Skip to content

core

Top-level package for kiara_modules.core.

array special

ArrayMetadataModule (ExtractMetadataModule)

Extract metadata from an 'array' value.

Source code in core/array/__init__.py
class ArrayMetadataModule(ExtractMetadataModule):
    """Extract metadata from an 'array' value."""

    _module_type_name = "metadata"

    @classmethod
    def _get_supported_types(cls) -> str:
        return "array"

    @classmethod
    def get_metadata_key(cls) -> str:
        return "array"

    def _get_metadata_schema(
        self, type: str
    ) -> typing.Union[str, typing.Type[BaseModel]]:
        return ArrayMetadata

    def extract_metadata(self, value: Value) -> typing.Mapping[str, typing.Any]:

        import pyarrow as pa

        array: pa.Array = value.get_value_data()

        return {
            "length": len(array),
            "size": array.nbytes,
        }

MapModule (KiaraModule)

Map a list of values into another list of values.

This module must be configured with the type (and optional) configuration of another kiara module. This 'child' module will then be used to compute the array items of the result.

Source code in core/array/__init__.py
class MapModule(KiaraModule):
    """Map a list of values into another list of values.

    This module must be configured with the type (and optional) configuration of another *kiara* module. This 'child'
    module will then be used to compute the array items of the result.
    """

    _config_cls = MapModuleConfig

    def module_instance_doc(self) -> str:

        config: MapModuleConfig = self.config  # type: ignore

        module_type = config.module_type
        module_config = config.module_config

        m = self._kiara.create_module(
            module_type=module_type, module_config=module_config
        )
        type_md = m.get_type_metadata()
        doc = type_md.documentation.full_doc
        link = type_md.context.get_url_for_reference("module_doc")
        if not link:
            link_str = f"``{module_type}``"
        else:
            link_str = f"[``{module_type}``]({link})"

        result = f"""Map the values of the input list onto a new list of the same length, using the {link_str} module."""

        if doc and doc != "-- n/a --":
            result = result + f"\n\n``{module_type}`` documentation:\n\n{doc}"
        return result

    def __init__(self, *args, **kwargs):

        self._child_module: typing.Optional[KiaraModule] = None
        self._module_input_name: typing.Optional[str] = None
        self._module_output_name: typing.Optional[str] = None
        super().__init__(*args, **kwargs)

    @property
    def child_module(self) -> KiaraModule:

        if self._child_module is not None:
            return self._child_module

        module_name = self.get_config_value("module_type")
        module_config = self.get_config_value("module_config")
        self._child_module = self._kiara.create_module(
            id="map_module_child", module_type=module_name, module_config=module_config
        )
        return self._child_module

    @property
    def module_input_name(self) -> str:

        if self._module_input_name is not None:
            return self._module_input_name

        self._module_input_name = self.get_config_value("input_name")
        if self._module_input_name is None:
            if len(list(self.child_module.input_names)) == 1:
                self._module_input_name = next(iter(self.child_module.input_names))
            else:
                raise KiaraProcessingException(
                    f"No 'input_name' specified, and configured module has more than one inputs. Please specify an 'input_name' value in your module config, pick one of: {', '.join(self.child_module.input_names)}"
                )

        return self._module_input_name

    @property
    def module_output_name(self) -> str:

        if self._module_output_name is not None:
            return self._module_output_name

        self._module_output_name = self.get_config_value("output_name")
        if self._module_output_name is None:
            if len(list(self.child_module.output_names)) == 1:
                self._module_output_name = next(iter(self.child_module.output_names))
            else:
                raise KiaraProcessingException(
                    f"No 'output_name' specified, and configured module has more than one outputs. Please specify an 'output_name' value in your module config, pick one of: {', '.join(self.child_module.output_names)}"
                )

        return self._module_output_name

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        inputs: typing.Dict[
            str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
        ] = {
            "array": {
                "type": "array",
                "doc": "The array containing the values the filter is applied on.",
            }
        }
        for input_name, schema in self.child_module.input_schemas.items():
            assert input_name != "array"
            if input_name == self.module_input_name:
                continue
            inputs[input_name] = schema
        return inputs

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        outputs = {
            "array": {
                "type": "array",
                "doc": "An array of equal length to the input array, containing the 'mapped' values.",
            }
        }
        return outputs

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        import pyarrow as pa

        input_array: pa.Array = inputs.get_value_data("array")

        init_data: typing.Dict[str, typing.Any] = {}
        for input_name in self.input_schemas.keys():
            if input_name in ["array", self.module_input_name]:
                continue

            init_data[input_name] = inputs.get_value_obj(input_name)

        result_list = map_with_module(
            input_array,
            module_input_name=self.module_input_name,
            module_obj=self.child_module,
            init_data=init_data,
            module_output_name=self.module_output_name,
        )
        outputs.set_value("array", pa.array(result_list))

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/array/__init__.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    inputs: typing.Dict[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ] = {
        "array": {
            "type": "array",
            "doc": "The array containing the values the filter is applied on.",
        }
    }
    for input_name, schema in self.child_module.input_schemas.items():
        assert input_name != "array"
        if input_name == self.module_input_name:
            continue
        inputs[input_name] = schema
    return inputs

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/array/__init__.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    outputs = {
        "array": {
            "type": "array",
            "doc": "An array of equal length to the input array, containing the 'mapped' values.",
        }
    }
    return outputs

module_instance_doc(self)

Return documentation for this instance of the module.

If not overwritten, will return this class' method doc().

Source code in core/array/__init__.py
def module_instance_doc(self) -> str:

    config: MapModuleConfig = self.config  # type: ignore

    module_type = config.module_type
    module_config = config.module_config

    m = self._kiara.create_module(
        module_type=module_type, module_config=module_config
    )
    type_md = m.get_type_metadata()
    doc = type_md.documentation.full_doc
    link = type_md.context.get_url_for_reference("module_doc")
    if not link:
        link_str = f"``{module_type}``"
    else:
        link_str = f"[``{module_type}``]({link})"

    result = f"""Map the values of the input list onto a new list of the same length, using the {link_str} module."""

    if doc and doc != "-- n/a --":
        result = result + f"\n\n``{module_type}`` documentation:\n\n{doc}"
    return result

MapModuleConfig (ModuleTypeConfigSchema) pydantic-model

Source code in core/array/__init__.py
class MapModuleConfig(ModuleTypeConfigSchema):

    module_type: str = Field(
        description="The name of the kiara module to use to filter the input data."
    )
    module_config: typing.Optional[typing.Dict[str, typing.Any]] = Field(
        description="The config for the kiara filter module.", default_factory=dict
    )
    input_name: typing.Optional[str] = Field(
        description="The name of the input name of the module which will receive the items from our input array. Can be omitted if the configured module only has a single input.",
        default=None,
    )
    output_name: typing.Optional[str] = Field(
        description="The name of the output name of the module which will receive the items from our input array. Can be omitted if the configured module only has a single output.",
        default=None,
    )

input_name: str pydantic-field

The name of the input name of the module which will receive the items from our input array. Can be omitted if the configured module only has a single input.

module_config: Dict[str, Any] pydantic-field

The config for the kiara filter module.

module_type: str pydantic-field required

The name of the kiara module to use to filter the input data.

output_name: str pydantic-field

The name of the output name of the module which will receive the items from our input array. Can be omitted if the configured module only has a single output.

SampleArrayModule (SampleValueModule)

Sample an array.

Samples are used to randomly select a subset of a dataset, which helps test queries and workflows on smaller versions of the original data, to adjust parameters before a full run.

Source code in core/array/__init__.py
class SampleArrayModule(SampleValueModule):
    """Sample an array.

    Samples are used to randomly select a subset of a dataset, which helps test queries and workflows on smaller versions
    of the original data, to adjust parameters before a full run.
    """

    _module_type_name = "sample"

    @classmethod
    def get_value_type(cls) -> str:
        return "array"

    # def create_input_schema(
    #     self,
    # ) -> typing.Mapping[
    #     str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    # ]:
    #
    #     return {
    #         "table": {"type": "table", "doc": "The table to sample data from."},
    #         "sample_size": {
    #             "type": "integer",
    #             "doc": "The percentage or number of rows to sample (depending on 'sample_unit' input).",
    #         }
    #     }
    #
    # def create_output_schema(
    #     self,
    # ) -> typing.Mapping[
    #     str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    # ]:
    #
    #     return {"sampled_table": {"type": "table", "doc": "A sampled table."}}

    def sample_percent(self, value: Value, sample_size: int):

        import duckdb
        import pyarrow as pa

        array: pa.Array = value.get_value_data()

        if sample_size >= 100:
            return array

        table = pa.Table.from_arrays([array], names=["column"])
        query = f"SELECT * FROM data USING SAMPLE {sample_size} PERCENT (bernoulli);"

        rel_from_arrow = duckdb.arrow(table)
        result: duckdb.DuckDBPyResult = rel_from_arrow.query("data", query)

        result_table: pa.Table = result.fetch_arrow_table()
        return result_table.column("column")

    def sample_rows(self, value: Value, sample_size: int):

        import duckdb
        import pyarrow as pa

        array: pa.Array = value.get_value_data()

        if sample_size >= len(array):
            return array

        table = pa.Table.from_arrays([array], names=["column"])
        query = f"SELECT * FROM data USING SAMPLE {sample_size};"

        rel_from_arrow = duckdb.arrow(table)
        result: duckdb.DuckDBPyResult = rel_from_arrow.query("data", query)

        result_table: pa.Table = result.fetch_arrow_table()
        return result_table.column("column")

    def sample_rows_from_start(self, value: Value, sample_size: int):

        import pyarrow as pa

        array: pa.Array = value.get_value_data()

        if sample_size >= len(array):
            return array

        result_array = array.slice(0, sample_size)
        return result_array

    def sample_rows_to_end(self, value: Value, sample_size: int):

        import pyarrow as pa

        array: pa.Array = value.get_value_data()

        if sample_size >= len(array):
            return array

        result_array = array.slice(len(array) - sample_size)
        return result_array

get_value_type() classmethod

Return the value type for this sample module.

Source code in core/array/__init__.py
@classmethod
def get_value_type(cls) -> str:
    return "array"

StoreArrayTypeModule (StoreValueTypeModule)

Save an Arrow array to a file.

This module wraps the input array into an Arrow Table, and saves this table as a feather file.

The output of this module is a dictionary representing the configuration to be used with kira to re-assemble the array object from disk.

Source code in core/array/__init__.py
class StoreArrayTypeModule(StoreValueTypeModule):
    """Save an Arrow array to a file.

    This module wraps the input array into an Arrow Table, and saves this table as a feather file.

    The output of this module is a dictionary representing the configuration to be used with *kira* to re-assemble
    the array object from disk.
    """

    _module_type_name = "store"

    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return "array"

    def store_value(self, value: Value, base_path: str):

        import pyarrow as pa
        from pyarrow import feather

        array: pa.Array = value.get_value_data()
        # folder = inputs.get_value_data("folder_path")
        # file_name = inputs.get_value_data("file_name")
        # column_name = inputs.get_value_data("column_name")

        path = os.path.join(base_path, ARRAY_SAVE_FILE_NAME)
        if os.path.exists(path):
            raise KiaraProcessingException(
                f"Can't write file, path already exists: {path}"
            )

        os.makedirs(os.path.dirname(path))

        table = pa.Table.from_arrays([array], names=[ARRAY_SAVE_COLUM_NAME])
        feather.write_feather(table, path)

        load_config = {
            "module_type": "array.restore",
            "inputs": {
                "base_path": base_path,
                "rel_path": ARRAY_SAVE_FILE_NAME,
                "format": "feather",
                "column_name": ARRAY_SAVE_COLUM_NAME,
            },
            "output_name": "array",
        }
        return load_config

store_value(self, value, base_path)

Save the value, and return the load config needed to load it again.

Source code in core/array/__init__.py
def store_value(self, value: Value, base_path: str):

    import pyarrow as pa
    from pyarrow import feather

    array: pa.Array = value.get_value_data()
    # folder = inputs.get_value_data("folder_path")
    # file_name = inputs.get_value_data("file_name")
    # column_name = inputs.get_value_data("column_name")

    path = os.path.join(base_path, ARRAY_SAVE_FILE_NAME)
    if os.path.exists(path):
        raise KiaraProcessingException(
            f"Can't write file, path already exists: {path}"
        )

    os.makedirs(os.path.dirname(path))

    table = pa.Table.from_arrays([array], names=[ARRAY_SAVE_COLUM_NAME])
    feather.write_feather(table, path)

    load_config = {
        "module_type": "array.restore",
        "inputs": {
            "base_path": base_path,
            "rel_path": ARRAY_SAVE_FILE_NAME,
            "format": "feather",
            "column_name": ARRAY_SAVE_COLUM_NAME,
        },
        "output_name": "array",
    }
    return load_config

bytes special

LoadBytesModule (KiaraModule)

Source code in core/bytes/__init__.py
class LoadBytesModule(KiaraModule):

    _module_type_name = "load"

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {
            "base_path": {
                "type": "string",
                "doc": "The base path to the file to read.",
            },
            "rel_path": {
                "type": "string",
                "doc": "The relative path of the file, within the base path.",
            },
        }

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {"bytes": {"type": "bytes", "doc": "The content of the file."}}

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        path = inputs.get_value_data("path")

        if not os.path.exists(path):
            raise KiaraProcessingException(
                f"Can't read file, path does not exist: {path}"
            )

        with open(path, "rb") as f:
            content = f.read()

        outputs.set_value("bytes", content)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/bytes/__init__.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {
        "base_path": {
            "type": "string",
            "doc": "The base path to the file to read.",
        },
        "rel_path": {
            "type": "string",
            "doc": "The relative path of the file, within the base path.",
        },
    }

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/bytes/__init__.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {"bytes": {"type": "bytes", "doc": "The content of the file."}}

StoreBytesTypeModule (StoreValueTypeModule)

Source code in core/bytes/__init__.py
class StoreBytesTypeModule(StoreValueTypeModule):

    _module_type_name = "store"

    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return "bytes"

    def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:

        path = os.path.join(base_path, BYTES_SAVE_FILE_NAME)

        if os.path.exists(path):
            raise KiaraProcessingException(
                f"Can't write bytes, target path already exists: {path}"
            )

        os.makedirs(os.path.dirname(path), exist_ok=True)

        bytes = value.get_value_data()

        with open(path, "wb") as f:
            f.write(bytes)

        load_config = {
            "module_type": "bytes.load",
            "inputs": {"base_path": base_path, "rel_path": BYTES_SAVE_FILE_NAME},
            "output_name": "bytes",
        }
        return load_config

store_value(self, value, base_path)

Save the value, and return the load config needed to load it again.

Source code in core/bytes/__init__.py
def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:

    path = os.path.join(base_path, BYTES_SAVE_FILE_NAME)

    if os.path.exists(path):
        raise KiaraProcessingException(
            f"Can't write bytes, target path already exists: {path}"
        )

    os.makedirs(os.path.dirname(path), exist_ok=True)

    bytes = value.get_value_data()

    with open(path, "wb") as f:
        f.write(bytes)

    load_config = {
        "module_type": "bytes.load",
        "inputs": {"base_path": base_path, "rel_path": BYTES_SAVE_FILE_NAME},
        "output_name": "bytes",
    }
    return load_config

msgpack

DeserializeFromMsgPackModule (KiaraModule)

Source code in core/bytes/msgpack.py
class DeserializeFromMsgPackModule(KiaraModule):

    _module_type_name = "to_value"
    _config_cls = SerializeToMsgPackModuleConfig

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {"bytes": {"type": "bytes", "doc": "The msgpack-serialized value."}}

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {
            "value_type": {"type": "string", "doc": "The type of the value."},
            "value_data": {
                "type": "any",
                "doc": f"The {self.get_config_value('value_type')} value.",
            },
            "value_metadata": {
                "type": "dict",
                "doc": "A dictionary with metadata of the serialized table. The result dict has the metadata key as key, and two sub-values under each key: 'metadata_item' (the actual metadata) and 'metadata_item_schema' (the schema for the metadata).",
            },
        }

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        import msgpack

        msg = inputs.get_value_data("bytes")

        unpacked = msgpack.unpackb(msg, raw=False)

        value_type = unpacked["value_type"]
        outputs.set_value("value_type", value_type)

        metadata = unpacked["metadata"]
        outputs.set_value("value_metadata", metadata)

        new_data = unpacked["data"]

        if not hasattr(self, f"to_{value_type}"):
            raise KiaraProcessingException(
                f"Value type not supported for msgpack deserialization: {value_type}"
            )

        func = getattr(self, f"to_{value_type}")
        obj = func(data=new_data)
        outputs.set_value("value_data", obj)

    def to_table(self, data: bytes) -> typing.Any:

        import pyarrow as pa

        reader = pa.ipc.open_stream(data)

        batches = [b for b in reader]
        new_table = pa.Table.from_batches(batches)

        return new_table

    def to_boolean(self, data: bytes):

        return data
create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/bytes/msgpack.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {"bytes": {"type": "bytes", "doc": "The msgpack-serialized value."}}
create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/bytes/msgpack.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {
        "value_type": {"type": "string", "doc": "The type of the value."},
        "value_data": {
            "type": "any",
            "doc": f"The {self.get_config_value('value_type')} value.",
        },
        "value_metadata": {
            "type": "dict",
            "doc": "A dictionary with metadata of the serialized table. The result dict has the metadata key as key, and two sub-values under each key: 'metadata_item' (the actual metadata) and 'metadata_item_schema' (the schema for the metadata).",
        },
    }

SerializeToMsgPackModule (KiaraModule)

Source code in core/bytes/msgpack.py
class SerializeToMsgPackModule(KiaraModule):

    _module_type_name = "from_value"
    _config_cls = SerializeToMsgPackModuleConfig

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {
            "value_item": {
                "type": self.config.get("value_type"),
                "doc": f"A {self.get_config_value('value_type')} value.",
            }
        }

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {
            "bytes": {
                "type": "bytes",
                "doc": f"The msgpack-serialized {self.get_config_value('value_type')} value.",
            }
        }

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        import msgpack

        type_name: str = self.get_config_value("value_type")

        if not hasattr(self, f"from_{type_name}"):
            raise KiaraProcessingException(
                f"Value type not supported for msgpack serialization: {type_name}"
            )

        func = getattr(self, f"from_{type_name}")

        value = inputs.get_value_obj("value_item")

        metadata = value.get_metadata(also_return_schema=True)

        msg = func(value=value)
        data = {"value_type": value.type_name, "metadata": metadata, "data": msg}

        msg = msgpack.packb(data, use_bin_type=True)

        outputs.set_value("bytes", msg)

    def from_table(self, value: Value) -> bytes:

        import pyarrow as pa

        table_val: Value = value
        table: pa.Table = table_val.get_value_data()

        sink = pa.BufferOutputStream()
        writer = pa.ipc.new_stream(sink, table.schema)

        writer.write(table)
        writer.close()

        buf: pa.Buffer = sink.getvalue()
        return memoryview(buf)

    def from_boolean(self, value: Value) -> bytes:

        return value.get_value_data()
create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/bytes/msgpack.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {
        "value_item": {
            "type": self.config.get("value_type"),
            "doc": f"A {self.get_config_value('value_type')} value.",
        }
    }
create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/bytes/msgpack.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {
        "bytes": {
            "type": "bytes",
            "doc": f"The msgpack-serialized {self.get_config_value('value_type')} value.",
        }
    }

SerializeToMsgPackModuleConfig (ModuleTypeConfigSchema) pydantic-model

Source code in core/bytes/msgpack.py
class SerializeToMsgPackModuleConfig(ModuleTypeConfigSchema):

    value_type: str = Field(description="The value type to serialize/deserialize.")
value_type: str pydantic-field required

The value type to serialize/deserialize.

database special

BaseDatabaseInfoMetadataModule (ExtractMetadataModule)

Extract extended metadata (like tables, schemas) from a database object.

Source code in core/database/__init__.py
class BaseDatabaseInfoMetadataModule(ExtractMetadataModule):
    """Extract extended metadata (like tables, schemas) from a database object."""

    def _get_metadata_schema(
        self, type: str
    ) -> typing.Union[str, typing.Type[BaseModel]]:
        return KiaraDatabase

    def extract_metadata(self, value: Value) -> BaseModel:

        from sqlalchemy import inspect, text

        database: KiaraDatabase = value.get_value_data()
        inspector: Inspector = inspect(database.get_sqlalchemy_engine())

        table_names = inspector.get_table_names()
        view_names = inspector.get_view_names()

        table_infos = {}
        for table in table_names:
            columns = inspector.get_columns(table_name=table)
            columns_info = {}
            with database.get_sqlalchemy_engine().connect() as con:
                result = con.execute(text(f"SELECT count(*) from {table}"))
                num_rows = result.fetchone()[0]
                try:
                    result = con.execute(
                        text(f'SELECT SUM("pgsize") FROM "dbstat" WHERE name="{table}"')
                    )
                    table_size = result.fetchone()[0]
                except Exception:
                    table_size = None

            for column in columns:
                column_name = column["name"]
                cs = ColumnSchema(
                    type_name=column["type"].__visit_name__,
                    metadata={
                        "is_primary_key": column["primary_key"] == 1,
                        "nullable": column["nullable"],
                    },
                )
                columns_info[column_name] = cs
            table_infos[table] = TableMetadata(
                column_names=list(columns_info.keys()),
                column_schema=columns_info,
                rows=num_rows,
                size=table_size,
            )

        file_stats = os.stat(database.db_file_path)
        size = file_stats.st_size

        kdi = KiaraDatabaseInfo(
            table_names=table_names,
            view_names=view_names,
            tables=table_infos,
            size=size,
        )
        return kdi

BaseDatabaseMetadataModule (ExtractMetadataModule)

Extract basic metadata from a database object.

Source code in core/database/__init__.py
class BaseDatabaseMetadataModule(ExtractMetadataModule):
    """Extract basic metadata from a database object."""

    def _get_metadata_schema(
        self, type: str
    ) -> typing.Union[str, typing.Type[BaseModel]]:
        return KiaraDatabase

    def extract_metadata(self, value: Value) -> KiaraDatabase:

        database: KiaraDatabase = value.get_value_data()
        return database

ConvertToDatabaseModule (CreateValueModule)

Create a database from files, file_bundles, etc.

Source code in core/database/__init__.py
class ConvertToDatabaseModule(CreateValueModule):
    """Create a database from files, file_bundles, etc."""

    _module_type_name = "create"
    _config_cls = DatabaseConversionModuleConfig

    @classmethod
    def get_target_value_type(cls) -> str:
        return "database"

    def from_table(self, value: Value):

        import pyarrow as pa
        from sqlalchemy import MetaData, Table

        from kiara_modules.core.table.utils import (
            create_sqlite_schema_data_from_arrow_table,
        )

        table: pa.Table = value.get_value_data()
        # maybe we could check the values lineage, to find the best table name?
        table_name = value.id.replace("-", "_")

        index_columns = []
        for cn in table.column_names:
            if cn.lower() == "id":
                index_columns.append(cn)

        column_info: SqliteTableSchema = create_sqlite_schema_data_from_arrow_table(
            table=table, index_columns=index_columns
        )

        init_sql = create_table_init_sql(
            table_name=table_name, table_schema=column_info
        )

        db = KiaraDatabase.create_in_temp_dir(init_sql=init_sql)

        nodes_column_map: typing.Dict[str, typing.Any] = {}

        for batch in table.to_batches(DEFAULT_DB_CHUNK_SIZE):
            batch_dict = batch.to_pydict()

            for k, v in nodes_column_map.items():
                if k in batch_dict.keys():
                    _data = batch_dict.pop(k)
                    if v in batch_dict.keys():
                        raise Exception("Duplicate column name after mapping: {v}")
                    batch_dict[v] = _data

            data = [dict(zip(batch_dict, t)) for t in zip(*batch_dict.values())]

            engine = db.get_sqlalchemy_engine()

            _metadata_obj = MetaData()
            sqlite_table = Table(table_name, _metadata_obj, autoload_with=engine)

            with engine.connect() as conn:
                with conn.begin():
                    conn.execute(sqlite_table.insert(), data)

        return db

    def from_csv_file(self, value: Value):

        f = tempfile.mkdtemp()
        db_path = os.path.join(f, "db.sqlite")

        def cleanup():
            shutil.rmtree(f, ignore_errors=True)

        atexit.register(cleanup)

        create_sqlite_table_from_file(
            target_db_file=db_path, file_item=value.get_value_data()
        )

        return db_path

    def from_csv_file_bundle(self, value: Value):

        include_file_information: bool = True
        include_raw_content_in_file_info: bool = False

        temp_f = tempfile.mkdtemp()
        db_path = os.path.join(temp_f, "db.sqlite")

        def cleanup():
            shutil.rmtree(db_path, ignore_errors=True)

        atexit.register(cleanup)

        db = KiaraDatabase(db_file_path=db_path)
        db.create_if_not_exists()

        bundle: KiaraFileBundle = value.get_value_data()
        table_names: typing.List[str] = []
        for rel_path in sorted(bundle.included_files.keys()):
            file_item = bundle.included_files[rel_path]
            table_name = find_free_id(
                stem=file_item.file_name_without_extension, current_ids=table_names
            )
            try:
                table_names.append(table_name)
                create_sqlite_table_from_file(
                    target_db_file=db_path, file_item=file_item, table_name=table_name
                )
            except Exception as e:
                if self.get_config_value("ignore_errors") is True or True:
                    log_message(
                        f"Ignoring file '{rel_path}': could not import data from file -- {e}"
                    )
                    continue
                raise KiaraProcessingException(e)

        if include_file_information:
            create_table_from_file_bundle(
                file_bundle=value.get_value_data(),
                db_file_path=db_path,
                table_name="source_files_metadata",
                include_content=include_raw_content_in_file_info,
            )

        return db_path

    def from_text_file_bundle(self, value: Value):

        return create_table_from_file_bundle(
            file_bundle=value.get_value_data(), include_content=True
        )

DatabaseConversionModuleConfig (CreateValueModuleConfig) pydantic-model

Source code in core/database/__init__.py
class DatabaseConversionModuleConfig(CreateValueModuleConfig):

    ignore_errors: bool = Field(
        description="Whether to ignore convert errors and omit the failed items.",
        default=False,
    )

ignore_errors: bool pydantic-field

Whether to ignore convert errors and omit the failed items.

DatabaseInfoMetadataModule (BaseDatabaseInfoMetadataModule)

Extract extended metadata (like tables, schemas) from a database object.

Source code in core/database/__init__.py
class DatabaseInfoMetadataModule(BaseDatabaseInfoMetadataModule):
    """Extract extended metadata (like tables, schemas) from a database object."""

    _module_type_name = "info"

    @classmethod
    def _get_supported_types(cls) -> str:
        return "database"

    @classmethod
    def get_metadata_key(cls) -> str:
        return "database_info"

DatabaseMetadataModule (BaseDatabaseMetadataModule)

Extract basic metadata from a database object.

Source code in core/database/__init__.py
class DatabaseMetadataModule(BaseDatabaseMetadataModule):
    """Extract basic metadata from a database object."""

    _module_type_name = "metadata"

    @classmethod
    def _get_supported_types(cls) -> str:
        return "database"

    @classmethod
    def get_metadata_key(cls) -> str:
        return "database"

LoadDatabaseConfig (ModuleTypeConfigSchema) pydantic-model

Source code in core/database/__init__.py
class LoadDatabaseConfig(ModuleTypeConfigSchema):

    value_type: str = Field(
        description="The type of the value to be stored (if database sub-type).",
        default="database",
    )

value_type: str pydantic-field

The type of the value to be stored (if database sub-type).

LoadDatabaseModule (KiaraModule)

Source code in core/database/__init__.py
class LoadDatabaseModule(KiaraModule):

    _module_type_name = "load"
    _config_cls = LoadDatabaseConfig

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {
            "base_path": {
                "type": "string",
                "doc": "The path to the base directory where the database file is stored.",
            },
            "rel_path": {
                "type": "string",
                "doc": "The relative path of the database file within the base directory.",
            },
        }

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        value_type = self.get_config_value("value_type")
        if value_type != "database":
            msg = f" (as '{value_type}')"
        else:
            msg = ""

        outputs: typing.Mapping[str, typing.Any] = {
            "database": {"type": value_type, "doc": f"The database value object{msg}."}
        }
        return outputs

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        base_path = inputs.get_value_data("base_path")
        rel_path = inputs.get_value_data("rel_path")

        path = os.path.join(base_path, rel_path)

        outputs.set_value("database", path)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/database/__init__.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {
        "base_path": {
            "type": "string",
            "doc": "The path to the base directory where the database file is stored.",
        },
        "rel_path": {
            "type": "string",
            "doc": "The relative path of the database file within the base directory.",
        },
    }

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/database/__init__.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    value_type = self.get_config_value("value_type")
    if value_type != "database":
        msg = f" (as '{value_type}')"
    else:
        msg = ""

    outputs: typing.Mapping[str, typing.Any] = {
        "database": {"type": value_type, "doc": f"The database value object{msg}."}
    }
    return outputs

StoreDatabaseTypeModule (StoreValueTypeModule)

Save an sqlite database to a file.

Source code in core/database/__init__.py
class StoreDatabaseTypeModule(StoreValueTypeModule):
    """Save an sqlite database to a file."""

    _module_type_name = "store"

    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return "database"

    def store_value(self, value: Value, base_path: str):

        value_type = value.type_name
        # TODO: assert type inherits from database

        database: KiaraDatabase = value.get_value_data()

        path = os.path.join(base_path, DEFAULT_DATABASE_SAVE_FILE_NAME)
        if os.path.exists(path):
            raise KiaraProcessingException(
                f"Can't write file, path already exists: {path}"
            )

        new_db = database.copy_database_file(path)

        load_config = {
            "module_type": "database.load",
            "module_config": {"value_type": value_type},
            "inputs": {
                "base_path": base_path,
                "rel_path": DEFAULT_DATABASE_SAVE_FILE_NAME,
            },
            "output_name": "database",
        }
        return (load_config, new_db)

store_value(self, value, base_path)

Save the value, and return the load config needed to load it again.

Source code in core/database/__init__.py
def store_value(self, value: Value, base_path: str):

    value_type = value.type_name
    # TODO: assert type inherits from database

    database: KiaraDatabase = value.get_value_data()

    path = os.path.join(base_path, DEFAULT_DATABASE_SAVE_FILE_NAME)
    if os.path.exists(path):
        raise KiaraProcessingException(
            f"Can't write file, path already exists: {path}"
        )

    new_db = database.copy_database_file(path)

    load_config = {
        "module_type": "database.load",
        "module_config": {"value_type": value_type},
        "inputs": {
            "base_path": base_path,
            "rel_path": DEFAULT_DATABASE_SAVE_FILE_NAME,
        },
        "output_name": "database",
    }
    return (load_config, new_db)

query

QueryDatabaseSQLModuleConfig (ModuleTypeConfigSchema) pydantic-model

Source code in core/database/query.py
class QueryDatabaseSQLModuleConfig(ModuleTypeConfigSchema):

    query: typing.Optional[str] = Field(
        description="The query to execute. If not specified, the user will be able to provide their own.",
        default=None,
    )
query: str pydantic-field

The query to execute. If not specified, the user will be able to provide their own.

QueryTableSQL (KiaraModule)

Execute a sql query against an (Arrow) table.

Source code in core/database/query.py
class QueryTableSQL(KiaraModule):
    """Execute a sql query against an (Arrow) table."""

    _module_type_name = "sql"
    _config_cls = QueryDatabaseSQLModuleConfig

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        inputs = {
            "database": {
                "type": "database",
                "doc": "The database to query",
            }
        }

        if self.get_config_value("query") is None:
            inputs["query"] = {"type": "string", "doc": "The query."}

        return inputs

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {"query_result": {"type": "table", "doc": "The query result."}}

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        import pandas as pd
        import pyarrow as pa

        if self.get_config_value("query") is None:
            _query: str = inputs.get_value_data("query")
        else:
            _query = self.get_config_value("query")

        _database: KiaraDatabase = inputs.get_value_data("database")

        # can't re-use the default engine, because pandas does not support having the 'future' flag set to 'True'
        engine = create_engine(_database.db_url)
        df = pd.read_sql(_query, con=engine)
        table = pa.Table.from_pandas(df)

        outputs.set_value("query_result", table)
create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/database/query.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    inputs = {
        "database": {
            "type": "database",
            "doc": "The database to query",
        }
    }

    if self.get_config_value("query") is None:
        inputs["query"] = {"type": "string", "doc": "The query."}

    return inputs
create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/database/query.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {"query_result": {"type": "table", "doc": "The query result."}}

utils

SqliteColumnAttributes (BaseModel) pydantic-model

Source code in core/database/utils.py
class SqliteColumnAttributes(BaseModel):

    data_type: str = Field(
        description="The type of the data in this column.", default="ANY"
    )
    extra_column_info: typing.List[str] = Field(
        description="Additional init information for the column.", default_factory=list
    )
    create_index: bool = Field(
        description="Whether to create an index for this column or not.", default=False
    )
create_index: bool pydantic-field

Whether to create an index for this column or not.

data_type: str pydantic-field

The type of the data in this column.

extra_column_info: List[str] pydantic-field

Additional init information for the column.

SqliteTableSchema (BaseModel) pydantic-model

Source code in core/database/utils.py
class SqliteTableSchema(BaseModel):

    columns: typing.Dict[str, SqliteColumnAttributes] = Field(
        description="The table columns and their attributes."
    )
    extra_schema: typing.List[str] = Field(
        description="Extra schema information for this table.", default_factory=list
    )
    column_map: typing.Dict[str, str] = Field(
        description="A dictionary describing how to map incoming data column names. Values in this dict point to keys in this models 'columns' attribute.",
        default_factory=dict,
    )
column_map: Dict[str, str] pydantic-field

A dictionary describing how to map incoming data column names. Values in this dict point to keys in this models 'columns' attribute.

columns: Dict[str, kiara_modules.core.database.utils.SqliteColumnAttributes] pydantic-field required

The table columns and their attributes.

extra_schema: List[str] pydantic-field

Extra schema information for this table.

create_table_init_sql(table_name, table_schema, schema_template_str=None)

Create an sql script to initialize a table.

Parameters:

Name Type Description Default
column_attrs

a map with the column name as key, and column details ('type', 'extra_column_info', 'create_index') as values

required
Source code in core/database/utils.py
def create_table_init_sql(
    table_name: str,
    table_schema: SqliteTableSchema,
    schema_template_str: typing.Optional[str] = None,
):
    """Create an sql script to initialize a table.

    Arguments:
        column_attrs: a map with the column name as key, and column details ('type', 'extra_column_info', 'create_index') as values
    """

    if schema_template_str is None:
        template_path = Path(TEMPLATES_FOLDER) / "sqlite_schama.sql.j2"
        schema_template_str = template_path.read_text()

    template = Environment(loader=BaseLoader()).from_string(schema_template_str)

    edges_columns = []
    edge_indexes = []
    lines = []
    for cn, details in table_schema.columns.items():
        cn_type = details.data_type
        cn_extra = details.extra_column_info

        line = f"    {cn}    {cn_type}"
        if cn_extra:
            line = f"{line}    {' '.join(cn_extra)}"

        edges_columns.append(line)
        if details.create_index:
            edge_indexes.append(cn)
        lines.append(line)

    lines.extend(table_schema.extra_schema)

    rendered = template.render(
        table_name=table_name, column_info=lines, index_columns=edge_indexes
    )
    return rendered

date

A collection of date related modules.

Most of those are very bare-bones, not really dealing with more advanced (but very important) concepts like timezones and resolution yet.

DateRangeCheckModule (KiaraModule)

Check whether a date falls within a specified date range.

If none one of the inputs 'earliest' or 'latest' is set, this module will always return 'True'.

Return True if that's the case, otherwise False.

Source code in core/date.py
class DateRangeCheckModule(KiaraModule):
    """Check whether a date falls within a specified date range.

    If none one of the inputs 'earliest' or 'latest' is set, this module will always return 'True'.

    Return ``True`` if that's the case, otherwise ``False``.
    """

    _module_type_name = "range_check"

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        inputs: typing.Dict[str, typing.Dict[str, typing.Any]] = {
            "date": {"type": "date", "doc": "The date to check."},
            "earliest": {
                "type": "date",
                "doc": "The earliest date that is allowed.",
                "optional": True,
            },
            "latest": {
                "type": "date",
                "doc": "The latest date that is allowed.",
                "optional": True,
            },
        }

        return inputs

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        outputs = {
            "within_range": {
                "type": "boolean",
                "doc": "A boolean indicating whether the provided date was within the allowed range ('true'), or not ('false')",
            }
        }
        return outputs

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        from dateutil import parser

        d = inputs.get_value_data("date")
        earliest: typing.Optional[datetime.datetime] = inputs.get_value_data("earliest")
        latest: typing.Optional[datetime.datetime] = inputs.get_value_data("latest")

        if not earliest and not latest:
            outputs.set_value("within_range", True)
            return

        if hasattr(d, "as_py"):
            d = d.as_py()

        if isinstance(d, str):
            d = parser.parse(d)

        if earliest and latest:
            matches = earliest <= d <= latest
        elif earliest:
            matches = earliest <= d
        else:
            matches = d <= latest

        outputs.set_value("within_range", matches)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/date.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    inputs: typing.Dict[str, typing.Dict[str, typing.Any]] = {
        "date": {"type": "date", "doc": "The date to check."},
        "earliest": {
            "type": "date",
            "doc": "The earliest date that is allowed.",
            "optional": True,
        },
        "latest": {
            "type": "date",
            "doc": "The latest date that is allowed.",
            "optional": True,
        },
    }

    return inputs

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/date.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    outputs = {
        "within_range": {
            "type": "boolean",
            "doc": "A boolean indicating whether the provided date was within the allowed range ('true'), or not ('false')",
        }
    }
    return outputs

ExtractDateModule (KiaraModule)

Extract a date object from a string.

This module is not really smart yet, currently it uses the following regex to extract a date (which might fail in a lot of cases):

r"_(\d{4}-\d{2}-\d{2})_"
Source code in core/date.py
class ExtractDateModule(KiaraModule):
    """Extract a date object from a string.

    This module is not really smart yet, currently it uses the following regex to extract a date (which might fail in a lot of cases):

        r"_(\d{4}-\d{2}-\d{2})_"

    """

    _module_type_name = "extract_from_string"

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {"text": {"type": "string", "doc": "The input string."}}

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "date": {"type": "date", "doc": "The date extracted from the input string."}
        }

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        from dateutil import parser

        text = inputs.get_value_data("text")
        date_match = re.findall(r"_(\d{4}-\d{2}-\d{2})_", text)
        assert date_match
        d_obj = parser.parse(date_match[0])  # type: ignore

        outputs.set_value("date", d_obj)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/date.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {"text": {"type": "string", "doc": "The input string."}}

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/date.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "date": {"type": "date", "doc": "The date extracted from the input string."}
    }

defaults

KIARA_MODULES_CORE_BASE_FOLDER

Marker to indicate the base folder for the kiara network module package.

KIARA_MODULES_CORE_RESOURCES_FOLDER

Default resources folder for this package.

dev

Modules that are useful for kiara as well as pipeline-development, as well as testing.

DummyModule (KiaraModule)

Module that simulates processing, but uses hard-coded outputs as a result.

Source code in core/dev.py
class DummyModule(KiaraModule):
    """Module that simulates processing, but uses hard-coded outputs as a result."""

    _config_cls = DummyProcessingModuleConfig

    def create_input_schema(self) -> typing.Mapping[str, ValueSchema]:
        """The input schema for the ``dummy`` module is created at object creation time from the ``input_schemas`` config parameter."""

        result = {}
        for k, v in self.config.get("input_schema").items():  # type: ignore
            schema = ValueSchema(**v)
            schema.validate_types(self._kiara)
            result[k] = schema
        return result

    def create_output_schema(self) -> typing.Mapping[str, ValueSchema]:
        """The output schema for the ``dummy`` module is created at object creation time from the ``output_schemas`` config parameter."""

        result = {}
        for k, v in self.config.get("output_schema").items():  # type: ignore
            schema = ValueSchema(**v)
            schema.validate_types(self._kiara)
            result[k] = schema
        return result

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        """Returns the hardcoded output values that are set in the ``outputs`` config field.

        Optionally, this module can simulate processing by waiting a configured amount of time (seconds -- specified in the ``delay`` config parameter).
        """

        time.sleep(self.config.get("delay"))  # type: ignore

        output_values: typing.Mapping = self.config.get("outputs")  # type: ignore

        value_dict = {}
        for output_name in self.output_names:
            if output_name not in output_values.keys():
                raise NotImplementedError()
                # v = self.output_schemas[output_name].type_obj.fake_value()
                # value_dict[output_name] = v
            else:
                value_dict[output_name] = output_values[output_name]
        outputs.set_values(**value_dict)

    # def _get_doc(self) -> str:
    #
    #     doc = self.config.get("doc", None)
    #
    #     if doc:
    #         return self.config["doc"]
    #     else:
    #         return super()._get_doc()

create_input_schema(self)

The input schema for the dummy module is created at object creation time from the input_schemas config parameter.

Source code in core/dev.py
def create_input_schema(self) -> typing.Mapping[str, ValueSchema]:
    """The input schema for the ``dummy`` module is created at object creation time from the ``input_schemas`` config parameter."""

    result = {}
    for k, v in self.config.get("input_schema").items():  # type: ignore
        schema = ValueSchema(**v)
        schema.validate_types(self._kiara)
        result[k] = schema
    return result

create_output_schema(self)

The output schema for the dummy module is created at object creation time from the output_schemas config parameter.

Source code in core/dev.py
def create_output_schema(self) -> typing.Mapping[str, ValueSchema]:
    """The output schema for the ``dummy`` module is created at object creation time from the ``output_schemas`` config parameter."""

    result = {}
    for k, v in self.config.get("output_schema").items():  # type: ignore
        schema = ValueSchema(**v)
        schema.validate_types(self._kiara)
        result[k] = schema
    return result

process(self, inputs, outputs)

Returns the hardcoded output values that are set in the outputs config field.

Optionally, this module can simulate processing by waiting a configured amount of time (seconds -- specified in the delay config parameter).

Source code in core/dev.py
def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
    """Returns the hardcoded output values that are set in the ``outputs`` config field.

    Optionally, this module can simulate processing by waiting a configured amount of time (seconds -- specified in the ``delay`` config parameter).
    """

    time.sleep(self.config.get("delay"))  # type: ignore

    output_values: typing.Mapping = self.config.get("outputs")  # type: ignore

    value_dict = {}
    for output_name in self.output_names:
        if output_name not in output_values.keys():
            raise NotImplementedError()
            # v = self.output_schemas[output_name].type_obj.fake_value()
            # value_dict[output_name] = v
        else:
            value_dict[output_name] = output_values[output_name]
    outputs.set_values(**value_dict)

DummyProcessingModuleConfig (ModuleTypeConfigSchema) pydantic-model

Configuration for the 'dummy' processing module.

Source code in core/dev.py
class DummyProcessingModuleConfig(ModuleTypeConfigSchema):
    """Configuration for the 'dummy' processing module."""

    documentation: typing.Optional[str] = None

    input_schema: typing.Dict[str, typing.Dict] = Field(
        description="The input schema for this module."
    )
    output_schema: typing.Dict[str, typing.Dict] = Field(
        description="The output schema for this module."
    )
    outputs: typing.Dict[str, typing.Any] = Field(
        description="The (dummy) output for this module.", default_factory=dict
    )
    delay: float = Field(
        description="The delay in seconds from processing start to when the (dummy) outputs are returned.",
        default=0,
    )

delay: float pydantic-field

The delay in seconds from processing start to when the (dummy) outputs are returned.

input_schema: Dict[str, Dict] pydantic-field required

The input schema for this module.

output_schema: Dict[str, Dict] pydantic-field required

The output schema for this module.

outputs: Dict[str, Any] pydantic-field

The (dummy) output for this module.

dict

SaveDictModule (StoreValueTypeModule)

Source code in core/dict.py
class SaveDictModule(StoreValueTypeModule):

    _config_cls = JsonSerializationConfig
    _module_type_name = "store"

    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return "dict"

    def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:

        import orjson

        options = self.get_config_value("options")
        file_name = self.get_config_value("file_name")
        json_str = orjson.dumps(value.get_value_data(), option=options)

        bp = Path(base_path)
        bp.mkdir(parents=True, exist_ok=True)

        full_path = bp / file_name
        full_path.write_bytes(json_str)

        load_config = {
            "module_type": "generic.restore_from_json",
            "base_path_input_name": "base_path",
            "inputs": {
                "base_path": base_path,
                "file_name": self.get_config_value("file_name"),
            },
            "output_name": "value_item",
        }

        return load_config

store_value(self, value, base_path)

Save the value, and return the load config needed to load it again.

Source code in core/dict.py
def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:

    import orjson

    options = self.get_config_value("options")
    file_name = self.get_config_value("file_name")
    json_str = orjson.dumps(value.get_value_data(), option=options)

    bp = Path(base_path)
    bp.mkdir(parents=True, exist_ok=True)

    full_path = bp / file_name
    full_path.write_bytes(json_str)

    load_config = {
        "module_type": "generic.restore_from_json",
        "base_path_input_name": "base_path",
        "inputs": {
            "base_path": base_path,
            "file_name": self.get_config_value("file_name"),
        },
        "output_name": "value_item",
    }

    return load_config

file

DefaultFileImportModule (FileImportModule)

Import an external file into a kiara session.

Source code in core/file.py
class DefaultFileImportModule(FileImportModule):
    """Import an external file into a kiara session."""

    _module_type_name = "import"

    def import_from__file_path__string(self, source: str) -> KiaraFile:

        file_model = KiaraFile.load_file(source)
        return file_model

LoadLocalFileModule (KiaraModule)

Load a file and its metadata.

This module does not read or load the content of a file, but contains the path to the local representation/version of the file so it can be read by a subsequent process.

Source code in core/file.py
class LoadLocalFileModule(KiaraModule):
    """Load a file and its metadata.

    This module does not read or load the content of a file, but contains the path to the local representation/version of the
    file so it can be read by a subsequent process.
    """

    # _config_cls = ImportLocalPathConfig
    _module_type_name = "load"

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "base_path": {
                "type": "string",
                "doc": "The path to the base directory where the file is stored.",
            },
            "rel_path": {
                "type": "string",
                "doc": "The relative path of the file within the base directory.",
            },
        }

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "file": {
                "type": "file",
                "doc": "A representation of the original file content in the kiara data registry.",
            }
        }

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        base_path = inputs.get_value_data("base_path")
        rel_path = inputs.get_value_data("rel_path")

        path = os.path.join(base_path, rel_path)

        file_model = KiaraFile.load_file(path)
        outputs.set_value("file", file_model)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/file.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "base_path": {
            "type": "string",
            "doc": "The path to the base directory where the file is stored.",
        },
        "rel_path": {
            "type": "string",
            "doc": "The relative path of the file within the base directory.",
        },
    }

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/file.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "file": {
            "type": "file",
            "doc": "A representation of the original file content in the kiara data registry.",
        }
    }

StoreFileTypeModule (StoreValueTypeModule)

Save a file to disk.

Source code in core/file.py
class StoreFileTypeModule(StoreValueTypeModule):
    """Save a file to disk."""

    _module_type_name = "store"

    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return "file"

    def store_value(
        self, value: Value, base_path: str
    ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Any]:

        file_obj = value.get_value_data()

        file_name = file_obj.file_name
        full_target = os.path.join(base_path, file_name)

        os.makedirs(os.path.dirname(full_target), exist_ok=True)

        if os.path.exists(full_target):
            raise KiaraProcessingException(
                f"Can't save file, path already exists: {full_target}"
            )

        fm = file_obj.copy_file(full_target, is_onboarded=True)

        load_config = {
            "module_type": "file.load",
            "base_path_input_name": "base_path",
            "inputs": {"base_path": base_path, "rel_path": file_name},
            "output_name": "file",
        }
        return (load_config, fm)

store_value(self, value, base_path)

Save the value, and return the load config needed to load it again.

Source code in core/file.py
def store_value(
    self, value: Value, base_path: str
) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Any]:

    file_obj = value.get_value_data()

    file_name = file_obj.file_name
    full_target = os.path.join(base_path, file_name)

    os.makedirs(os.path.dirname(full_target), exist_ok=True)

    if os.path.exists(full_target):
        raise KiaraProcessingException(
            f"Can't save file, path already exists: {full_target}"
        )

    fm = file_obj.copy_file(full_target, is_onboarded=True)

    load_config = {
        "module_type": "file.load",
        "base_path_input_name": "base_path",
        "inputs": {"base_path": base_path, "rel_path": file_name},
        "output_name": "file",
    }
    return (load_config, fm)

file_bundle

DefaultFileBundleImportModule (FileBundleImportModule)

Import a file bundle into the kiara data store.

This module will support multiple source types and profiles in the future, but at the moment only import from local folder is supported. Thus, requiring the config value 'local' for 'source_profile', and 'folder_path' for 'source_type'.

Source code in core/file_bundle.py
class DefaultFileBundleImportModule(FileBundleImportModule):
    """Import a file bundle into the kiara data store.

    This module will support multiple source types and profiles in the future, but at the moment only import from
    local folder is supported. Thus, requiring the config value 'local' for 'source_profile', and 'folder_path' for 'source_type'.
    """

    _module_type_name = "import"

    def import_from__folder_path__string(self, source: str) -> KiaraFileBundle:

        file_bundle_model = KiaraFileBundle.import_folder(source)
        return file_bundle_model

LoadFileBundleModule (KiaraModule)

Load a file bundle and its metadata.

This module does not read or load the content of all included files, but contains the path to the local representation/version of them so they can be read by a subsequent process.

Source code in core/file_bundle.py
class LoadFileBundleModule(KiaraModule):
    """Load a file bundle and its metadata.

    This module does not read or load the content of all included files, but contains the path to the local representation/version of them
    so they can be read by a subsequent process.
    """

    _module_type_name = "load"

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "base_path": {
                "type": "string",
                "doc": "The base path where the folder lives.",
            },
            "rel_path": {
                "type": "string",
                "doc": "The relative path of the folder, within the base path location.",
            },
            "include_files": {
                "type": "list",
                "doc": "A list of strings, include all files where the filename ends with one of those strings.\n\nOnly full string matches are supported at the moment, globs and regex might be in the future.",
                "optional": True,
            },
            "exclude_dirs": {
                "type": "list",
                "doc": f"A list of strings, exclude all folders whose name ends with that string. Defaults to: {DEFAULT_EXCLUDE_DIRS}.\n\nOnly full string matches are supported at the moment, globs and regex might be in the future.",
                "default": DEFAULT_EXCLUDE_DIRS,
                "optional": True,
            },
            "exclude_files": {
                "type": "list",
                "doc": f"A list of strings, exclude all files that end with that one of those strings (takes precedence over 'include_files'). Defaults to: {DEFAULT_EXCLUDE_FILES}\n\nOnly full string matches are supported at the moment, globs and regex might be in the future.",
                "default": DEFAULT_EXCLUDE_FILES,
                "optional": True,
            },
        }

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {
            "file_bundle": {
                "type": "file_bundle",
                "doc": "The collection of files contained in the bundle.",
            }
        }

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        base_path = inputs.get_value_data("base_path")
        rel_path = inputs.get_value_data("rel_path")

        path = os.path.join(base_path, rel_path)

        included_files = inputs.get_value_data("include_files")
        excluded_dirs = inputs.get_value_data("exclude_dirs")
        excluded_files = inputs.get_value_data("exclude_files")

        import_config = FolderImportConfig(
            include_files=included_files,
            exclude_dirs=excluded_dirs,
            excluded_files=excluded_files,
        )

        bundle = KiaraFileBundle.import_folder(source=path, import_config=import_config)

        outputs.set_values(file_bundle=bundle)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/file_bundle.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "base_path": {
            "type": "string",
            "doc": "The base path where the folder lives.",
        },
        "rel_path": {
            "type": "string",
            "doc": "The relative path of the folder, within the base path location.",
        },
        "include_files": {
            "type": "list",
            "doc": "A list of strings, include all files where the filename ends with one of those strings.\n\nOnly full string matches are supported at the moment, globs and regex might be in the future.",
            "optional": True,
        },
        "exclude_dirs": {
            "type": "list",
            "doc": f"A list of strings, exclude all folders whose name ends with that string. Defaults to: {DEFAULT_EXCLUDE_DIRS}.\n\nOnly full string matches are supported at the moment, globs and regex might be in the future.",
            "default": DEFAULT_EXCLUDE_DIRS,
            "optional": True,
        },
        "exclude_files": {
            "type": "list",
            "doc": f"A list of strings, exclude all files that end with that one of those strings (takes precedence over 'include_files'). Defaults to: {DEFAULT_EXCLUDE_FILES}\n\nOnly full string matches are supported at the moment, globs and regex might be in the future.",
            "default": DEFAULT_EXCLUDE_FILES,
            "optional": True,
        },
    }

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/file_bundle.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {
        "file_bundle": {
            "type": "file_bundle",
            "doc": "The collection of files contained in the bundle.",
        }
    }

StoreFileBundleType (StoreValueTypeModule)

Save a file bundle to disk.

Source code in core/file_bundle.py
class StoreFileBundleType(StoreValueTypeModule):
    """Save a file bundle to disk."""

    _module_type_name = "store"

    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return "file_bundle"

    def store_value(
        self, value: Value, base_path: str
    ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Any]:

        bundle: KiaraFileBundle = value.get_value_data()
        rel_path = bundle.bundle_name

        target_path = os.path.join(base_path, rel_path)
        fb = bundle.copy_bundle(target_path, is_onboarded=True)

        # # the following changes the input value, which is usually not allowed, but the file_bundle type is a special case
        # bundle.included_files = fb.included_files
        # bundle.is_onboarded = True
        # bundle.path = fb.path
        # for path, f in bundle.included_files.items():
        #     f.is_onboarded = True

        load_config = {
            "module_type": "file_bundle.load",
            "base_path_input_name": "base_path",
            "inputs": {"base_path": base_path, "rel_path": rel_path},
            "output_name": "file_bundle",
        }

        return (load_config, fb)

store_value(self, value, base_path)

Save the value, and return the load config needed to load it again.

Source code in core/file_bundle.py
def store_value(
    self, value: Value, base_path: str
) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Any]:

    bundle: KiaraFileBundle = value.get_value_data()
    rel_path = bundle.bundle_name

    target_path = os.path.join(base_path, rel_path)
    fb = bundle.copy_bundle(target_path, is_onboarded=True)

    # # the following changes the input value, which is usually not allowed, but the file_bundle type is a special case
    # bundle.included_files = fb.included_files
    # bundle.is_onboarded = True
    # bundle.path = fb.path
    # for path, f in bundle.included_files.items():
    #     f.is_onboarded = True

    load_config = {
        "module_type": "file_bundle.load",
        "base_path_input_name": "base_path",
        "inputs": {"base_path": base_path, "rel_path": rel_path},
        "output_name": "file_bundle",
    }

    return (load_config, fb)

generic

JsonSerializationConfig (StoreValueModuleConfig) pydantic-model

Source code in core/generic.py
class JsonSerializationConfig(StoreValueModuleConfig):

    options: int = Field(
        description="The options to use for the json serialization. Check https://github.com/ijl/orjson#quickstart for details.",
        default=orjson.OPT_NAIVE_UTC | orjson.OPT_SERIALIZE_NUMPY,
    )
    file_name: str = Field(
        description="The name of the serialized file.", default="dict.json"
    )

file_name: str pydantic-field

The name of the serialized file.

options: int pydantic-field

The options to use for the json serialization. Check https://github.com/ijl/orjson#quickstart for details.

RestoreFromJsonDictModule (KiaraModule)

Source code in core/generic.py
class RestoreFromJsonDictModule(KiaraModule):

    _module_type_name = "restore_from_json"

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {
            "base_path": {
                "type": "string",
                "doc": "The folder that contains the serialized dict.",
            },
            "file_name": {
                "type": "string",
                "doc": "The file name of the serialized dict.",
            },
        }

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {"value_item": {"type": "dict", "doc": "The deserialized dict value."}}

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        base_path = inputs.get_value_data("base_path")
        file_name = inputs.get_value_data("file_name")

        full_path = os.path.join(base_path, file_name)

        if not os.path.exists(full_path):
            raise KiaraProcessingException(
                f"Can't deserialize dict, path to file does not exist: {full_path}"
            )

        if not os.path.isfile(os.path.realpath(full_path)):
            raise KiaraProcessingException(
                f"Can't deserialize dict, path is not a file: {full_path}"
            )

        with open(full_path, "r") as f:
            content = f.read()

        data = orjson.loads(content)
        outputs.set_value("value_item", data)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/generic.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {
        "base_path": {
            "type": "string",
            "doc": "The folder that contains the serialized dict.",
        },
        "file_name": {
            "type": "string",
            "doc": "The file name of the serialized dict.",
        },
    }

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/generic.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {"value_item": {"type": "dict", "doc": "The deserialized dict value."}}

RestoreScalarModule (KiaraModule)

Utility module, only used internally.

Source code in core/generic.py
class RestoreScalarModule(KiaraModule):
    """Utility module, only used internally."""

    _module_type_name = "restore_scalar"
    _config_cls = RestoreScalarModuleConfig

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {
            "scalar_data": {
                "type": self.get_config_value("value_type"),
                "doc": "The scalar value.",
            }
        }

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {
            "value_item": {
                "type": self.get_config_value("value_type"),
                "doc": "The loaded item.",
            }
        }

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        data = inputs.get_value_obj("scalar_data")
        outputs.set_value("value_item", data)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/generic.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {
        "scalar_data": {
            "type": self.get_config_value("value_type"),
            "doc": "The scalar value.",
        }
    }

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/generic.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {
        "value_item": {
            "type": self.get_config_value("value_type"),
            "doc": "The loaded item.",
        }
    }

RestoreScalarModuleConfig (ModuleTypeConfigSchema) pydantic-model

Source code in core/generic.py
class RestoreScalarModuleConfig(ModuleTypeConfigSchema):

    value_type: str = Field(description="The value type of the scalar to load.")

value_type: str pydantic-field required

The value type of the scalar to load.

StoreScalarModule (StoreValueTypeModule)

Source code in core/generic.py
class StoreScalarModule(StoreValueTypeModule):

    _module_type_name = "store"

    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return ["boolean", "integer", "float", "string"]

    def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:

        data = value.get_value_data()

        load_config = {
            "module_type": "generic.restore_scalar",
            "module_config": {"value_type": self.get_config_value("value_type")},
            "base_path_input_name": None,
            "inputs": {"scalar_data": data},
            "output_name": "value_item",
        }

        return load_config

store_value(self, value, base_path)

Save the value, and return the load config needed to load it again.

Source code in core/generic.py
def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:

    data = value.get_value_data()

    load_config = {
        "module_type": "generic.restore_scalar",
        "module_config": {"value_type": self.get_config_value("value_type")},
        "base_path_input_name": None,
        "inputs": {"scalar_data": data},
        "output_name": "value_item",
    }

    return load_config

StoreScalarModuleConfig (ModuleTypeConfigSchema) pydantic-model

Source code in core/generic.py
class StoreScalarModuleConfig(ModuleTypeConfigSchema):

    value_type: str = Field(description="The value type of the scalar to store.")

value_type: str pydantic-field required

The value type of the scalar to store.

json

ToJsonModuleOld (OldTypeConversionModule)

Convert arbitrary types into json.

Very early days for this module, it doesn't support a lot of types yet.

Source code in core/json.py
class ToJsonModuleOld(OldTypeConversionModule):
    """Convert arbitrary types into json.

    Very early days for this module, it doesn't support a lot of types yet.
    """

    _module_type_name = "to_json"

    @classmethod
    def _get_supported_source_types(self) -> typing.Union[typing.Iterable[str], str]:
        return JSON_SUPPORTED_SOURCE_TYPES

    @classmethod
    def _get_target_types(self) -> typing.Union[typing.Iterable[str], str]:
        return ["json"]

    def convert(
        self, value: Value, config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:

        input_value: typing.Any = value.get_value_data()

        input_value_str = convert_to_json(
            self._kiara, data=input_value, convert_config=config
        )
        return input_value_str

list

IncludedInListCheckModule (KiaraModule)

Check whether an element is in a list.

Source code in core/list.py
class IncludedInListCheckModule(KiaraModule):
    """Check whether an element is in a list."""

    _module_type_name = "contains"

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        inputs = {
            "list": {"type": "list", "doc": "The list."},
            "item": {
                "type": "any",
                "doc": "The element to check for inclusion in the list.",
            },
        }
        return inputs

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        outputs = {
            "is_included": {
                "type": "boolean",
                "doc": "Whether the element is in the list, or not.",
            }
        }
        return outputs

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        item_list = inputs.get_value_data("list")
        item = inputs.get_value_data("item")

        outputs.set_value("is_included", item in item_list)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/list.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    inputs = {
        "list": {"type": "list", "doc": "The list."},
        "item": {
            "type": "any",
            "doc": "The element to check for inclusion in the list.",
        },
    }
    return inputs

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/list.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    outputs = {
        "is_included": {
            "type": "boolean",
            "doc": "Whether the element is in the list, or not.",
        }
    }
    return outputs

StoreDictModule (StoreValueTypeModule)

Source code in core/list.py
class StoreDictModule(StoreValueTypeModule):

    _config_cls = JsonSerializationConfig
    _module_type_name = "store"

    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return "list"

    def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:

        import orjson

        options = self.get_config_value("options")
        file_name = self.get_config_value("file_name")
        json_str = orjson.dumps(value.get_value_data(), option=options)

        bp = Path(base_path)
        bp.mkdir(parents=True, exist_ok=True)

        full_path = bp / file_name
        full_path.write_bytes(json_str)

        load_config = {
            "module_type": "generic.restore_from_json",
            "base_path_input_name": "base_path",
            "inputs": {
                "base_path": base_path,
                "file_name": self.get_config_value("file_name"),
            },
            "output_name": "value_item",
        }

        return load_config

store_value(self, value, base_path)

Save the value, and return the load config needed to load it again.

Source code in core/list.py
def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:

    import orjson

    options = self.get_config_value("options")
    file_name = self.get_config_value("file_name")
    json_str = orjson.dumps(value.get_value_data(), option=options)

    bp = Path(base_path)
    bp.mkdir(parents=True, exist_ok=True)

    full_path = bp / file_name
    full_path.write_bytes(json_str)

    load_config = {
        "module_type": "generic.restore_from_json",
        "base_path_input_name": "base_path",
        "inputs": {
            "base_path": base_path,
            "file_name": self.get_config_value("file_name"),
        },
        "output_name": "value_item",
    }

    return load_config

logic

AndModule (LogicProcessingModule)

Returns 'True' if both inputs are 'True'.

Source code in core/logic.py
class AndModule(LogicProcessingModule):
    """Returns 'True' if both inputs are 'True'."""

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {
            "a": {"type": "boolean", "doc": "A boolean describing this input state."},
            "b": {"type": "boolean", "doc": "A boolean describing this input state."},
        }

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {
            "y": {
                "type": "boolean",
                "doc": "A boolean describing the module output state.",
            }
        }

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        time.sleep(self.config.delay)  # type: ignore

        outputs.set_value(
            "y", inputs.get_value_data("a") and inputs.get_value_data("b")
        )

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/logic.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {
        "a": {"type": "boolean", "doc": "A boolean describing this input state."},
        "b": {"type": "boolean", "doc": "A boolean describing this input state."},
    }

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/logic.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {
        "y": {
            "type": "boolean",
            "doc": "A boolean describing the module output state.",
        }
    }

LogicProcessingModuleConfig (ModuleTypeConfigSchema) pydantic-model

Config class for all the 'logic'-related modules.

Source code in core/logic.py
class LogicProcessingModuleConfig(ModuleTypeConfigSchema):
    """Config class for all the 'logic'-related modules."""

    delay: float = Field(
        default=0,
        description="the delay in seconds from processing start to when the output is returned.",
    )

delay: float pydantic-field

the delay in seconds from processing start to when the output is returned.

NotModule (LogicProcessingModule)

Negates the input.

Source code in core/logic.py
class NotModule(LogicProcessingModule):
    """Negates the input."""

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        """The not module only has one input, a boolean that will be negated by the module."""

        return {
            "a": {"type": "boolean", "doc": "A boolean describing this input state."}
        }

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        """The output of this module is a single boolean, the negated input."""

        return {
            "y": {
                "type": "boolean",
                "doc": "A boolean describing the module output state.",
            }
        }

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        """Negates the input boolean."""

        time.sleep(self.config.get("delay"))  # type: ignore

        outputs.set_value("y", not inputs.get_value_data("a"))

create_input_schema(self)

The not module only has one input, a boolean that will be negated by the module.

Source code in core/logic.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    """The not module only has one input, a boolean that will be negated by the module."""

    return {
        "a": {"type": "boolean", "doc": "A boolean describing this input state."}
    }

create_output_schema(self)

The output of this module is a single boolean, the negated input.

Source code in core/logic.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    """The output of this module is a single boolean, the negated input."""

    return {
        "y": {
            "type": "boolean",
            "doc": "A boolean describing the module output state.",
        }
    }

process(self, inputs, outputs)

Negates the input boolean.

Source code in core/logic.py
def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
    """Negates the input boolean."""

    time.sleep(self.config.get("delay"))  # type: ignore

    outputs.set_value("y", not inputs.get_value_data("a"))

OrModule (LogicProcessingModule)

Returns 'True' if one of the inputs is 'True'.

Source code in core/logic.py
class OrModule(LogicProcessingModule):
    """Returns 'True' if one of the inputs is 'True'."""

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {
            "a": {"type": "boolean", "doc": "A boolean describing this input state."},
            "b": {"type": "boolean", "doc": "A boolean describing this input state."},
        }

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {
            "y": {
                "type": "boolean",
                "doc": "A boolean describing the module output state.",
            }
        }

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        time.sleep(self.config.get("delay"))  # type: ignore
        outputs.set_value("y", inputs.get_value_data("a") or inputs.get_value_data("b"))

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/logic.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {
        "a": {"type": "boolean", "doc": "A boolean describing this input state."},
        "b": {"type": "boolean", "doc": "A boolean describing this input state."},
    }

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/logic.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {
        "y": {
            "type": "boolean",
            "doc": "A boolean describing the module output state.",
        }
    }

metadata_models

This module contains the metadata models that are used in the kiara_modules.core package.

Metadata models are convenience wrappers that make it easier for kiara to find, create, manage and version metadata that is attached to data, as well as kiara modules. It is possible to register metadata using a JSON schema string, but it is recommended to create a metadata model, because it is much easier overall.

Metadata models must be a sub-class of kiara.metadata.MetadataModel.

ArrayMetadata (HashedMetadataModel) pydantic-model

Describes properties fo the 'array' type.

Source code in core/metadata_models.py
class ArrayMetadata(HashedMetadataModel):
    """Describes properties fo the 'array' type."""

    _metadata_key: typing.ClassVar[str] = "array"

    length: int = Field(description="The number of elements the array contains.")
    size: int = Field(
        description="Total number of bytes consumed by the elements of the array."
    )

    def _obj_to_hash(self) -> typing.Any:
        return {"length": self.length, "size": self.size}

    def get_category_alias(self) -> str:
        return "instance.metadata.array"

length: int pydantic-field required

The number of elements the array contains.

size: int pydantic-field required

Total number of bytes consumed by the elements of the array.

ColumnSchema (BaseModel) pydantic-model

Describes properties of a single column of the 'table' data type.

Source code in core/metadata_models.py
class ColumnSchema(BaseModel):
    """Describes properties of a single column of the 'table' data type."""

    _metadata_key: typing.ClassVar[str] = "column"

    type_name: str = Field(
        description="The type name of the column (backend-specific)."
    )
    metadata: typing.Dict[str, typing.Any] = Field(
        description="Other metadata for the column.", default_factory=dict
    )

metadata: Dict[str, Any] pydantic-field

Other metadata for the column.

type_name: str pydantic-field required

The type name of the column (backend-specific).

FolderImportConfig (BaseModel) pydantic-model

Source code in core/metadata_models.py
class FolderImportConfig(BaseModel):

    include_files: typing.Optional[typing.List[str]] = Field(
        description="A list of strings, include all files where the filename ends with that string.",
        default=None,
    )
    exclude_dirs: typing.Optional[typing.List[str]] = Field(
        description="A list of strings, exclude all folders whose name ends with that string.",
        default=None,
    )
    exclude_files: typing.Optional[typing.List[str]] = Field(
        description=f"A list of strings, exclude all files that match those (takes precedence over 'include_files'). Defaults to: {DEFAULT_EXCLUDE_FILES}.",
        default=DEFAULT_EXCLUDE_FILES,
    )

exclude_dirs: List[str] pydantic-field

A list of strings, exclude all folders whose name ends with that string.

exclude_files: List[str] pydantic-field

A list of strings, exclude all files that match those (takes precedence over 'include_files'). Defaults to: ['.DS_Store'].

include_files: List[str] pydantic-field

A list of strings, include all files where the filename ends with that string.

KiaraDatabase (MetadataModel) pydantic-model

Source code in core/metadata_models.py
class KiaraDatabase(MetadataModel):

    _metadata_key: typing.ClassVar[str] = "database"

    @classmethod
    def create_in_temp_dir(cls, init_sql: typing.Optional[str] = None):

        temp_f = tempfile.mkdtemp()
        db_path = os.path.join(temp_f, "db.sqlite")

        def cleanup():
            shutil.rmtree(db_path, ignore_errors=True)

        atexit.register(cleanup)

        db = cls(db_file_path=db_path)
        db.create_if_not_exists()

        if init_sql:
            db.execute_sql(sql_script=init_sql, invalidate=True)

        return db

    db_file_path: str = Field(description="The path to the sqlite database file.")
    _cached_engine = PrivateAttr(default=None)
    _cached_inspector = PrivateAttr(default=None)
    _table_names = PrivateAttr(default=None)
    _table_schemas = PrivateAttr(default=None)

    def get_id(self) -> str:
        return self.db_file_path

    def get_category_alias(self) -> str:
        return "instance.metadata.database"

    @validator("db_file_path", allow_reuse=True)
    def ensure_absolute_path(cls, path: str):

        path = os.path.abspath(path)
        if not os.path.exists(os.path.dirname(path)):
            raise ValueError(f"Parent folder for database file does not exist: {path}")
        return path

    @property
    def db_url(self) -> str:
        return f"sqlite:///{self.db_file_path}"

    def get_sqlalchemy_engine(self) -> "Engine":

        if self._cached_engine is not None:
            return self._cached_engine

        from sqlalchemy import create_engine

        self._cached_engine = create_engine(self.db_url, future=True)
        # with self._cached_engine.connect() as con:
        #     con.execute(text("PRAGMA query_only = ON"))

        return self._cached_engine

    def create_if_not_exists(self):

        from sqlalchemy_utils import create_database, database_exists

        if not database_exists(self.db_url):
            create_database(self.db_url)

    def execute_sql(self, sql_script: str, invalidate: bool = False):
        """Execute an sql script.

        Arguments:
          sql_script: the sql script
          invalidate: whether to invalidate cached values within this object
        """

        self.create_if_not_exists()
        conn = self.get_sqlalchemy_engine().raw_connection()
        cursor = conn.cursor()
        cursor.executescript(sql_script)
        conn.commit()
        conn.close()

        if invalidate:
            self._cached_inspector = None
            self._table_names = None
            self._table_schemas = None

    def copy_database_file(self, target: str):

        os.makedirs(os.path.dirname(target))

        shutil.copy2(self.db_file_path, target)

        new_db = KiaraDatabase(db_file_path=target)
        return new_db

    def get_sqlalchemy_inspector(self) -> "Inspector":

        if self._cached_inspector is not None:
            return self._cached_inspector

        self._cached_inspector = inspect(self.get_sqlalchemy_engine())
        return self._cached_inspector

    @property
    def table_names(self) -> typing.Iterable[str]:
        if self._table_names is not None:
            return self._table_names

        self._table_names = self.get_sqlalchemy_inspector().get_table_names()
        return self._table_names

    def get_schema_for_table(self, table_name: str):

        if self._table_schemas is not None:
            if table_name not in self._table_schemas.keys():
                raise Exception(
                    f"Can't get table schema, database does not contain table with name '{table_name}'."
                )
            return self._table_schemas[table_name]

        ts: typing.Dict[str, typing.Dict[str, typing.Any]] = {}
        inspector = self.get_sqlalchemy_inspector()
        for tn in inspector.get_table_names():
            columns = self.get_sqlalchemy_inspector().get_columns(tn)
            ts[tn] = {}
            for c in columns:
                ts[tn][c["name"]] = c

        self._table_schemas = ts
        if table_name not in self._table_schemas.keys():
            raise Exception(
                f"Can't get table schema, database does not contain table with name '{table_name}'."
            )

        return self._table_schemas[table_name]

db_file_path: str pydantic-field required

The path to the sqlite database file.

execute_sql(self, sql_script, invalidate=False)

Execute an sql script.

Parameters:

Name Type Description Default
sql_script str

the sql script

required
invalidate bool

whether to invalidate cached values within this object

False
Source code in core/metadata_models.py
def execute_sql(self, sql_script: str, invalidate: bool = False):
    """Execute an sql script.

    Arguments:
      sql_script: the sql script
      invalidate: whether to invalidate cached values within this object
    """

    self.create_if_not_exists()
    conn = self.get_sqlalchemy_engine().raw_connection()
    cursor = conn.cursor()
    cursor.executescript(sql_script)
    conn.commit()
    conn.close()

    if invalidate:
        self._cached_inspector = None
        self._table_names = None
        self._table_schemas = None

KiaraDatabaseInfo (HashedMetadataModel) pydantic-model

Source code in core/metadata_models.py
class KiaraDatabaseInfo(HashedMetadataModel):

    _metadata_key: typing.ClassVar[str] = "database_info"

    table_names: typing.List[str] = Field(
        description="The names of all tables in this database."
    )
    view_names: typing.List[str] = Field(
        description="The names of all views in this database."
    )
    tables: typing.Dict[str, TableMetadata] = Field(
        description="Information about the tables within this database."
    )
    size: int = Field(description="The size of the database file.")

    def _obj_to_hash(self) -> typing.Any:
        return {
            "table_names": self.table_names,
            "view_names": self.view_names,
            "tables": self.tables,
            "size": self.size,
        }

    def get_category_alias(self) -> str:
        return "instance.metadata.database_info"

size: int pydantic-field required

The size of the database file.

table_names: List[str] pydantic-field required

The names of all tables in this database.

tables: Dict[str, kiara_modules.core.metadata_models.TableMetadata] pydantic-field required

Information about the tables within this database.

view_names: List[str] pydantic-field required

The names of all views in this database.

KiaraFile (MetadataModel) pydantic-model

Describes properties for the 'file' value type.

Source code in core/metadata_models.py
class KiaraFile(MetadataModel):
    """Describes properties for the 'file' value type."""

    _metadata_key: typing.ClassVar[str] = "file"

    @classmethod
    def load_file(
        cls,
        source: str,
        target: typing.Optional[str] = None,
        incl_orig_path: bool = False,
    ):
        """Utility method to read metadata of a file from disk and optionally move it into a data archive location."""

        import mimetypes

        import filetype

        if not source:
            raise ValueError("No source path provided.")

        if not os.path.exists(os.path.realpath(source)):
            raise ValueError(f"Path does not exist: {source}")

        if not os.path.isfile(os.path.realpath(source)):
            raise ValueError(f"Path is not a file: {source}")

        orig_filename = os.path.basename(source)
        orig_path: str = os.path.abspath(source)
        file_import_time = datetime.datetime.now().isoformat()  # TODO: timezone

        file_stats = os.stat(orig_path)
        size = file_stats.st_size

        if target:
            if os.path.exists(target):
                raise ValueError(f"Target path exists: {target}")
            os.makedirs(os.path.dirname(target), exist_ok=True)
            shutil.copy2(source, target)
        else:
            target = orig_path

        r = mimetypes.guess_type(target)
        if r[0] is not None:
            mime_type = r[0]
        else:
            _mime_type = filetype.guess(target)
            if not _mime_type:
                mime_type = "application/octet-stream"
            else:
                mime_type = _mime_type.MIME

        if not incl_orig_path:
            _orig_path: typing.Optional[str] = None
        else:
            _orig_path = orig_path

        m = KiaraFile(
            orig_filename=orig_filename,
            orig_path=_orig_path,
            import_time=file_import_time,
            mime_type=mime_type,
            size=size,
            file_name=orig_filename,
            path=target,
        )
        return m

    _file_hash: typing.Optional[str] = PrivateAttr(default=None)

    orig_filename: str = Field(
        description="The original filename of this file at the time of import."
    )
    orig_path: typing.Optional[str] = Field(
        description="The original path to this file at the time of import.",
        default=None,
    )
    import_time: str = Field(description="The time when the file was imported.")
    mime_type: str = Field(description="The mime type of the file.")
    file_name: str = Field("The name of the file.")
    size: int = Field(description="The size of the file.")
    path: str = Field(description="The archive path of the file.")
    is_onboarded: bool = Field(
        description="Whether the file is imported into the kiara data store.",
        default=False,
    )

    def get_id(self) -> str:
        return self.path

    def get_category_alias(self) -> str:
        return "instance.metadata.file"

    def copy_file(
        self, target: str, incl_orig_path: bool = False, is_onboarded: bool = False
    ):

        fm = KiaraFile.load_file(self.path, target)
        if incl_orig_path:
            fm.orig_path = self.orig_path
        else:
            fm.orig_path = None
        fm.orig_filename = self.orig_filename
        fm.import_time = self.import_time
        if self._file_hash is not None:
            fm._file_hash = self._file_hash

        fm.is_onboarded = is_onboarded

        return fm

    @property
    def file_hash(self):

        if self._file_hash is not None:
            return self._file_hash

        sha256_hash = hashlib.sha3_256()
        with open(self.path, "rb") as f:
            # Read and update hash string value in blocks of 4K
            for byte_block in iter(lambda: f.read(4096), b""):
                sha256_hash.update(byte_block)

        self._file_hash = sha256_hash.hexdigest()
        return self._file_hash

    @property
    def file_name_without_extension(self) -> str:

        return self.file_name.split(".")[0]

    @property
    def import_time_as_datetime(self) -> datetime.datetime:
        from dateutil import parser

        return parser.parse(self.import_time)

    def read_content(
        self, as_str: bool = True, max_lines: int = -1
    ) -> typing.Union[str, bytes]:
        """Read the content of a file."""

        mode = "r" if as_str else "rb"

        with open(self.path, mode) as f:
            if max_lines <= 0:
                content = f.read()
            else:
                content = "".join((next(f) for x in range(max_lines)))
        return content

    def __repr__(self):
        return f"FileMetadata(name={self.file_name})"

    def __str__(self):
        return self.__repr__()

import_time: str pydantic-field required

The time when the file was imported.

is_onboarded: bool pydantic-field

Whether the file is imported into the kiara data store.

mime_type: str pydantic-field required

The mime type of the file.

orig_filename: str pydantic-field required

The original filename of this file at the time of import.

orig_path: str pydantic-field

The original path to this file at the time of import.

path: str pydantic-field required

The archive path of the file.

size: int pydantic-field required

The size of the file.

__repr__(self) special

Return repr(self).

Source code in core/metadata_models.py
def __repr__(self):
    return f"FileMetadata(name={self.file_name})"

__str__(self) special

Return str(self).

Source code in core/metadata_models.py
def __str__(self):
    return self.__repr__()

load_file(source, target=None, incl_orig_path=False) classmethod

Utility method to read metadata of a file from disk and optionally move it into a data archive location.

Source code in core/metadata_models.py
@classmethod
def load_file(
    cls,
    source: str,
    target: typing.Optional[str] = None,
    incl_orig_path: bool = False,
):
    """Utility method to read metadata of a file from disk and optionally move it into a data archive location."""

    import mimetypes

    import filetype

    if not source:
        raise ValueError("No source path provided.")

    if not os.path.exists(os.path.realpath(source)):
        raise ValueError(f"Path does not exist: {source}")

    if not os.path.isfile(os.path.realpath(source)):
        raise ValueError(f"Path is not a file: {source}")

    orig_filename = os.path.basename(source)
    orig_path: str = os.path.abspath(source)
    file_import_time = datetime.datetime.now().isoformat()  # TODO: timezone

    file_stats = os.stat(orig_path)
    size = file_stats.st_size

    if target:
        if os.path.exists(target):
            raise ValueError(f"Target path exists: {target}")
        os.makedirs(os.path.dirname(target), exist_ok=True)
        shutil.copy2(source, target)
    else:
        target = orig_path

    r = mimetypes.guess_type(target)
    if r[0] is not None:
        mime_type = r[0]
    else:
        _mime_type = filetype.guess(target)
        if not _mime_type:
            mime_type = "application/octet-stream"
        else:
            mime_type = _mime_type.MIME

    if not incl_orig_path:
        _orig_path: typing.Optional[str] = None
    else:
        _orig_path = orig_path

    m = KiaraFile(
        orig_filename=orig_filename,
        orig_path=_orig_path,
        import_time=file_import_time,
        mime_type=mime_type,
        size=size,
        file_name=orig_filename,
        path=target,
    )
    return m

read_content(self, as_str=True, max_lines=-1)

Read the content of a file.

Source code in core/metadata_models.py
def read_content(
    self, as_str: bool = True, max_lines: int = -1
) -> typing.Union[str, bytes]:
    """Read the content of a file."""

    mode = "r" if as_str else "rb"

    with open(self.path, mode) as f:
        if max_lines <= 0:
            content = f.read()
        else:
            content = "".join((next(f) for x in range(max_lines)))
    return content

KiaraFileBundle (MetadataModel) pydantic-model

Describes properties for the 'file_bundle' value type.

Source code in core/metadata_models.py
class KiaraFileBundle(MetadataModel):
    """Describes properties for the 'file_bundle' value type."""

    _metadata_key: typing.ClassVar[str] = "file_bundle"

    @classmethod
    def import_folder(
        cls,
        source: str,
        target: typing.Optional[str] = None,
        import_config: typing.Union[
            None, typing.Mapping[str, typing.Any], FolderImportConfig
        ] = None,
        incl_orig_path: bool = False,
    ):

        if not source:
            raise ValueError("No source path provided.")

        if not os.path.exists(os.path.realpath(source)):
            raise ValueError(f"Path does not exist: {source}")

        if not os.path.isdir(os.path.realpath(source)):
            raise ValueError(f"Path is not a file: {source}")

        if target and os.path.exists(target):
            raise ValueError(f"Target path already exists: {target}")

        if source.endswith(os.path.sep):
            source = source[0:-1]

        if target and target.endswith(os.path.sep):
            target = target[0:-1]

        if import_config is None:
            _import_config = FolderImportConfig()
        elif isinstance(import_config, typing.Mapping):
            _import_config = FolderImportConfig(**import_config)
        elif isinstance(import_config, FolderImportConfig):
            _import_config = import_config
        else:
            raise TypeError(
                f"Invalid type for folder import config: {type(import_config)}."
            )

        included_files: typing.Dict[str, KiaraFile] = {}
        exclude_dirs = _import_config.exclude_dirs
        invalid_extensions = _import_config.exclude_files

        valid_extensions = _import_config.include_files

        sum_size = 0

        def include_file(filename: str) -> bool:

            if invalid_extensions and any(
                filename.endswith(ext) for ext in invalid_extensions
            ):
                return False
            if not valid_extensions:
                return True
            else:
                return any(filename.endswith(ext) for ext in valid_extensions)

        for root, dirnames, filenames in os.walk(source, topdown=True):

            if exclude_dirs:
                dirnames[:] = [d for d in dirnames if d not in exclude_dirs]

            for filename in [
                f
                for f in filenames
                if os.path.isfile(os.path.join(root, f)) and include_file(f)
            ]:

                full_path = os.path.join(root, filename)
                rel_path = os.path.relpath(full_path, source)
                if target:
                    target_path: typing.Optional[str] = os.path.join(target, rel_path)
                else:
                    target_path = None

                file_model = KiaraFile.load_file(
                    full_path, target_path, incl_orig_path=incl_orig_path
                )
                sum_size = sum_size + file_model.size
                included_files[rel_path] = file_model

        orig_bundle_name = os.path.basename(source)
        if incl_orig_path:
            orig_path: typing.Optional[str] = source
        else:
            orig_path = None

        if target:
            path = target
        else:
            path = source

        return KiaraFileBundle.create_from_file_models(
            files=included_files,
            orig_bundle_name=orig_bundle_name,
            orig_path=orig_path,
            path=path,
            sum_size=sum_size,
        )

    @classmethod
    def create_from_file_models(
        self,
        files: typing.Mapping[str, KiaraFile],
        orig_bundle_name: str,
        orig_path: typing.Optional[str],
        path: str,
        sum_size: typing.Optional[int] = None,
        is_onboarded: bool = False,
    ):

        result: typing.Dict[str, typing.Any] = {}

        result["included_files"] = files

        result["orig_path"] = orig_path
        result["path"] = path
        result["import_time"] = datetime.datetime.now().isoformat()
        result["number_of_files"] = len(files)
        result["bundle_name"] = os.path.basename(result["path"])
        result["orig_bundle_name"] = orig_bundle_name
        result["is_onboarded"] = is_onboarded

        if sum_size is None:
            sum_size = 0
            for f in files.values():
                sum_size = sum_size + f.size
        result["size"] = sum_size

        return KiaraFileBundle(**result)

    _file_bundle_hash: typing.Optional[str] = PrivateAttr(default=None)

    orig_bundle_name: str = Field(
        description="The original name of this folder at the time of import."
    )
    bundle_name: str = Field(description="The name of this bundle.")
    orig_path: typing.Optional[str] = Field(
        description="The original path to this folder at the time of import.",
        default=None,
    )
    import_time: str = Field(description="The time when the file was imported.")
    number_of_files: int = Field(
        description="How many files are included in this bundle."
    )
    included_files: typing.Dict[str, KiaraFile] = Field(
        description="A map of all the included files, incl. their properties. Uses the relative path of each file as key."
    )
    size: int = Field(description="The size of all files in this folder, combined.")
    path: str = Field(description="The archive path of the folder.")
    is_onboarded: bool = Field(
        description="Whether this bundle is imported into the kiara data store.",
        default=False,
    )

    def get_id(self) -> str:
        return self.path

    def get_category_alias(self) -> str:
        return "instance.metadata.file_bundle"

    def get_relative_path(self, file: KiaraFile):

        return os.path.relpath(file.path, self.path)

    def read_text_file_contents(
        self, ignore_errors: bool = False
    ) -> typing.Mapping[str, str]:

        content_dict: typing.Dict[str, str] = {}

        def read_file(rel_path: str, fm: KiaraFile):
            with open(fm.path, encoding="utf-8") as f:
                try:
                    content = f.read()
                    content_dict[rel_path] = content  # type: ignore
                except Exception as e:
                    if ignore_errors:
                        log_message(f"Can't read file: {e}")
                        log.warning(f"Ignoring file: {fm.path}")
                    else:
                        raise Exception(f"Can't read file (as text) '{fm.path}: {e}")

        # TODO: common ignore files and folders
        for f in self.included_files.values():
            rel_path = self.get_relative_path(f)
            read_file(rel_path=rel_path, fm=f)

        return content_dict

    @property
    def file_bundle_hash(self):

        if self._file_bundle_hash is not None:
            return self._file_bundle_hash

        # hash_format ="sha3-256"

        hashes = ""
        for path in sorted(self.included_files.keys()):
            fm = self.included_files[path]
            hashes = hashes + "_" + path + "_" + fm.file_hash

        self._file_bundle_hash = hashlib.sha3_256(hashes.encode("utf-8")).hexdigest()
        return self._file_bundle_hash

    def copy_bundle(
        self, target_path: str, incl_orig_path: bool = False, is_onboarded: bool = False
    ) -> "KiaraFileBundle":

        if target_path == self.path:
            raise Exception(f"Target path and current path are the same: {target_path}")

        result = {}
        for rel_path, item in self.included_files.items():
            _target_path = os.path.join(target_path, rel_path)
            new_fm = item.copy_file(_target_path, is_onboarded=is_onboarded)
            result[rel_path] = new_fm

        if incl_orig_path:
            orig_path = self.orig_path
        else:
            orig_path = None
        fb = KiaraFileBundle.create_from_file_models(
            result,
            orig_bundle_name=self.orig_bundle_name,
            orig_path=orig_path,
            path=target_path,
            sum_size=self.size,
            is_onboarded=is_onboarded,
        )
        if self._file_bundle_hash is not None:
            fb._file_bundle_hash = self._file_bundle_hash

        return fb

    def __repr__(self):
        return f"FileBundle(name={self.bundle_name})"

    def __str__(self):
        return self.__repr__()

bundle_name: str pydantic-field required

The name of this bundle.

import_time: str pydantic-field required

The time when the file was imported.

included_files: Dict[str, kiara_modules.core.metadata_models.KiaraFile] pydantic-field required

A map of all the included files, incl. their properties. Uses the relative path of each file as key.

is_onboarded: bool pydantic-field

Whether this bundle is imported into the kiara data store.

number_of_files: int pydantic-field required

How many files are included in this bundle.

orig_bundle_name: str pydantic-field required

The original name of this folder at the time of import.

orig_path: str pydantic-field

The original path to this folder at the time of import.

path: str pydantic-field required

The archive path of the folder.

size: int pydantic-field required

The size of all files in this folder, combined.

__repr__(self) special

Return repr(self).

Source code in core/metadata_models.py
def __repr__(self):
    return f"FileBundle(name={self.bundle_name})"

__str__(self) special

Return str(self).

Source code in core/metadata_models.py
def __str__(self):
    return self.__repr__()

TableMetadata (HashedMetadataModel) pydantic-model

Describes properties for the 'table' data type.

Source code in core/metadata_models.py
class TableMetadata(HashedMetadataModel):
    """Describes properties for the 'table' data type."""

    _metadata_key: typing.ClassVar[str] = "table"

    column_names: typing.List[str] = Field(
        description="The name of the columns of the table."
    )
    column_schema: typing.Dict[str, ColumnSchema] = Field(
        description="The schema description of the table."
    )
    rows: int = Field(description="The number of rows the table contains.")
    size: typing.Optional[int] = Field(
        description="The tables size in bytes.", default=None
    )

    def _obj_to_hash(self) -> typing.Any:

        return {
            "column_names": self.column_names,
            "column_schemas": {k: v.dict() for k, v in self.column_schema.items()},
            "rows": self.rows,
            "size": self.size,
        }

    def get_category_alias(self) -> str:
        return "instance.metadata.table"

column_names: List[str] pydantic-field required

The name of the columns of the table.

column_schema: Dict[str, kiara_modules.core.metadata_models.ColumnSchema] pydantic-field required

The schema description of the table.

rows: int pydantic-field required

The number of rows the table contains.

size: int pydantic-field

The tables size in bytes.

pipelines special

Virtual module that is used as base for PipelineModule classes that are auto-generated from pipeline descriptions under this folder.

string

DeserializeStringModule (KiaraModule)

Source code in core/string.py
class DeserializeStringModule(KiaraModule):

    _module_type_name = "deserialize"
    _config_cls = DeserializeStringModuleConfig

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {
            "serialized": {
                "type": "string",
                "doc": "The serialized form of the string.",
            }
        }

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {"value_item": {"type": "string", "doc": "The string data."}}

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        serialization_type = self.get_config_value("serialization_type")
        if serialization_type not in ["json"]:
            raise KiaraProcessingException(
                f"Can't deserialize string: serialisation type '{serialization_type}' not supported."
            )

        serialized = inputs.get_value_data("serialized")
        outputs.set_value("value_item", serialized)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/string.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {
        "serialized": {
            "type": "string",
            "doc": "The serialized form of the string.",
        }
    }

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/string.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {"value_item": {"type": "string", "doc": "The string data."}}

DeserializeStringModuleConfig (ModuleTypeConfigSchema) pydantic-model

Source code in core/string.py
class DeserializeStringModuleConfig(ModuleTypeConfigSchema):

    serialization_type: str = Field(
        description="The serialization type that was used to serialize the value."
    )

serialization_type: str pydantic-field required

The serialization type that was used to serialize the value.

RegexModule (KiaraModule)

Match a string using a regular expression.

Source code in core/string.py
class RegexModule(KiaraModule):
    """Match a string using a regular expression."""

    _config_cls = RegexModuleConfig
    _module_type_name = "match_regex"

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {"text": {"type": "string", "doc": "The text to match."}}

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        if self.get_config_value("only_first_match"):
            output_schema = {"text": {"type": "string", "doc": "The first match."}}
        else:
            raise NotImplementedError()

        return output_schema

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        text = inputs.get_value_data("text")
        regex = self.get_config_value("regex")
        matches = re.findall(regex, text)

        if not matches:
            raise KiaraProcessingException(f"No match for regex: {regex}")

        if self.get_config_value("only_first_match"):
            result = matches[0]
        else:
            result = matches

        outputs.set_value("text", result)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/string.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {"text": {"type": "string", "doc": "The text to match."}}

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/string.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    if self.get_config_value("only_first_match"):
        output_schema = {"text": {"type": "string", "doc": "The first match."}}
    else:
        raise NotImplementedError()

    return output_schema

RegexModuleConfig (ModuleTypeConfigSchema) pydantic-model

Source code in core/string.py
class RegexModuleConfig(ModuleTypeConfigSchema):

    regex: str = Field(description="The regex to apply.")
    only_first_match: bool = Field(
        description="Whether to only return the first match, or all matches.",
        default=False,
    )

only_first_match: bool pydantic-field

Whether to only return the first match, or all matches.

regex: str pydantic-field required

The regex to apply.

ReplaceModuleConfig (ModuleTypeConfigSchema) pydantic-model

Source code in core/string.py
class ReplaceModuleConfig(ModuleTypeConfigSchema):

    replacement_map: typing.Dict[str, str] = Field(
        description="A map, containing the strings to be replaced as keys, and the replacements as values."
    )
    default_value: typing.Optional[str] = Field(
        description="The default value to use if the string to be replaced is not in the replacement map. By default, this just returns the string itself.",
        default=None,
    )

default_value: str pydantic-field

The default value to use if the string to be replaced is not in the replacement map. By default, this just returns the string itself.

replacement_map: Dict[str, str] pydantic-field required

A map, containing the strings to be replaced as keys, and the replacements as values.

ReplaceStringModule (KiaraModule)

Replace a string if it matches a key in a mapping dictionary.

Source code in core/string.py
class ReplaceStringModule(KiaraModule):
    """Replace a string if it matches a key in a mapping dictionary."""

    _config_cls = ReplaceModuleConfig
    _module_type_name = "replace"

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {"text": {"type": "string", "doc": "The input string."}}

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {"text": {"type": "string", "doc": "The replaced string."}}

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        text = inputs.get_value_data("text")
        repl_map = self.get_config_value("replacement_map")
        default = self.get_config_value("default_value")

        if text not in repl_map.keys():
            if default is None:
                result = text
            else:
                result = default
        else:
            result = repl_map[text]

        outputs.set_value("text", result)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/string.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {"text": {"type": "string", "doc": "The input string."}}

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/string.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {"text": {"type": "string", "doc": "The replaced string."}}

StringManipulationModule (KiaraModule)

Base module to simplify creating other modules that do string manipulation.

Source code in core/string.py
class StringManipulationModule(KiaraModule):
    """Base module to simplify creating other modules that do string manipulation."""

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {"text": {"type": "string", "doc": "The input string."}}

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {"text": {"type": "string", "doc": "The processed string."}}

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        input_string = inputs.get_value_data("text")
        result = self.process_string(input_string)
        outputs.set_value("text", result)

    @abstractmethod
    def process_string(self, text: str) -> str:
        pass

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/string.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {"text": {"type": "string", "doc": "The input string."}}

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/string.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {"text": {"type": "string", "doc": "The processed string."}}

table special

ConvertToTableModule (CreateValueModule)

Create an Arrow table from files, file_bundles, etc.

This module supportes two conversion targets currently:

  • bytes: a memoryview of the byte-representation of the Table
  • string: the base64-encoded byte-representation of the Table
Source code in core/table/__init__.py
class ConvertToTableModule(CreateValueModule):
    """Create an Arrow table from files, file_bundles, etc.

    This module supportes two conversion targets currently:

     - bytes: a memoryview of the byte-representation of the Table
     - string: the base64-encoded byte-representation of the Table
    """

    _module_type_name = "create"
    _config_cls = TableConversionModuleConfig

    @classmethod
    def get_target_value_type(cls) -> str:
        return "table"

    # def to_bytes(self, value: Value) -> bytes:
    #
    #     import pyarrow as pa
    #
    #     table_val: Value = value
    #     table: pa.Table = table_val.get_value_data()
    #
    #     batches = table.to_batches()
    #
    #     sink = pa.BufferOutputStream()
    #     writer = pa.ipc.new_stream(sink, batches[0].schema)
    #
    #     for batch in batches:
    #         writer.write_batch(batch)
    #     writer.close()
    #
    #     buf: pa.Buffer = sink.getvalue()
    #     return memoryview(buf)
    #
    # def to_string(self, value: Value):
    #
    #     _bytes: bytes = self.to_bytes(value)
    #     string = base64.b64encode(_bytes)
    #     return string.decode()

    # def from_bytes(self, value: Value):
    #     raise NotImplementedError()
    #
    # def from_string(self, value: Value):
    #     raise NotImplementedError()

    def from_csv_file(self, value: Value):

        from pyarrow import csv

        input_file: KiaraFile = value.get_value_data()
        imported_data = csv.read_csv(input_file.path)
        return imported_data

    def from_text_file_bundle(self, value: Value):

        import pyarrow as pa

        bundle: KiaraFileBundle = value.get_value_data()

        columns = FILE_BUNDLE_IMPORT_AVAILABLE_COLUMNS

        ignore_errors = self.get_config_value("ignore_errors")
        file_dict = bundle.read_text_file_contents(ignore_errors=ignore_errors)

        tabular: typing.Dict[str, typing.List[typing.Any]] = {}
        for column in columns:
            for index, rel_path in enumerate(sorted(file_dict.keys())):

                if column == "content":
                    _value: typing.Any = file_dict[rel_path]
                elif column == "id":
                    _value = index
                elif column == "rel_path":
                    _value = rel_path
                else:
                    file_model = bundle.included_files[rel_path]
                    _value = getattr(file_model, column)

                tabular.setdefault(column, []).append(_value)

        table = pa.Table.from_pydict(tabular)
        return table

CutColumnModule (KiaraModule)

Cut off one column from a table, returning an array.

Source code in core/table/__init__.py
class CutColumnModule(KiaraModule):
    """Cut off one column from a table, returning an array."""

    _module_type_name = "cut_column"

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        inputs: typing.Mapping[str, typing.Any] = {
            "table": {"type": "table", "doc": "A table."},
            "column_name": {
                "type": "string",
                "doc": "The name of the column to extract.",
            },
        }
        return inputs

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        outputs: typing.Mapping[str, typing.Any] = {
            "array": {"type": "array", "doc": "The column."}
        }
        return outputs

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        import pyarrow as pa

        table_value = inputs.get_value_obj("table")

        column_name: str = inputs.get_value_data("column_name")
        available = table_value.get_metadata("table")["table"]["column_names"]
        if column_name not in available:
            raise KiaraProcessingException(
                f"Invalid column name '{column_name}'. Available column names: {available}"
            )

        table: pa.Table = inputs.get_value_data("table")
        column = table.column(column_name)

        outputs.set_value("array", column)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    inputs: typing.Mapping[str, typing.Any] = {
        "table": {"type": "table", "doc": "A table."},
        "column_name": {
            "type": "string",
            "doc": "The name of the column to extract.",
        },
    }
    return inputs

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    outputs: typing.Mapping[str, typing.Any] = {
        "array": {"type": "array", "doc": "The column."}
    }
    return outputs

ExportArrowTable (KiaraModule)

Export a table object to disk.

Source code in core/table/__init__.py
class ExportArrowTable(KiaraModule):
    """Export a table object to disk."""

    _module_type_name = "export"

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        inputs: typing.Mapping[str, typing.Any] = {
            "table": {"type": "table", "doc": "The table object."},
            "path": {
                "type": "string",
                "doc": "The path to the file to write.",
            },
            "format": {
                "type": "string",
                "doc": "The format of the table file ('feather' or 'parquet').",
                "default": "feather",
            },
            "force_overwrite": {
                "type": "boolean",
                "doc": "Whether to overwrite an existing file.",
                "default": False,
            },
            "compression": {
                "type": "string",
                "doc": "The compression to use. Use either: 'zstd' (default), 'lz4', or 'uncompressed'.",
                "default": "zstd",
            },
        }
        return inputs

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        outputs: typing.Mapping[str, typing.Any] = {
            "load_config": {
                "type": "load_config",
                "doc": "The configuration to use with kiara to load the saved value.",
            }
        }

        return outputs

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        import pyarrow as pa
        from pyarrow import feather

        table: pa.Table = inputs.get_value_data("table")
        full_path: str = inputs.get_value_data("path")
        force_overwrite = inputs.get_value_data("force_overwrite")
        format: str = inputs.get_value_data("format")
        compression = inputs.get_value_data("compression")

        if compression not in ["zstd", "lz4", "uncompressed"]:
            raise KiaraProcessingException(
                f"Invalid compression format '{compression}'. Allowed: 'zstd', 'lz4', 'uncompressed'."
            )

        if format != "feather":
            raise KiaraProcessingException(
                f"Can't export table to format '{format}': only 'feather' supported at the moment."
            )

        if os.path.exists(full_path) and not force_overwrite:
            raise KiaraProcessingException(
                f"Can't write table to file, file already exists: {full_path}"
            )

        os.makedirs(os.path.dirname(full_path), exist_ok=True)
        feather.write_feather(table, full_path, compression=compression)

        result = {
            "module_type": "table.load",
            "base_path_input_name": "base_path",
            "inputs": {
                "base_path": os.path.dirname(full_path),
                "rel_path": os.path.basename(full_path),
                "format": format,
            },
            "value_id": NO_VALUE_ID_MARKER,
            "output_name": "table",
        }
        outputs.set_value("load_config", result)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    inputs: typing.Mapping[str, typing.Any] = {
        "table": {"type": "table", "doc": "The table object."},
        "path": {
            "type": "string",
            "doc": "The path to the file to write.",
        },
        "format": {
            "type": "string",
            "doc": "The format of the table file ('feather' or 'parquet').",
            "default": "feather",
        },
        "force_overwrite": {
            "type": "boolean",
            "doc": "Whether to overwrite an existing file.",
            "default": False,
        },
        "compression": {
            "type": "string",
            "doc": "The compression to use. Use either: 'zstd' (default), 'lz4', or 'uncompressed'.",
            "default": "zstd",
        },
    }
    return inputs

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    outputs: typing.Mapping[str, typing.Any] = {
        "load_config": {
            "type": "load_config",
            "doc": "The configuration to use with kiara to load the saved value.",
        }
    }

    return outputs

LoadArrowTable (KiaraModule)

Load a table object from disk.

Source code in core/table/__init__.py
class LoadArrowTable(KiaraModule):
    """Load a table object from disk."""

    _module_type_name = "load"

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        inputs: typing.Mapping[str, typing.Any] = {
            "base_path": {
                "type": "string",
                "doc": "The path to the folder that contains the table file.",
            },
            "rel_path": {
                "type": "string",
                "doc": "The relative path to the table file within base_path.",
            },
            "format": {
                "type": "string",
                "doc": "The format of the table file ('feather' or 'parquet').",
                "default": "feather",
            },
        }
        return inputs

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        outputs: typing.Mapping[str, typing.Any] = {
            "table": {"type": "table", "doc": "The pyarrow table object."}
        }
        return outputs

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        from pyarrow import feather

        base_path = inputs.get_value_data("base_path")
        rel_path = inputs.get_value_data("rel_path")
        format = inputs.get_value_data("format")

        if format != "feather":
            raise NotImplementedError()

        path = os.path.join(base_path, rel_path)

        table = feather.read_table(path)
        outputs.set_value("table", table)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    inputs: typing.Mapping[str, typing.Any] = {
        "base_path": {
            "type": "string",
            "doc": "The path to the folder that contains the table file.",
        },
        "rel_path": {
            "type": "string",
            "doc": "The relative path to the table file within base_path.",
        },
        "format": {
            "type": "string",
            "doc": "The format of the table file ('feather' or 'parquet').",
            "default": "feather",
        },
    }
    return inputs

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    outputs: typing.Mapping[str, typing.Any] = {
        "table": {"type": "table", "doc": "The pyarrow table object."}
    }
    return outputs

MapColumnModule (KiaraModule)

Map the items of one column of a table onto an array, using another module.

Source code in core/table/__init__.py
class MapColumnModule(KiaraModule):
    """Map the items of one column of a table onto an array, using another module."""

    _config_cls = MapColumnsModuleConfig
    _module_type_name = "map_column"

    def module_instance_doc(self) -> str:

        config: MapColumnsModuleConfig = self.config  # type: ignore

        module_type = config.module_type
        module_config = config.module_config

        m = self._kiara.create_module(
            id="map_column_child", module_type=module_type, module_config=module_config
        )
        type_md = m.get_type_metadata()
        doc = type_md.documentation.full_doc
        link = type_md.context.get_url_for_reference("module_doc")
        if not link:
            link_str = f"``{module_type}``"
        else:
            link_str = f"[``{module_type}``]({link})"

        result = f"""Map the values of the rows of an input table onto a new array of the same length, using the {link_str} module."""

        if doc and doc != "-- n/a --":
            result = result + f"\n\n``{module_type}`` documentation:\n\n{doc}"
        return result

    def __init__(self, *args, **kwargs):

        self._child_module: typing.Optional[KiaraModule] = None
        self._module_input_name: typing.Optional[str] = None
        self._module_output_name: typing.Optional[str] = None
        super().__init__(*args, **kwargs)

    @property
    def child_module(self) -> KiaraModule:

        if self._child_module is not None:
            return self._child_module

        module_name = self.get_config_value("module_type")
        module_config = self.get_config_value("module_config")
        self._child_module = self._kiara.create_module(
            module_type=module_name, module_config=module_config
        )
        return self._child_module

    @property
    def module_input_name(self) -> str:

        if self._module_input_name is not None:
            return self._module_input_name

        self._module_input_name = self.get_config_value("input_name")
        if self._module_input_name is None:
            if len(list(self.child_module.input_names)) == 1:
                self._module_input_name = next(iter(self.child_module.input_names))
            else:
                raise KiaraProcessingException(
                    f"No 'input_name' specified, and configured module has more than one inputs. Please specify an 'input_name' value in your module config, pick one of: {', '.join(self.child_module.input_names)}"
                )

        return self._module_input_name

    @property
    def module_output_name(self) -> str:

        if self._module_output_name is not None:
            return self._module_output_name

        self._module_output_name = self.get_config_value("output_name")
        if self._module_output_name is None:
            if len(list(self.child_module.output_names)) == 1:
                self._module_output_name = next(iter(self.child_module.output_names))
            else:
                raise KiaraProcessingException(
                    f"No 'output_name' specified, and configured module has more than one outputs. Please specify an 'output_name' value in your module config, pick one of: {', '.join(self.child_module.output_names)}"
                )

        return self._module_output_name

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        inputs: typing.Dict[
            str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
        ] = {
            "table": {
                "type": "table",
                "doc": "The table to use as input.",
            },
            "column_name": {
                "type": "string",
                "doc": "The name of the table column to run the mapping operation on.",
            },
        }
        for input_name, schema in self.child_module.input_schemas.items():
            assert input_name != "table"
            assert input_name != "column_name"
            if input_name == self.module_input_name:
                continue
            inputs[input_name] = schema
        return inputs

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        outputs = {
            "array": {
                "type": "array",
                "doc": "An array of equal length to the input array, containing the 'mapped' values.",
            }
        }
        return outputs

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        import pyarrow as pa

        table: pa.Table = inputs.get_value_data("table")
        column_name = inputs.get_value_data("column_name")

        if column_name not in table.column_names:
            raise KiaraProcessingException(
                f"Table column '{column_name}' not available in table. Available columns: {', '.join(table.column_names)}."
            )

        input_array: pa.Array = table.column(column_name)

        init_data: typing.Dict[str, typing.Any] = {}
        for input_name in self.input_schemas.keys():
            if input_name in ["table", "column_name", self.module_input_name]:
                continue

            init_data[input_name] = inputs.get_value_obj(input_name)

        result_list = map_with_module(
            input_array,
            module_input_name=self.module_input_name,
            module_obj=self.child_module,
            init_data=init_data,
            module_output_name=self.module_output_name,
        )
        outputs.set_value("array", pa.array(result_list))

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    inputs: typing.Dict[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ] = {
        "table": {
            "type": "table",
            "doc": "The table to use as input.",
        },
        "column_name": {
            "type": "string",
            "doc": "The name of the table column to run the mapping operation on.",
        },
    }
    for input_name, schema in self.child_module.input_schemas.items():
        assert input_name != "table"
        assert input_name != "column_name"
        if input_name == self.module_input_name:
            continue
        inputs[input_name] = schema
    return inputs

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    outputs = {
        "array": {
            "type": "array",
            "doc": "An array of equal length to the input array, containing the 'mapped' values.",
        }
    }
    return outputs

module_instance_doc(self)

Return documentation for this instance of the module.

If not overwritten, will return this class' method doc().

Source code in core/table/__init__.py
def module_instance_doc(self) -> str:

    config: MapColumnsModuleConfig = self.config  # type: ignore

    module_type = config.module_type
    module_config = config.module_config

    m = self._kiara.create_module(
        id="map_column_child", module_type=module_type, module_config=module_config
    )
    type_md = m.get_type_metadata()
    doc = type_md.documentation.full_doc
    link = type_md.context.get_url_for_reference("module_doc")
    if not link:
        link_str = f"``{module_type}``"
    else:
        link_str = f"[``{module_type}``]({link})"

    result = f"""Map the values of the rows of an input table onto a new array of the same length, using the {link_str} module."""

    if doc and doc != "-- n/a --":
        result = result + f"\n\n``{module_type}`` documentation:\n\n{doc}"
    return result

MapColumnsModuleConfig (ModuleTypeConfigSchema) pydantic-model

Source code in core/table/__init__.py
class MapColumnsModuleConfig(ModuleTypeConfigSchema):

    module_type: str = Field(
        description="The name of the kiara module to use to filter the input data."
    )
    module_config: typing.Optional[typing.Dict[str, typing.Any]] = Field(
        description="The config for the kiara filter module.", default_factory=dict
    )
    input_name: typing.Optional[str] = Field(
        description="The name of the input name of the module which will receive the rows from our input table. Can be omitted if the configured module only has a single input.",
        default=None,
    )
    output_name: typing.Optional[str] = Field(
        description="The name of the output name of the module which will receive the items from our input array. Can be omitted if the configured module only has a single output.",
        default=None,
    )

input_name: str pydantic-field

The name of the input name of the module which will receive the rows from our input table. Can be omitted if the configured module only has a single input.

module_config: Dict[str, Any] pydantic-field

The config for the kiara filter module.

module_type: str pydantic-field required

The name of the kiara module to use to filter the input data.

output_name: str pydantic-field

The name of the output name of the module which will receive the items from our input array. Can be omitted if the configured module only has a single output.

MergeTableModule (KiaraModule)

Create a table from other tables and/or arrays.

Source code in core/table/__init__.py
class MergeTableModule(KiaraModule):
    """Create a table from other tables and/or arrays."""

    _module_type_name = "merge"
    _config_cls = MergeTableModuleConfig

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        input_schema_dict = self.get_config_value("input_schema")
        return input_schema_dict

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        outputs = {
            "table": {
                "type": "table",
                "doc": "The merged table, including all source tables and columns.",
            }
        }
        return outputs

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        import pyarrow as pa

        input_schema: typing.Dict[str, typing.Any] = self.get_config_value(
            "input_schema"
        )

        sources = {}
        for field_name in input_schema.keys():
            sources[field_name] = inputs.get_value_data(field_name)

        len_dict = {}
        arrays = []
        column_names = []
        for source_key, table_or_column in sources.items():

            if isinstance(table_or_column, pa.Table):
                rows = table_or_column.num_rows
                for name in table_or_column.schema.names:
                    column = table_or_column.column(name)
                    arrays.append(column)
                    column_names.append(name)

            elif isinstance(table_or_column, (pa.Array, pa.ChunkedArray)):
                rows = len(table_or_column)
                arrays.append(table_or_column)
                column_names.append(source_key)
            else:
                raise KiaraProcessingException(
                    f"Can't merge table: invalid type '{type(table_or_column)}' for source '{source_key}'."
                )

            len_dict[source_key] = rows

        all_rows = None
        for source_key, rows in len_dict.items():
            if all_rows is None:
                all_rows = rows
            else:
                if all_rows != rows:
                    all_rows = None
                    break

        if all_rows is None:
            len_str = ""
            for name, rows in len_dict.items():
                len_str = f" {name} ({rows})"

            raise KiaraProcessingException(
                f"Can't merge table, sources have different lengths:{len_str}"
            )

        table = pa.Table.from_arrays(arrays=arrays, names=column_names)

        outputs.set_value("table", table)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    input_schema_dict = self.get_config_value("input_schema")
    return input_schema_dict

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    outputs = {
        "table": {
            "type": "table",
            "doc": "The merged table, including all source tables and columns.",
        }
    }
    return outputs

MergeTableModuleConfig (ModuleTypeConfigSchema) pydantic-model

Source code in core/table/__init__.py
class MergeTableModuleConfig(ModuleTypeConfigSchema):

    input_schema: typing.Dict[str, typing.Any] = Field(
        description="A dict describing the inputs for this merge process."
    )

input_schema: Dict[str, Any] pydantic-field required

A dict describing the inputs for this merge process.

SampleTableModule (SampleValueModule)

Sample a table.

Samples are used to randomly select a subset of a dataset, which helps test queries and workflows on smaller versions of the original data, to adjust parameters before a full run.

Source code in core/table/__init__.py
class SampleTableModule(SampleValueModule):
    """Sample a table.

    Samples are used to randomly select a subset of a dataset, which helps test queries and workflows on smaller versions
    of the original data, to adjust parameters before a full run.
    """

    _module_type_name = "sample"

    @classmethod
    def get_value_type(cls) -> str:
        return "table"

    # def create_input_schema(
    #     self,
    # ) -> typing.Mapping[
    #     str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    # ]:
    #
    #     return {
    #         "table": {"type": "table", "doc": "The table to sample data from."},
    #         "sample_size": {
    #             "type": "integer",
    #             "doc": "The percentage or number of rows to sample (depending on 'sample_unit' input).",
    #         }
    #     }
    #
    # def create_output_schema(
    #     self,
    # ) -> typing.Mapping[
    #     str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    # ]:
    #
    #     return {"sampled_table": {"type": "table", "doc": "A sampled table."}}

    def sample_percent(self, value: Value, sample_size: int):

        import duckdb
        import pyarrow as pa

        table: pa.Table = value.get_value_data()

        if sample_size >= 100:
            return table

        query = f"SELECT * FROM data USING SAMPLE {sample_size} PERCENT (bernoulli);"

        rel_from_arrow = duckdb.arrow(table)
        result: duckdb.DuckDBPyResult = rel_from_arrow.query("data", query)

        result_table: pa.Table = result.fetch_arrow_table()
        return result_table

    def sample_rows(self, value: Value, sample_size: int):

        import duckdb
        import pyarrow as pa

        table: pa.Table = value.get_value_data()

        if sample_size >= len(table):
            return table

        query = f"SELECT * FROM data USING SAMPLE {sample_size};"

        rel_from_arrow = duckdb.arrow(table)
        result: duckdb.DuckDBPyResult = rel_from_arrow.query("data", query)

        result_table: pa.Table = result.fetch_arrow_table()
        return result_table

    def sample_rows_from_start(self, value: Value, sample_size: int):

        import pyarrow as pa

        table: pa.Table = value.get_value_data()

        if sample_size >= len(table):
            return table

        return table.slice(0, sample_size)

    def sample_rows_to_end(self, value: Value, sample_size: int):

        import pyarrow as pa

        table: pa.Table = value.get_value_data()

        if sample_size >= len(table):
            return table

        return table.slice(len(table) - sample_size)

get_value_type() classmethod

Return the value type for this sample module.

Source code in core/table/__init__.py
@classmethod
def get_value_type(cls) -> str:
    return "table"

SaveArrowTableConfig (StoreValueModuleConfig) pydantic-model

Source code in core/table/__init__.py
class SaveArrowTableConfig(StoreValueModuleConfig):

    compression: str = Field(
        description="The compression to use when saving the table.", default="zstd"
    )

compression: str pydantic-field

The compression to use when saving the table.

StoreArrowTable (StoreValueTypeModule)

Source code in core/table/__init__.py
class StoreArrowTable(StoreValueTypeModule):

    _config_cls = SaveArrowTableConfig
    _module_type_name = "store"

    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return "table"

    def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:

        import pyarrow as pa
        from pyarrow import feather

        table: pa.Table = value.get_value_data()
        full_path: str = os.path.join(base_path, DEFAULT_SAVE_TABLE_FILE_NAME)

        if os.path.exists(full_path):
            raise KiaraProcessingException(
                f"Can't save table, file already exists: {full_path}"
            )

        os.makedirs(os.path.dirname(full_path), exist_ok=True)

        compression = self.get_config_value("compression")

        feather.write_feather(table, full_path, compression=compression)

        result = {
            "module_type": "table.load",
            "base_path_input_name": "base_path",
            "inputs": {
                "base_path": os.path.dirname(full_path),
                "rel_path": os.path.basename(full_path),
                "format": "feather",
            },
            "output_name": "table",
        }
        return result

store_value(self, value, base_path)

Save the value, and return the load config needed to load it again.

Source code in core/table/__init__.py
def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:

    import pyarrow as pa
    from pyarrow import feather

    table: pa.Table = value.get_value_data()
    full_path: str = os.path.join(base_path, DEFAULT_SAVE_TABLE_FILE_NAME)

    if os.path.exists(full_path):
        raise KiaraProcessingException(
            f"Can't save table, file already exists: {full_path}"
        )

    os.makedirs(os.path.dirname(full_path), exist_ok=True)

    compression = self.get_config_value("compression")

    feather.write_feather(table, full_path, compression=compression)

    result = {
        "module_type": "table.load",
        "base_path_input_name": "base_path",
        "inputs": {
            "base_path": os.path.dirname(full_path),
            "rel_path": os.path.basename(full_path),
            "format": "feather",
        },
        "output_name": "table",
    }
    return result

TableConversionModuleConfig (CreateValueModuleConfig) pydantic-model

Source code in core/table/__init__.py
class TableConversionModuleConfig(CreateValueModuleConfig):

    ignore_errors: bool = Field(
        description="Whether to ignore convert errors and omit the failed items.",
        default=False,
    )

ignore_errors: bool pydantic-field

Whether to ignore convert errors and omit the failed items.

TableMetadataModule (ExtractMetadataModule)

Extract metadata from a table object.

Source code in core/table/__init__.py
class TableMetadataModule(ExtractMetadataModule):
    """Extract metadata from a table object."""

    _module_type_name = "metadata"

    @classmethod
    def _get_supported_types(cls) -> str:
        return "table"

    @classmethod
    def get_metadata_key(cls) -> str:
        return "table"

    def _get_metadata_schema(
        self, type: str
    ) -> typing.Union[str, typing.Type[BaseModel]]:
        return TableMetadata

    def extract_metadata(self, value: Value) -> typing.Mapping[str, typing.Any]:

        import pyarrow as pa

        table: pa.Table = value.get_value_data()
        table_schema = {}
        for name in table.schema.names:
            field = table.schema.field(name)
            md = field.metadata
            _type = field.type
            if not md:
                md = {
                    "arrow_type_id": _type.id,
                }
            _d = {
                "type_name": str(_type),
                "metadata": md,
            }
            table_schema[name] = _d

        return {
            "column_names": table.column_names,
            "column_schema": table_schema,
            "rows": table.num_rows,
            "size": table.nbytes,
        }

filter

CreateFilteredTableModule (KiaraModule)

Filter a table using a mask array.

Source code in core/table/filter.py
class CreateFilteredTableModule(KiaraModule):
    """Filter a table using a mask array."""

    _module_type_name = "with_mask"

    _config_cls = TableFilterModuleConfig

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        inputs = {
            "table": {"type": "table", "doc": "The table to filter."},
            "mask": {
                "type": "array",
                "doc": "An mask array of booleans of the same length as the table.",
            },
        }
        return inputs

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        outputs = {"table": {"type": "table", "doc": "The filtered table."}}
        return outputs

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        import pyarrow as pa

        input_table: pa.Table = inputs.get_value_data("table")
        filter_array: pa.Array = inputs.get_value_data("mask")

        filtered = input_table.filter(filter_array)

        outputs.set_value("table", filtered)
create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/table/filter.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    inputs = {
        "table": {"type": "table", "doc": "The table to filter."},
        "mask": {
            "type": "array",
            "doc": "An mask array of booleans of the same length as the table.",
        },
    }
    return inputs
create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/table/filter.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    outputs = {"table": {"type": "table", "doc": "The filtered table."}}
    return outputs

query

QueryTableGraphQL (KiaraModule)

Execute a graphql aggregation query against an (Arrow) table.

References

  • https://vaex.io/docs/example_graphql.html

Examples:

An example for a query could be:

{
  df(where: {
    Language: {_eq: "German"}
  } ) {

    row(limit: 10) {
      Label
      City
    }
  }
}
Source code in core/table/query.py
class QueryTableGraphQL(KiaraModule):
    """Execute a graphql aggregation query against an (Arrow) table.

    References:
        - https://vaex.io/docs/example_graphql.html

    Examples:
        An example for a query could be:

            {
              df(where: {
                Language: {_eq: "German"}
              } ) {

                row(limit: 10) {
                  Label
                  City
                }
              }
            }

    """

    _module_type_name = "graphql"

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        inputs: typing.Mapping[str, typing.Any] = {
            "table": {"type": "table", "doc": "The table to query."},
            "query": {"type": "string", "doc": "The query."},
        }

        return inputs

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        outputs: typing.Mapping[str, typing.Any] = {
            "query_result": {"type": "dict", "doc": "The query result."}
        }

        return outputs

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        import vaex

        table = inputs.get_value_data("table")
        query = inputs.get_value_data("query")

        df = vaex.from_arrow_table(table)

        result = df.graphql.execute(query)
        outputs.set_value("query_result", result.to_dict()["data"])
create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/table/query.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    inputs: typing.Mapping[str, typing.Any] = {
        "table": {"type": "table", "doc": "The table to query."},
        "query": {"type": "string", "doc": "The query."},
    }

    return inputs
create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/table/query.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    outputs: typing.Mapping[str, typing.Any] = {
        "query_result": {"type": "dict", "doc": "The query result."}
    }

    return outputs

QueryTableSQL (KiaraModule)

Execute a sql query against an (Arrow) table.

Source code in core/table/query.py
class QueryTableSQL(KiaraModule):
    """Execute a sql query against an (Arrow) table."""

    _module_type_name = "sql"
    _config_cls = QueryTableSQLModuleConfig

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        inputs = {
            "table": {
                "type": "table",
                "doc": "The table to query",
            }
        }

        if self.get_config_value("query") is None:
            inputs["query"] = {"type": "string", "doc": "The query."}
            inputs["relation_name"] = {
                "type": "string",
                "doc": "The name the table is referred to in the sql query.",
                "default": "data",
            }

        return inputs

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {"query_result": {"type": "table", "doc": "The query result."}}

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        import duckdb

        if self.get_config_value("query") is None:
            _query: str = inputs.get_value_data("query")
            _relation_name: str = inputs.get_value_data("relation_name")
        else:
            _query = self.get_config_value("query")
            _relation_name = self.get_config_value("relation_name")

        if _relation_name.upper() in RESERVED_SQL_KEYWORDS:
            raise KiaraProcessingException(
                f"Invalid relation name '{_relation_name}': this is a reserved sql keyword, please select a different name."
            )

        _table = inputs.get_value_data("table")
        rel_from_arrow = duckdb.arrow(_table)
        result: duckdb.DuckDBPyResult = rel_from_arrow.query(_relation_name, _query)

        outputs.set_value("query_result", result.fetch_arrow_table())
create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/table/query.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    inputs = {
        "table": {
            "type": "table",
            "doc": "The table to query",
        }
    }

    if self.get_config_value("query") is None:
        inputs["query"] = {"type": "string", "doc": "The query."}
        inputs["relation_name"] = {
            "type": "string",
            "doc": "The name the table is referred to in the sql query.",
            "default": "data",
        }

    return inputs
create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/table/query.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {"query_result": {"type": "table", "doc": "The query result."}}

QueryTableSQLModuleConfig (ModuleTypeConfigSchema) pydantic-model

Source code in core/table/query.py
class QueryTableSQLModuleConfig(ModuleTypeConfigSchema):

    query: typing.Optional[str] = Field(
        description="The query to execute. If not specified, the user will be able to provide their own.",
        default=None,
    )
    relation_name: typing.Optional[str] = Field(
        description="The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.",
        default="data",
    )
query: str pydantic-field

The query to execute. If not specified, the user will be able to provide their own.

relation_name: str pydantic-field

The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.

utils

create_sqlite_schema_data_from_arrow_table(table, column_map=None, index_columns=None, extra_column_info=None)

Create a sql schema statement from an Arrow table object.

Parameters:

Name Type Description Default
table pa.Table

the Arrow table object

required
column_map Optional[Mapping[str, str]]

a map that contains column names that should be changed in the new table

None
index_columns Optional[Iterable[str]]

a list of column names (after mapping) to create module_indexes for

None
extra_column_info Optional[Mapping[str, Iterable[str]]]

a list of extra schema instructions per column name (after mapping)

None
Source code in core/table/utils.py
def create_sqlite_schema_data_from_arrow_table(
    table: "pa.Table",
    column_map: typing.Optional[typing.Mapping[str, str]] = None,
    index_columns: typing.Optional[typing.Iterable[str]] = None,
    extra_column_info: typing.Optional[
        typing.Mapping[str, typing.Iterable[str]]
    ] = None,
) -> SqliteTableSchema:
    """Create a sql schema statement from an Arrow table object.

    Arguments:
        table: the Arrow table object
        column_map: a map that contains column names that should be changed in the new table
        index_columns: a list of column names (after mapping) to create module_indexes for
        extra_column_info: a list of extra schema instructions per column name (after mapping)
    """

    columns = convert_arrow_column_types_to_sqlite(table=table)

    if column_map is None:
        column_map = {}

    if extra_column_info is None:
        extra_column_info = {}

    temp: typing.Dict[str, typing.Dict[str, typing.Any]] = {}

    if index_columns is None:
        index_columns = []

    for cn, data in columns.items():
        if cn in column_map.keys():
            new_key = column_map[cn]
        else:
            new_key = cn
        temp_data = dict(data)
        if new_key in extra_column_info.keys():
            temp_data["extra_column_info"] = extra_column_info[new_key]
        else:
            temp_data["extra_column_info"] = [""]
        if cn in index_columns:
            temp_data["create_index"] = True
        temp[new_key] = temp_data

    columns = temp
    if not columns:
        raise Exception("Resulting table schema has no columns.")
    else:
        for ic in index_columns:
            if ic not in columns.keys():
                raise Exception(
                    f"Can't create schema, requested index column name not available: {ic}"
                )

    return SqliteTableSchema(columns=columns, column_map=column_map)

value

DataProfilerModule (KiaraModule)

Generate a data profile report for a dataset.

This uses the DataProfiler Python library, check out its documentation for more details.

Source code in core/value.py
class DataProfilerModule(KiaraModule):
    """Generate a data profile report for a dataset.

    This uses the [DataProfiler](https://capitalone.github.io/DataProfiler/docs/0.7.0/html/index.html) Python library,
    check out its documentation for more details.
    """

    _module_type_name = "data_profile"
    _config_cls = DataProfilerModuleConfig

    @classmethod
    def retrieve_module_profiles(
        cls, kiara: "Kiara"
    ) -> typing.Mapping[str, typing.Union[typing.Mapping[str, typing.Any], Operation]]:

        supported_source_types = ["table", "file"]

        doc = cls.get_type_metadata().documentation
        all_profiles = {}
        for sup_type in supported_source_types:

            op_config = {
                "module_type": cls._module_type_id,  # type: ignore
                "module_config": {"value_type": sup_type},
                "doc": doc,
            }
            all_profiles[f"profile.{sup_type}.data"] = op_config

        return all_profiles

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        inputs: typing.Mapping[str, typing.Mapping[str, typing.Any]] = {
            "item": {
                "type": self.get_config_value("value_type"),
                "doc": f"The {self.get_config_value('value_type')} to profile.",
            }
        }
        return inputs

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        outputs: typing.Mapping[str, typing.Mapping[str, typing.Any]] = {
            "report": {"type": "dict", "doc": "Statistics/details about the dataset."}
        }
        return outputs

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        import pyarrow as pa
        from dataprofiler import Data, Profiler, ProfilerOptions, set_verbosity

        set_verbosity(logging.WARNING)

        value_type = self.get_config_value("value_type")

        profile_options = ProfilerOptions()
        profile_options.structured_options.data_labeler.is_enabled = False
        profile_options.unstructured_options.data_labeler.is_enabled = False

        if value_type == "table":
            table_item: pa.Table = inputs.get_value_data("item")
            pd = table_item.to_pandas()
            profile = Profiler(
                pd, options=profile_options
            )  # Calculate Statistics, Entity Recognition, etc
            report = profile.report()

        elif value_type == "file":
            file_item: KiaraFile = inputs.get_value_data("item")
            data = Data(file_item.path)
            profile = Profiler(data, options=profile_options)
            report = profile.report()
        else:
            raise KiaraProcessingException(
                f"Data profiling of value type '{value_type}' not supported."
            )

        outputs.set_value("report", report)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/value.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    inputs: typing.Mapping[str, typing.Mapping[str, typing.Any]] = {
        "item": {
            "type": self.get_config_value("value_type"),
            "doc": f"The {self.get_config_value('value_type')} to profile.",
        }
    }
    return inputs

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/value.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    outputs: typing.Mapping[str, typing.Mapping[str, typing.Any]] = {
        "report": {"type": "dict", "doc": "Statistics/details about the dataset."}
    }
    return outputs

retrieve_module_profiles(kiara) classmethod

Retrieve a collection of profiles (pre-set module configs) for this kiara module type.

This is used to automatically create generally useful operations (incl. their ids).

Source code in core/value.py
@classmethod
def retrieve_module_profiles(
    cls, kiara: "Kiara"
) -> typing.Mapping[str, typing.Union[typing.Mapping[str, typing.Any], Operation]]:

    supported_source_types = ["table", "file"]

    doc = cls.get_type_metadata().documentation
    all_profiles = {}
    for sup_type in supported_source_types:

        op_config = {
            "module_type": cls._module_type_id,  # type: ignore
            "module_config": {"value_type": sup_type},
            "doc": doc,
        }
        all_profiles[f"profile.{sup_type}.data"] = op_config

    return all_profiles

DataProfilerModuleConfig (ModuleTypeConfigSchema) pydantic-model

Source code in core/value.py
class DataProfilerModuleConfig(ModuleTypeConfigSchema):

    value_type: str = Field(description="The value type to profile.")

value_type: str pydantic-field required

The value type to profile.

value_types

This module contains the value type classes that are used in the kiara_modules.core package.

ArrayType (AnyType)

An Apache arrow array.

Source code in core/value_types.py
class ArrayType(AnyType):
    """An Apache arrow array."""

    _value_type_name = "array"

    @classmethod
    def backing_python_type(cls) -> typing.Type:
        import pyarrow as pa

        return pa.Array

    @classmethod
    def candidate_python_types(cls) -> typing.Optional[typing.Iterable[typing.Type]]:
        import pyarrow as pa

        return [pa.ChunkedArray, pa.Table]

    def parse_value(self, value: typing.Any) -> typing.Any:

        import pyarrow as pa

        if isinstance(value, pa.Table):
            if len(value.columns) != 1:
                raise Exception(
                    f"Invalid type, only Arrow Arrays or single-column Tables allowed. This value is a table with {len(value.columns)} columns."
                )
            return value.column(0)

    def validate(cls, value: typing.Any) -> None:
        import pyarrow as pa

        if isinstance(value, pa.ChunkedArray):
            return value
        else:
            raise Exception(
                f"invalid type '{type(value).__name__}', must be '{pa.Array.__name__}'."
            )

    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:

        max_rows = print_config.get("max_no_rows")
        max_row_height = print_config.get("max_row_height")
        max_cell_length = print_config.get("max_cell_length")

        half_lines: typing.Optional[int] = None
        if max_rows:
            half_lines = int(max_rows / 2)

        array = value.get_value_data()
        import pyarrow as pa

        temp_table = pa.Table.from_arrays(arrays=[array], names=["array"])
        atw = ArrowTabularWrap(temp_table)
        result = [
            atw.pretty_print(
                rows_head=half_lines,
                rows_tail=half_lines,
                max_row_height=max_row_height,
                max_cell_length=max_cell_length,
            )
        ]
        return result

parse_value(self, value)

Parse a value into a supported python type.

This exists to make it easier to do trivial conversions (e.g. from a date string to a datetime object). If you choose to overwrite this method, make 100% sure that you don't change the meaning of the value, and try to avoid adding or removing information from the data (e.g. by changing the resolution of a date).

Parameters:

Name Type Description Default
v

the value

required

Returns:

Type Description
Any

'None', if no parsing was done and the original value should be used, otherwise return the parsed Python object

Source code in core/value_types.py
def parse_value(self, value: typing.Any) -> typing.Any:

    import pyarrow as pa

    if isinstance(value, pa.Table):
        if len(value.columns) != 1:
            raise Exception(
                f"Invalid type, only Arrow Arrays or single-column Tables allowed. This value is a table with {len(value.columns)} columns."
            )
        return value.column(0)

validate(cls, value)

Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).

Source code in core/value_types.py
def validate(cls, value: typing.Any) -> None:
    import pyarrow as pa

    if isinstance(value, pa.ChunkedArray):
        return value
    else:
        raise Exception(
            f"invalid type '{type(value).__name__}', must be '{pa.Array.__name__}'."
        )

BooleanType (AnyType)

A boolean.

Source code in core/value_types.py
class BooleanType(AnyType):
    "A boolean."

    _value_type_name = "boolean"

    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return bool

    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
        return str(hash(value))

    def validate(cls, value: typing.Any):
        if not isinstance(value, bool):
            # if isinstance(v, str):
            #     if v.lower() in ["true", "yes"]:
            #         v = True
            #     elif v.lower() in ["false", "no"]:
            #         v = False
            #     else:
            #         raise ValueError(f"Can't parse string into boolean: {v}")
            # else:
            raise ValueError(f"Invalid type '{type(value)}' for boolean: {value}")

    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:

        data = value.get_value_data()
        return [str(data)]

calculate_value_hash(value, hash_type) classmethod

Calculate the hash of this value.

If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.

Source code in core/value_types.py
@classmethod
def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
    return str(hash(value))

validate(cls, value)

Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).

Source code in core/value_types.py
def validate(cls, value: typing.Any):
    if not isinstance(value, bool):
        # if isinstance(v, str):
        #     if v.lower() in ["true", "yes"]:
        #         v = True
        #     elif v.lower() in ["false", "no"]:
        #         v = False
        #     else:
        #         raise ValueError(f"Can't parse string into boolean: {v}")
        # else:
        raise ValueError(f"Invalid type '{type(value)}' for boolean: {value}")

BytesType (AnyType)

An array of bytes.

Source code in core/value_types.py
class BytesType(AnyType):
    """An array of bytes."""

    _value_type_name = "bytes"

    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return bytes

    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
        return str(hash(value))

    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:

        data: bytes = value.get_value_data()
        return [data.decode()]

    # @classmethod
    # def get_operations(
    #     cls,
    # ) -> typing.Mapping[str, typing.Mapping[str, typing.Mapping[str, typing.Any]]]:
    #
    #     return {
    #         "save_value": {
    #             "default": {
    #                 "module_type": "bytes.save",
    #                 "input_name": "bytes",
    #             }
    #         }
    #     }

calculate_value_hash(value, hash_type) classmethod

Calculate the hash of this value.

If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.

Source code in core/value_types.py
@classmethod
def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
    return str(hash(value))

DatabaseType (ComplexModelType)

A database, containing one or several tables.

This is backed by sqlite databases.

Source code in core/value_types.py
class DatabaseType(ComplexModelType[KiaraDatabase]):
    """A database, containing one or several tables.

    This is backed by sqlite databases.
    """

    _value_type_name = "database"

    @classmethod
    def backing_model_type(self) -> typing.Type[KiaraDatabase]:
        return KiaraDatabase

    @classmethod
    def candidate_python_types(cls) -> typing.Optional[typing.Iterable[typing.Type]]:
        return [KiaraDatabase, str]

    def parse_value(self, value: typing.Any) -> typing.Any:

        if isinstance(value, str):
            # TODO: check path exists
            return KiaraDatabase(db_file_path=value)

        return value

    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:

        max_rows = print_config.get("max_no_rows")
        max_row_height = print_config.get("max_row_height")
        max_cell_length = print_config.get("max_cell_length")

        half_lines: typing.Optional[int] = None
        if max_rows:
            half_lines = int(max_rows / 2)

        db: KiaraDatabase = value.get_value_data()

        from sqlalchemy import inspect

        inspector = inspect(db.get_sqlalchemy_engine())
        result: typing.List[typing.Any] = [""]
        for table_name in inspector.get_table_names():
            atw = SqliteTabularWrap(db=db, table_name=table_name)
            pretty = atw.pretty_print(
                rows_head=half_lines,
                rows_tail=half_lines,
                max_row_height=max_row_height,
                max_cell_length=max_cell_length,
            )
            result.append(f"[b]Table[/b]: [i]{table_name}[/i]")
            result.append(pretty)

        return result

parse_value(self, value)

Parse a value into a supported python type.

This exists to make it easier to do trivial conversions (e.g. from a date string to a datetime object). If you choose to overwrite this method, make 100% sure that you don't change the meaning of the value, and try to avoid adding or removing information from the data (e.g. by changing the resolution of a date).

Parameters:

Name Type Description Default
v

the value

required

Returns:

Type Description
Any

'None', if no parsing was done and the original value should be used, otherwise return the parsed Python object

Source code in core/value_types.py
def parse_value(self, value: typing.Any) -> typing.Any:

    if isinstance(value, str):
        # TODO: check path exists
        return KiaraDatabase(db_file_path=value)

    return value

DateType (AnyType)

A date.

Internally, this will always be represented as a Python datetime object. Iff provided as input, it can also be as string, in which case the dateutils.parser.parse method will be used to parse the string into a datetime object.

Source code in core/value_types.py
class DateType(AnyType):
    """A date.

    Internally, this will always be represented as a Python ``datetime`` object. Iff provided as input, it can also
    be as string, in which case the [``dateutils.parser.parse``](https://dateutil.readthedocs.io/en/stable/parser.html#dateutil.parser.parse) method will be used to parse the string into a datetime object.
    """

    _value_type_name = "date"

    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return datetime.datetime

    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_typpe: str) -> str:

        return str(hash(value))

    def parse_value(self, v: typing.Any) -> typing.Any:

        from dateutil import parser

        if isinstance(v, str):
            d = parser.parse(v)
            return d
        elif isinstance(v, datetime.date):
            _d = datetime.datetime(year=v.year, month=v.month, day=v.day)
            return _d

        return None

    def validate(cls, value: typing.Any):
        assert isinstance(value, datetime.datetime)

    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:

        data = value.get_value_data()
        return [str(data)]

calculate_value_hash(value, hash_typpe) classmethod

Calculate the hash of this value.

If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.

Source code in core/value_types.py
@classmethod
def calculate_value_hash(cls, value: typing.Any, hash_typpe: str) -> str:

    return str(hash(value))

parse_value(self, v)

Parse a value into a supported python type.

This exists to make it easier to do trivial conversions (e.g. from a date string to a datetime object). If you choose to overwrite this method, make 100% sure that you don't change the meaning of the value, and try to avoid adding or removing information from the data (e.g. by changing the resolution of a date).

Parameters:

Name Type Description Default
v Any

the value

required

Returns:

Type Description
Any

'None', if no parsing was done and the original value should be used, otherwise return the parsed Python object

Source code in core/value_types.py
def parse_value(self, v: typing.Any) -> typing.Any:

    from dateutil import parser

    if isinstance(v, str):
        d = parser.parse(v)
        return d
    elif isinstance(v, datetime.date):
        _d = datetime.datetime(year=v.year, month=v.month, day=v.day)
        return _d

    return None

validate(cls, value)

Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).

Source code in core/value_types.py
def validate(cls, value: typing.Any):
    assert isinstance(value, datetime.datetime)

DictType (AnyType)

A dict-like object.

Source code in core/value_types.py
class DictType(AnyType):
    """A dict-like object."""

    _value_type_name = "dict"

    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return dict

    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:

        from deepdiff import DeepHash

        dh = DeepHash(value)
        return str(dh[value])

    def validate(cls, value: typing.Any) -> None:

        if not isinstance(value, typing.Mapping):
            raise ValueError(f"Invalid type '{type(value)}', not a mapping.")

    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:

        data = value.get_value_data()
        return [pprint.pformat(data)]

calculate_value_hash(value, hash_type) classmethod

Calculate the hash of this value.

If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.

Source code in core/value_types.py
@classmethod
def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:

    from deepdiff import DeepHash

    dh = DeepHash(value)
    return str(dh[value])

validate(cls, value)

Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).

Source code in core/value_types.py
def validate(cls, value: typing.Any) -> None:

    if not isinstance(value, typing.Mapping):
        raise ValueError(f"Invalid type '{type(value)}', not a mapping.")

FileBundleType (AnyType)

A representation of a set of files (folder, archive, etc.).

It is recommended to 'onboard' files before working with them, otherwise metadata consistency can not be guaranteed.

Source code in core/value_types.py
class FileBundleType(AnyType):
    """A representation of a set of files (folder, archive, etc.).

    It is recommended to 'onboard' files before working with them, otherwise metadata consistency can not be guaranteed.
    """

    _value_type_name = "file_bundle"

    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return KiaraFileBundle

    @classmethod
    def candidate_python_types(cls) -> typing.Optional[typing.Iterable[typing.Type]]:
        return [KiaraFileBundle]

    @classmethod
    def get_supported_hash_types(cls) -> typing.Iterable[str]:
        return ["sha3_256"]

    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:

        assert hash_type == "sha3_256"

        assert isinstance(value, KiaraFileBundle)
        return value.file_bundle_hash

    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:

        max_no_included_files = print_config.get("max_no_files", 40)

        data: KiaraFileBundle = value.get_value_data()
        pretty = data.dict(exclude={"included_files"})
        files = list(data.included_files.keys())
        if max_no_included_files >= 0:
            if len(files) > max_no_included_files:
                half = int((max_no_included_files - 1) / 2)
                head = files[0:half]
                tail = files[-1 * half :]  # noqa
                files = (
                    head
                    + ["..... output skipped .....", "..... output skipped ....."]
                    + tail
                )
        pretty["included_files"] = files
        return [json.dumps(pretty, indent=2)]
        # return [data.json(indent=2)]

calculate_value_hash(value, hash_type) classmethod

Calculate the hash of this value.

If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.

Source code in core/value_types.py
@classmethod
def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:

    assert hash_type == "sha3_256"

    assert isinstance(value, KiaraFileBundle)
    return value.file_bundle_hash

FileType (AnyType)

A representation of a file.

It is recommended to 'onboard' files before working with them, otherwise metadata consistency can not be guaranteed.

Source code in core/value_types.py
class FileType(AnyType):
    """A representation of a file.

    It is recommended to 'onboard' files before working with them, otherwise metadata consistency can not be guaranteed.
    """

    _value_type_name = "file"

    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return KiaraFile

    @classmethod
    def candidate_python_types(cls) -> typing.Optional[typing.Iterable[typing.Type]]:
        return [KiaraFile]

    @classmethod
    def get_supported_hash_types(cls) -> typing.Iterable[str]:
        return ["sha3_256"]

    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:

        assert hash_type == "sha3_256"
        assert isinstance(value, KiaraFile)
        return value.file_hash

    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:

        data: KiaraFile = value.get_value_data()
        max_lines = print_config.get("max_lines", 34)
        try:
            lines = []
            with open(data.path, "r") as f:
                for idx, l in enumerate(f):
                    if idx > max_lines:
                        lines.append("...\n")
                        lines.append("...")
                        break
                    lines.append(l)

            # TODO: syntax highlighting
            return ["".join(lines)]
        except UnicodeDecodeError:
            # found non-text data
            return [
                "Binary file or non-utf8 enconding, not printing content...",
                "",
                "[b]File metadata:[/b]",
                "",
                data.json(indent=2),
            ]

calculate_value_hash(value, hash_type) classmethod

Calculate the hash of this value.

If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.

Source code in core/value_types.py
@classmethod
def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:

    assert hash_type == "sha3_256"
    assert isinstance(value, KiaraFile)
    return value.file_hash

FloatType (AnyType)

A float.

Source code in core/value_types.py
class FloatType(AnyType):
    "A float."

    _value_type_name = "float"

    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return float

    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
        return str(hash(value))

    def validate(cls, value: typing.Any) -> typing.Any:

        if not isinstance(value, float):
            raise ValueError(f"Invalid type '{type(value)}' for float: {value}")

    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:

        data = value.get_value_data()
        return [str(data)]

calculate_value_hash(value, hash_type) classmethod

Calculate the hash of this value.

If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.

Source code in core/value_types.py
@classmethod
def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
    return str(hash(value))

validate(cls, value)

Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).

Source code in core/value_types.py
def validate(cls, value: typing.Any) -> typing.Any:

    if not isinstance(value, float):
        raise ValueError(f"Invalid type '{type(value)}' for float: {value}")

IntegerType (AnyType)

An integer.

Source code in core/value_types.py
class IntegerType(AnyType):
    """An integer."""

    _value_type_name = "integer"

    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return int

    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
        return str(hash(value))

    def validate(cls, value: typing.Any) -> None:

        if not isinstance(value, int):
            #     if isinstance(v, str):
            #         try:
            #             v = int(v)
            #         except Exception:
            #             raise ValueError(f"Can't parse string into integer: {v}")
            # else:
            raise ValueError(f"Invalid type '{type(value)}' for integer: {value}")

    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:

        data = value.get_value_data()
        return [str(data)]

calculate_value_hash(value, hash_type) classmethod

Calculate the hash of this value.

If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.

Source code in core/value_types.py
@classmethod
def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
    return str(hash(value))

validate(cls, value)

Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).

Source code in core/value_types.py
def validate(cls, value: typing.Any) -> None:

    if not isinstance(value, int):
        #     if isinstance(v, str):
        #         try:
        #             v = int(v)
        #         except Exception:
        #             raise ValueError(f"Can't parse string into integer: {v}")
        # else:
        raise ValueError(f"Invalid type '{type(value)}' for integer: {value}")

ListType (AnyType)

A list-like object.

Source code in core/value_types.py
class ListType(AnyType):
    """A list-like object."""

    _value_type_name = "list"

    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return list

    @classmethod
    def calculate_value_hash(self, value: typing.Any, hash_type: str) -> str:

        from deepdiff import DeepHash

        dh = DeepHash(value)
        return str(dh[value])

    def validate(cls, value: typing.Any) -> None:

        assert isinstance(value, typing.Iterable)

    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:

        data = value.get_value_data()
        return [pprint.pformat(data)]

calculate_value_hash(value, hash_type) classmethod

Calculate the hash of this value.

If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.

Source code in core/value_types.py
@classmethod
def calculate_value_hash(self, value: typing.Any, hash_type: str) -> str:

    from deepdiff import DeepHash

    dh = DeepHash(value)
    return str(dh[value])

validate(cls, value)

Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).

Source code in core/value_types.py
def validate(cls, value: typing.Any) -> None:

    assert isinstance(value, typing.Iterable)

RenderablesType (ValueType)

A list of renderable objects, used in the 'rich' Python library, to print to the terminal or in Jupyter.

Internally, the result list items can be either a string, a 'rich.console.ConsoleRenderable', or a 'rich.console.RichCast'.

Source code in core/value_types.py
class RenderablesType(ValueType[object, ValueTypeConfigSchema]):
    """A list of renderable objects, used in the 'rich' Python library, to print to the terminal or in Jupyter.

    Internally, the result list items can be either a string, a 'rich.console.ConsoleRenderable', or a 'rich.console.RichCast'.
    """

    _value_type_name = "renderables"

    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return object

    @classmethod
    def candidate_python_types(cls) -> typing.Optional[typing.Iterable[typing.Type]]:
        return [str, ConsoleRenderable, RichCast]

    @classmethod
    def type_config_cls(cls) -> typing.Type[ValueTypeConfigSchema]:
        return ValueTypeConfigSchema

StringType (AnyType)

A string.

Source code in core/value_types.py
class StringType(AnyType):
    """A string."""

    _value_type_name = "string"

    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return str

    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
        return str(hash(value))

    def validate(cls, value: typing.Any) -> None:

        if not isinstance(value, str):
            raise ValueError(f"Invalid type '{type(value)}': string required")

    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:

        data = value.get_value_data()
        return [data]

calculate_value_hash(value, hash_type) classmethod

Calculate the hash of this value.

If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.

Source code in core/value_types.py
@classmethod
def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
    return str(hash(value))

validate(cls, value)

Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).

Source code in core/value_types.py
def validate(cls, value: typing.Any) -> None:

    if not isinstance(value, str):
        raise ValueError(f"Invalid type '{type(value)}': string required")

TableType (AnyType)

A table.

Internally, this is backed by the Apache Arrow Table class.

Source code in core/value_types.py
class TableType(AnyType):
    """A table.

    Internally, this is backed by the [Apache Arrow](https://arrow.apache.org) [``Table``](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html) class.
    """

    _value_type_name = "table"

    @classmethod
    def backing_python_type(cls) -> typing.Type:
        import pyarrow as pa

        return pa.Table

    @classmethod
    def candidate_python_types(cls) -> typing.Optional[typing.Iterable[typing.Type]]:
        import pyarrow as pa

        return [pa.Table]

    @classmethod
    def check_data(cls, data: typing.Any) -> typing.Optional["ValueType"]:

        import pyarrow as pa

        if isinstance(data, pa.Table):
            return TableType()

        return None

    # @classmethod
    # def get_supported_hash_types(cls) -> typing.Iterable[str]:
    #
    #     return ["pandas_df_hash"]
    #
    # @classmethod
    # def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
    #
    #     import pyarrow as pa
    #
    #     # this is only for testing, and will be replaced with a native arrow table hush function, once I figure out how to do that efficiently
    #     table: pa.Table = value
    #     from pandas.util import hash_pandas_object
    #
    #     hash_result = hash_pandas_object(table.to_pandas()).sum()
    #     return str(hash_result)

    def validate(cls, value: typing.Any) -> None:
        import pyarrow as pa

        if not isinstance(value, pa.Table):
            raise Exception(
                f"invalid type '{type(value).__name__}', must be '{pa.Table.__name__}'."
            )

    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:

        max_rows = print_config.get("max_no_rows")
        max_row_height = print_config.get("max_row_height")
        max_cell_length = print_config.get("max_cell_length")

        half_lines: typing.Optional[int] = None
        if max_rows:
            half_lines = int(max_rows / 2)

        atw = ArrowTabularWrap(value.get_value_data())
        result = [
            atw.pretty_print(
                rows_head=half_lines,
                rows_tail=half_lines,
                max_row_height=max_row_height,
                max_cell_length=max_cell_length,
            )
        ]
        return result

check_data(data) classmethod

Check whether the provided input matches this value type.

If it does, return a ValueType object (with the appropriate type configuration).

Source code in core/value_types.py
@classmethod
def check_data(cls, data: typing.Any) -> typing.Optional["ValueType"]:

    import pyarrow as pa

    if isinstance(data, pa.Table):
        return TableType()

    return None

validate(cls, value)

Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).

Source code in core/value_types.py
def validate(cls, value: typing.Any) -> None:
    import pyarrow as pa

    if not isinstance(value, pa.Table):
        raise Exception(
            f"invalid type '{type(value).__name__}', must be '{pa.Table.__name__}'."
        )

yaml

ToYamlModuleOld (OldTypeConversionModule)

Convert arbitrary types into YAML format.

Early days for this module, it doesn't support a whole lot of types yet.

Source code in core/yaml.py
class ToYamlModuleOld(OldTypeConversionModule):
    """Convert arbitrary types into YAML format.

    Early days for this module, it doesn't support a whole lot of types yet.
    """

    _module_type_name = "to_yaml"

    @classmethod
    def _get_supported_source_types(self) -> typing.Union[typing.Iterable[str], str]:
        return YAML_SUPPORTED_SOURCE_TYPES

    @classmethod
    def _get_target_types(self) -> typing.Union[typing.Iterable[str], str]:
        return ["yaml"]

    def convert(
        self, value: Value, config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:

        input_value: typing.Any = value.get_value_data()

        input_value_str = convert_to_yaml(
            self._kiara, data=input_value, convert_config=config
        )
        return input_value_str