Skip to content

hashfs

HashFS is a content-addressable file management system. What does that mean? Simply, that HashFS manages a directory where files are saved based on the file's hash.

Typical use cases for this kind of system are ones where:

  • Files are written once and never change (e.g. image storage).
  • It's desirable to have no duplicate files (e.g. user uploads).
  • File metadata is stored elsewhere (e.g. in a database).

Adapted from: https://github.com/dgilland/hashfs

License

The MIT License (MIT)

Copyright (c) 2015, Derrick Gilland

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Classes

HashFS

Bases: object

Content addressable file manager.

Attributes:

Name Type Description
root str

Directory path used as root of storage space.

depth int

Depth of subfolders when saving a file.

width int

Width of each subfolder to create when saving a file.

algorithm str

Hash algorithm to use when computing file hash. Algorithm should be available in hashlib module. Defaults to 'sha256'.

fmode int

File mode permission to set when adding files to directory. Defaults to 0o664 which allows owner/group to read/write and everyone else to read.

dmode int

Directory mode permission to set for subdirectories. Defaults to 0o755 which allows owner/group to read/write and everyone else to read and everyone to execute.

Source code in kiara/utils/hashfs/__init__.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
class HashFS(object):
    """Content addressable file manager.

    Attributes:
        root (str): Directory path used as root of storage space.
        depth (int, optional): Depth of subfolders when saving a
            file.
        width (int, optional): Width of each subfolder to create when saving a
            file.
        algorithm (str): Hash algorithm to use when computing file hash.
            Algorithm should be available in ``hashlib`` module. Defaults to
            ``'sha256'``.
        fmode (int, optional): File mode permission to set when adding files to
            directory. Defaults to ``0o664`` which allows owner/group to
            read/write and everyone else to read.
        dmode (int, optional): Directory mode permission to set for
            subdirectories. Defaults to ``0o755`` which allows owner/group to
            read/write and everyone else to read and everyone to execute.
    """

    def __init__(
        self,
        root: str,
        depth: int = 4,
        width: int = 1,
        algorithm: str = "sha256",
        fmode=0o664,
        dmode=0o755,
    ):
        self.root: str = os.path.realpath(root)
        self.depth: int = depth
        self.width: int = width
        self.algorithm: str = algorithm
        self.fmode = fmode
        self.dmode = dmode

    def put(self, file: BinaryIO) -> "HashAddress":
        """Store contents of `file` on disk using its content hash for the
        address.

        Args:
            file (mixed): Readable object or path to file.

        Returns:
            HashAddress: File's hash address.
        """
        stream = Stream(file)

        with closing(stream):
            id = self.computehash(stream)
            filepath, is_duplicate = self._copy(stream, id)

        return HashAddress(id, self.relpath(filepath), filepath, is_duplicate)

    def put_with_precomputed_hash(
        self, file: Union[str, Path, BinaryIO], hash_id: str
    ) -> "HashAddress":

        stream = Stream(file)
        with closing(stream):
            filepath, is_duplicate = self._copy(stream=stream, id=hash_id)

        return HashAddress(hash_id, self.relpath(filepath), filepath, is_duplicate)

    def _copy(self, stream: "Stream", id: str):
        """Copy the contents of `stream` onto disk with an optional file
        extension appended. The copy process uses a temporary file to store the
        initial contents and then moves that file to it's final location.
        """

        filepath = self.idpath(id)

        if not os.path.isfile(filepath):
            # Only move file if it doesn't already exist.
            is_duplicate = False
            fname = self._mktempfile(stream)
            self.makepath(os.path.dirname(filepath))
            shutil.move(fname, filepath)
        else:
            is_duplicate = True

        return (filepath, is_duplicate)

    def _mktempfile(self, stream):
        """Create a named temporary file from a :class:`Stream` object and
        return its filename.
        """
        tmp = NamedTemporaryFile(delete=False)

        if self.fmode is not None:
            oldmask = os.umask(0)

            try:
                os.chmod(tmp.name, self.fmode)
            finally:
                os.umask(oldmask)

        for data in stream:
            tmp.write(to_bytes(data))

        tmp.close()

        return tmp.name

    def get(self, file):
        """Return :class:`HashAdress` from given id or path. If `file` does not
        refer to a valid file, then ``None`` is returned.

        Args:
            file (str): Address ID or path of file.

        Returns:
            HashAddress: File's hash address.
        """
        realpath = self.realpath(file)

        if realpath is None:
            return None
        else:
            return HashAddress(self.unshard(realpath), self.relpath(realpath), realpath)

    def open(self, file, mode="rb"):
        """Return open buffer object from given id or path.

        Args:
            file (str): Address ID or path of file.
            mode (str, optional): Mode to open file in. Defaults to ``'rb'``.

        Returns:
            Buffer: An ``io`` buffer dependent on the `mode`.

        Raises:
            IOError: If file doesn't exist.
        """
        realpath = self.realpath(file)
        if realpath is None:
            raise IOError("Could not locate file: {0}".format(file))

        return io.open(realpath, mode)

    def delete(self, file):
        """Delete file using id or path. Remove any empty directories after
        deleting. No exception is raised if file doesn't exist.

        Args:
            file (str): Address ID or path of file.
        """
        realpath = self.realpath(file)
        if realpath is None:
            return

        try:
            os.remove(realpath)
        except OSError:  # pragma: no cover
            pass
        else:
            self.remove_empty(os.path.dirname(realpath))

    def remove_empty(self, subpath):
        """Successively remove all empty folders starting with `subpath` and
        proceeding "up" through directory tree until reaching the :attr:`root`
        folder.
        """
        # Don't attempt to remove any folders if subpath is not a
        # subdirectory of the root directory.
        if not self.haspath(subpath):
            return

        while subpath != self.root:
            if len(os.listdir(subpath)) > 0 or os.path.islink(subpath):
                break
            os.rmdir(subpath)
            subpath = os.path.dirname(subpath)

    def files(self):
        """Return generator that yields all files in the :attr:`root`
        directory.
        """
        for folder, subfolders, files in walk(self.root):
            for file in files:
                yield os.path.abspath(os.path.join(folder, file))

    def folders(self):
        """Return generator that yields all folders in the :attr:`root`
        directory that contain files.
        """
        for folder, subfolders, files in walk(self.root):
            if files:
                yield folder

    def count(self):
        """Return count of the number of files in the :attr:`root` directory."""
        count = 0
        for _ in self:
            count += 1
        return count

    def size(self):
        """Return the total size in bytes of all files in the :attr:`root`
        directory.
        """
        total = 0

        for path in self.files():
            total += os.path.getsize(path)

        return total

    def exists(self, file):
        """Check whether a given file id or path exists on disk."""
        return bool(self.realpath(file))

    def haspath(self, path):
        """Return whether `path` is a subdirectory of the :attr:`root`
        directory.
        """
        return issubdir(path, self.root)

    def makepath(self, path):
        """Physically create the folder path on disk."""
        try:
            os.makedirs(path, self.dmode)
        except FileExistsError:
            assert os.path.isdir(path), "expected {} to be a directory".format(path)

    def relpath(self, path):
        """Return `path` relative to the :attr:`root` directory."""
        return os.path.relpath(path, self.root)

    def realpath(self, file):
        """Attempt to determine the real path of a file id or path through
        successive checking of candidate paths. If the real path is stored with
        an extension, the path is considered a match if the basename matches
        the expected file path of the id.
        """
        # Check for absoluate path.
        if os.path.isfile(file):
            return file

        # Check for relative path.
        relpath = os.path.join(self.root, file)
        if os.path.isfile(relpath):
            return relpath

        # Check for sharded path.
        filepath = self.idpath(file)
        if os.path.isfile(filepath):
            return filepath

        # Check for sharded path with any extension.
        paths = glob.glob("{0}.*".format(filepath))
        if paths:
            return paths[0]

        # Could not determine a match.
        return None

    def idpath(self, id):
        """Build the file path for a given hash id. Optionally, append a
        file extension.
        """
        paths = self.shard(id)

        return os.path.join(self.root, *paths)

    def computehash(self, stream) -> str:
        """Compute hash of file using :attr:`algorithm`."""
        hashobj = hashlib.new(self.algorithm)
        for data in stream:
            hashobj.update(to_bytes(data))
        return hashobj.hexdigest()

    def shard(self, id):
        """Shard content ID into subfolders."""
        return shard(id, self.depth, self.width)

    def unshard(self, path):
        """Unshard path to determine hash value."""
        if not self.haspath(path):
            raise ValueError(
                "Cannot unshard path. The path {0!r} is not "
                "a subdirectory of the root directory {1!r}".format(path, self.root)
            )

        return os.path.splitext(self.relpath(path))[0].replace(os.sep, "")

    def repair(self):
        """Repair any file locations whose content address doesn't match it's
        file path.
        """
        repaired = []
        corrupted = tuple(self.corrupted())
        oldmask = os.umask(0)

        try:
            for path, address in corrupted:
                if os.path.isfile(address.abspath):
                    # File already exists so just delete corrupted path.
                    os.remove(path)
                else:
                    # File doesn't exists so move it.
                    self.makepath(os.path.dirname(address.abspath))
                    shutil.move(path, address.abspath)

                os.chmod(address.abspath, self.fmode)
                repaired.append((path, address))
        finally:
            os.umask(oldmask)

        return repaired

    def corrupted(self):
        """Return generator that yields corrupted files as ``(path, address)``
        where ``path`` is the path of the corrupted file and ``address`` is
        the :class:`HashAddress` of the expected location.
        """
        for path in self.files():
            stream = Stream(path)

            with closing(stream):
                id = self.computehash(stream)

            expected_path = self.idpath(id)

            if expected_path != path:
                yield (
                    path,
                    HashAddress(id, self.relpath(expected_path), expected_path),
                )

    def __contains__(self, file):
        """Return whether a given file id or path is contained in the
        :attr:`root` directory.
        """
        return self.exists(file)

    def __iter__(self):
        """Iterate over all files in the :attr:`root` directory."""
        return self.files()

    def __len__(self):
        """Return count of the number of files in the :attr:`root` directory."""
        return self.count()

Attributes

root: str = os.path.realpath(root) instance-attribute
depth: int = depth instance-attribute
width: int = width instance-attribute
algorithm: str = algorithm instance-attribute
fmode = fmode instance-attribute
dmode = dmode instance-attribute

Functions

put(file: BinaryIO) -> HashAddress

Store contents of file on disk using its content hash for the address.

Parameters:

Name Type Description Default
file mixed

Readable object or path to file.

required

Returns:

Name Type Description
HashAddress HashAddress

File's hash address.

Source code in kiara/utils/hashfs/__init__.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def put(self, file: BinaryIO) -> "HashAddress":
    """Store contents of `file` on disk using its content hash for the
    address.

    Args:
        file (mixed): Readable object or path to file.

    Returns:
        HashAddress: File's hash address.
    """
    stream = Stream(file)

    with closing(stream):
        id = self.computehash(stream)
        filepath, is_duplicate = self._copy(stream, id)

    return HashAddress(id, self.relpath(filepath), filepath, is_duplicate)
put_with_precomputed_hash(file: Union[str, Path, BinaryIO], hash_id: str) -> HashAddress
Source code in kiara/utils/hashfs/__init__.py
136
137
138
139
140
141
142
143
144
def put_with_precomputed_hash(
    self, file: Union[str, Path, BinaryIO], hash_id: str
) -> "HashAddress":

    stream = Stream(file)
    with closing(stream):
        filepath, is_duplicate = self._copy(stream=stream, id=hash_id)

    return HashAddress(hash_id, self.relpath(filepath), filepath, is_duplicate)
get(file)

Return :class:HashAdress from given id or path. If file does not refer to a valid file, then None is returned.

Parameters:

Name Type Description Default
file str

Address ID or path of file.

required

Returns:

Name Type Description
HashAddress

File's hash address.

Source code in kiara/utils/hashfs/__init__.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def get(self, file):
    """Return :class:`HashAdress` from given id or path. If `file` does not
    refer to a valid file, then ``None`` is returned.

    Args:
        file (str): Address ID or path of file.

    Returns:
        HashAddress: File's hash address.
    """
    realpath = self.realpath(file)

    if realpath is None:
        return None
    else:
        return HashAddress(self.unshard(realpath), self.relpath(realpath), realpath)
open(file, mode = 'rb')

Return open buffer object from given id or path.

Parameters:

Name Type Description Default
file str

Address ID or path of file.

required
mode str

Mode to open file in. Defaults to 'rb'.

'rb'

Returns:

Name Type Description
Buffer

An io buffer dependent on the mode.

Raises:

Type Description
IOError

If file doesn't exist.

Source code in kiara/utils/hashfs/__init__.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def open(self, file, mode="rb"):
    """Return open buffer object from given id or path.

    Args:
        file (str): Address ID or path of file.
        mode (str, optional): Mode to open file in. Defaults to ``'rb'``.

    Returns:
        Buffer: An ``io`` buffer dependent on the `mode`.

    Raises:
        IOError: If file doesn't exist.
    """
    realpath = self.realpath(file)
    if realpath is None:
        raise IOError("Could not locate file: {0}".format(file))

    return io.open(realpath, mode)
delete(file)

Delete file using id or path. Remove any empty directories after deleting. No exception is raised if file doesn't exist.

Parameters:

Name Type Description Default
file str

Address ID or path of file.

required
Source code in kiara/utils/hashfs/__init__.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def delete(self, file):
    """Delete file using id or path. Remove any empty directories after
    deleting. No exception is raised if file doesn't exist.

    Args:
        file (str): Address ID or path of file.
    """
    realpath = self.realpath(file)
    if realpath is None:
        return

    try:
        os.remove(realpath)
    except OSError:  # pragma: no cover
        pass
    else:
        self.remove_empty(os.path.dirname(realpath))
remove_empty(subpath)

Successively remove all empty folders starting with subpath and proceeding "up" through directory tree until reaching the :attr:root folder.

Source code in kiara/utils/hashfs/__init__.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def remove_empty(self, subpath):
    """Successively remove all empty folders starting with `subpath` and
    proceeding "up" through directory tree until reaching the :attr:`root`
    folder.
    """
    # Don't attempt to remove any folders if subpath is not a
    # subdirectory of the root directory.
    if not self.haspath(subpath):
        return

    while subpath != self.root:
        if len(os.listdir(subpath)) > 0 or os.path.islink(subpath):
            break
        os.rmdir(subpath)
        subpath = os.path.dirname(subpath)
files()

Return generator that yields all files in the :attr:root directory.

Source code in kiara/utils/hashfs/__init__.py
256
257
258
259
260
261
262
def files(self):
    """Return generator that yields all files in the :attr:`root`
    directory.
    """
    for folder, subfolders, files in walk(self.root):
        for file in files:
            yield os.path.abspath(os.path.join(folder, file))
folders()

Return generator that yields all folders in the :attr:root directory that contain files.

Source code in kiara/utils/hashfs/__init__.py
264
265
266
267
268
269
270
def folders(self):
    """Return generator that yields all folders in the :attr:`root`
    directory that contain files.
    """
    for folder, subfolders, files in walk(self.root):
        if files:
            yield folder
count()

Return count of the number of files in the :attr:root directory.

Source code in kiara/utils/hashfs/__init__.py
272
273
274
275
276
277
def count(self):
    """Return count of the number of files in the :attr:`root` directory."""
    count = 0
    for _ in self:
        count += 1
    return count
size()

Return the total size in bytes of all files in the :attr:root directory.

Source code in kiara/utils/hashfs/__init__.py
279
280
281
282
283
284
285
286
287
288
def size(self):
    """Return the total size in bytes of all files in the :attr:`root`
    directory.
    """
    total = 0

    for path in self.files():
        total += os.path.getsize(path)

    return total
exists(file)

Check whether a given file id or path exists on disk.

Source code in kiara/utils/hashfs/__init__.py
290
291
292
def exists(self, file):
    """Check whether a given file id or path exists on disk."""
    return bool(self.realpath(file))
haspath(path)

Return whether path is a subdirectory of the :attr:root directory.

Source code in kiara/utils/hashfs/__init__.py
294
295
296
297
298
def haspath(self, path):
    """Return whether `path` is a subdirectory of the :attr:`root`
    directory.
    """
    return issubdir(path, self.root)
makepath(path)

Physically create the folder path on disk.

Source code in kiara/utils/hashfs/__init__.py
300
301
302
303
304
305
def makepath(self, path):
    """Physically create the folder path on disk."""
    try:
        os.makedirs(path, self.dmode)
    except FileExistsError:
        assert os.path.isdir(path), "expected {} to be a directory".format(path)
relpath(path)

Return path relative to the :attr:root directory.

Source code in kiara/utils/hashfs/__init__.py
307
308
309
def relpath(self, path):
    """Return `path` relative to the :attr:`root` directory."""
    return os.path.relpath(path, self.root)
realpath(file)

Attempt to determine the real path of a file id or path through successive checking of candidate paths. If the real path is stored with an extension, the path is considered a match if the basename matches the expected file path of the id.

Source code in kiara/utils/hashfs/__init__.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def realpath(self, file):
    """Attempt to determine the real path of a file id or path through
    successive checking of candidate paths. If the real path is stored with
    an extension, the path is considered a match if the basename matches
    the expected file path of the id.
    """
    # Check for absoluate path.
    if os.path.isfile(file):
        return file

    # Check for relative path.
    relpath = os.path.join(self.root, file)
    if os.path.isfile(relpath):
        return relpath

    # Check for sharded path.
    filepath = self.idpath(file)
    if os.path.isfile(filepath):
        return filepath

    # Check for sharded path with any extension.
    paths = glob.glob("{0}.*".format(filepath))
    if paths:
        return paths[0]

    # Could not determine a match.
    return None
idpath(id)

Build the file path for a given hash id. Optionally, append a file extension.

Source code in kiara/utils/hashfs/__init__.py
339
340
341
342
343
344
345
def idpath(self, id):
    """Build the file path for a given hash id. Optionally, append a
    file extension.
    """
    paths = self.shard(id)

    return os.path.join(self.root, *paths)
computehash(stream) -> str

Compute hash of file using :attr:algorithm.

Source code in kiara/utils/hashfs/__init__.py
347
348
349
350
351
352
def computehash(self, stream) -> str:
    """Compute hash of file using :attr:`algorithm`."""
    hashobj = hashlib.new(self.algorithm)
    for data in stream:
        hashobj.update(to_bytes(data))
    return hashobj.hexdigest()
shard(id)

Shard content ID into subfolders.

Source code in kiara/utils/hashfs/__init__.py
354
355
356
def shard(self, id):
    """Shard content ID into subfolders."""
    return shard(id, self.depth, self.width)
unshard(path)

Unshard path to determine hash value.

Source code in kiara/utils/hashfs/__init__.py
358
359
360
361
362
363
364
365
366
def unshard(self, path):
    """Unshard path to determine hash value."""
    if not self.haspath(path):
        raise ValueError(
            "Cannot unshard path. The path {0!r} is not "
            "a subdirectory of the root directory {1!r}".format(path, self.root)
        )

    return os.path.splitext(self.relpath(path))[0].replace(os.sep, "")
repair()

Repair any file locations whose content address doesn't match it's file path.

Source code in kiara/utils/hashfs/__init__.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def repair(self):
    """Repair any file locations whose content address doesn't match it's
    file path.
    """
    repaired = []
    corrupted = tuple(self.corrupted())
    oldmask = os.umask(0)

    try:
        for path, address in corrupted:
            if os.path.isfile(address.abspath):
                # File already exists so just delete corrupted path.
                os.remove(path)
            else:
                # File doesn't exists so move it.
                self.makepath(os.path.dirname(address.abspath))
                shutil.move(path, address.abspath)

            os.chmod(address.abspath, self.fmode)
            repaired.append((path, address))
    finally:
        os.umask(oldmask)

    return repaired
corrupted()

Return generator that yields corrupted files as (path, address) where path is the path of the corrupted file and address is the :class:HashAddress of the expected location.

Source code in kiara/utils/hashfs/__init__.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
def corrupted(self):
    """Return generator that yields corrupted files as ``(path, address)``
    where ``path`` is the path of the corrupted file and ``address`` is
    the :class:`HashAddress` of the expected location.
    """
    for path in self.files():
        stream = Stream(path)

        with closing(stream):
            id = self.computehash(stream)

        expected_path = self.idpath(id)

        if expected_path != path:
            yield (
                path,
                HashAddress(id, self.relpath(expected_path), expected_path),
            )

HashAddress

Bases: namedtuple(HashAddress, [id, relpath, abspath, is_duplicate])

File address containing file's path on disk and it's content hash ID.

Attributes:

Name Type Description
id str

Hash ID (hexdigest) of file contents.

relpath str

Relative path location to :attr:HashFS.root.

abspath str

Absoluate path location of file on disk.

is_duplicate boolean

Whether the hash address created was a duplicate of a previously existing file. Can only be True after a put operation. Defaults to False.

Source code in kiara/utils/hashfs/__init__.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
class HashAddress(
    namedtuple("HashAddress", ["id", "relpath", "abspath", "is_duplicate"])
):
    """File address containing file's path on disk and it's content hash ID.

    Attributes:
        id (str): Hash ID (hexdigest) of file contents.
        relpath (str): Relative path location to :attr:`HashFS.root`.
        abspath (str): Absoluate path location of file on disk.
        is_duplicate (boolean, optional): Whether the hash address created was
            a duplicate of a previously existing file. Can only be ``True``
            after a put operation. Defaults to ``False``.
    """

    def __new__(cls, id, relpath, abspath, is_duplicate=False):
        return super(HashAddress, cls).__new__(cls, id, relpath, abspath, is_duplicate)  # type: ignore

Stream

Bases: object

Common interface for file-like objects.

The input obj can be a file-like object or a path to a file. If obj is a path to a file, then it will be opened until :meth:close is called. If obj is a file-like object, then it's original position will be restored when :meth:close is called instead of closing the object automatically. Closing of the stream is deferred to whatever process passed the stream in.

Successive readings of the stream is supported without having to manually set it's position back to 0.

Source code in kiara/utils/hashfs/__init__.py
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
class Stream(object):
    """Common interface for file-like objects.

    The input `obj` can be a file-like object or a path to a file. If `obj` is
    a path to a file, then it will be opened until :meth:`close` is called.
    If `obj` is a file-like object, then it's original position will be
    restored when :meth:`close` is called instead of closing the object
    automatically. Closing of the stream is deferred to whatever process passed
    the stream in.

    Successive readings of the stream is supported without having to manually
    set it's position back to ``0``.
    """

    def __init__(self, obj: Union[BinaryIO, str, Path]):
        if hasattr(obj, "read"):
            pos = obj.tell()  # type: ignore
        elif os.path.isfile(obj):  # type: ignore
            obj = io.open(obj, "rb")  # type: ignore
            pos = None
        else:
            raise ValueError("Object must be a valid file path or a readable object")

        try:
            file_stat = os.stat(obj.name)  # type: ignore
            buffer_size = file_stat.st_blksize
        except Exception:
            buffer_size = 8192

        self._obj: BinaryIO = obj  # type: ignore
        self._pos = pos
        self._buffer_size = buffer_size

    def __iter__(self):
        """Read underlying IO object and yield results. Return object to
        original position if we didn't open it originally.
        """
        self._obj.seek(0)

        while True:
            data = self._obj.read(self._buffer_size)

            if not data:
                break

            yield data

        if self._pos is not None:
            self._obj.seek(self._pos)

    def close(self):
        """Close underlying IO object if we opened it, else return it to
        original position.
        """
        if self._pos is None:
            self._obj.close()
        else:
            self._obj.seek(self._pos)

Functions

close()

Close underlying IO object if we opened it, else return it to original position.

Source code in kiara/utils/hashfs/__init__.py
495
496
497
498
499
500
501
502
def close(self):
    """Close underlying IO object if we opened it, else return it to
    original position.
    """
    if self._pos is None:
        self._obj.close()
    else:
        self._obj.seek(self._pos)

Functions

to_bytes(text: Union[str, bytes])

Source code in kiara/utils/hashfs/__init__.py
54
55
56
57
def to_bytes(text: Union[str, bytes]):
    if not isinstance(text, bytes):
        text = bytes(text, "utf8")
    return text

compact(items: List[Any]) -> List[Any]

Return only truthy elements of items.

Source code in kiara/utils/hashfs/__init__.py
60
61
62
def compact(items: List[Any]) -> List[Any]:
    """Return only truthy elements of `items`."""
    return [item for item in items if item]

issubdir(subpath: str, path: str)

Return whether subpath is a sub-directory of path.

Source code in kiara/utils/hashfs/__init__.py
65
66
67
68
69
70
def issubdir(subpath: str, path: str):
    """Return whether `subpath` is a sub-directory of `path`."""
    # Append os.sep so that paths like /usr/var2/log doesn't match /usr/var.
    path = os.path.realpath(path) + os.sep
    subpath = os.path.realpath(subpath)
    return subpath.startswith(path)

shard(digest, depth, width)

Source code in kiara/utils/hashfs/__init__.py
73
74
75
76
77
78
79
def shard(digest, depth, width):
    # This creates a list of `depth` number of tokens with width
    # `width` from the first part of the id plus the remainder.
    return compact(
        [digest[i * width : width * (i + 1)] for i in range(depth)]
        + [digest[depth * width :]]
    )