Skip to content

metadata_models

This module contains the metadata models that are used in the kiara_modules.core package.

Metadata models are convenience wrappers that make it easier for kiara to find, create, manage and version metadata that is attached to data, as well as kiara modules. It is possible to register metadata using a JSON schema string, but it is recommended to create a metadata model, because it is much easier overall.

Metadata models must be a sub-class of kiara.metadata.MetadataModel.

ArrayMetadata (HashedMetadataModel) pydantic-model

Describes properties fo the 'array' type.

Source code in core/metadata_models.py
class ArrayMetadata(HashedMetadataModel):
    """Describes properties fo the 'array' type."""

    _metadata_key: typing.ClassVar[str] = "array"

    length: int = Field(description="The number of elements the array contains.")
    size: int = Field(
        description="Total number of bytes consumed by the elements of the array."
    )

    def _obj_to_hash(self) -> typing.Any:
        return {"length": self.length, "size": self.size}

    def get_category_alias(self) -> str:
        return "instance.metadata.array"

length: int pydantic-field required

The number of elements the array contains.

size: int pydantic-field required

Total number of bytes consumed by the elements of the array.

ColumnSchema (BaseModel) pydantic-model

Describes properties of a single column of the 'table' data type.

Source code in core/metadata_models.py
class ColumnSchema(BaseModel):
    """Describes properties of a single column of the 'table' data type."""

    _metadata_key: typing.ClassVar[str] = "column"

    type_name: str = Field(
        description="The type name of the column (backend-specific)."
    )
    metadata: typing.Dict[str, typing.Any] = Field(
        description="Other metadata for the column.", default_factory=dict
    )

metadata: Dict[str, Any] pydantic-field

Other metadata for the column.

type_name: str pydantic-field required

The type name of the column (backend-specific).

FolderImportConfig (BaseModel) pydantic-model

Source code in core/metadata_models.py
class FolderImportConfig(BaseModel):

    include_files: typing.Optional[typing.List[str]] = Field(
        description="A list of strings, include all files where the filename ends with that string.",
        default=None,
    )
    exclude_dirs: typing.Optional[typing.List[str]] = Field(
        description="A list of strings, exclude all folders whose name ends with that string.",
        default=None,
    )
    exclude_files: typing.Optional[typing.List[str]] = Field(
        description=f"A list of strings, exclude all files that match those (takes precedence over 'include_files'). Defaults to: {DEFAULT_EXCLUDE_FILES}.",
        default=DEFAULT_EXCLUDE_FILES,
    )

exclude_dirs: List[str] pydantic-field

A list of strings, exclude all folders whose name ends with that string.

exclude_files: List[str] pydantic-field

A list of strings, exclude all files that match those (takes precedence over 'include_files'). Defaults to: ['.DS_Store'].

include_files: List[str] pydantic-field

A list of strings, include all files where the filename ends with that string.

KiaraDatabase (MetadataModel) pydantic-model

Source code in core/metadata_models.py
class KiaraDatabase(MetadataModel):

    _metadata_key: typing.ClassVar[str] = "database"

    @classmethod
    def create_in_temp_dir(cls, init_sql: typing.Optional[str] = None):

        temp_f = tempfile.mkdtemp()
        db_path = os.path.join(temp_f, "db.sqlite")

        def cleanup():
            shutil.rmtree(db_path, ignore_errors=True)

        atexit.register(cleanup)

        db = cls(db_file_path=db_path)
        db.create_if_not_exists()

        if init_sql:
            db.execute_sql(sql_script=init_sql, invalidate=True)

        return db

    db_file_path: str = Field(description="The path to the sqlite database file.")
    _cached_engine = PrivateAttr(default=None)
    _cached_inspector = PrivateAttr(default=None)
    _table_names = PrivateAttr(default=None)
    _table_schemas = PrivateAttr(default=None)

    def get_id(self) -> str:
        return self.db_file_path

    def get_category_alias(self) -> str:
        return "instance.metadata.database"

    @validator("db_file_path", allow_reuse=True)
    def ensure_absolute_path(cls, path: str):

        path = os.path.abspath(path)
        if not os.path.exists(os.path.dirname(path)):
            raise ValueError(f"Parent folder for database file does not exist: {path}")
        return path

    @property
    def db_url(self) -> str:
        return f"sqlite:///{self.db_file_path}"

    def get_sqlalchemy_engine(self) -> "Engine":

        if self._cached_engine is not None:
            return self._cached_engine

        from sqlalchemy import create_engine

        self._cached_engine = create_engine(self.db_url, future=True)
        # with self._cached_engine.connect() as con:
        #     con.execute(text("PRAGMA query_only = ON"))

        return self._cached_engine

    def create_if_not_exists(self):

        from sqlalchemy_utils import create_database, database_exists

        if not database_exists(self.db_url):
            create_database(self.db_url)

    def execute_sql(self, sql_script: str, invalidate: bool = False):
        """Execute an sql script.

        Arguments:
          sql_script: the sql script
          invalidate: whether to invalidate cached values within this object
        """

        self.create_if_not_exists()
        conn = self.get_sqlalchemy_engine().raw_connection()
        cursor = conn.cursor()
        cursor.executescript(sql_script)
        conn.commit()
        conn.close()

        if invalidate:
            self._cached_inspector = None
            self._table_names = None
            self._table_schemas = None

    def copy_database_file(self, target: str):

        os.makedirs(os.path.dirname(target))

        shutil.copy2(self.db_file_path, target)

        new_db = KiaraDatabase(db_file_path=target)
        return new_db

    def get_sqlalchemy_inspector(self) -> "Inspector":

        if self._cached_inspector is not None:
            return self._cached_inspector

        self._cached_inspector = inspect(self.get_sqlalchemy_engine())
        return self._cached_inspector

    @property
    def table_names(self) -> typing.Iterable[str]:
        if self._table_names is not None:
            return self._table_names

        self._table_names = self.get_sqlalchemy_inspector().get_table_names()
        return self._table_names

    def get_schema_for_table(self, table_name: str):

        if self._table_schemas is not None:
            if table_name not in self._table_schemas.keys():
                raise Exception(
                    f"Can't get table schema, database does not contain table with name '{table_name}'."
                )
            return self._table_schemas[table_name]

        ts: typing.Dict[str, typing.Dict[str, typing.Any]] = {}
        inspector = self.get_sqlalchemy_inspector()
        for tn in inspector.get_table_names():
            columns = self.get_sqlalchemy_inspector().get_columns(tn)
            ts[tn] = {}
            for c in columns:
                ts[tn][c["name"]] = c

        self._table_schemas = ts
        if table_name not in self._table_schemas.keys():
            raise Exception(
                f"Can't get table schema, database does not contain table with name '{table_name}'."
            )

        return self._table_schemas[table_name]

db_file_path: str pydantic-field required

The path to the sqlite database file.

execute_sql(self, sql_script, invalidate=False)

Execute an sql script.

Parameters:

Name Type Description Default
sql_script str

the sql script

required
invalidate bool

whether to invalidate cached values within this object

False
Source code in core/metadata_models.py
def execute_sql(self, sql_script: str, invalidate: bool = False):
    """Execute an sql script.

    Arguments:
      sql_script: the sql script
      invalidate: whether to invalidate cached values within this object
    """

    self.create_if_not_exists()
    conn = self.get_sqlalchemy_engine().raw_connection()
    cursor = conn.cursor()
    cursor.executescript(sql_script)
    conn.commit()
    conn.close()

    if invalidate:
        self._cached_inspector = None
        self._table_names = None
        self._table_schemas = None

KiaraDatabaseInfo (HashedMetadataModel) pydantic-model

Source code in core/metadata_models.py
class KiaraDatabaseInfo(HashedMetadataModel):

    _metadata_key: typing.ClassVar[str] = "database_info"

    table_names: typing.List[str] = Field(
        description="The names of all tables in this database."
    )
    view_names: typing.List[str] = Field(
        description="The names of all views in this database."
    )
    tables: typing.Dict[str, TableMetadata] = Field(
        description="Information about the tables within this database."
    )
    size: int = Field(description="The size of the database file.")

    def _obj_to_hash(self) -> typing.Any:
        return {
            "table_names": self.table_names,
            "view_names": self.view_names,
            "tables": self.tables,
            "size": self.size,
        }

    def get_category_alias(self) -> str:
        return "instance.metadata.database_info"

size: int pydantic-field required

The size of the database file.

table_names: List[str] pydantic-field required

The names of all tables in this database.

tables: Dict[str, kiara_modules.core.metadata_models.TableMetadata] pydantic-field required

Information about the tables within this database.

view_names: List[str] pydantic-field required

The names of all views in this database.

KiaraFile (MetadataModel) pydantic-model

Describes properties for the 'file' value type.

Source code in core/metadata_models.py
class KiaraFile(MetadataModel):
    """Describes properties for the 'file' value type."""

    _metadata_key: typing.ClassVar[str] = "file"

    @classmethod
    def load_file(
        cls,
        source: str,
        target: typing.Optional[str] = None,
        incl_orig_path: bool = False,
    ):
        """Utility method to read metadata of a file from disk and optionally move it into a data archive location."""

        import mimetypes

        import filetype

        if not source:
            raise ValueError("No source path provided.")

        if not os.path.exists(os.path.realpath(source)):
            raise ValueError(f"Path does not exist: {source}")

        if not os.path.isfile(os.path.realpath(source)):
            raise ValueError(f"Path is not a file: {source}")

        orig_filename = os.path.basename(source)
        orig_path: str = os.path.abspath(source)
        file_import_time = datetime.datetime.now().isoformat()  # TODO: timezone

        file_stats = os.stat(orig_path)
        size = file_stats.st_size

        if target:
            if os.path.exists(target):
                raise ValueError(f"Target path exists: {target}")
            os.makedirs(os.path.dirname(target), exist_ok=True)
            shutil.copy2(source, target)
        else:
            target = orig_path

        r = mimetypes.guess_type(target)
        if r[0] is not None:
            mime_type = r[0]
        else:
            _mime_type = filetype.guess(target)
            if not _mime_type:
                mime_type = "application/octet-stream"
            else:
                mime_type = _mime_type.MIME

        if not incl_orig_path:
            _orig_path: typing.Optional[str] = None
        else:
            _orig_path = orig_path

        m = KiaraFile(
            orig_filename=orig_filename,
            orig_path=_orig_path,
            import_time=file_import_time,
            mime_type=mime_type,
            size=size,
            file_name=orig_filename,
            path=target,
        )
        return m

    _file_hash: typing.Optional[str] = PrivateAttr(default=None)

    orig_filename: str = Field(
        description="The original filename of this file at the time of import."
    )
    orig_path: typing.Optional[str] = Field(
        description="The original path to this file at the time of import.",
        default=None,
    )
    import_time: str = Field(description="The time when the file was imported.")
    mime_type: str = Field(description="The mime type of the file.")
    file_name: str = Field("The name of the file.")
    size: int = Field(description="The size of the file.")
    path: str = Field(description="The archive path of the file.")
    is_onboarded: bool = Field(
        description="Whether the file is imported into the kiara data store.",
        default=False,
    )

    def get_id(self) -> str:
        return self.path

    def get_category_alias(self) -> str:
        return "instance.metadata.file"

    def copy_file(
        self, target: str, incl_orig_path: bool = False, is_onboarded: bool = False
    ):

        fm = KiaraFile.load_file(self.path, target)
        if incl_orig_path:
            fm.orig_path = self.orig_path
        else:
            fm.orig_path = None
        fm.orig_filename = self.orig_filename
        fm.import_time = self.import_time
        if self._file_hash is not None:
            fm._file_hash = self._file_hash

        fm.is_onboarded = is_onboarded

        return fm

    @property
    def file_hash(self):

        if self._file_hash is not None:
            return self._file_hash

        sha256_hash = hashlib.sha3_256()
        with open(self.path, "rb") as f:
            # Read and update hash string value in blocks of 4K
            for byte_block in iter(lambda: f.read(4096), b""):
                sha256_hash.update(byte_block)

        self._file_hash = sha256_hash.hexdigest()
        return self._file_hash

    @property
    def file_name_without_extension(self) -> str:

        return self.file_name.split(".")[0]

    @property
    def import_time_as_datetime(self) -> datetime.datetime:
        from dateutil import parser

        return parser.parse(self.import_time)

    def read_content(
        self, as_str: bool = True, max_lines: int = -1
    ) -> typing.Union[str, bytes]:
        """Read the content of a file."""

        mode = "r" if as_str else "rb"

        with open(self.path, mode) as f:
            if max_lines <= 0:
                content = f.read()
            else:
                content = "".join((next(f) for x in range(max_lines)))
        return content

    def __repr__(self):
        return f"FileMetadata(name={self.file_name})"

    def __str__(self):
        return self.__repr__()

import_time: str pydantic-field required

The time when the file was imported.

is_onboarded: bool pydantic-field

Whether the file is imported into the kiara data store.

mime_type: str pydantic-field required

The mime type of the file.

orig_filename: str pydantic-field required

The original filename of this file at the time of import.

orig_path: str pydantic-field

The original path to this file at the time of import.

path: str pydantic-field required

The archive path of the file.

size: int pydantic-field required

The size of the file.

__repr__(self) special

Return repr(self).

Source code in core/metadata_models.py
def __repr__(self):
    return f"FileMetadata(name={self.file_name})"

__str__(self) special

Return str(self).

Source code in core/metadata_models.py
def __str__(self):
    return self.__repr__()

load_file(source, target=None, incl_orig_path=False) classmethod

Utility method to read metadata of a file from disk and optionally move it into a data archive location.

Source code in core/metadata_models.py
@classmethod
def load_file(
    cls,
    source: str,
    target: typing.Optional[str] = None,
    incl_orig_path: bool = False,
):
    """Utility method to read metadata of a file from disk and optionally move it into a data archive location."""

    import mimetypes

    import filetype

    if not source:
        raise ValueError("No source path provided.")

    if not os.path.exists(os.path.realpath(source)):
        raise ValueError(f"Path does not exist: {source}")

    if not os.path.isfile(os.path.realpath(source)):
        raise ValueError(f"Path is not a file: {source}")

    orig_filename = os.path.basename(source)
    orig_path: str = os.path.abspath(source)
    file_import_time = datetime.datetime.now().isoformat()  # TODO: timezone

    file_stats = os.stat(orig_path)
    size = file_stats.st_size

    if target:
        if os.path.exists(target):
            raise ValueError(f"Target path exists: {target}")
        os.makedirs(os.path.dirname(target), exist_ok=True)
        shutil.copy2(source, target)
    else:
        target = orig_path

    r = mimetypes.guess_type(target)
    if r[0] is not None:
        mime_type = r[0]
    else:
        _mime_type = filetype.guess(target)
        if not _mime_type:
            mime_type = "application/octet-stream"
        else:
            mime_type = _mime_type.MIME

    if not incl_orig_path:
        _orig_path: typing.Optional[str] = None
    else:
        _orig_path = orig_path

    m = KiaraFile(
        orig_filename=orig_filename,
        orig_path=_orig_path,
        import_time=file_import_time,
        mime_type=mime_type,
        size=size,
        file_name=orig_filename,
        path=target,
    )
    return m

read_content(self, as_str=True, max_lines=-1)

Read the content of a file.

Source code in core/metadata_models.py
def read_content(
    self, as_str: bool = True, max_lines: int = -1
) -> typing.Union[str, bytes]:
    """Read the content of a file."""

    mode = "r" if as_str else "rb"

    with open(self.path, mode) as f:
        if max_lines <= 0:
            content = f.read()
        else:
            content = "".join((next(f) for x in range(max_lines)))
    return content

KiaraFileBundle (MetadataModel) pydantic-model

Describes properties for the 'file_bundle' value type.

Source code in core/metadata_models.py
class KiaraFileBundle(MetadataModel):
    """Describes properties for the 'file_bundle' value type."""

    _metadata_key: typing.ClassVar[str] = "file_bundle"

    @classmethod
    def import_folder(
        cls,
        source: str,
        target: typing.Optional[str] = None,
        import_config: typing.Union[
            None, typing.Mapping[str, typing.Any], FolderImportConfig
        ] = None,
        incl_orig_path: bool = False,
    ):

        if not source:
            raise ValueError("No source path provided.")

        if not os.path.exists(os.path.realpath(source)):
            raise ValueError(f"Path does not exist: {source}")

        if not os.path.isdir(os.path.realpath(source)):
            raise ValueError(f"Path is not a file: {source}")

        if target and os.path.exists(target):
            raise ValueError(f"Target path already exists: {target}")

        if source.endswith(os.path.sep):
            source = source[0:-1]

        if target and target.endswith(os.path.sep):
            target = target[0:-1]

        if import_config is None:
            _import_config = FolderImportConfig()
        elif isinstance(import_config, typing.Mapping):
            _import_config = FolderImportConfig(**import_config)
        elif isinstance(import_config, FolderImportConfig):
            _import_config = import_config
        else:
            raise TypeError(
                f"Invalid type for folder import config: {type(import_config)}."
            )

        included_files: typing.Dict[str, KiaraFile] = {}
        exclude_dirs = _import_config.exclude_dirs
        invalid_extensions = _import_config.exclude_files

        valid_extensions = _import_config.include_files

        sum_size = 0

        def include_file(filename: str) -> bool:

            if invalid_extensions and any(
                filename.endswith(ext) for ext in invalid_extensions
            ):
                return False
            if not valid_extensions:
                return True
            else:
                return any(filename.endswith(ext) for ext in valid_extensions)

        for root, dirnames, filenames in os.walk(source, topdown=True):

            if exclude_dirs:
                dirnames[:] = [d for d in dirnames if d not in exclude_dirs]

            for filename in [
                f
                for f in filenames
                if os.path.isfile(os.path.join(root, f)) and include_file(f)
            ]:

                full_path = os.path.join(root, filename)
                rel_path = os.path.relpath(full_path, source)
                if target:
                    target_path: typing.Optional[str] = os.path.join(target, rel_path)
                else:
                    target_path = None

                file_model = KiaraFile.load_file(
                    full_path, target_path, incl_orig_path=incl_orig_path
                )
                sum_size = sum_size + file_model.size
                included_files[rel_path] = file_model

        orig_bundle_name = os.path.basename(source)
        if incl_orig_path:
            orig_path: typing.Optional[str] = source
        else:
            orig_path = None

        if target:
            path = target
        else:
            path = source

        return KiaraFileBundle.create_from_file_models(
            files=included_files,
            orig_bundle_name=orig_bundle_name,
            orig_path=orig_path,
            path=path,
            sum_size=sum_size,
        )

    @classmethod
    def create_from_file_models(
        self,
        files: typing.Mapping[str, KiaraFile],
        orig_bundle_name: str,
        orig_path: typing.Optional[str],
        path: str,
        sum_size: typing.Optional[int] = None,
        is_onboarded: bool = False,
    ):

        result: typing.Dict[str, typing.Any] = {}

        result["included_files"] = files

        result["orig_path"] = orig_path
        result["path"] = path
        result["import_time"] = datetime.datetime.now().isoformat()
        result["number_of_files"] = len(files)
        result["bundle_name"] = os.path.basename(result["path"])
        result["orig_bundle_name"] = orig_bundle_name
        result["is_onboarded"] = is_onboarded

        if sum_size is None:
            sum_size = 0
            for f in files.values():
                sum_size = sum_size + f.size
        result["size"] = sum_size

        return KiaraFileBundle(**result)

    _file_bundle_hash: typing.Optional[str] = PrivateAttr(default=None)

    orig_bundle_name: str = Field(
        description="The original name of this folder at the time of import."
    )
    bundle_name: str = Field(description="The name of this bundle.")
    orig_path: typing.Optional[str] = Field(
        description="The original path to this folder at the time of import.",
        default=None,
    )
    import_time: str = Field(description="The time when the file was imported.")
    number_of_files: int = Field(
        description="How many files are included in this bundle."
    )
    included_files: typing.Dict[str, KiaraFile] = Field(
        description="A map of all the included files, incl. their properties. Uses the relative path of each file as key."
    )
    size: int = Field(description="The size of all files in this folder, combined.")
    path: str = Field(description="The archive path of the folder.")
    is_onboarded: bool = Field(
        description="Whether this bundle is imported into the kiara data store.",
        default=False,
    )

    def get_id(self) -> str:
        return self.path

    def get_category_alias(self) -> str:
        return "instance.metadata.file_bundle"

    def get_relative_path(self, file: KiaraFile):

        return os.path.relpath(file.path, self.path)

    def read_text_file_contents(
        self, ignore_errors: bool = False
    ) -> typing.Mapping[str, str]:

        content_dict: typing.Dict[str, str] = {}

        def read_file(rel_path: str, fm: KiaraFile):
            with open(fm.path, encoding="utf-8") as f:
                try:
                    content = f.read()
                    content_dict[rel_path] = content  # type: ignore
                except Exception as e:
                    if ignore_errors:
                        log_message(f"Can't read file: {e}")
                        log.warning(f"Ignoring file: {fm.path}")
                    else:
                        raise Exception(f"Can't read file (as text) '{fm.path}: {e}")

        # TODO: common ignore files and folders
        for f in self.included_files.values():
            rel_path = self.get_relative_path(f)
            read_file(rel_path=rel_path, fm=f)

        return content_dict

    @property
    def file_bundle_hash(self):

        if self._file_bundle_hash is not None:
            return self._file_bundle_hash

        # hash_format ="sha3-256"

        hashes = ""
        for path in sorted(self.included_files.keys()):
            fm = self.included_files[path]
            hashes = hashes + "_" + path + "_" + fm.file_hash

        self._file_bundle_hash = hashlib.sha3_256(hashes.encode("utf-8")).hexdigest()
        return self._file_bundle_hash

    def copy_bundle(
        self, target_path: str, incl_orig_path: bool = False, is_onboarded: bool = False
    ) -> "KiaraFileBundle":

        if target_path == self.path:
            raise Exception(f"Target path and current path are the same: {target_path}")

        result = {}
        for rel_path, item in self.included_files.items():
            _target_path = os.path.join(target_path, rel_path)
            new_fm = item.copy_file(_target_path, is_onboarded=is_onboarded)
            result[rel_path] = new_fm

        if incl_orig_path:
            orig_path = self.orig_path
        else:
            orig_path = None
        fb = KiaraFileBundle.create_from_file_models(
            result,
            orig_bundle_name=self.orig_bundle_name,
            orig_path=orig_path,
            path=target_path,
            sum_size=self.size,
            is_onboarded=is_onboarded,
        )
        if self._file_bundle_hash is not None:
            fb._file_bundle_hash = self._file_bundle_hash

        return fb

    def __repr__(self):
        return f"FileBundle(name={self.bundle_name})"

    def __str__(self):
        return self.__repr__()

bundle_name: str pydantic-field required

The name of this bundle.

import_time: str pydantic-field required

The time when the file was imported.

included_files: Dict[str, kiara_modules.core.metadata_models.KiaraFile] pydantic-field required

A map of all the included files, incl. their properties. Uses the relative path of each file as key.

is_onboarded: bool pydantic-field

Whether this bundle is imported into the kiara data store.

number_of_files: int pydantic-field required

How many files are included in this bundle.

orig_bundle_name: str pydantic-field required

The original name of this folder at the time of import.

orig_path: str pydantic-field

The original path to this folder at the time of import.

path: str pydantic-field required

The archive path of the folder.

size: int pydantic-field required

The size of all files in this folder, combined.

__repr__(self) special

Return repr(self).

Source code in core/metadata_models.py
def __repr__(self):
    return f"FileBundle(name={self.bundle_name})"

__str__(self) special

Return str(self).

Source code in core/metadata_models.py
def __str__(self):
    return self.__repr__()

TableMetadata (HashedMetadataModel) pydantic-model

Describes properties for the 'table' data type.

Source code in core/metadata_models.py
class TableMetadata(HashedMetadataModel):
    """Describes properties for the 'table' data type."""

    _metadata_key: typing.ClassVar[str] = "table"

    column_names: typing.List[str] = Field(
        description="The name of the columns of the table."
    )
    column_schema: typing.Dict[str, ColumnSchema] = Field(
        description="The schema description of the table."
    )
    rows: int = Field(description="The number of rows the table contains.")
    size: typing.Optional[int] = Field(
        description="The tables size in bytes.", default=None
    )

    def _obj_to_hash(self) -> typing.Any:

        return {
            "column_names": self.column_names,
            "column_schemas": {k: v.dict() for k, v in self.column_schema.items()},
            "rows": self.rows,
            "size": self.size,
        }

    def get_category_alias(self) -> str:
        return "instance.metadata.table"

column_names: List[str] pydantic-field required

The name of the columns of the table.

column_schema: Dict[str, kiara_modules.core.metadata_models.ColumnSchema] pydantic-field required

The schema description of the table.

rows: int pydantic-field required

The number of rows the table contains.

size: int pydantic-field

The tables size in bytes.