Skip to content

filesystem

FILE_BUNDLE_IMPORT_AVAILABLE_COLUMNS

logger

Classes

FileBundle (KiaraModel) pydantic-model

Describes properties for the 'file_bundle' value type.

Source code in kiara/models/filesystem.py
class FileBundle(KiaraModel):
    """Describes properties for the 'file_bundle' value type."""

    _kiara_model_id = "instance.data.file_bundle"

    @classmethod
    def import_folder(
        cls,
        source: str,
        bundle_name: Optional[str] = None,
        import_config: Union[None, Mapping[str, Any], FolderImportConfig] = None,
        # import_time: Optional[datetime.datetime] = None,
    ) -> "FileBundle":

        if not source:
            raise ValueError("No source path provided.")

        if not os.path.exists(os.path.realpath(source)):
            raise ValueError(f"Path does not exist: {source}")

        if not os.path.isdir(os.path.realpath(source)):
            raise ValueError(f"Path is not a file: {source}")

        if source.endswith(os.path.sep):
            source = source[0:-1]

        abs_path = os.path.abspath(source)

        if import_config is None:
            _import_config = FolderImportConfig()
        elif isinstance(import_config, Mapping):
            _import_config = FolderImportConfig(**import_config)
        elif isinstance(import_config, FolderImportConfig):
            _import_config = import_config
        else:
            raise TypeError(
                f"Invalid type for folder import config: {type(import_config)}."
            )

        included_files: Dict[str, FileModel] = {}
        exclude_dirs = _import_config.exclude_dirs
        invalid_extensions = _import_config.exclude_files

        valid_extensions = _import_config.include_files

        # if import_time:
        #     bundle_import_time = import_time
        # else:
        #     bundle_import_time = datetime.datetime.now()  # TODO: timezone

        sum_size = 0

        def include_file(filename: str) -> bool:

            if invalid_extensions and any(
                filename.endswith(ext) for ext in invalid_extensions
            ):
                return False
            if not valid_extensions:
                return True
            else:
                return any(filename.endswith(ext) for ext in valid_extensions)

        for root, dirnames, filenames in os.walk(abs_path, topdown=True):

            if exclude_dirs:
                dirnames[:] = [d for d in dirnames if d not in exclude_dirs]

            for filename in [
                f
                for f in filenames
                if os.path.isfile(os.path.join(root, f)) and include_file(f)
            ]:

                full_path = os.path.join(root, filename)
                rel_path = os.path.relpath(full_path, abs_path)

                file_model = FileModel.load_file(full_path)
                sum_size = sum_size + file_model.size
                included_files[rel_path] = file_model

        if bundle_name is None:
            bundle_name = os.path.basename(source)

        bundle = FileBundle.create_from_file_models(
            files=included_files,
            path=abs_path,
            bundle_name=bundle_name,
            sum_size=sum_size,
        )
        return bundle

    @classmethod
    def create_from_file_models(
        cls,
        files: Mapping[str, FileModel],
        bundle_name: str,
        path: Optional[str] = None,
        sum_size: Optional[int] = None,
        # import_time: Optional[datetime.datetime] = None,
    ) -> "FileBundle":

        # if import_time:
        #     bundle_import_time = import_time
        # else:
        #     bundle_import_time = datetime.datetime.now()  # TODO: timezone

        result: Dict[str, Any] = {}

        result["included_files"] = files

        # result["import_time"] = datetime.datetime.now().isoformat()
        result["number_of_files"] = len(files)
        result["bundle_name"] = bundle_name
        # result["import_time"] = bundle_import_time

        if sum_size is None:
            sum_size = 0
            for f in files.values():
                sum_size = sum_size + f.size
        result["size"] = sum_size

        bundle = FileBundle(**result)
        bundle._path = path
        return bundle

    _file_bundle_hash: Optional[int] = PrivateAttr(default=None)

    bundle_name: str = Field(description="The name of this bundle.")
    # import_time: datetime.datetime = Field(
    #     description="The time when the file bundle was imported."
    # )
    number_of_files: int = Field(
        description="How many files are included in this bundle."
    )
    included_files: Dict[str, FileModel] = Field(
        description="A map of all the included files, incl. their properties. Uses the relative path of each file as key."
    )
    size: int = Field(description="The size of all files in this folder, combined.")
    _path: Optional[str] = PrivateAttr(default=None)

    @property
    def path(self) -> str:
        if self._path is None:
            # TODO: better explanation, offer remedy like copying into temp folder
            raise Exception(
                "File bundle path not set, it appears this bundle is comprised of symlinks only."
            )
        return self._path

    def _retrieve_id(self) -> str:
        return str(self.file_bundle_hash)

    # @property
    # def model_data_hash(self) -> int:
    #     return self.file_bundle_hash

    def _retrieve_data_to_hash(self) -> Any:

        return {
            "bundle_name": self.bundle_name,
            "included_files": {
                k: v.instance_cid for k, v in self.included_files.items()
            },
        }

    def get_relative_path(self, file: FileModel):
        return os.path.relpath(file.path, self.path)

    def read_text_file_contents(self, ignore_errors: bool = False) -> Mapping[str, str]:

        content_dict: Dict[str, str] = {}

        def read_file(rel_path: str, full_path: str):
            with open(full_path, encoding="utf-8") as f:
                try:
                    content = f.read()
                    content_dict[rel_path] = content  # type: ignore
                except Exception as e:
                    if ignore_errors:
                        log_message(f"Can't read file: {e}")
                        logger.warning("ignore.file", path=full_path, reason=str(e))
                    else:
                        raise Exception(f"Can't read file (as text) '{full_path}: {e}")

        # TODO: common ignore files and folders
        for rel_path, f in self.included_files.items():
            if f._path:
                path = f._path
            else:
                path = self.get_relative_path(f)
            read_file(rel_path=rel_path, full_path=path)

        return content_dict

    @property
    def file_bundle_hash(self) -> int:

        # TODO: use sha256?
        if self._file_bundle_hash is not None:
            return self._file_bundle_hash

        obj = {k: v.file_hash for k, v in self.included_files.items()}
        h = DeepHash(obj, hasher=KIARA_HASH_FUNCTION)

        self._file_bundle_hash = h[obj]
        return self._file_bundle_hash

    def copy_bundle(
        self, target_path: str, bundle_name: Optional[str] = None
    ) -> "FileBundle":

        if target_path == self.path:
            raise Exception(f"Target path and current path are the same: {target_path}")

        result = {}
        for rel_path, item in self.included_files.items():
            _target_path = os.path.join(target_path, rel_path)
            new_fm = item.copy_file(_target_path)
            result[rel_path] = new_fm

        if bundle_name is None:
            bundle_name = os.path.basename(target_path)

        fb = FileBundle.create_from_file_models(
            files=result,
            bundle_name=bundle_name,
            path=target_path,
            sum_size=self.size,
            # import_time=self.import_time,
        )
        if self._file_bundle_hash is not None:
            fb._file_bundle_hash = self._file_bundle_hash

        return fb

    def create_renderable(self, **config: Any) -> RenderableType:

        show_bundle_hash = config.get("show_bundle_hash", False)

        table = Table(show_header=False, box=box.SIMPLE)
        table.add_column("key")
        table.add_column("value", style="i")

        table.add_row("bundle name", self.bundle_name)
        # table.add_row("import_time", str(self.import_time))
        table.add_row("number_of_files", str(self.number_of_files))
        table.add_row("size", str(self.size))
        if show_bundle_hash:
            table.add_row("bundle_hash", str(self.file_bundle_hash))

        content = self._create_content_table(**config)
        table.add_row("included files", content)

        return table

    def _create_content_table(self, **render_config: Any) -> Table:

        # show_content = render_config.get("show_content_preview", False)
        max_no_included_files = render_config.get("max_no_files", 40)

        table = Table(show_header=True, box=box.SIMPLE)
        table.add_column("(relative) path")
        table.add_column("size")
        # if show_content:
        #     table.add_column("content preview")

        if (
            max_no_included_files < 0
            or len(self.included_files) <= max_no_included_files
        ):
            for f, model in self.included_files.items():
                row = [f, str(model.size)]
                table.add_row(*row)
        else:
            files = list(self.included_files.keys())
            half = int((max_no_included_files - 1) / 2)
            head = files[0:half]
            tail = files[-1 * half :]  # noqa
            for rel_path in head:
                model = self.included_files[rel_path]
                row = [rel_path, str(model.size)]
                table.add_row(*row)
            table.add_row("   ... output skipped ...", "")
            table.add_row("   ... output skipped ...", "")
            for rel_path in tail:
                model = self.included_files[rel_path]
                row = [rel_path, str(model.size)]
                table.add_row(*row)

        return table

    def __repr__(self):
        return f"FileBundle(name={self.bundle_name})"

    def __str__(self):
        return self.__repr__()

Attributes

bundle_name: str pydantic-field required

The name of this bundle.

file_bundle_hash: int property readonly
included_files: Dict[str, kiara.models.filesystem.FileModel] pydantic-field required

A map of all the included files, incl. their properties. Uses the relative path of each file as key.

number_of_files: int pydantic-field required

How many files are included in this bundle.

path: str property readonly
size: int pydantic-field required

The size of all files in this folder, combined.

copy_bundle(self, target_path, bundle_name=None)
Source code in kiara/models/filesystem.py
def copy_bundle(
    self, target_path: str, bundle_name: Optional[str] = None
) -> "FileBundle":

    if target_path == self.path:
        raise Exception(f"Target path and current path are the same: {target_path}")

    result = {}
    for rel_path, item in self.included_files.items():
        _target_path = os.path.join(target_path, rel_path)
        new_fm = item.copy_file(_target_path)
        result[rel_path] = new_fm

    if bundle_name is None:
        bundle_name = os.path.basename(target_path)

    fb = FileBundle.create_from_file_models(
        files=result,
        bundle_name=bundle_name,
        path=target_path,
        sum_size=self.size,
        # import_time=self.import_time,
    )
    if self._file_bundle_hash is not None:
        fb._file_bundle_hash = self._file_bundle_hash

    return fb
create_from_file_models(files, bundle_name, path=None, sum_size=None) classmethod
Source code in kiara/models/filesystem.py
@classmethod
def create_from_file_models(
    cls,
    files: Mapping[str, FileModel],
    bundle_name: str,
    path: Optional[str] = None,
    sum_size: Optional[int] = None,
    # import_time: Optional[datetime.datetime] = None,
) -> "FileBundle":

    # if import_time:
    #     bundle_import_time = import_time
    # else:
    #     bundle_import_time = datetime.datetime.now()  # TODO: timezone

    result: Dict[str, Any] = {}

    result["included_files"] = files

    # result["import_time"] = datetime.datetime.now().isoformat()
    result["number_of_files"] = len(files)
    result["bundle_name"] = bundle_name
    # result["import_time"] = bundle_import_time

    if sum_size is None:
        sum_size = 0
        for f in files.values():
            sum_size = sum_size + f.size
    result["size"] = sum_size

    bundle = FileBundle(**result)
    bundle._path = path
    return bundle
create_renderable(self, **config)
Source code in kiara/models/filesystem.py
def create_renderable(self, **config: Any) -> RenderableType:

    show_bundle_hash = config.get("show_bundle_hash", False)

    table = Table(show_header=False, box=box.SIMPLE)
    table.add_column("key")
    table.add_column("value", style="i")

    table.add_row("bundle name", self.bundle_name)
    # table.add_row("import_time", str(self.import_time))
    table.add_row("number_of_files", str(self.number_of_files))
    table.add_row("size", str(self.size))
    if show_bundle_hash:
        table.add_row("bundle_hash", str(self.file_bundle_hash))

    content = self._create_content_table(**config)
    table.add_row("included files", content)

    return table
get_relative_path(self, file)
Source code in kiara/models/filesystem.py
def get_relative_path(self, file: FileModel):
    return os.path.relpath(file.path, self.path)
import_folder(source, bundle_name=None, import_config=None) classmethod
Source code in kiara/models/filesystem.py
@classmethod
def import_folder(
    cls,
    source: str,
    bundle_name: Optional[str] = None,
    import_config: Union[None, Mapping[str, Any], FolderImportConfig] = None,
    # import_time: Optional[datetime.datetime] = None,
) -> "FileBundle":

    if not source:
        raise ValueError("No source path provided.")

    if not os.path.exists(os.path.realpath(source)):
        raise ValueError(f"Path does not exist: {source}")

    if not os.path.isdir(os.path.realpath(source)):
        raise ValueError(f"Path is not a file: {source}")

    if source.endswith(os.path.sep):
        source = source[0:-1]

    abs_path = os.path.abspath(source)

    if import_config is None:
        _import_config = FolderImportConfig()
    elif isinstance(import_config, Mapping):
        _import_config = FolderImportConfig(**import_config)
    elif isinstance(import_config, FolderImportConfig):
        _import_config = import_config
    else:
        raise TypeError(
            f"Invalid type for folder import config: {type(import_config)}."
        )

    included_files: Dict[str, FileModel] = {}
    exclude_dirs = _import_config.exclude_dirs
    invalid_extensions = _import_config.exclude_files

    valid_extensions = _import_config.include_files

    # if import_time:
    #     bundle_import_time = import_time
    # else:
    #     bundle_import_time = datetime.datetime.now()  # TODO: timezone

    sum_size = 0

    def include_file(filename: str) -> bool:

        if invalid_extensions and any(
            filename.endswith(ext) for ext in invalid_extensions
        ):
            return False
        if not valid_extensions:
            return True
        else:
            return any(filename.endswith(ext) for ext in valid_extensions)

    for root, dirnames, filenames in os.walk(abs_path, topdown=True):

        if exclude_dirs:
            dirnames[:] = [d for d in dirnames if d not in exclude_dirs]

        for filename in [
            f
            for f in filenames
            if os.path.isfile(os.path.join(root, f)) and include_file(f)
        ]:

            full_path = os.path.join(root, filename)
            rel_path = os.path.relpath(full_path, abs_path)

            file_model = FileModel.load_file(full_path)
            sum_size = sum_size + file_model.size
            included_files[rel_path] = file_model

    if bundle_name is None:
        bundle_name = os.path.basename(source)

    bundle = FileBundle.create_from_file_models(
        files=included_files,
        path=abs_path,
        bundle_name=bundle_name,
        sum_size=sum_size,
    )
    return bundle
read_text_file_contents(self, ignore_errors=False)
Source code in kiara/models/filesystem.py
def read_text_file_contents(self, ignore_errors: bool = False) -> Mapping[str, str]:

    content_dict: Dict[str, str] = {}

    def read_file(rel_path: str, full_path: str):
        with open(full_path, encoding="utf-8") as f:
            try:
                content = f.read()
                content_dict[rel_path] = content  # type: ignore
            except Exception as e:
                if ignore_errors:
                    log_message(f"Can't read file: {e}")
                    logger.warning("ignore.file", path=full_path, reason=str(e))
                else:
                    raise Exception(f"Can't read file (as text) '{full_path}: {e}")

    # TODO: common ignore files and folders
    for rel_path, f in self.included_files.items():
        if f._path:
            path = f._path
        else:
            path = self.get_relative_path(f)
        read_file(rel_path=rel_path, full_path=path)

    return content_dict

FileModel (KiaraModel) pydantic-model

Describes properties for the 'file' value type.

Source code in kiara/models/filesystem.py
class FileModel(KiaraModel):
    """Describes properties for the 'file' value type."""

    _kiara_model_id = "instance.data.file"

    @classmethod
    def load_file(
        cls,
        source: str,
        file_name: Optional[str] = None,
        # import_time: Optional[datetime.datetime] = None,
    ):
        """Utility method to read metadata of a file from disk and optionally move it into a data archive location."""

        import filetype
        import mimetypes

        if not source:
            raise ValueError("No source path provided.")

        if not os.path.exists(os.path.realpath(source)):
            raise ValueError(f"Path does not exist: {source}")

        if not os.path.isfile(os.path.realpath(source)):
            raise ValueError(f"Path is not a file: {source}")

        if file_name is None:
            file_name = os.path.basename(source)

        path: str = os.path.abspath(source)
        # if import_time:
        #     file_import_time = import_time
        # else:
        #     file_import_time = datetime.datetime.now()  # TODO: timezone

        file_stats = os.stat(path)
        size = file_stats.st_size

        r = mimetypes.guess_type(path)
        if r[0] is not None:
            mime_type = r[0]
        else:
            _mime_type = filetype.guess(path)
            if not _mime_type:
                mime_type = "application/octet-stream"
            else:
                mime_type = _mime_type.MIME

        m = FileModel(
            # import_time=file_import_time,
            mime_type=mime_type,
            size=size,
            file_name=file_name,
        )
        m._path = path
        return m

    # import_time: datetime.datetime = Field(
    #     description="The time when the file was imported."
    # )
    mime_type: str = Field(description="The mime type of the file.")
    file_name: str = Field("The name of the file.")
    size: int = Field(description="The size of the file.")

    _path: Optional[str] = PrivateAttr(default=None)
    _file_hash: Optional[str] = PrivateAttr(default=None)
    _file_cid: Optional[CID] = PrivateAttr(default=None)

    # @validator("path")
    # def ensure_abs_path(cls, value):
    #     return os.path.abspath(value)

    @property
    def path(self) -> str:
        if self._path is None:
            raise Exception("File path not set for file model.")
        return self._path

    def _retrieve_data_to_hash(self) -> Any:
        data = {
            "file_name": self.file_name,
            "file_cid": self.file_cid,
        }
        return data

    # def get_id(self) -> str:
    #     return self.path

    def get_category_alias(self) -> str:
        return "instance.file_model"

    def copy_file(self, target: str, new_name: Optional[str] = None) -> "FileModel":

        target_path: str = os.path.abspath(target)
        os.makedirs(os.path.dirname(target_path), exist_ok=True)

        shutil.copy2(self.path, target_path)
        fm = FileModel.load_file(target, file_name=new_name)

        if self._file_hash is not None:
            fm._file_hash = self._file_hash

        return fm

    @property
    def file_hash(self) -> str:

        if self._file_hash is not None:
            return self._file_hash

        self._file_hash = str(self.file_cid)
        return self._file_hash

    @property
    def file_cid(self) -> CID:

        if self._file_cid is not None:
            return self._file_cid

        # TODO: auto-set codec?
        self._file_cid = compute_cid_from_file(file=self.path, codec="raw")
        return self._file_cid

    @property
    def file_name_without_extension(self) -> str:

        return self.file_name.split(".")[0]

    def read_text(self, max_lines: int = -1) -> str:
        """Read the content of a file."""

        with open(self.path, "rt") as f:
            if max_lines <= 0:
                content = f.read()
            else:
                content = "".join((next(f) for x in range(max_lines)))
        return content

    def read_bytes(self, length: int = -1) -> bytes:
        """Read the content of a file."""

        with open(self.path, "rb") as f:
            if length <= 0:
                content = f.read()
            else:
                content = f.read(length)
        return content

    def __repr__(self):
        return f"FileModel(name={self.file_name})"

    def __str__(self):
        return self.__repr__()

Attributes

file_cid: CID property readonly
file_hash: str property readonly
file_name: str pydantic-field
file_name_without_extension: str property readonly
mime_type: str pydantic-field required

The mime type of the file.

path: str property readonly
size: int pydantic-field required

The size of the file.

Methods

copy_file(self, target, new_name=None)
Source code in kiara/models/filesystem.py
def copy_file(self, target: str, new_name: Optional[str] = None) -> "FileModel":

    target_path: str = os.path.abspath(target)
    os.makedirs(os.path.dirname(target_path), exist_ok=True)

    shutil.copy2(self.path, target_path)
    fm = FileModel.load_file(target, file_name=new_name)

    if self._file_hash is not None:
        fm._file_hash = self._file_hash

    return fm
get_category_alias(self)
Source code in kiara/models/filesystem.py
def get_category_alias(self) -> str:
    return "instance.file_model"
load_file(source, file_name=None) classmethod

Utility method to read metadata of a file from disk and optionally move it into a data archive location.

Source code in kiara/models/filesystem.py
@classmethod
def load_file(
    cls,
    source: str,
    file_name: Optional[str] = None,
    # import_time: Optional[datetime.datetime] = None,
):
    """Utility method to read metadata of a file from disk and optionally move it into a data archive location."""

    import filetype
    import mimetypes

    if not source:
        raise ValueError("No source path provided.")

    if not os.path.exists(os.path.realpath(source)):
        raise ValueError(f"Path does not exist: {source}")

    if not os.path.isfile(os.path.realpath(source)):
        raise ValueError(f"Path is not a file: {source}")

    if file_name is None:
        file_name = os.path.basename(source)

    path: str = os.path.abspath(source)
    # if import_time:
    #     file_import_time = import_time
    # else:
    #     file_import_time = datetime.datetime.now()  # TODO: timezone

    file_stats = os.stat(path)
    size = file_stats.st_size

    r = mimetypes.guess_type(path)
    if r[0] is not None:
        mime_type = r[0]
    else:
        _mime_type = filetype.guess(path)
        if not _mime_type:
            mime_type = "application/octet-stream"
        else:
            mime_type = _mime_type.MIME

    m = FileModel(
        # import_time=file_import_time,
        mime_type=mime_type,
        size=size,
        file_name=file_name,
    )
    m._path = path
    return m
read_bytes(self, length=-1)

Read the content of a file.

Source code in kiara/models/filesystem.py
def read_bytes(self, length: int = -1) -> bytes:
    """Read the content of a file."""

    with open(self.path, "rb") as f:
        if length <= 0:
            content = f.read()
        else:
            content = f.read(length)
    return content
read_text(self, max_lines=-1)

Read the content of a file.

Source code in kiara/models/filesystem.py
def read_text(self, max_lines: int = -1) -> str:
    """Read the content of a file."""

    with open(self.path, "rt") as f:
        if max_lines <= 0:
            content = f.read()
        else:
            content = "".join((next(f) for x in range(max_lines)))
    return content

FolderImportConfig (BaseModel) pydantic-model

Source code in kiara/models/filesystem.py
class FolderImportConfig(BaseModel):

    include_files: Optional[List[str]] = Field(
        description="A list of strings, include all files where the filename ends with that string.",
        default=None,
    )
    exclude_dirs: Optional[List[str]] = Field(
        description="A list of strings, exclude all folders whose name ends with that string.",
        default=None,
    )
    exclude_files: Optional[List[str]] = Field(
        description=f"A list of strings, exclude all files that match those (takes precedence over 'include_files'). Defaults to: {DEFAULT_EXCLUDE_FILES}.",
        default=DEFAULT_EXCLUDE_FILES,
    )

Attributes

exclude_dirs: List[str] pydantic-field

A list of strings, exclude all folders whose name ends with that string.

exclude_files: List[str] pydantic-field

A list of strings, exclude all files that match those (takes precedence over 'include_files'). Defaults to: ['.DS_Store'].

include_files: List[str] pydantic-field

A list of strings, include all files where the filename ends with that string.