core
Top-level package for kiara_modules.core.
        array
  
      special
  
¶
    
        
ArrayMetadataModule            (ExtractMetadataModule)
        
¶
    Extract metadata from an 'array' value.
Source code in core/array/__init__.py
          class ArrayMetadataModule(ExtractMetadataModule):
    """Extract metadata from an 'array' value."""
    _module_type_name = "metadata"
    @classmethod
    def _get_supported_types(cls) -> str:
        return "array"
    @classmethod
    def get_metadata_key(cls) -> str:
        return "array"
    def _get_metadata_schema(
        self, type: str
    ) -> typing.Union[str, typing.Type[BaseModel]]:
        return ArrayMetadata
    def extract_metadata(self, value: Value) -> typing.Mapping[str, typing.Any]:
        import pyarrow as pa
        array: pa.Array = value.get_value_data()
        return {
            "length": len(array),
            "size": array.nbytes,
        }
        
MapModule            (KiaraModule)
        
¶
    Map a list of values into another list of values.
This module must be configured with the type (and optional) configuration of another kiara module. This 'child' module will then be used to compute the array items of the result.
Source code in core/array/__init__.py
          class MapModule(KiaraModule):
    """Map a list of values into another list of values.
    This module must be configured with the type (and optional) configuration of another *kiara* module. This 'child'
    module will then be used to compute the array items of the result.
    """
    _config_cls = MapModuleConfig
    def module_instance_doc(self) -> str:
        config: MapModuleConfig = self.config  # type: ignore
        module_type = config.module_type
        module_config = config.module_config
        m = self._kiara.create_module(
            module_type=module_type, module_config=module_config
        )
        type_md = m.get_type_metadata()
        doc = type_md.documentation.full_doc
        link = type_md.context.get_url_for_reference("module_doc")
        if not link:
            link_str = f"``{module_type}``"
        else:
            link_str = f"[``{module_type}``]({link})"
        result = f"""Map the values of the input list onto a new list of the same length, using the {link_str} module."""
        if doc and doc != "-- n/a --":
            result = result + f"\n\n``{module_type}`` documentation:\n\n{doc}"
        return result
    def __init__(self, *args, **kwargs):
        self._child_module: typing.Optional[KiaraModule] = None
        self._module_input_name: typing.Optional[str] = None
        self._module_output_name: typing.Optional[str] = None
        super().__init__(*args, **kwargs)
    @property
    def child_module(self) -> KiaraModule:
        if self._child_module is not None:
            return self._child_module
        module_name = self.get_config_value("module_type")
        module_config = self.get_config_value("module_config")
        self._child_module = self._kiara.create_module(
            id="map_module_child", module_type=module_name, module_config=module_config
        )
        return self._child_module
    @property
    def module_input_name(self) -> str:
        if self._module_input_name is not None:
            return self._module_input_name
        self._module_input_name = self.get_config_value("input_name")
        if self._module_input_name is None:
            if len(list(self.child_module.input_names)) == 1:
                self._module_input_name = next(iter(self.child_module.input_names))
            else:
                raise KiaraProcessingException(
                    f"No 'input_name' specified, and configured module has more than one inputs. Please specify an 'input_name' value in your module config, pick one of: {', '.join(self.child_module.input_names)}"
                )
        return self._module_input_name
    @property
    def module_output_name(self) -> str:
        if self._module_output_name is not None:
            return self._module_output_name
        self._module_output_name = self.get_config_value("output_name")
        if self._module_output_name is None:
            if len(list(self.child_module.output_names)) == 1:
                self._module_output_name = next(iter(self.child_module.output_names))
            else:
                raise KiaraProcessingException(
                    f"No 'output_name' specified, and configured module has more than one outputs. Please specify an 'output_name' value in your module config, pick one of: {', '.join(self.child_module.output_names)}"
                )
        return self._module_output_name
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        inputs: typing.Dict[
            str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
        ] = {
            "array": {
                "type": "array",
                "doc": "The array containing the values the filter is applied on.",
            }
        }
        for input_name, schema in self.child_module.input_schemas.items():
            assert input_name != "array"
            if input_name == self.module_input_name:
                continue
            inputs[input_name] = schema
        return inputs
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        outputs = {
            "array": {
                "type": "array",
                "doc": "An array of equal length to the input array, containing the 'mapped' values.",
            }
        }
        return outputs
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        import pyarrow as pa
        input_array: pa.Array = inputs.get_value_data("array")
        init_data: typing.Dict[str, typing.Any] = {}
        for input_name in self.input_schemas.keys():
            if input_name in ["array", self.module_input_name]:
                continue
            init_data[input_name] = inputs.get_value_obj(input_name)
        result_list = map_with_module(
            input_array,
            module_input_name=self.module_input_name,
            module_obj=self.child_module,
            init_data=init_data,
            module_output_name=self.module_output_name,
        )
        outputs.set_value("array", pa.array(result_list))
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/array/__init__.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    inputs: typing.Dict[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ] = {
        "array": {
            "type": "array",
            "doc": "The array containing the values the filter is applied on.",
        }
    }
    for input_name, schema in self.child_module.input_schemas.items():
        assert input_name != "array"
        if input_name == self.module_input_name:
            continue
        inputs[input_name] = schema
    return inputs
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/array/__init__.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    outputs = {
        "array": {
            "type": "array",
            "doc": "An array of equal length to the input array, containing the 'mapped' values.",
        }
    }
    return outputs
module_instance_doc(self)
¶
    Return documentation for this instance of the module.
If not overwritten, will return this class' method doc().
Source code in core/array/__init__.py
          def module_instance_doc(self) -> str:
    config: MapModuleConfig = self.config  # type: ignore
    module_type = config.module_type
    module_config = config.module_config
    m = self._kiara.create_module(
        module_type=module_type, module_config=module_config
    )
    type_md = m.get_type_metadata()
    doc = type_md.documentation.full_doc
    link = type_md.context.get_url_for_reference("module_doc")
    if not link:
        link_str = f"``{module_type}``"
    else:
        link_str = f"[``{module_type}``]({link})"
    result = f"""Map the values of the input list onto a new list of the same length, using the {link_str} module."""
    if doc and doc != "-- n/a --":
        result = result + f"\n\n``{module_type}`` documentation:\n\n{doc}"
    return result
        
MapModuleConfig            (ModuleTypeConfigSchema)
        
  
      pydantic-model
  
¶
    Source code in core/array/__init__.py
          class MapModuleConfig(ModuleTypeConfigSchema):
    module_type: str = Field(
        description="The name of the kiara module to use to filter the input data."
    )
    module_config: typing.Optional[typing.Dict[str, typing.Any]] = Field(
        description="The config for the kiara filter module.", default_factory=dict
    )
    input_name: typing.Optional[str] = Field(
        description="The name of the input name of the module which will receive the items from our input array. Can be omitted if the configured module only has a single input.",
        default=None,
    )
    output_name: typing.Optional[str] = Field(
        description="The name of the output name of the module which will receive the items from our input array. Can be omitted if the configured module only has a single output.",
        default=None,
    )
input_name: str
  
      pydantic-field
  
¶
    The name of the input name of the module which will receive the items from our input array. Can be omitted if the configured module only has a single input.
module_config: Dict[str, Any]
  
      pydantic-field
  
¶
    The config for the kiara filter module.
module_type: str
  
      pydantic-field
      required
  
¶
    The name of the kiara module to use to filter the input data.
output_name: str
  
      pydantic-field
  
¶
    The name of the output name of the module which will receive the items from our input array. Can be omitted if the configured module only has a single output.
        
SampleArrayModule            (SampleValueModule)
        
¶
    Sample an array.
Samples are used to randomly select a subset of a dataset, which helps test queries and workflows on smaller versions of the original data, to adjust parameters before a full run.
Source code in core/array/__init__.py
          class SampleArrayModule(SampleValueModule):
    """Sample an array.
    Samples are used to randomly select a subset of a dataset, which helps test queries and workflows on smaller versions
    of the original data, to adjust parameters before a full run.
    """
    _module_type_name = "sample"
    @classmethod
    def get_value_type(cls) -> str:
        return "array"
    # def create_input_schema(
    #     self,
    # ) -> typing.Mapping[
    #     str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    # ]:
    #
    #     return {
    #         "table": {"type": "table", "doc": "The table to sample data from."},
    #         "sample_size": {
    #             "type": "integer",
    #             "doc": "The percentage or number of rows to sample (depending on 'sample_unit' input).",
    #         }
    #     }
    #
    # def create_output_schema(
    #     self,
    # ) -> typing.Mapping[
    #     str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    # ]:
    #
    #     return {"sampled_table": {"type": "table", "doc": "A sampled table."}}
    def sample_percent(self, value: Value, sample_size: int):
        import duckdb
        import pyarrow as pa
        array: pa.Array = value.get_value_data()
        if sample_size >= 100:
            return array
        table = pa.Table.from_arrays([array], names=["column"])
        query = f"SELECT * FROM data USING SAMPLE {sample_size} PERCENT (bernoulli);"
        rel_from_arrow = duckdb.arrow(table)
        result: duckdb.DuckDBPyResult = rel_from_arrow.query("data", query)
        result_table: pa.Table = result.fetch_arrow_table()
        return result_table.column("column")
    def sample_rows(self, value: Value, sample_size: int):
        import duckdb
        import pyarrow as pa
        array: pa.Array = value.get_value_data()
        if sample_size >= len(array):
            return array
        table = pa.Table.from_arrays([array], names=["column"])
        query = f"SELECT * FROM data USING SAMPLE {sample_size};"
        rel_from_arrow = duckdb.arrow(table)
        result: duckdb.DuckDBPyResult = rel_from_arrow.query("data", query)
        result_table: pa.Table = result.fetch_arrow_table()
        return result_table.column("column")
    def sample_rows_from_start(self, value: Value, sample_size: int):
        import pyarrow as pa
        array: pa.Array = value.get_value_data()
        if sample_size >= len(array):
            return array
        result_array = array.slice(0, sample_size)
        return result_array
    def sample_rows_to_end(self, value: Value, sample_size: int):
        import pyarrow as pa
        array: pa.Array = value.get_value_data()
        if sample_size >= len(array):
            return array
        result_array = array.slice(len(array) - sample_size)
        return result_array
get_value_type()
  
      classmethod
  
¶
    Return the value type for this sample module.
Source code in core/array/__init__.py
          @classmethod
def get_value_type(cls) -> str:
    return "array"
        
StoreArrayTypeModule            (StoreValueTypeModule)
        
¶
    Save an Arrow array to a file.
This module wraps the input array into an Arrow Table, and saves this table as a feather file.
The output of this module is a dictionary representing the configuration to be used with kira to re-assemble the array object from disk.
Source code in core/array/__init__.py
          class StoreArrayTypeModule(StoreValueTypeModule):
    """Save an Arrow array to a file.
    This module wraps the input array into an Arrow Table, and saves this table as a feather file.
    The output of this module is a dictionary representing the configuration to be used with *kira* to re-assemble
    the array object from disk.
    """
    _module_type_name = "store"
    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return "array"
    def store_value(self, value: Value, base_path: str):
        import pyarrow as pa
        from pyarrow import feather
        array: pa.Array = value.get_value_data()
        # folder = inputs.get_value_data("folder_path")
        # file_name = inputs.get_value_data("file_name")
        # column_name = inputs.get_value_data("column_name")
        path = os.path.join(base_path, ARRAY_SAVE_FILE_NAME)
        if os.path.exists(path):
            raise KiaraProcessingException(
                f"Can't write file, path already exists: {path}"
            )
        os.makedirs(os.path.dirname(path))
        table = pa.Table.from_arrays([array], names=[ARRAY_SAVE_COLUM_NAME])
        feather.write_feather(table, path)
        load_config = {
            "module_type": "array.restore",
            "inputs": {
                "base_path": base_path,
                "rel_path": ARRAY_SAVE_FILE_NAME,
                "format": "feather",
                "column_name": ARRAY_SAVE_COLUM_NAME,
            },
            "output_name": "array",
        }
        return load_config
store_value(self, value, base_path)
¶
    Save the value, and return the load config needed to load it again.
Source code in core/array/__init__.py
          def store_value(self, value: Value, base_path: str):
    import pyarrow as pa
    from pyarrow import feather
    array: pa.Array = value.get_value_data()
    # folder = inputs.get_value_data("folder_path")
    # file_name = inputs.get_value_data("file_name")
    # column_name = inputs.get_value_data("column_name")
    path = os.path.join(base_path, ARRAY_SAVE_FILE_NAME)
    if os.path.exists(path):
        raise KiaraProcessingException(
            f"Can't write file, path already exists: {path}"
        )
    os.makedirs(os.path.dirname(path))
    table = pa.Table.from_arrays([array], names=[ARRAY_SAVE_COLUM_NAME])
    feather.write_feather(table, path)
    load_config = {
        "module_type": "array.restore",
        "inputs": {
            "base_path": base_path,
            "rel_path": ARRAY_SAVE_FILE_NAME,
            "format": "feather",
            "column_name": ARRAY_SAVE_COLUM_NAME,
        },
        "output_name": "array",
    }
    return load_config
        bytes
  
      special
  
¶
    
        
LoadBytesModule            (KiaraModule)
        
¶
    Source code in core/bytes/__init__.py
          class LoadBytesModule(KiaraModule):
    _module_type_name = "load"
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "base_path": {
                "type": "string",
                "doc": "The base path to the file to read.",
            },
            "rel_path": {
                "type": "string",
                "doc": "The relative path of the file, within the base path.",
            },
        }
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {"bytes": {"type": "bytes", "doc": "The content of the file."}}
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        path = inputs.get_value_data("path")
        if not os.path.exists(path):
            raise KiaraProcessingException(
                f"Can't read file, path does not exist: {path}"
            )
        with open(path, "rb") as f:
            content = f.read()
        outputs.set_value("bytes", content)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/bytes/__init__.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "base_path": {
            "type": "string",
            "doc": "The base path to the file to read.",
        },
        "rel_path": {
            "type": "string",
            "doc": "The relative path of the file, within the base path.",
        },
    }
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/bytes/__init__.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {"bytes": {"type": "bytes", "doc": "The content of the file."}}
        
StoreBytesTypeModule            (StoreValueTypeModule)
        
¶
    Source code in core/bytes/__init__.py
          class StoreBytesTypeModule(StoreValueTypeModule):
    _module_type_name = "store"
    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return "bytes"
    def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:
        path = os.path.join(base_path, BYTES_SAVE_FILE_NAME)
        if os.path.exists(path):
            raise KiaraProcessingException(
                f"Can't write bytes, target path already exists: {path}"
            )
        os.makedirs(os.path.dirname(path), exist_ok=True)
        bytes = value.get_value_data()
        with open(path, "wb") as f:
            f.write(bytes)
        load_config = {
            "module_type": "bytes.load",
            "inputs": {"base_path": base_path, "rel_path": BYTES_SAVE_FILE_NAME},
            "output_name": "bytes",
        }
        return load_config
store_value(self, value, base_path)
¶
    Save the value, and return the load config needed to load it again.
Source code in core/bytes/__init__.py
          def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:
    path = os.path.join(base_path, BYTES_SAVE_FILE_NAME)
    if os.path.exists(path):
        raise KiaraProcessingException(
            f"Can't write bytes, target path already exists: {path}"
        )
    os.makedirs(os.path.dirname(path), exist_ok=True)
    bytes = value.get_value_data()
    with open(path, "wb") as f:
        f.write(bytes)
    load_config = {
        "module_type": "bytes.load",
        "inputs": {"base_path": base_path, "rel_path": BYTES_SAVE_FILE_NAME},
        "output_name": "bytes",
    }
    return load_config
        msgpack
¶
    
        
DeserializeFromMsgPackModule            (KiaraModule)
        
¶
    Source code in core/bytes/msgpack.py
          class DeserializeFromMsgPackModule(KiaraModule):
    _module_type_name = "to_value"
    _config_cls = SerializeToMsgPackModuleConfig
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {"bytes": {"type": "bytes", "doc": "The msgpack-serialized value."}}
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "value_type": {"type": "string", "doc": "The type of the value."},
            "value_data": {
                "type": "any",
                "doc": f"The {self.get_config_value('value_type')} value.",
            },
            "value_metadata": {
                "type": "dict",
                "doc": "A dictionary with metadata of the serialized table. The result dict has the metadata key as key, and two sub-values under each key: 'metadata_item' (the actual metadata) and 'metadata_item_schema' (the schema for the metadata).",
            },
        }
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        import msgpack
        msg = inputs.get_value_data("bytes")
        unpacked = msgpack.unpackb(msg, raw=False)
        value_type = unpacked["value_type"]
        outputs.set_value("value_type", value_type)
        metadata = unpacked["metadata"]
        outputs.set_value("value_metadata", metadata)
        new_data = unpacked["data"]
        if not hasattr(self, f"to_{value_type}"):
            raise KiaraProcessingException(
                f"Value type not supported for msgpack deserialization: {value_type}"
            )
        func = getattr(self, f"to_{value_type}")
        obj = func(data=new_data)
        outputs.set_value("value_data", obj)
    def to_table(self, data: bytes) -> typing.Any:
        import pyarrow as pa
        reader = pa.ipc.open_stream(data)
        batches = [b for b in reader]
        new_table = pa.Table.from_batches(batches)
        return new_table
    def to_boolean(self, data: bytes):
        return data
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/bytes/msgpack.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {"bytes": {"type": "bytes", "doc": "The msgpack-serialized value."}}
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/bytes/msgpack.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "value_type": {"type": "string", "doc": "The type of the value."},
        "value_data": {
            "type": "any",
            "doc": f"The {self.get_config_value('value_type')} value.",
        },
        "value_metadata": {
            "type": "dict",
            "doc": "A dictionary with metadata of the serialized table. The result dict has the metadata key as key, and two sub-values under each key: 'metadata_item' (the actual metadata) and 'metadata_item_schema' (the schema for the metadata).",
        },
    }
        
SerializeToMsgPackModule            (KiaraModule)
        
¶
    Source code in core/bytes/msgpack.py
          class SerializeToMsgPackModule(KiaraModule):
    _module_type_name = "from_value"
    _config_cls = SerializeToMsgPackModuleConfig
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "value_item": {
                "type": self.config.get("value_type"),
                "doc": f"A {self.get_config_value('value_type')} value.",
            }
        }
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "bytes": {
                "type": "bytes",
                "doc": f"The msgpack-serialized {self.get_config_value('value_type')} value.",
            }
        }
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        import msgpack
        type_name: str = self.get_config_value("value_type")
        if not hasattr(self, f"from_{type_name}"):
            raise KiaraProcessingException(
                f"Value type not supported for msgpack serialization: {type_name}"
            )
        func = getattr(self, f"from_{type_name}")
        value = inputs.get_value_obj("value_item")
        metadata = value.get_metadata(also_return_schema=True)
        msg = func(value=value)
        data = {"value_type": value.type_name, "metadata": metadata, "data": msg}
        msg = msgpack.packb(data, use_bin_type=True)
        outputs.set_value("bytes", msg)
    def from_table(self, value: Value) -> bytes:
        import pyarrow as pa
        table_val: Value = value
        table: pa.Table = table_val.get_value_data()
        sink = pa.BufferOutputStream()
        writer = pa.ipc.new_stream(sink, table.schema)
        writer.write(table)
        writer.close()
        buf: pa.Buffer = sink.getvalue()
        return memoryview(buf)
    def from_boolean(self, value: Value) -> bytes:
        return value.get_value_data()
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/bytes/msgpack.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "value_item": {
            "type": self.config.get("value_type"),
            "doc": f"A {self.get_config_value('value_type')} value.",
        }
    }
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/bytes/msgpack.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "bytes": {
            "type": "bytes",
            "doc": f"The msgpack-serialized {self.get_config_value('value_type')} value.",
        }
    }
        
SerializeToMsgPackModuleConfig            (ModuleTypeConfigSchema)
        
  
      pydantic-model
  
¶
    Source code in core/bytes/msgpack.py
          class SerializeToMsgPackModuleConfig(ModuleTypeConfigSchema):
    value_type: str = Field(description="The value type to serialize/deserialize.")
value_type: str
  
      pydantic-field
      required
  
¶
    The value type to serialize/deserialize.
        database
  
      special
  
¶
    
        
BaseDatabaseInfoMetadataModule            (ExtractMetadataModule)
        
¶
    Extract extended metadata (like tables, schemas) from a database object.
Source code in core/database/__init__.py
          class BaseDatabaseInfoMetadataModule(ExtractMetadataModule):
    """Extract extended metadata (like tables, schemas) from a database object."""
    def _get_metadata_schema(
        self, type: str
    ) -> typing.Union[str, typing.Type[BaseModel]]:
        return KiaraDatabase
    def extract_metadata(self, value: Value) -> BaseModel:
        from sqlalchemy import inspect, text
        database: KiaraDatabase = value.get_value_data()
        inspector: Inspector = inspect(database.get_sqlalchemy_engine())
        table_names = inspector.get_table_names()
        view_names = inspector.get_view_names()
        table_infos = {}
        for table in table_names:
            columns = inspector.get_columns(table_name=table)
            columns_info = {}
            with database.get_sqlalchemy_engine().connect() as con:
                result = con.execute(text(f"SELECT count(*) from {table}"))
                num_rows = result.fetchone()[0]
                try:
                    result = con.execute(
                        text(f'SELECT SUM("pgsize") FROM "dbstat" WHERE name="{table}"')
                    )
                    table_size = result.fetchone()[0]
                except Exception:
                    table_size = None
            for column in columns:
                column_name = column["name"]
                cs = ColumnSchema(
                    type_name=column["type"].__visit_name__,
                    metadata={
                        "is_primary_key": column["primary_key"] == 1,
                        "nullable": column["nullable"],
                    },
                )
                columns_info[column_name] = cs
            table_infos[table] = TableMetadata(
                column_names=list(columns_info.keys()),
                column_schema=columns_info,
                rows=num_rows,
                size=table_size,
            )
        file_stats = os.stat(database.db_file_path)
        size = file_stats.st_size
        kdi = KiaraDatabaseInfo(
            table_names=table_names,
            view_names=view_names,
            tables=table_infos,
            size=size,
        )
        return kdi
        
BaseDatabaseMetadataModule            (ExtractMetadataModule)
        
¶
    Extract basic metadata from a database object.
Source code in core/database/__init__.py
          class BaseDatabaseMetadataModule(ExtractMetadataModule):
    """Extract basic metadata from a database object."""
    def _get_metadata_schema(
        self, type: str
    ) -> typing.Union[str, typing.Type[BaseModel]]:
        return KiaraDatabase
    def extract_metadata(self, value: Value) -> KiaraDatabase:
        database: KiaraDatabase = value.get_value_data()
        return database
        
ConvertToDatabaseModule            (CreateValueModule)
        
¶
    Create a database from files, file_bundles, etc.
Source code in core/database/__init__.py
          class ConvertToDatabaseModule(CreateValueModule):
    """Create a database from files, file_bundles, etc."""
    _module_type_name = "create"
    _config_cls = DatabaseConversionModuleConfig
    @classmethod
    def get_target_value_type(cls) -> str:
        return "database"
    def from_table(self, value: Value):
        import pyarrow as pa
        from sqlalchemy import MetaData, Table
        from kiara_modules.core.table.utils import (
            create_sqlite_schema_data_from_arrow_table,
        )
        table: pa.Table = value.get_value_data()
        # maybe we could check the values lineage, to find the best table name?
        table_name = value.id.replace("-", "_")
        index_columns = []
        for cn in table.column_names:
            if cn.lower() == "id":
                index_columns.append(cn)
        column_info: SqliteTableSchema = create_sqlite_schema_data_from_arrow_table(
            table=table, index_columns=index_columns
        )
        init_sql = create_table_init_sql(
            table_name=table_name, table_schema=column_info
        )
        db = KiaraDatabase.create_in_temp_dir(init_sql=init_sql)
        nodes_column_map: typing.Dict[str, typing.Any] = {}
        for batch in table.to_batches(DEFAULT_DB_CHUNK_SIZE):
            batch_dict = batch.to_pydict()
            for k, v in nodes_column_map.items():
                if k in batch_dict.keys():
                    _data = batch_dict.pop(k)
                    if v in batch_dict.keys():
                        raise Exception("Duplicate column name after mapping: {v}")
                    batch_dict[v] = _data
            data = [dict(zip(batch_dict, t)) for t in zip(*batch_dict.values())]
            engine = db.get_sqlalchemy_engine()
            _metadata_obj = MetaData()
            sqlite_table = Table(table_name, _metadata_obj, autoload_with=engine)
            with engine.connect() as conn:
                with conn.begin():
                    conn.execute(sqlite_table.insert(), data)
        return db
    def from_csv_file(self, value: Value):
        f = tempfile.mkdtemp()
        db_path = os.path.join(f, "db.sqlite")
        def cleanup():
            shutil.rmtree(f, ignore_errors=True)
        atexit.register(cleanup)
        create_sqlite_table_from_file(
            target_db_file=db_path, file_item=value.get_value_data()
        )
        return db_path
    def from_csv_file_bundle(self, value: Value):
        include_file_information: bool = True
        include_raw_content_in_file_info: bool = False
        temp_f = tempfile.mkdtemp()
        db_path = os.path.join(temp_f, "db.sqlite")
        def cleanup():
            shutil.rmtree(db_path, ignore_errors=True)
        atexit.register(cleanup)
        db = KiaraDatabase(db_file_path=db_path)
        db.create_if_not_exists()
        bundle: KiaraFileBundle = value.get_value_data()
        table_names: typing.List[str] = []
        for rel_path in sorted(bundle.included_files.keys()):
            file_item = bundle.included_files[rel_path]
            table_name = find_free_id(
                stem=file_item.file_name_without_extension, current_ids=table_names
            )
            try:
                table_names.append(table_name)
                create_sqlite_table_from_file(
                    target_db_file=db_path, file_item=file_item, table_name=table_name
                )
            except Exception as e:
                if self.get_config_value("ignore_errors") is True or True:
                    log_message(
                        f"Ignoring file '{rel_path}': could not import data from file -- {e}"
                    )
                    continue
                raise KiaraProcessingException(e)
        if include_file_information:
            create_table_from_file_bundle(
                file_bundle=value.get_value_data(),
                db_file_path=db_path,
                table_name="source_files_metadata",
                include_content=include_raw_content_in_file_info,
            )
        return db_path
    def from_text_file_bundle(self, value: Value):
        return create_table_from_file_bundle(
            file_bundle=value.get_value_data(), include_content=True
        )
        
DatabaseConversionModuleConfig            (CreateValueModuleConfig)
        
  
      pydantic-model
  
¶
    Source code in core/database/__init__.py
          class DatabaseConversionModuleConfig(CreateValueModuleConfig):
    ignore_errors: bool = Field(
        description="Whether to ignore convert errors and omit the failed items.",
        default=False,
    )
ignore_errors: bool
  
      pydantic-field
  
¶
    Whether to ignore convert errors and omit the failed items.
        
DatabaseInfoMetadataModule            (BaseDatabaseInfoMetadataModule)
        
¶
    Extract extended metadata (like tables, schemas) from a database object.
Source code in core/database/__init__.py
          class DatabaseInfoMetadataModule(BaseDatabaseInfoMetadataModule):
    """Extract extended metadata (like tables, schemas) from a database object."""
    _module_type_name = "info"
    @classmethod
    def _get_supported_types(cls) -> str:
        return "database"
    @classmethod
    def get_metadata_key(cls) -> str:
        return "database_info"
        
DatabaseMetadataModule            (BaseDatabaseMetadataModule)
        
¶
    Extract basic metadata from a database object.
Source code in core/database/__init__.py
          class DatabaseMetadataModule(BaseDatabaseMetadataModule):
    """Extract basic metadata from a database object."""
    _module_type_name = "metadata"
    @classmethod
    def _get_supported_types(cls) -> str:
        return "database"
    @classmethod
    def get_metadata_key(cls) -> str:
        return "database"
        
LoadDatabaseConfig            (ModuleTypeConfigSchema)
        
  
      pydantic-model
  
¶
    Source code in core/database/__init__.py
          class LoadDatabaseConfig(ModuleTypeConfigSchema):
    value_type: str = Field(
        description="The type of the value to be stored (if database sub-type).",
        default="database",
    )
value_type: str
  
      pydantic-field
  
¶
    The type of the value to be stored (if database sub-type).
        
LoadDatabaseModule            (KiaraModule)
        
¶
    Source code in core/database/__init__.py
          class LoadDatabaseModule(KiaraModule):
    _module_type_name = "load"
    _config_cls = LoadDatabaseConfig
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "base_path": {
                "type": "string",
                "doc": "The path to the base directory where the database file is stored.",
            },
            "rel_path": {
                "type": "string",
                "doc": "The relative path of the database file within the base directory.",
            },
        }
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        value_type = self.get_config_value("value_type")
        if value_type != "database":
            msg = f" (as '{value_type}')"
        else:
            msg = ""
        outputs: typing.Mapping[str, typing.Any] = {
            "database": {"type": value_type, "doc": f"The database value object{msg}."}
        }
        return outputs
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        base_path = inputs.get_value_data("base_path")
        rel_path = inputs.get_value_data("rel_path")
        path = os.path.join(base_path, rel_path)
        outputs.set_value("database", path)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/database/__init__.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "base_path": {
            "type": "string",
            "doc": "The path to the base directory where the database file is stored.",
        },
        "rel_path": {
            "type": "string",
            "doc": "The relative path of the database file within the base directory.",
        },
    }
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/database/__init__.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    value_type = self.get_config_value("value_type")
    if value_type != "database":
        msg = f" (as '{value_type}')"
    else:
        msg = ""
    outputs: typing.Mapping[str, typing.Any] = {
        "database": {"type": value_type, "doc": f"The database value object{msg}."}
    }
    return outputs
        
StoreDatabaseTypeModule            (StoreValueTypeModule)
        
¶
    Save an sqlite database to a file.
Source code in core/database/__init__.py
          class StoreDatabaseTypeModule(StoreValueTypeModule):
    """Save an sqlite database to a file."""
    _module_type_name = "store"
    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return "database"
    def store_value(self, value: Value, base_path: str):
        value_type = value.type_name
        # TODO: assert type inherits from database
        database: KiaraDatabase = value.get_value_data()
        path = os.path.join(base_path, DEFAULT_DATABASE_SAVE_FILE_NAME)
        if os.path.exists(path):
            raise KiaraProcessingException(
                f"Can't write file, path already exists: {path}"
            )
        new_db = database.copy_database_file(path)
        load_config = {
            "module_type": "database.load",
            "module_config": {"value_type": value_type},
            "inputs": {
                "base_path": base_path,
                "rel_path": DEFAULT_DATABASE_SAVE_FILE_NAME,
            },
            "output_name": "database",
        }
        return (load_config, new_db)
store_value(self, value, base_path)
¶
    Save the value, and return the load config needed to load it again.
Source code in core/database/__init__.py
          def store_value(self, value: Value, base_path: str):
    value_type = value.type_name
    # TODO: assert type inherits from database
    database: KiaraDatabase = value.get_value_data()
    path = os.path.join(base_path, DEFAULT_DATABASE_SAVE_FILE_NAME)
    if os.path.exists(path):
        raise KiaraProcessingException(
            f"Can't write file, path already exists: {path}"
        )
    new_db = database.copy_database_file(path)
    load_config = {
        "module_type": "database.load",
        "module_config": {"value_type": value_type},
        "inputs": {
            "base_path": base_path,
            "rel_path": DEFAULT_DATABASE_SAVE_FILE_NAME,
        },
        "output_name": "database",
    }
    return (load_config, new_db)
        query
¶
    
        
QueryDatabaseSQLModuleConfig            (ModuleTypeConfigSchema)
        
  
      pydantic-model
  
¶
    Source code in core/database/query.py
          class QueryDatabaseSQLModuleConfig(ModuleTypeConfigSchema):
    query: typing.Optional[str] = Field(
        description="The query to execute. If not specified, the user will be able to provide their own.",
        default=None,
    )
query: str
  
      pydantic-field
  
¶
    The query to execute. If not specified, the user will be able to provide their own.
        
QueryTableSQL            (KiaraModule)
        
¶
    Execute a sql query against an (Arrow) table.
Source code in core/database/query.py
          class QueryTableSQL(KiaraModule):
    """Execute a sql query against an (Arrow) table."""
    _module_type_name = "sql"
    _config_cls = QueryDatabaseSQLModuleConfig
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        inputs = {
            "database": {
                "type": "database",
                "doc": "The database to query",
            }
        }
        if self.get_config_value("query") is None:
            inputs["query"] = {"type": "string", "doc": "The query."}
        return inputs
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {"query_result": {"type": "table", "doc": "The query result."}}
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        import pandas as pd
        import pyarrow as pa
        if self.get_config_value("query") is None:
            _query: str = inputs.get_value_data("query")
        else:
            _query = self.get_config_value("query")
        _database: KiaraDatabase = inputs.get_value_data("database")
        # can't re-use the default engine, because pandas does not support having the 'future' flag set to 'True'
        engine = create_engine(_database.db_url)
        df = pd.read_sql(_query, con=engine)
        table = pa.Table.from_pandas(df)
        outputs.set_value("query_result", table)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/database/query.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    inputs = {
        "database": {
            "type": "database",
            "doc": "The database to query",
        }
    }
    if self.get_config_value("query") is None:
        inputs["query"] = {"type": "string", "doc": "The query."}
    return inputs
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/database/query.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {"query_result": {"type": "table", "doc": "The query result."}}
        utils
¶
    
        
SqliteColumnAttributes            (BaseModel)
        
  
      pydantic-model
  
¶
    Source code in core/database/utils.py
          class SqliteColumnAttributes(BaseModel):
    data_type: str = Field(
        description="The type of the data in this column.", default="ANY"
    )
    extra_column_info: typing.List[str] = Field(
        description="Additional init information for the column.", default_factory=list
    )
    create_index: bool = Field(
        description="Whether to create an index for this column or not.", default=False
    )
        
SqliteTableSchema            (BaseModel)
        
  
      pydantic-model
  
¶
    Source code in core/database/utils.py
          class SqliteTableSchema(BaseModel):
    columns: typing.Dict[str, SqliteColumnAttributes] = Field(
        description="The table columns and their attributes."
    )
    extra_schema: typing.List[str] = Field(
        description="Extra schema information for this table.", default_factory=list
    )
    column_map: typing.Dict[str, str] = Field(
        description="A dictionary describing how to map incoming data column names. Values in this dict point to keys in this models 'columns' attribute.",
        default_factory=dict,
    )
column_map: Dict[str, str]
  
      pydantic-field
  
¶
    A dictionary describing how to map incoming data column names. Values in this dict point to keys in this models 'columns' attribute.
columns: Dict[str, kiara_modules.core.database.utils.SqliteColumnAttributes]
  
      pydantic-field
      required
  
¶
    The table columns and their attributes.
extra_schema: List[str]
  
      pydantic-field
  
¶
    Extra schema information for this table.
create_table_init_sql(table_name, table_schema, schema_template_str=None)
¶
    Create an sql script to initialize a table.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
column_attrs | 
        a map with the column name as key, and column details ('type', 'extra_column_info', 'create_index') as values  | 
        required | 
Source code in core/database/utils.py
          def create_table_init_sql(
    table_name: str,
    table_schema: SqliteTableSchema,
    schema_template_str: typing.Optional[str] = None,
):
    """Create an sql script to initialize a table.
    Arguments:
        column_attrs: a map with the column name as key, and column details ('type', 'extra_column_info', 'create_index') as values
    """
    if schema_template_str is None:
        template_path = Path(TEMPLATES_FOLDER) / "sqlite_schama.sql.j2"
        schema_template_str = template_path.read_text()
    template = Environment(loader=BaseLoader()).from_string(schema_template_str)
    edges_columns = []
    edge_indexes = []
    lines = []
    for cn, details in table_schema.columns.items():
        cn_type = details.data_type
        cn_extra = details.extra_column_info
        line = f"    {cn}    {cn_type}"
        if cn_extra:
            line = f"{line}    {' '.join(cn_extra)}"
        edges_columns.append(line)
        if details.create_index:
            edge_indexes.append(cn)
        lines.append(line)
    lines.extend(table_schema.extra_schema)
    rendered = template.render(
        table_name=table_name, column_info=lines, index_columns=edge_indexes
    )
    return rendered
        date
¶
    A collection of date related modules.
Most of those are very bare-bones, not really dealing with more advanced (but very important) concepts like timezones and resolution yet.
        
DateRangeCheckModule            (KiaraModule)
        
¶
    Check whether a date falls within a specified date range.
If none one of the inputs 'earliest' or 'latest' is set, this module will always return 'True'.
Return True if that's the case, otherwise False.
Source code in core/date.py
          class DateRangeCheckModule(KiaraModule):
    """Check whether a date falls within a specified date range.
    If none one of the inputs 'earliest' or 'latest' is set, this module will always return 'True'.
    Return ``True`` if that's the case, otherwise ``False``.
    """
    _module_type_name = "range_check"
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        inputs: typing.Dict[str, typing.Dict[str, typing.Any]] = {
            "date": {"type": "date", "doc": "The date to check."},
            "earliest": {
                "type": "date",
                "doc": "The earliest date that is allowed.",
                "optional": True,
            },
            "latest": {
                "type": "date",
                "doc": "The latest date that is allowed.",
                "optional": True,
            },
        }
        return inputs
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        outputs = {
            "within_range": {
                "type": "boolean",
                "doc": "A boolean indicating whether the provided date was within the allowed range ('true'), or not ('false')",
            }
        }
        return outputs
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        from dateutil import parser
        d = inputs.get_value_data("date")
        earliest: typing.Optional[datetime.datetime] = inputs.get_value_data("earliest")
        latest: typing.Optional[datetime.datetime] = inputs.get_value_data("latest")
        if not earliest and not latest:
            outputs.set_value("within_range", True)
            return
        if hasattr(d, "as_py"):
            d = d.as_py()
        if isinstance(d, str):
            d = parser.parse(d)
        if earliest and latest:
            matches = earliest <= d <= latest
        elif earliest:
            matches = earliest <= d
        else:
            matches = d <= latest
        outputs.set_value("within_range", matches)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/date.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    inputs: typing.Dict[str, typing.Dict[str, typing.Any]] = {
        "date": {"type": "date", "doc": "The date to check."},
        "earliest": {
            "type": "date",
            "doc": "The earliest date that is allowed.",
            "optional": True,
        },
        "latest": {
            "type": "date",
            "doc": "The latest date that is allowed.",
            "optional": True,
        },
    }
    return inputs
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/date.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    outputs = {
        "within_range": {
            "type": "boolean",
            "doc": "A boolean indicating whether the provided date was within the allowed range ('true'), or not ('false')",
        }
    }
    return outputs
        
ExtractDateModule            (KiaraModule)
        
¶
    Extract a date object from a string.
This module is not really smart yet, currently it uses the following regex to extract a date (which might fail in a lot of cases):
r"_(\d{4}-\d{2}-\d{2})_"
Source code in core/date.py
          class ExtractDateModule(KiaraModule):
    """Extract a date object from a string.
    This module is not really smart yet, currently it uses the following regex to extract a date (which might fail in a lot of cases):
        r"_(\d{4}-\d{2}-\d{2})_"
    """
    _module_type_name = "extract_from_string"
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {"text": {"type": "string", "doc": "The input string."}}
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "date": {"type": "date", "doc": "The date extracted from the input string."}
        }
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        from dateutil import parser
        text = inputs.get_value_data("text")
        date_match = re.findall(r"_(\d{4}-\d{2}-\d{2})_", text)
        assert date_match
        d_obj = parser.parse(date_match[0])  # type: ignore
        outputs.set_value("date", d_obj)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/date.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {"text": {"type": "string", "doc": "The input string."}}
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/date.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "date": {"type": "date", "doc": "The date extracted from the input string."}
    }
        defaults
¶
    
  
        dev
¶
    Modules that are useful for kiara as well as pipeline-development, as well as testing.
        
DummyModule            (KiaraModule)
        
¶
    Module that simulates processing, but uses hard-coded outputs as a result.
Source code in core/dev.py
          class DummyModule(KiaraModule):
    """Module that simulates processing, but uses hard-coded outputs as a result."""
    _config_cls = DummyProcessingModuleConfig
    def create_input_schema(self) -> typing.Mapping[str, ValueSchema]:
        """The input schema for the ``dummy`` module is created at object creation time from the ``input_schemas`` config parameter."""
        result = {}
        for k, v in self.config.get("input_schema").items():  # type: ignore
            schema = ValueSchema(**v)
            schema.validate_types(self._kiara)
            result[k] = schema
        return result
    def create_output_schema(self) -> typing.Mapping[str, ValueSchema]:
        """The output schema for the ``dummy`` module is created at object creation time from the ``output_schemas`` config parameter."""
        result = {}
        for k, v in self.config.get("output_schema").items():  # type: ignore
            schema = ValueSchema(**v)
            schema.validate_types(self._kiara)
            result[k] = schema
        return result
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        """Returns the hardcoded output values that are set in the ``outputs`` config field.
        Optionally, this module can simulate processing by waiting a configured amount of time (seconds -- specified in the ``delay`` config parameter).
        """
        time.sleep(self.config.get("delay"))  # type: ignore
        output_values: typing.Mapping = self.config.get("outputs")  # type: ignore
        value_dict = {}
        for output_name in self.output_names:
            if output_name not in output_values.keys():
                raise NotImplementedError()
                # v = self.output_schemas[output_name].type_obj.fake_value()
                # value_dict[output_name] = v
            else:
                value_dict[output_name] = output_values[output_name]
        outputs.set_values(**value_dict)
    # def _get_doc(self) -> str:
    #
    #     doc = self.config.get("doc", None)
    #
    #     if doc:
    #         return self.config["doc"]
    #     else:
    #         return super()._get_doc()
create_input_schema(self)
¶
    The input schema for the dummy module is created at object creation time from the input_schemas config parameter.
Source code in core/dev.py
          def create_input_schema(self) -> typing.Mapping[str, ValueSchema]:
    """The input schema for the ``dummy`` module is created at object creation time from the ``input_schemas`` config parameter."""
    result = {}
    for k, v in self.config.get("input_schema").items():  # type: ignore
        schema = ValueSchema(**v)
        schema.validate_types(self._kiara)
        result[k] = schema
    return result
create_output_schema(self)
¶
    The output schema for the dummy module is created at object creation time from the output_schemas config parameter.
Source code in core/dev.py
          def create_output_schema(self) -> typing.Mapping[str, ValueSchema]:
    """The output schema for the ``dummy`` module is created at object creation time from the ``output_schemas`` config parameter."""
    result = {}
    for k, v in self.config.get("output_schema").items():  # type: ignore
        schema = ValueSchema(**v)
        schema.validate_types(self._kiara)
        result[k] = schema
    return result
process(self, inputs, outputs)
¶
    Returns the hardcoded output values that are set in the outputs config field.
Optionally, this module can simulate processing by waiting a configured amount of time (seconds -- specified in the delay config parameter).
Source code in core/dev.py
          def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
    """Returns the hardcoded output values that are set in the ``outputs`` config field.
    Optionally, this module can simulate processing by waiting a configured amount of time (seconds -- specified in the ``delay`` config parameter).
    """
    time.sleep(self.config.get("delay"))  # type: ignore
    output_values: typing.Mapping = self.config.get("outputs")  # type: ignore
    value_dict = {}
    for output_name in self.output_names:
        if output_name not in output_values.keys():
            raise NotImplementedError()
            # v = self.output_schemas[output_name].type_obj.fake_value()
            # value_dict[output_name] = v
        else:
            value_dict[output_name] = output_values[output_name]
    outputs.set_values(**value_dict)
        
DummyProcessingModuleConfig            (ModuleTypeConfigSchema)
        
  
      pydantic-model
  
¶
    Configuration for the 'dummy' processing module.
Source code in core/dev.py
          class DummyProcessingModuleConfig(ModuleTypeConfigSchema):
    """Configuration for the 'dummy' processing module."""
    documentation: typing.Optional[str] = None
    input_schema: typing.Dict[str, typing.Dict] = Field(
        description="The input schema for this module."
    )
    output_schema: typing.Dict[str, typing.Dict] = Field(
        description="The output schema for this module."
    )
    outputs: typing.Dict[str, typing.Any] = Field(
        description="The (dummy) output for this module.", default_factory=dict
    )
    delay: float = Field(
        description="The delay in seconds from processing start to when the (dummy) outputs are returned.",
        default=0,
    )
delay: float
  
      pydantic-field
  
¶
    The delay in seconds from processing start to when the (dummy) outputs are returned.
input_schema: Dict[str, Dict]
  
      pydantic-field
      required
  
¶
    The input schema for this module.
output_schema: Dict[str, Dict]
  
      pydantic-field
      required
  
¶
    The output schema for this module.
outputs: Dict[str, Any]
  
      pydantic-field
  
¶
    The (dummy) output for this module.
        dict
¶
    
        
SaveDictModule            (StoreValueTypeModule)
        
¶
    Source code in core/dict.py
          class SaveDictModule(StoreValueTypeModule):
    _config_cls = JsonSerializationConfig
    _module_type_name = "store"
    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return "dict"
    def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:
        import orjson
        options = self.get_config_value("options")
        file_name = self.get_config_value("file_name")
        json_str = orjson.dumps(value.get_value_data(), option=options)
        bp = Path(base_path)
        bp.mkdir(parents=True, exist_ok=True)
        full_path = bp / file_name
        full_path.write_bytes(json_str)
        load_config = {
            "module_type": "generic.restore_from_json",
            "base_path_input_name": "base_path",
            "inputs": {
                "base_path": base_path,
                "file_name": self.get_config_value("file_name"),
            },
            "output_name": "value_item",
        }
        return load_config
store_value(self, value, base_path)
¶
    Save the value, and return the load config needed to load it again.
Source code in core/dict.py
          def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:
    import orjson
    options = self.get_config_value("options")
    file_name = self.get_config_value("file_name")
    json_str = orjson.dumps(value.get_value_data(), option=options)
    bp = Path(base_path)
    bp.mkdir(parents=True, exist_ok=True)
    full_path = bp / file_name
    full_path.write_bytes(json_str)
    load_config = {
        "module_type": "generic.restore_from_json",
        "base_path_input_name": "base_path",
        "inputs": {
            "base_path": base_path,
            "file_name": self.get_config_value("file_name"),
        },
        "output_name": "value_item",
    }
    return load_config
        file
¶
    
        
DefaultFileImportModule            (FileImportModule)
        
¶
    Import an external file into a kiara session.
Source code in core/file.py
          class DefaultFileImportModule(FileImportModule):
    """Import an external file into a kiara session."""
    _module_type_name = "import"
    def import_from__file_path__string(self, source: str) -> KiaraFile:
        file_model = KiaraFile.load_file(source)
        return file_model
        
LoadLocalFileModule            (KiaraModule)
        
¶
    Load a file and its metadata.
This module does not read or load the content of a file, but contains the path to the local representation/version of the file so it can be read by a subsequent process.
Source code in core/file.py
          class LoadLocalFileModule(KiaraModule):
    """Load a file and its metadata.
    This module does not read or load the content of a file, but contains the path to the local representation/version of the
    file so it can be read by a subsequent process.
    """
    # _config_cls = ImportLocalPathConfig
    _module_type_name = "load"
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "base_path": {
                "type": "string",
                "doc": "The path to the base directory where the file is stored.",
            },
            "rel_path": {
                "type": "string",
                "doc": "The relative path of the file within the base directory.",
            },
        }
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "file": {
                "type": "file",
                "doc": "A representation of the original file content in the kiara data registry.",
            }
        }
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        base_path = inputs.get_value_data("base_path")
        rel_path = inputs.get_value_data("rel_path")
        path = os.path.join(base_path, rel_path)
        file_model = KiaraFile.load_file(path)
        outputs.set_value("file", file_model)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/file.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "base_path": {
            "type": "string",
            "doc": "The path to the base directory where the file is stored.",
        },
        "rel_path": {
            "type": "string",
            "doc": "The relative path of the file within the base directory.",
        },
    }
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/file.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "file": {
            "type": "file",
            "doc": "A representation of the original file content in the kiara data registry.",
        }
    }
        
StoreFileTypeModule            (StoreValueTypeModule)
        
¶
    Save a file to disk.
Source code in core/file.py
          class StoreFileTypeModule(StoreValueTypeModule):
    """Save a file to disk."""
    _module_type_name = "store"
    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return "file"
    def store_value(
        self, value: Value, base_path: str
    ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Any]:
        file_obj = value.get_value_data()
        file_name = file_obj.file_name
        full_target = os.path.join(base_path, file_name)
        os.makedirs(os.path.dirname(full_target), exist_ok=True)
        if os.path.exists(full_target):
            raise KiaraProcessingException(
                f"Can't save file, path already exists: {full_target}"
            )
        fm = file_obj.copy_file(full_target, is_onboarded=True)
        load_config = {
            "module_type": "file.load",
            "base_path_input_name": "base_path",
            "inputs": {"base_path": base_path, "rel_path": file_name},
            "output_name": "file",
        }
        return (load_config, fm)
store_value(self, value, base_path)
¶
    Save the value, and return the load config needed to load it again.
Source code in core/file.py
          def store_value(
    self, value: Value, base_path: str
) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Any]:
    file_obj = value.get_value_data()
    file_name = file_obj.file_name
    full_target = os.path.join(base_path, file_name)
    os.makedirs(os.path.dirname(full_target), exist_ok=True)
    if os.path.exists(full_target):
        raise KiaraProcessingException(
            f"Can't save file, path already exists: {full_target}"
        )
    fm = file_obj.copy_file(full_target, is_onboarded=True)
    load_config = {
        "module_type": "file.load",
        "base_path_input_name": "base_path",
        "inputs": {"base_path": base_path, "rel_path": file_name},
        "output_name": "file",
    }
    return (load_config, fm)
        file_bundle
¶
    
        
DefaultFileBundleImportModule            (FileBundleImportModule)
        
¶
    Import a file bundle into the kiara data store.
This module will support multiple source types and profiles in the future, but at the moment only import from local folder is supported. Thus, requiring the config value 'local' for 'source_profile', and 'folder_path' for 'source_type'.
Source code in core/file_bundle.py
          class DefaultFileBundleImportModule(FileBundleImportModule):
    """Import a file bundle into the kiara data store.
    This module will support multiple source types and profiles in the future, but at the moment only import from
    local folder is supported. Thus, requiring the config value 'local' for 'source_profile', and 'folder_path' for 'source_type'.
    """
    _module_type_name = "import"
    def import_from__folder_path__string(self, source: str) -> KiaraFileBundle:
        file_bundle_model = KiaraFileBundle.import_folder(source)
        return file_bundle_model
        
LoadFileBundleModule            (KiaraModule)
        
¶
    Load a file bundle and its metadata.
This module does not read or load the content of all included files, but contains the path to the local representation/version of them so they can be read by a subsequent process.
Source code in core/file_bundle.py
          class LoadFileBundleModule(KiaraModule):
    """Load a file bundle and its metadata.
    This module does not read or load the content of all included files, but contains the path to the local representation/version of them
    so they can be read by a subsequent process.
    """
    _module_type_name = "load"
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "base_path": {
                "type": "string",
                "doc": "The base path where the folder lives.",
            },
            "rel_path": {
                "type": "string",
                "doc": "The relative path of the folder, within the base path location.",
            },
            "include_files": {
                "type": "list",
                "doc": "A list of strings, include all files where the filename ends with one of those strings.\n\nOnly full string matches are supported at the moment, globs and regex might be in the future.",
                "optional": True,
            },
            "exclude_dirs": {
                "type": "list",
                "doc": f"A list of strings, exclude all folders whose name ends with that string. Defaults to: {DEFAULT_EXCLUDE_DIRS}.\n\nOnly full string matches are supported at the moment, globs and regex might be in the future.",
                "default": DEFAULT_EXCLUDE_DIRS,
                "optional": True,
            },
            "exclude_files": {
                "type": "list",
                "doc": f"A list of strings, exclude all files that end with that one of those strings (takes precedence over 'include_files'). Defaults to: {DEFAULT_EXCLUDE_FILES}\n\nOnly full string matches are supported at the moment, globs and regex might be in the future.",
                "default": DEFAULT_EXCLUDE_FILES,
                "optional": True,
            },
        }
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "file_bundle": {
                "type": "file_bundle",
                "doc": "The collection of files contained in the bundle.",
            }
        }
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        base_path = inputs.get_value_data("base_path")
        rel_path = inputs.get_value_data("rel_path")
        path = os.path.join(base_path, rel_path)
        included_files = inputs.get_value_data("include_files")
        excluded_dirs = inputs.get_value_data("exclude_dirs")
        excluded_files = inputs.get_value_data("exclude_files")
        import_config = FolderImportConfig(
            include_files=included_files,
            exclude_dirs=excluded_dirs,
            excluded_files=excluded_files,
        )
        bundle = KiaraFileBundle.import_folder(source=path, import_config=import_config)
        outputs.set_values(file_bundle=bundle)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/file_bundle.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "base_path": {
            "type": "string",
            "doc": "The base path where the folder lives.",
        },
        "rel_path": {
            "type": "string",
            "doc": "The relative path of the folder, within the base path location.",
        },
        "include_files": {
            "type": "list",
            "doc": "A list of strings, include all files where the filename ends with one of those strings.\n\nOnly full string matches are supported at the moment, globs and regex might be in the future.",
            "optional": True,
        },
        "exclude_dirs": {
            "type": "list",
            "doc": f"A list of strings, exclude all folders whose name ends with that string. Defaults to: {DEFAULT_EXCLUDE_DIRS}.\n\nOnly full string matches are supported at the moment, globs and regex might be in the future.",
            "default": DEFAULT_EXCLUDE_DIRS,
            "optional": True,
        },
        "exclude_files": {
            "type": "list",
            "doc": f"A list of strings, exclude all files that end with that one of those strings (takes precedence over 'include_files'). Defaults to: {DEFAULT_EXCLUDE_FILES}\n\nOnly full string matches are supported at the moment, globs and regex might be in the future.",
            "default": DEFAULT_EXCLUDE_FILES,
            "optional": True,
        },
    }
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/file_bundle.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "file_bundle": {
            "type": "file_bundle",
            "doc": "The collection of files contained in the bundle.",
        }
    }
        
StoreFileBundleType            (StoreValueTypeModule)
        
¶
    Save a file bundle to disk.
Source code in core/file_bundle.py
          class StoreFileBundleType(StoreValueTypeModule):
    """Save a file bundle to disk."""
    _module_type_name = "store"
    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return "file_bundle"
    def store_value(
        self, value: Value, base_path: str
    ) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Any]:
        bundle: KiaraFileBundle = value.get_value_data()
        rel_path = bundle.bundle_name
        target_path = os.path.join(base_path, rel_path)
        fb = bundle.copy_bundle(target_path, is_onboarded=True)
        # # the following changes the input value, which is usually not allowed, but the file_bundle type is a special case
        # bundle.included_files = fb.included_files
        # bundle.is_onboarded = True
        # bundle.path = fb.path
        # for path, f in bundle.included_files.items():
        #     f.is_onboarded = True
        load_config = {
            "module_type": "file_bundle.load",
            "base_path_input_name": "base_path",
            "inputs": {"base_path": base_path, "rel_path": rel_path},
            "output_name": "file_bundle",
        }
        return (load_config, fb)
store_value(self, value, base_path)
¶
    Save the value, and return the load config needed to load it again.
Source code in core/file_bundle.py
          def store_value(
    self, value: Value, base_path: str
) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Any]:
    bundle: KiaraFileBundle = value.get_value_data()
    rel_path = bundle.bundle_name
    target_path = os.path.join(base_path, rel_path)
    fb = bundle.copy_bundle(target_path, is_onboarded=True)
    # # the following changes the input value, which is usually not allowed, but the file_bundle type is a special case
    # bundle.included_files = fb.included_files
    # bundle.is_onboarded = True
    # bundle.path = fb.path
    # for path, f in bundle.included_files.items():
    #     f.is_onboarded = True
    load_config = {
        "module_type": "file_bundle.load",
        "base_path_input_name": "base_path",
        "inputs": {"base_path": base_path, "rel_path": rel_path},
        "output_name": "file_bundle",
    }
    return (load_config, fb)
        generic
¶
    
        
JsonSerializationConfig            (StoreValueModuleConfig)
        
  
      pydantic-model
  
¶
    Source code in core/generic.py
          class JsonSerializationConfig(StoreValueModuleConfig):
    options: int = Field(
        description="The options to use for the json serialization. Check https://github.com/ijl/orjson#quickstart for details.",
        default=orjson.OPT_NAIVE_UTC | orjson.OPT_SERIALIZE_NUMPY,
    )
    file_name: str = Field(
        description="The name of the serialized file.", default="dict.json"
    )
        
RestoreFromJsonDictModule            (KiaraModule)
        
¶
    Source code in core/generic.py
          class RestoreFromJsonDictModule(KiaraModule):
    _module_type_name = "restore_from_json"
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "base_path": {
                "type": "string",
                "doc": "The folder that contains the serialized dict.",
            },
            "file_name": {
                "type": "string",
                "doc": "The file name of the serialized dict.",
            },
        }
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {"value_item": {"type": "dict", "doc": "The deserialized dict value."}}
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        base_path = inputs.get_value_data("base_path")
        file_name = inputs.get_value_data("file_name")
        full_path = os.path.join(base_path, file_name)
        if not os.path.exists(full_path):
            raise KiaraProcessingException(
                f"Can't deserialize dict, path to file does not exist: {full_path}"
            )
        if not os.path.isfile(os.path.realpath(full_path)):
            raise KiaraProcessingException(
                f"Can't deserialize dict, path is not a file: {full_path}"
            )
        with open(full_path, "r") as f:
            content = f.read()
        data = orjson.loads(content)
        outputs.set_value("value_item", data)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/generic.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "base_path": {
            "type": "string",
            "doc": "The folder that contains the serialized dict.",
        },
        "file_name": {
            "type": "string",
            "doc": "The file name of the serialized dict.",
        },
    }
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/generic.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {"value_item": {"type": "dict", "doc": "The deserialized dict value."}}
        
RestoreScalarModule            (KiaraModule)
        
¶
    Utility module, only used internally.
Source code in core/generic.py
          class RestoreScalarModule(KiaraModule):
    """Utility module, only used internally."""
    _module_type_name = "restore_scalar"
    _config_cls = RestoreScalarModuleConfig
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "scalar_data": {
                "type": self.get_config_value("value_type"),
                "doc": "The scalar value.",
            }
        }
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "value_item": {
                "type": self.get_config_value("value_type"),
                "doc": "The loaded item.",
            }
        }
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        data = inputs.get_value_obj("scalar_data")
        outputs.set_value("value_item", data)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/generic.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "scalar_data": {
            "type": self.get_config_value("value_type"),
            "doc": "The scalar value.",
        }
    }
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/generic.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "value_item": {
            "type": self.get_config_value("value_type"),
            "doc": "The loaded item.",
        }
    }
        
RestoreScalarModuleConfig            (ModuleTypeConfigSchema)
        
  
      pydantic-model
  
¶
    Source code in core/generic.py
          class RestoreScalarModuleConfig(ModuleTypeConfigSchema):
    value_type: str = Field(description="The value type of the scalar to load.")
value_type: str
  
      pydantic-field
      required
  
¶
    The value type of the scalar to load.
        
StoreScalarModule            (StoreValueTypeModule)
        
¶
    Source code in core/generic.py
          class StoreScalarModule(StoreValueTypeModule):
    _module_type_name = "store"
    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return ["boolean", "integer", "float", "string"]
    def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:
        data = value.get_value_data()
        load_config = {
            "module_type": "generic.restore_scalar",
            "module_config": {"value_type": self.get_config_value("value_type")},
            "base_path_input_name": None,
            "inputs": {"scalar_data": data},
            "output_name": "value_item",
        }
        return load_config
store_value(self, value, base_path)
¶
    Save the value, and return the load config needed to load it again.
Source code in core/generic.py
          def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:
    data = value.get_value_data()
    load_config = {
        "module_type": "generic.restore_scalar",
        "module_config": {"value_type": self.get_config_value("value_type")},
        "base_path_input_name": None,
        "inputs": {"scalar_data": data},
        "output_name": "value_item",
    }
    return load_config
        
StoreScalarModuleConfig            (ModuleTypeConfigSchema)
        
  
      pydantic-model
  
¶
    Source code in core/generic.py
          class StoreScalarModuleConfig(ModuleTypeConfigSchema):
    value_type: str = Field(description="The value type of the scalar to store.")
value_type: str
  
      pydantic-field
      required
  
¶
    The value type of the scalar to store.
        json
¶
    
        
ToJsonModuleOld            (OldTypeConversionModule)
        
¶
    Convert arbitrary types into json.
Very early days for this module, it doesn't support a lot of types yet.
Source code in core/json.py
          class ToJsonModuleOld(OldTypeConversionModule):
    """Convert arbitrary types into json.
    Very early days for this module, it doesn't support a lot of types yet.
    """
    _module_type_name = "to_json"
    @classmethod
    def _get_supported_source_types(self) -> typing.Union[typing.Iterable[str], str]:
        return JSON_SUPPORTED_SOURCE_TYPES
    @classmethod
    def _get_target_types(self) -> typing.Union[typing.Iterable[str], str]:
        return ["json"]
    def convert(
        self, value: Value, config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:
        input_value: typing.Any = value.get_value_data()
        input_value_str = convert_to_json(
            self._kiara, data=input_value, convert_config=config
        )
        return input_value_str
        list
¶
    
        
IncludedInListCheckModule            (KiaraModule)
        
¶
    Check whether an element is in a list.
Source code in core/list.py
          class IncludedInListCheckModule(KiaraModule):
    """Check whether an element is in a list."""
    _module_type_name = "contains"
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        inputs = {
            "list": {"type": "list", "doc": "The list."},
            "item": {
                "type": "any",
                "doc": "The element to check for inclusion in the list.",
            },
        }
        return inputs
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        outputs = {
            "is_included": {
                "type": "boolean",
                "doc": "Whether the element is in the list, or not.",
            }
        }
        return outputs
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        item_list = inputs.get_value_data("list")
        item = inputs.get_value_data("item")
        outputs.set_value("is_included", item in item_list)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/list.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    inputs = {
        "list": {"type": "list", "doc": "The list."},
        "item": {
            "type": "any",
            "doc": "The element to check for inclusion in the list.",
        },
    }
    return inputs
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/list.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    outputs = {
        "is_included": {
            "type": "boolean",
            "doc": "Whether the element is in the list, or not.",
        }
    }
    return outputs
        
StoreDictModule            (StoreValueTypeModule)
        
¶
    Source code in core/list.py
          class StoreDictModule(StoreValueTypeModule):
    _config_cls = JsonSerializationConfig
    _module_type_name = "store"
    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return "list"
    def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:
        import orjson
        options = self.get_config_value("options")
        file_name = self.get_config_value("file_name")
        json_str = orjson.dumps(value.get_value_data(), option=options)
        bp = Path(base_path)
        bp.mkdir(parents=True, exist_ok=True)
        full_path = bp / file_name
        full_path.write_bytes(json_str)
        load_config = {
            "module_type": "generic.restore_from_json",
            "base_path_input_name": "base_path",
            "inputs": {
                "base_path": base_path,
                "file_name": self.get_config_value("file_name"),
            },
            "output_name": "value_item",
        }
        return load_config
store_value(self, value, base_path)
¶
    Save the value, and return the load config needed to load it again.
Source code in core/list.py
          def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:
    import orjson
    options = self.get_config_value("options")
    file_name = self.get_config_value("file_name")
    json_str = orjson.dumps(value.get_value_data(), option=options)
    bp = Path(base_path)
    bp.mkdir(parents=True, exist_ok=True)
    full_path = bp / file_name
    full_path.write_bytes(json_str)
    load_config = {
        "module_type": "generic.restore_from_json",
        "base_path_input_name": "base_path",
        "inputs": {
            "base_path": base_path,
            "file_name": self.get_config_value("file_name"),
        },
        "output_name": "value_item",
    }
    return load_config
        logic
¶
    
        
AndModule            (LogicProcessingModule)
        
¶
    Returns 'True' if both inputs are 'True'.
Source code in core/logic.py
          class AndModule(LogicProcessingModule):
    """Returns 'True' if both inputs are 'True'."""
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "a": {"type": "boolean", "doc": "A boolean describing this input state."},
            "b": {"type": "boolean", "doc": "A boolean describing this input state."},
        }
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "y": {
                "type": "boolean",
                "doc": "A boolean describing the module output state.",
            }
        }
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        time.sleep(self.config.delay)  # type: ignore
        outputs.set_value(
            "y", inputs.get_value_data("a") and inputs.get_value_data("b")
        )
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/logic.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "a": {"type": "boolean", "doc": "A boolean describing this input state."},
        "b": {"type": "boolean", "doc": "A boolean describing this input state."},
    }
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/logic.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "y": {
            "type": "boolean",
            "doc": "A boolean describing the module output state.",
        }
    }
        
LogicProcessingModuleConfig            (ModuleTypeConfigSchema)
        
  
      pydantic-model
  
¶
    Config class for all the 'logic'-related modules.
Source code in core/logic.py
          class LogicProcessingModuleConfig(ModuleTypeConfigSchema):
    """Config class for all the 'logic'-related modules."""
    delay: float = Field(
        default=0,
        description="the delay in seconds from processing start to when the output is returned.",
    )
delay: float
  
      pydantic-field
  
¶
    the delay in seconds from processing start to when the output is returned.
        
NotModule            (LogicProcessingModule)
        
¶
    Negates the input.
Source code in core/logic.py
          class NotModule(LogicProcessingModule):
    """Negates the input."""
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        """The not module only has one input, a boolean that will be negated by the module."""
        return {
            "a": {"type": "boolean", "doc": "A boolean describing this input state."}
        }
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        """The output of this module is a single boolean, the negated input."""
        return {
            "y": {
                "type": "boolean",
                "doc": "A boolean describing the module output state.",
            }
        }
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        """Negates the input boolean."""
        time.sleep(self.config.get("delay"))  # type: ignore
        outputs.set_value("y", not inputs.get_value_data("a"))
create_input_schema(self)
¶
    The not module only has one input, a boolean that will be negated by the module.
Source code in core/logic.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    """The not module only has one input, a boolean that will be negated by the module."""
    return {
        "a": {"type": "boolean", "doc": "A boolean describing this input state."}
    }
create_output_schema(self)
¶
    The output of this module is a single boolean, the negated input.
Source code in core/logic.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    """The output of this module is a single boolean, the negated input."""
    return {
        "y": {
            "type": "boolean",
            "doc": "A boolean describing the module output state.",
        }
    }
process(self, inputs, outputs)
¶
    Negates the input boolean.
Source code in core/logic.py
          def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
    """Negates the input boolean."""
    time.sleep(self.config.get("delay"))  # type: ignore
    outputs.set_value("y", not inputs.get_value_data("a"))
        
OrModule            (LogicProcessingModule)
        
¶
    Returns 'True' if one of the inputs is 'True'.
Source code in core/logic.py
          class OrModule(LogicProcessingModule):
    """Returns 'True' if one of the inputs is 'True'."""
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "a": {"type": "boolean", "doc": "A boolean describing this input state."},
            "b": {"type": "boolean", "doc": "A boolean describing this input state."},
        }
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "y": {
                "type": "boolean",
                "doc": "A boolean describing the module output state.",
            }
        }
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        time.sleep(self.config.get("delay"))  # type: ignore
        outputs.set_value("y", inputs.get_value_data("a") or inputs.get_value_data("b"))
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/logic.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "a": {"type": "boolean", "doc": "A boolean describing this input state."},
        "b": {"type": "boolean", "doc": "A boolean describing this input state."},
    }
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/logic.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "y": {
            "type": "boolean",
            "doc": "A boolean describing the module output state.",
        }
    }
        metadata_models
¶
    This module contains the metadata models that are used in the kiara_modules.core package.
Metadata models are convenience wrappers that make it easier for kiara to find, create, manage and version metadata that is attached to data, as well as kiara modules. It is possible to register metadata using a JSON schema string, but it is recommended to create a metadata model, because it is much easier overall.
Metadata models must be a sub-class of kiara.metadata.MetadataModel.
        
ArrayMetadata            (HashedMetadataModel)
        
  
      pydantic-model
  
¶
    Describes properties fo the 'array' type.
Source code in core/metadata_models.py
          class ArrayMetadata(HashedMetadataModel):
    """Describes properties fo the 'array' type."""
    _metadata_key: typing.ClassVar[str] = "array"
    length: int = Field(description="The number of elements the array contains.")
    size: int = Field(
        description="Total number of bytes consumed by the elements of the array."
    )
    def _obj_to_hash(self) -> typing.Any:
        return {"length": self.length, "size": self.size}
    def get_category_alias(self) -> str:
        return "instance.metadata.array"
        
ColumnSchema            (BaseModel)
        
  
      pydantic-model
  
¶
    Describes properties of a single column of the 'table' data type.
Source code in core/metadata_models.py
          class ColumnSchema(BaseModel):
    """Describes properties of a single column of the 'table' data type."""
    _metadata_key: typing.ClassVar[str] = "column"
    type_name: str = Field(
        description="The type name of the column (backend-specific)."
    )
    metadata: typing.Dict[str, typing.Any] = Field(
        description="Other metadata for the column.", default_factory=dict
    )
        
FolderImportConfig            (BaseModel)
        
  
      pydantic-model
  
¶
    Source code in core/metadata_models.py
          class FolderImportConfig(BaseModel):
    include_files: typing.Optional[typing.List[str]] = Field(
        description="A list of strings, include all files where the filename ends with that string.",
        default=None,
    )
    exclude_dirs: typing.Optional[typing.List[str]] = Field(
        description="A list of strings, exclude all folders whose name ends with that string.",
        default=None,
    )
    exclude_files: typing.Optional[typing.List[str]] = Field(
        description=f"A list of strings, exclude all files that match those (takes precedence over 'include_files'). Defaults to: {DEFAULT_EXCLUDE_FILES}.",
        default=DEFAULT_EXCLUDE_FILES,
    )
exclude_dirs: List[str]
  
      pydantic-field
  
¶
    A list of strings, exclude all folders whose name ends with that string.
exclude_files: List[str]
  
      pydantic-field
  
¶
    A list of strings, exclude all files that match those (takes precedence over 'include_files'). Defaults to: ['.DS_Store'].
include_files: List[str]
  
      pydantic-field
  
¶
    A list of strings, include all files where the filename ends with that string.
        
KiaraDatabase            (MetadataModel)
        
  
      pydantic-model
  
¶
    Source code in core/metadata_models.py
          class KiaraDatabase(MetadataModel):
    _metadata_key: typing.ClassVar[str] = "database"
    @classmethod
    def create_in_temp_dir(cls, init_sql: typing.Optional[str] = None):
        temp_f = tempfile.mkdtemp()
        db_path = os.path.join(temp_f, "db.sqlite")
        def cleanup():
            shutil.rmtree(db_path, ignore_errors=True)
        atexit.register(cleanup)
        db = cls(db_file_path=db_path)
        db.create_if_not_exists()
        if init_sql:
            db.execute_sql(sql_script=init_sql, invalidate=True)
        return db
    db_file_path: str = Field(description="The path to the sqlite database file.")
    _cached_engine = PrivateAttr(default=None)
    _cached_inspector = PrivateAttr(default=None)
    _table_names = PrivateAttr(default=None)
    _table_schemas = PrivateAttr(default=None)
    def get_id(self) -> str:
        return self.db_file_path
    def get_category_alias(self) -> str:
        return "instance.metadata.database"
    @validator("db_file_path", allow_reuse=True)
    def ensure_absolute_path(cls, path: str):
        path = os.path.abspath(path)
        if not os.path.exists(os.path.dirname(path)):
            raise ValueError(f"Parent folder for database file does not exist: {path}")
        return path
    @property
    def db_url(self) -> str:
        return f"sqlite:///{self.db_file_path}"
    def get_sqlalchemy_engine(self) -> "Engine":
        if self._cached_engine is not None:
            return self._cached_engine
        from sqlalchemy import create_engine
        self._cached_engine = create_engine(self.db_url, future=True)
        # with self._cached_engine.connect() as con:
        #     con.execute(text("PRAGMA query_only = ON"))
        return self._cached_engine
    def create_if_not_exists(self):
        from sqlalchemy_utils import create_database, database_exists
        if not database_exists(self.db_url):
            create_database(self.db_url)
    def execute_sql(self, sql_script: str, invalidate: bool = False):
        """Execute an sql script.
        Arguments:
          sql_script: the sql script
          invalidate: whether to invalidate cached values within this object
        """
        self.create_if_not_exists()
        conn = self.get_sqlalchemy_engine().raw_connection()
        cursor = conn.cursor()
        cursor.executescript(sql_script)
        conn.commit()
        conn.close()
        if invalidate:
            self._cached_inspector = None
            self._table_names = None
            self._table_schemas = None
    def copy_database_file(self, target: str):
        os.makedirs(os.path.dirname(target))
        shutil.copy2(self.db_file_path, target)
        new_db = KiaraDatabase(db_file_path=target)
        return new_db
    def get_sqlalchemy_inspector(self) -> "Inspector":
        if self._cached_inspector is not None:
            return self._cached_inspector
        self._cached_inspector = inspect(self.get_sqlalchemy_engine())
        return self._cached_inspector
    @property
    def table_names(self) -> typing.Iterable[str]:
        if self._table_names is not None:
            return self._table_names
        self._table_names = self.get_sqlalchemy_inspector().get_table_names()
        return self._table_names
    def get_schema_for_table(self, table_name: str):
        if self._table_schemas is not None:
            if table_name not in self._table_schemas.keys():
                raise Exception(
                    f"Can't get table schema, database does not contain table with name '{table_name}'."
                )
            return self._table_schemas[table_name]
        ts: typing.Dict[str, typing.Dict[str, typing.Any]] = {}
        inspector = self.get_sqlalchemy_inspector()
        for tn in inspector.get_table_names():
            columns = self.get_sqlalchemy_inspector().get_columns(tn)
            ts[tn] = {}
            for c in columns:
                ts[tn][c["name"]] = c
        self._table_schemas = ts
        if table_name not in self._table_schemas.keys():
            raise Exception(
                f"Can't get table schema, database does not contain table with name '{table_name}'."
            )
        return self._table_schemas[table_name]
db_file_path: str
  
      pydantic-field
      required
  
¶
    The path to the sqlite database file.
execute_sql(self, sql_script, invalidate=False)
¶
    Execute an sql script.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
sql_script | 
        str | 
        the sql script  | 
        required | 
invalidate | 
        bool | 
        whether to invalidate cached values within this object  | 
        False | 
      
Source code in core/metadata_models.py
          def execute_sql(self, sql_script: str, invalidate: bool = False):
    """Execute an sql script.
    Arguments:
      sql_script: the sql script
      invalidate: whether to invalidate cached values within this object
    """
    self.create_if_not_exists()
    conn = self.get_sqlalchemy_engine().raw_connection()
    cursor = conn.cursor()
    cursor.executescript(sql_script)
    conn.commit()
    conn.close()
    if invalidate:
        self._cached_inspector = None
        self._table_names = None
        self._table_schemas = None
        
KiaraDatabaseInfo            (HashedMetadataModel)
        
  
      pydantic-model
  
¶
    Source code in core/metadata_models.py
          class KiaraDatabaseInfo(HashedMetadataModel):
    _metadata_key: typing.ClassVar[str] = "database_info"
    table_names: typing.List[str] = Field(
        description="The names of all tables in this database."
    )
    view_names: typing.List[str] = Field(
        description="The names of all views in this database."
    )
    tables: typing.Dict[str, TableMetadata] = Field(
        description="Information about the tables within this database."
    )
    size: int = Field(description="The size of the database file.")
    def _obj_to_hash(self) -> typing.Any:
        return {
            "table_names": self.table_names,
            "view_names": self.view_names,
            "tables": self.tables,
            "size": self.size,
        }
    def get_category_alias(self) -> str:
        return "instance.metadata.database_info"
size: int
  
      pydantic-field
      required
  
¶
    The size of the database file.
table_names: List[str]
  
      pydantic-field
      required
  
¶
    The names of all tables in this database.
tables: Dict[str, kiara_modules.core.metadata_models.TableMetadata]
  
      pydantic-field
      required
  
¶
    Information about the tables within this database.
view_names: List[str]
  
      pydantic-field
      required
  
¶
    The names of all views in this database.
        
KiaraFile            (MetadataModel)
        
  
      pydantic-model
  
¶
    Describes properties for the 'file' value type.
Source code in core/metadata_models.py
          class KiaraFile(MetadataModel):
    """Describes properties for the 'file' value type."""
    _metadata_key: typing.ClassVar[str] = "file"
    @classmethod
    def load_file(
        cls,
        source: str,
        target: typing.Optional[str] = None,
        incl_orig_path: bool = False,
    ):
        """Utility method to read metadata of a file from disk and optionally move it into a data archive location."""
        import mimetypes
        import filetype
        if not source:
            raise ValueError("No source path provided.")
        if not os.path.exists(os.path.realpath(source)):
            raise ValueError(f"Path does not exist: {source}")
        if not os.path.isfile(os.path.realpath(source)):
            raise ValueError(f"Path is not a file: {source}")
        orig_filename = os.path.basename(source)
        orig_path: str = os.path.abspath(source)
        file_import_time = datetime.datetime.now().isoformat()  # TODO: timezone
        file_stats = os.stat(orig_path)
        size = file_stats.st_size
        if target:
            if os.path.exists(target):
                raise ValueError(f"Target path exists: {target}")
            os.makedirs(os.path.dirname(target), exist_ok=True)
            shutil.copy2(source, target)
        else:
            target = orig_path
        r = mimetypes.guess_type(target)
        if r[0] is not None:
            mime_type = r[0]
        else:
            _mime_type = filetype.guess(target)
            if not _mime_type:
                mime_type = "application/octet-stream"
            else:
                mime_type = _mime_type.MIME
        if not incl_orig_path:
            _orig_path: typing.Optional[str] = None
        else:
            _orig_path = orig_path
        m = KiaraFile(
            orig_filename=orig_filename,
            orig_path=_orig_path,
            import_time=file_import_time,
            mime_type=mime_type,
            size=size,
            file_name=orig_filename,
            path=target,
        )
        return m
    _file_hash: typing.Optional[str] = PrivateAttr(default=None)
    orig_filename: str = Field(
        description="The original filename of this file at the time of import."
    )
    orig_path: typing.Optional[str] = Field(
        description="The original path to this file at the time of import.",
        default=None,
    )
    import_time: str = Field(description="The time when the file was imported.")
    mime_type: str = Field(description="The mime type of the file.")
    file_name: str = Field("The name of the file.")
    size: int = Field(description="The size of the file.")
    path: str = Field(description="The archive path of the file.")
    is_onboarded: bool = Field(
        description="Whether the file is imported into the kiara data store.",
        default=False,
    )
    def get_id(self) -> str:
        return self.path
    def get_category_alias(self) -> str:
        return "instance.metadata.file"
    def copy_file(
        self, target: str, incl_orig_path: bool = False, is_onboarded: bool = False
    ):
        fm = KiaraFile.load_file(self.path, target)
        if incl_orig_path:
            fm.orig_path = self.orig_path
        else:
            fm.orig_path = None
        fm.orig_filename = self.orig_filename
        fm.import_time = self.import_time
        if self._file_hash is not None:
            fm._file_hash = self._file_hash
        fm.is_onboarded = is_onboarded
        return fm
    @property
    def file_hash(self):
        if self._file_hash is not None:
            return self._file_hash
        sha256_hash = hashlib.sha3_256()
        with open(self.path, "rb") as f:
            # Read and update hash string value in blocks of 4K
            for byte_block in iter(lambda: f.read(4096), b""):
                sha256_hash.update(byte_block)
        self._file_hash = sha256_hash.hexdigest()
        return self._file_hash
    @property
    def file_name_without_extension(self) -> str:
        return self.file_name.split(".")[0]
    @property
    def import_time_as_datetime(self) -> datetime.datetime:
        from dateutil import parser
        return parser.parse(self.import_time)
    def read_content(
        self, as_str: bool = True, max_lines: int = -1
    ) -> typing.Union[str, bytes]:
        """Read the content of a file."""
        mode = "r" if as_str else "rb"
        with open(self.path, mode) as f:
            if max_lines <= 0:
                content = f.read()
            else:
                content = "".join((next(f) for x in range(max_lines)))
        return content
    def __repr__(self):
        return f"FileMetadata(name={self.file_name})"
    def __str__(self):
        return self.__repr__()
import_time: str
  
      pydantic-field
      required
  
¶
    The time when the file was imported.
is_onboarded: bool
  
      pydantic-field
  
¶
    Whether the file is imported into the kiara data store.
mime_type: str
  
      pydantic-field
      required
  
¶
    The mime type of the file.
orig_filename: str
  
      pydantic-field
      required
  
¶
    The original filename of this file at the time of import.
orig_path: str
  
      pydantic-field
  
¶
    The original path to this file at the time of import.
path: str
  
      pydantic-field
      required
  
¶
    The archive path of the file.
size: int
  
      pydantic-field
      required
  
¶
    The size of the file.
__repr__(self)
  
      special
  
¶
    Return repr(self).
Source code in core/metadata_models.py
          def __repr__(self):
    return f"FileMetadata(name={self.file_name})"
__str__(self)
  
      special
  
¶
    Return str(self).
Source code in core/metadata_models.py
          def __str__(self):
    return self.__repr__()
load_file(source, target=None, incl_orig_path=False)
  
      classmethod
  
¶
    Utility method to read metadata of a file from disk and optionally move it into a data archive location.
Source code in core/metadata_models.py
          @classmethod
def load_file(
    cls,
    source: str,
    target: typing.Optional[str] = None,
    incl_orig_path: bool = False,
):
    """Utility method to read metadata of a file from disk and optionally move it into a data archive location."""
    import mimetypes
    import filetype
    if not source:
        raise ValueError("No source path provided.")
    if not os.path.exists(os.path.realpath(source)):
        raise ValueError(f"Path does not exist: {source}")
    if not os.path.isfile(os.path.realpath(source)):
        raise ValueError(f"Path is not a file: {source}")
    orig_filename = os.path.basename(source)
    orig_path: str = os.path.abspath(source)
    file_import_time = datetime.datetime.now().isoformat()  # TODO: timezone
    file_stats = os.stat(orig_path)
    size = file_stats.st_size
    if target:
        if os.path.exists(target):
            raise ValueError(f"Target path exists: {target}")
        os.makedirs(os.path.dirname(target), exist_ok=True)
        shutil.copy2(source, target)
    else:
        target = orig_path
    r = mimetypes.guess_type(target)
    if r[0] is not None:
        mime_type = r[0]
    else:
        _mime_type = filetype.guess(target)
        if not _mime_type:
            mime_type = "application/octet-stream"
        else:
            mime_type = _mime_type.MIME
    if not incl_orig_path:
        _orig_path: typing.Optional[str] = None
    else:
        _orig_path = orig_path
    m = KiaraFile(
        orig_filename=orig_filename,
        orig_path=_orig_path,
        import_time=file_import_time,
        mime_type=mime_type,
        size=size,
        file_name=orig_filename,
        path=target,
    )
    return m
read_content(self, as_str=True, max_lines=-1)
¶
    Read the content of a file.
Source code in core/metadata_models.py
          def read_content(
    self, as_str: bool = True, max_lines: int = -1
) -> typing.Union[str, bytes]:
    """Read the content of a file."""
    mode = "r" if as_str else "rb"
    with open(self.path, mode) as f:
        if max_lines <= 0:
            content = f.read()
        else:
            content = "".join((next(f) for x in range(max_lines)))
    return content
        
KiaraFileBundle            (MetadataModel)
        
  
      pydantic-model
  
¶
    Describes properties for the 'file_bundle' value type.
Source code in core/metadata_models.py
          class KiaraFileBundle(MetadataModel):
    """Describes properties for the 'file_bundle' value type."""
    _metadata_key: typing.ClassVar[str] = "file_bundle"
    @classmethod
    def import_folder(
        cls,
        source: str,
        target: typing.Optional[str] = None,
        import_config: typing.Union[
            None, typing.Mapping[str, typing.Any], FolderImportConfig
        ] = None,
        incl_orig_path: bool = False,
    ):
        if not source:
            raise ValueError("No source path provided.")
        if not os.path.exists(os.path.realpath(source)):
            raise ValueError(f"Path does not exist: {source}")
        if not os.path.isdir(os.path.realpath(source)):
            raise ValueError(f"Path is not a file: {source}")
        if target and os.path.exists(target):
            raise ValueError(f"Target path already exists: {target}")
        if source.endswith(os.path.sep):
            source = source[0:-1]
        if target and target.endswith(os.path.sep):
            target = target[0:-1]
        if import_config is None:
            _import_config = FolderImportConfig()
        elif isinstance(import_config, typing.Mapping):
            _import_config = FolderImportConfig(**import_config)
        elif isinstance(import_config, FolderImportConfig):
            _import_config = import_config
        else:
            raise TypeError(
                f"Invalid type for folder import config: {type(import_config)}."
            )
        included_files: typing.Dict[str, KiaraFile] = {}
        exclude_dirs = _import_config.exclude_dirs
        invalid_extensions = _import_config.exclude_files
        valid_extensions = _import_config.include_files
        sum_size = 0
        def include_file(filename: str) -> bool:
            if invalid_extensions and any(
                filename.endswith(ext) for ext in invalid_extensions
            ):
                return False
            if not valid_extensions:
                return True
            else:
                return any(filename.endswith(ext) for ext in valid_extensions)
        for root, dirnames, filenames in os.walk(source, topdown=True):
            if exclude_dirs:
                dirnames[:] = [d for d in dirnames if d not in exclude_dirs]
            for filename in [
                f
                for f in filenames
                if os.path.isfile(os.path.join(root, f)) and include_file(f)
            ]:
                full_path = os.path.join(root, filename)
                rel_path = os.path.relpath(full_path, source)
                if target:
                    target_path: typing.Optional[str] = os.path.join(target, rel_path)
                else:
                    target_path = None
                file_model = KiaraFile.load_file(
                    full_path, target_path, incl_orig_path=incl_orig_path
                )
                sum_size = sum_size + file_model.size
                included_files[rel_path] = file_model
        orig_bundle_name = os.path.basename(source)
        if incl_orig_path:
            orig_path: typing.Optional[str] = source
        else:
            orig_path = None
        if target:
            path = target
        else:
            path = source
        return KiaraFileBundle.create_from_file_models(
            files=included_files,
            orig_bundle_name=orig_bundle_name,
            orig_path=orig_path,
            path=path,
            sum_size=sum_size,
        )
    @classmethod
    def create_from_file_models(
        self,
        files: typing.Mapping[str, KiaraFile],
        orig_bundle_name: str,
        orig_path: typing.Optional[str],
        path: str,
        sum_size: typing.Optional[int] = None,
        is_onboarded: bool = False,
    ):
        result: typing.Dict[str, typing.Any] = {}
        result["included_files"] = files
        result["orig_path"] = orig_path
        result["path"] = path
        result["import_time"] = datetime.datetime.now().isoformat()
        result["number_of_files"] = len(files)
        result["bundle_name"] = os.path.basename(result["path"])
        result["orig_bundle_name"] = orig_bundle_name
        result["is_onboarded"] = is_onboarded
        if sum_size is None:
            sum_size = 0
            for f in files.values():
                sum_size = sum_size + f.size
        result["size"] = sum_size
        return KiaraFileBundle(**result)
    _file_bundle_hash: typing.Optional[str] = PrivateAttr(default=None)
    orig_bundle_name: str = Field(
        description="The original name of this folder at the time of import."
    )
    bundle_name: str = Field(description="The name of this bundle.")
    orig_path: typing.Optional[str] = Field(
        description="The original path to this folder at the time of import.",
        default=None,
    )
    import_time: str = Field(description="The time when the file was imported.")
    number_of_files: int = Field(
        description="How many files are included in this bundle."
    )
    included_files: typing.Dict[str, KiaraFile] = Field(
        description="A map of all the included files, incl. their properties. Uses the relative path of each file as key."
    )
    size: int = Field(description="The size of all files in this folder, combined.")
    path: str = Field(description="The archive path of the folder.")
    is_onboarded: bool = Field(
        description="Whether this bundle is imported into the kiara data store.",
        default=False,
    )
    def get_id(self) -> str:
        return self.path
    def get_category_alias(self) -> str:
        return "instance.metadata.file_bundle"
    def get_relative_path(self, file: KiaraFile):
        return os.path.relpath(file.path, self.path)
    def read_text_file_contents(
        self, ignore_errors: bool = False
    ) -> typing.Mapping[str, str]:
        content_dict: typing.Dict[str, str] = {}
        def read_file(rel_path: str, fm: KiaraFile):
            with open(fm.path, encoding="utf-8") as f:
                try:
                    content = f.read()
                    content_dict[rel_path] = content  # type: ignore
                except Exception as e:
                    if ignore_errors:
                        log_message(f"Can't read file: {e}")
                        log.warning(f"Ignoring file: {fm.path}")
                    else:
                        raise Exception(f"Can't read file (as text) '{fm.path}: {e}")
        # TODO: common ignore files and folders
        for f in self.included_files.values():
            rel_path = self.get_relative_path(f)
            read_file(rel_path=rel_path, fm=f)
        return content_dict
    @property
    def file_bundle_hash(self):
        if self._file_bundle_hash is not None:
            return self._file_bundle_hash
        # hash_format ="sha3-256"
        hashes = ""
        for path in sorted(self.included_files.keys()):
            fm = self.included_files[path]
            hashes = hashes + "_" + path + "_" + fm.file_hash
        self._file_bundle_hash = hashlib.sha3_256(hashes.encode("utf-8")).hexdigest()
        return self._file_bundle_hash
    def copy_bundle(
        self, target_path: str, incl_orig_path: bool = False, is_onboarded: bool = False
    ) -> "KiaraFileBundle":
        if target_path == self.path:
            raise Exception(f"Target path and current path are the same: {target_path}")
        result = {}
        for rel_path, item in self.included_files.items():
            _target_path = os.path.join(target_path, rel_path)
            new_fm = item.copy_file(_target_path, is_onboarded=is_onboarded)
            result[rel_path] = new_fm
        if incl_orig_path:
            orig_path = self.orig_path
        else:
            orig_path = None
        fb = KiaraFileBundle.create_from_file_models(
            result,
            orig_bundle_name=self.orig_bundle_name,
            orig_path=orig_path,
            path=target_path,
            sum_size=self.size,
            is_onboarded=is_onboarded,
        )
        if self._file_bundle_hash is not None:
            fb._file_bundle_hash = self._file_bundle_hash
        return fb
    def __repr__(self):
        return f"FileBundle(name={self.bundle_name})"
    def __str__(self):
        return self.__repr__()
bundle_name: str
  
      pydantic-field
      required
  
¶
    The name of this bundle.
import_time: str
  
      pydantic-field
      required
  
¶
    The time when the file was imported.
included_files: Dict[str, kiara_modules.core.metadata_models.KiaraFile]
  
      pydantic-field
      required
  
¶
    A map of all the included files, incl. their properties. Uses the relative path of each file as key.
is_onboarded: bool
  
      pydantic-field
  
¶
    Whether this bundle is imported into the kiara data store.
number_of_files: int
  
      pydantic-field
      required
  
¶
    How many files are included in this bundle.
orig_bundle_name: str
  
      pydantic-field
      required
  
¶
    The original name of this folder at the time of import.
orig_path: str
  
      pydantic-field
  
¶
    The original path to this folder at the time of import.
path: str
  
      pydantic-field
      required
  
¶
    The archive path of the folder.
size: int
  
      pydantic-field
      required
  
¶
    The size of all files in this folder, combined.
__repr__(self)
  
      special
  
¶
    Return repr(self).
Source code in core/metadata_models.py
          def __repr__(self):
    return f"FileBundle(name={self.bundle_name})"
__str__(self)
  
      special
  
¶
    Return str(self).
Source code in core/metadata_models.py
          def __str__(self):
    return self.__repr__()
        
TableMetadata            (HashedMetadataModel)
        
  
      pydantic-model
  
¶
    Describes properties for the 'table' data type.
Source code in core/metadata_models.py
          class TableMetadata(HashedMetadataModel):
    """Describes properties for the 'table' data type."""
    _metadata_key: typing.ClassVar[str] = "table"
    column_names: typing.List[str] = Field(
        description="The name of the columns of the table."
    )
    column_schema: typing.Dict[str, ColumnSchema] = Field(
        description="The schema description of the table."
    )
    rows: int = Field(description="The number of rows the table contains.")
    size: typing.Optional[int] = Field(
        description="The tables size in bytes.", default=None
    )
    def _obj_to_hash(self) -> typing.Any:
        return {
            "column_names": self.column_names,
            "column_schemas": {k: v.dict() for k, v in self.column_schema.items()},
            "rows": self.rows,
            "size": self.size,
        }
    def get_category_alias(self) -> str:
        return "instance.metadata.table"
column_names: List[str]
  
      pydantic-field
      required
  
¶
    The name of the columns of the table.
column_schema: Dict[str, kiara_modules.core.metadata_models.ColumnSchema]
  
      pydantic-field
      required
  
¶
    The schema description of the table.
rows: int
  
      pydantic-field
      required
  
¶
    The number of rows the table contains.
size: int
  
      pydantic-field
  
¶
    The tables size in bytes.
        pipelines
  
      special
  
¶
    Virtual module that is used as base for PipelineModule classes that are auto-generated from pipeline descriptions under this folder.
        string
¶
    
        
DeserializeStringModule            (KiaraModule)
        
¶
    Source code in core/string.py
          class DeserializeStringModule(KiaraModule):
    _module_type_name = "deserialize"
    _config_cls = DeserializeStringModuleConfig
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {
            "serialized": {
                "type": "string",
                "doc": "The serialized form of the string.",
            }
        }
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {"value_item": {"type": "string", "doc": "The string data."}}
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        serialization_type = self.get_config_value("serialization_type")
        if serialization_type not in ["json"]:
            raise KiaraProcessingException(
                f"Can't deserialize string: serialisation type '{serialization_type}' not supported."
            )
        serialized = inputs.get_value_data("serialized")
        outputs.set_value("value_item", serialized)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/string.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {
        "serialized": {
            "type": "string",
            "doc": "The serialized form of the string.",
        }
    }
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/string.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {"value_item": {"type": "string", "doc": "The string data."}}
        
DeserializeStringModuleConfig            (ModuleTypeConfigSchema)
        
  
      pydantic-model
  
¶
    Source code in core/string.py
          class DeserializeStringModuleConfig(ModuleTypeConfigSchema):
    serialization_type: str = Field(
        description="The serialization type that was used to serialize the value."
    )
serialization_type: str
  
      pydantic-field
      required
  
¶
    The serialization type that was used to serialize the value.
        
RegexModule            (KiaraModule)
        
¶
    Match a string using a regular expression.
Source code in core/string.py
          class RegexModule(KiaraModule):
    """Match a string using a regular expression."""
    _config_cls = RegexModuleConfig
    _module_type_name = "match_regex"
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {"text": {"type": "string", "doc": "The text to match."}}
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        if self.get_config_value("only_first_match"):
            output_schema = {"text": {"type": "string", "doc": "The first match."}}
        else:
            raise NotImplementedError()
        return output_schema
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        text = inputs.get_value_data("text")
        regex = self.get_config_value("regex")
        matches = re.findall(regex, text)
        if not matches:
            raise KiaraProcessingException(f"No match for regex: {regex}")
        if self.get_config_value("only_first_match"):
            result = matches[0]
        else:
            result = matches
        outputs.set_value("text", result)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/string.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {"text": {"type": "string", "doc": "The text to match."}}
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/string.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    if self.get_config_value("only_first_match"):
        output_schema = {"text": {"type": "string", "doc": "The first match."}}
    else:
        raise NotImplementedError()
    return output_schema
        
RegexModuleConfig            (ModuleTypeConfigSchema)
        
  
      pydantic-model
  
¶
    Source code in core/string.py
          class RegexModuleConfig(ModuleTypeConfigSchema):
    regex: str = Field(description="The regex to apply.")
    only_first_match: bool = Field(
        description="Whether to only return the first match, or all matches.",
        default=False,
    )
        
ReplaceModuleConfig            (ModuleTypeConfigSchema)
        
  
      pydantic-model
  
¶
    Source code in core/string.py
          class ReplaceModuleConfig(ModuleTypeConfigSchema):
    replacement_map: typing.Dict[str, str] = Field(
        description="A map, containing the strings to be replaced as keys, and the replacements as values."
    )
    default_value: typing.Optional[str] = Field(
        description="The default value to use if the string to be replaced is not in the replacement map. By default, this just returns the string itself.",
        default=None,
    )
default_value: str
  
      pydantic-field
  
¶
    The default value to use if the string to be replaced is not in the replacement map. By default, this just returns the string itself.
replacement_map: Dict[str, str]
  
      pydantic-field
      required
  
¶
    A map, containing the strings to be replaced as keys, and the replacements as values.
        
ReplaceStringModule            (KiaraModule)
        
¶
    Replace a string if it matches a key in a mapping dictionary.
Source code in core/string.py
          class ReplaceStringModule(KiaraModule):
    """Replace a string if it matches a key in a mapping dictionary."""
    _config_cls = ReplaceModuleConfig
    _module_type_name = "replace"
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {"text": {"type": "string", "doc": "The input string."}}
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {"text": {"type": "string", "doc": "The replaced string."}}
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        text = inputs.get_value_data("text")
        repl_map = self.get_config_value("replacement_map")
        default = self.get_config_value("default_value")
        if text not in repl_map.keys():
            if default is None:
                result = text
            else:
                result = default
        else:
            result = repl_map[text]
        outputs.set_value("text", result)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/string.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {"text": {"type": "string", "doc": "The input string."}}
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/string.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {"text": {"type": "string", "doc": "The replaced string."}}
        
StringManipulationModule            (KiaraModule)
        
¶
    Base module to simplify creating other modules that do string manipulation.
Source code in core/string.py
          class StringManipulationModule(KiaraModule):
    """Base module to simplify creating other modules that do string manipulation."""
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {"text": {"type": "string", "doc": "The input string."}}
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {"text": {"type": "string", "doc": "The processed string."}}
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        input_string = inputs.get_value_data("text")
        result = self.process_string(input_string)
        outputs.set_value("text", result)
    @abstractmethod
    def process_string(self, text: str) -> str:
        pass
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/string.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {"text": {"type": "string", "doc": "The input string."}}
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/string.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {"text": {"type": "string", "doc": "The processed string."}}
        table
  
      special
  
¶
    
        
ConvertToTableModule            (CreateValueModule)
        
¶
    Create an Arrow table from files, file_bundles, etc.
This module supportes two conversion targets currently:
- bytes: a memoryview of the byte-representation of the Table
 - string: the base64-encoded byte-representation of the Table
 
Source code in core/table/__init__.py
          class ConvertToTableModule(CreateValueModule):
    """Create an Arrow table from files, file_bundles, etc.
    This module supportes two conversion targets currently:
     - bytes: a memoryview of the byte-representation of the Table
     - string: the base64-encoded byte-representation of the Table
    """
    _module_type_name = "create"
    _config_cls = TableConversionModuleConfig
    @classmethod
    def get_target_value_type(cls) -> str:
        return "table"
    # def to_bytes(self, value: Value) -> bytes:
    #
    #     import pyarrow as pa
    #
    #     table_val: Value = value
    #     table: pa.Table = table_val.get_value_data()
    #
    #     batches = table.to_batches()
    #
    #     sink = pa.BufferOutputStream()
    #     writer = pa.ipc.new_stream(sink, batches[0].schema)
    #
    #     for batch in batches:
    #         writer.write_batch(batch)
    #     writer.close()
    #
    #     buf: pa.Buffer = sink.getvalue()
    #     return memoryview(buf)
    #
    # def to_string(self, value: Value):
    #
    #     _bytes: bytes = self.to_bytes(value)
    #     string = base64.b64encode(_bytes)
    #     return string.decode()
    # def from_bytes(self, value: Value):
    #     raise NotImplementedError()
    #
    # def from_string(self, value: Value):
    #     raise NotImplementedError()
    def from_csv_file(self, value: Value):
        from pyarrow import csv
        input_file: KiaraFile = value.get_value_data()
        imported_data = csv.read_csv(input_file.path)
        return imported_data
    def from_text_file_bundle(self, value: Value):
        import pyarrow as pa
        bundle: KiaraFileBundle = value.get_value_data()
        columns = FILE_BUNDLE_IMPORT_AVAILABLE_COLUMNS
        ignore_errors = self.get_config_value("ignore_errors")
        file_dict = bundle.read_text_file_contents(ignore_errors=ignore_errors)
        tabular: typing.Dict[str, typing.List[typing.Any]] = {}
        for column in columns:
            for index, rel_path in enumerate(sorted(file_dict.keys())):
                if column == "content":
                    _value: typing.Any = file_dict[rel_path]
                elif column == "id":
                    _value = index
                elif column == "rel_path":
                    _value = rel_path
                else:
                    file_model = bundle.included_files[rel_path]
                    _value = getattr(file_model, column)
                tabular.setdefault(column, []).append(_value)
        table = pa.Table.from_pydict(tabular)
        return table
        
CutColumnModule            (KiaraModule)
        
¶
    Cut off one column from a table, returning an array.
Source code in core/table/__init__.py
          class CutColumnModule(KiaraModule):
    """Cut off one column from a table, returning an array."""
    _module_type_name = "cut_column"
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        inputs: typing.Mapping[str, typing.Any] = {
            "table": {"type": "table", "doc": "A table."},
            "column_name": {
                "type": "string",
                "doc": "The name of the column to extract.",
            },
        }
        return inputs
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        outputs: typing.Mapping[str, typing.Any] = {
            "array": {"type": "array", "doc": "The column."}
        }
        return outputs
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        import pyarrow as pa
        table_value = inputs.get_value_obj("table")
        column_name: str = inputs.get_value_data("column_name")
        available = table_value.get_metadata("table")["table"]["column_names"]
        if column_name not in available:
            raise KiaraProcessingException(
                f"Invalid column name '{column_name}'. Available column names: {available}"
            )
        table: pa.Table = inputs.get_value_data("table")
        column = table.column(column_name)
        outputs.set_value("array", column)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/table/__init__.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    inputs: typing.Mapping[str, typing.Any] = {
        "table": {"type": "table", "doc": "A table."},
        "column_name": {
            "type": "string",
            "doc": "The name of the column to extract.",
        },
    }
    return inputs
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/table/__init__.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    outputs: typing.Mapping[str, typing.Any] = {
        "array": {"type": "array", "doc": "The column."}
    }
    return outputs
        
ExportArrowTable            (KiaraModule)
        
¶
    Export a table object to disk.
Source code in core/table/__init__.py
          class ExportArrowTable(KiaraModule):
    """Export a table object to disk."""
    _module_type_name = "export"
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        inputs: typing.Mapping[str, typing.Any] = {
            "table": {"type": "table", "doc": "The table object."},
            "path": {
                "type": "string",
                "doc": "The path to the file to write.",
            },
            "format": {
                "type": "string",
                "doc": "The format of the table file ('feather' or 'parquet').",
                "default": "feather",
            },
            "force_overwrite": {
                "type": "boolean",
                "doc": "Whether to overwrite an existing file.",
                "default": False,
            },
            "compression": {
                "type": "string",
                "doc": "The compression to use. Use either: 'zstd' (default), 'lz4', or 'uncompressed'.",
                "default": "zstd",
            },
        }
        return inputs
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        outputs: typing.Mapping[str, typing.Any] = {
            "load_config": {
                "type": "load_config",
                "doc": "The configuration to use with kiara to load the saved value.",
            }
        }
        return outputs
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        import pyarrow as pa
        from pyarrow import feather
        table: pa.Table = inputs.get_value_data("table")
        full_path: str = inputs.get_value_data("path")
        force_overwrite = inputs.get_value_data("force_overwrite")
        format: str = inputs.get_value_data("format")
        compression = inputs.get_value_data("compression")
        if compression not in ["zstd", "lz4", "uncompressed"]:
            raise KiaraProcessingException(
                f"Invalid compression format '{compression}'. Allowed: 'zstd', 'lz4', 'uncompressed'."
            )
        if format != "feather":
            raise KiaraProcessingException(
                f"Can't export table to format '{format}': only 'feather' supported at the moment."
            )
        if os.path.exists(full_path) and not force_overwrite:
            raise KiaraProcessingException(
                f"Can't write table to file, file already exists: {full_path}"
            )
        os.makedirs(os.path.dirname(full_path), exist_ok=True)
        feather.write_feather(table, full_path, compression=compression)
        result = {
            "module_type": "table.load",
            "base_path_input_name": "base_path",
            "inputs": {
                "base_path": os.path.dirname(full_path),
                "rel_path": os.path.basename(full_path),
                "format": format,
            },
            "value_id": NO_VALUE_ID_MARKER,
            "output_name": "table",
        }
        outputs.set_value("load_config", result)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/table/__init__.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    inputs: typing.Mapping[str, typing.Any] = {
        "table": {"type": "table", "doc": "The table object."},
        "path": {
            "type": "string",
            "doc": "The path to the file to write.",
        },
        "format": {
            "type": "string",
            "doc": "The format of the table file ('feather' or 'parquet').",
            "default": "feather",
        },
        "force_overwrite": {
            "type": "boolean",
            "doc": "Whether to overwrite an existing file.",
            "default": False,
        },
        "compression": {
            "type": "string",
            "doc": "The compression to use. Use either: 'zstd' (default), 'lz4', or 'uncompressed'.",
            "default": "zstd",
        },
    }
    return inputs
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/table/__init__.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    outputs: typing.Mapping[str, typing.Any] = {
        "load_config": {
            "type": "load_config",
            "doc": "The configuration to use with kiara to load the saved value.",
        }
    }
    return outputs
        
LoadArrowTable            (KiaraModule)
        
¶
    Load a table object from disk.
Source code in core/table/__init__.py
          class LoadArrowTable(KiaraModule):
    """Load a table object from disk."""
    _module_type_name = "load"
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        inputs: typing.Mapping[str, typing.Any] = {
            "base_path": {
                "type": "string",
                "doc": "The path to the folder that contains the table file.",
            },
            "rel_path": {
                "type": "string",
                "doc": "The relative path to the table file within base_path.",
            },
            "format": {
                "type": "string",
                "doc": "The format of the table file ('feather' or 'parquet').",
                "default": "feather",
            },
        }
        return inputs
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        outputs: typing.Mapping[str, typing.Any] = {
            "table": {"type": "table", "doc": "The pyarrow table object."}
        }
        return outputs
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        from pyarrow import feather
        base_path = inputs.get_value_data("base_path")
        rel_path = inputs.get_value_data("rel_path")
        format = inputs.get_value_data("format")
        if format != "feather":
            raise NotImplementedError()
        path = os.path.join(base_path, rel_path)
        table = feather.read_table(path)
        outputs.set_value("table", table)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/table/__init__.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    inputs: typing.Mapping[str, typing.Any] = {
        "base_path": {
            "type": "string",
            "doc": "The path to the folder that contains the table file.",
        },
        "rel_path": {
            "type": "string",
            "doc": "The relative path to the table file within base_path.",
        },
        "format": {
            "type": "string",
            "doc": "The format of the table file ('feather' or 'parquet').",
            "default": "feather",
        },
    }
    return inputs
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/table/__init__.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    outputs: typing.Mapping[str, typing.Any] = {
        "table": {"type": "table", "doc": "The pyarrow table object."}
    }
    return outputs
        
MapColumnModule            (KiaraModule)
        
¶
    Map the items of one column of a table onto an array, using another module.
Source code in core/table/__init__.py
          class MapColumnModule(KiaraModule):
    """Map the items of one column of a table onto an array, using another module."""
    _config_cls = MapColumnsModuleConfig
    _module_type_name = "map_column"
    def module_instance_doc(self) -> str:
        config: MapColumnsModuleConfig = self.config  # type: ignore
        module_type = config.module_type
        module_config = config.module_config
        m = self._kiara.create_module(
            id="map_column_child", module_type=module_type, module_config=module_config
        )
        type_md = m.get_type_metadata()
        doc = type_md.documentation.full_doc
        link = type_md.context.get_url_for_reference("module_doc")
        if not link:
            link_str = f"``{module_type}``"
        else:
            link_str = f"[``{module_type}``]({link})"
        result = f"""Map the values of the rows of an input table onto a new array of the same length, using the {link_str} module."""
        if doc and doc != "-- n/a --":
            result = result + f"\n\n``{module_type}`` documentation:\n\n{doc}"
        return result
    def __init__(self, *args, **kwargs):
        self._child_module: typing.Optional[KiaraModule] = None
        self._module_input_name: typing.Optional[str] = None
        self._module_output_name: typing.Optional[str] = None
        super().__init__(*args, **kwargs)
    @property
    def child_module(self) -> KiaraModule:
        if self._child_module is not None:
            return self._child_module
        module_name = self.get_config_value("module_type")
        module_config = self.get_config_value("module_config")
        self._child_module = self._kiara.create_module(
            module_type=module_name, module_config=module_config
        )
        return self._child_module
    @property
    def module_input_name(self) -> str:
        if self._module_input_name is not None:
            return self._module_input_name
        self._module_input_name = self.get_config_value("input_name")
        if self._module_input_name is None:
            if len(list(self.child_module.input_names)) == 1:
                self._module_input_name = next(iter(self.child_module.input_names))
            else:
                raise KiaraProcessingException(
                    f"No 'input_name' specified, and configured module has more than one inputs. Please specify an 'input_name' value in your module config, pick one of: {', '.join(self.child_module.input_names)}"
                )
        return self._module_input_name
    @property
    def module_output_name(self) -> str:
        if self._module_output_name is not None:
            return self._module_output_name
        self._module_output_name = self.get_config_value("output_name")
        if self._module_output_name is None:
            if len(list(self.child_module.output_names)) == 1:
                self._module_output_name = next(iter(self.child_module.output_names))
            else:
                raise KiaraProcessingException(
                    f"No 'output_name' specified, and configured module has more than one outputs. Please specify an 'output_name' value in your module config, pick one of: {', '.join(self.child_module.output_names)}"
                )
        return self._module_output_name
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        inputs: typing.Dict[
            str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
        ] = {
            "table": {
                "type": "table",
                "doc": "The table to use as input.",
            },
            "column_name": {
                "type": "string",
                "doc": "The name of the table column to run the mapping operation on.",
            },
        }
        for input_name, schema in self.child_module.input_schemas.items():
            assert input_name != "table"
            assert input_name != "column_name"
            if input_name == self.module_input_name:
                continue
            inputs[input_name] = schema
        return inputs
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        outputs = {
            "array": {
                "type": "array",
                "doc": "An array of equal length to the input array, containing the 'mapped' values.",
            }
        }
        return outputs
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        import pyarrow as pa
        table: pa.Table = inputs.get_value_data("table")
        column_name = inputs.get_value_data("column_name")
        if column_name not in table.column_names:
            raise KiaraProcessingException(
                f"Table column '{column_name}' not available in table. Available columns: {', '.join(table.column_names)}."
            )
        input_array: pa.Array = table.column(column_name)
        init_data: typing.Dict[str, typing.Any] = {}
        for input_name in self.input_schemas.keys():
            if input_name in ["table", "column_name", self.module_input_name]:
                continue
            init_data[input_name] = inputs.get_value_obj(input_name)
        result_list = map_with_module(
            input_array,
            module_input_name=self.module_input_name,
            module_obj=self.child_module,
            init_data=init_data,
            module_output_name=self.module_output_name,
        )
        outputs.set_value("array", pa.array(result_list))
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/table/__init__.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    inputs: typing.Dict[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ] = {
        "table": {
            "type": "table",
            "doc": "The table to use as input.",
        },
        "column_name": {
            "type": "string",
            "doc": "The name of the table column to run the mapping operation on.",
        },
    }
    for input_name, schema in self.child_module.input_schemas.items():
        assert input_name != "table"
        assert input_name != "column_name"
        if input_name == self.module_input_name:
            continue
        inputs[input_name] = schema
    return inputs
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/table/__init__.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    outputs = {
        "array": {
            "type": "array",
            "doc": "An array of equal length to the input array, containing the 'mapped' values.",
        }
    }
    return outputs
module_instance_doc(self)
¶
    Return documentation for this instance of the module.
If not overwritten, will return this class' method doc().
Source code in core/table/__init__.py
          def module_instance_doc(self) -> str:
    config: MapColumnsModuleConfig = self.config  # type: ignore
    module_type = config.module_type
    module_config = config.module_config
    m = self._kiara.create_module(
        id="map_column_child", module_type=module_type, module_config=module_config
    )
    type_md = m.get_type_metadata()
    doc = type_md.documentation.full_doc
    link = type_md.context.get_url_for_reference("module_doc")
    if not link:
        link_str = f"``{module_type}``"
    else:
        link_str = f"[``{module_type}``]({link})"
    result = f"""Map the values of the rows of an input table onto a new array of the same length, using the {link_str} module."""
    if doc and doc != "-- n/a --":
        result = result + f"\n\n``{module_type}`` documentation:\n\n{doc}"
    return result
        
MapColumnsModuleConfig            (ModuleTypeConfigSchema)
        
  
      pydantic-model
  
¶
    Source code in core/table/__init__.py
          class MapColumnsModuleConfig(ModuleTypeConfigSchema):
    module_type: str = Field(
        description="The name of the kiara module to use to filter the input data."
    )
    module_config: typing.Optional[typing.Dict[str, typing.Any]] = Field(
        description="The config for the kiara filter module.", default_factory=dict
    )
    input_name: typing.Optional[str] = Field(
        description="The name of the input name of the module which will receive the rows from our input table. Can be omitted if the configured module only has a single input.",
        default=None,
    )
    output_name: typing.Optional[str] = Field(
        description="The name of the output name of the module which will receive the items from our input array. Can be omitted if the configured module only has a single output.",
        default=None,
    )
input_name: str
  
      pydantic-field
  
¶
    The name of the input name of the module which will receive the rows from our input table. Can be omitted if the configured module only has a single input.
module_config: Dict[str, Any]
  
      pydantic-field
  
¶
    The config for the kiara filter module.
module_type: str
  
      pydantic-field
      required
  
¶
    The name of the kiara module to use to filter the input data.
output_name: str
  
      pydantic-field
  
¶
    The name of the output name of the module which will receive the items from our input array. Can be omitted if the configured module only has a single output.
        
MergeTableModule            (KiaraModule)
        
¶
    Create a table from other tables and/or arrays.
Source code in core/table/__init__.py
          class MergeTableModule(KiaraModule):
    """Create a table from other tables and/or arrays."""
    _module_type_name = "merge"
    _config_cls = MergeTableModuleConfig
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        input_schema_dict = self.get_config_value("input_schema")
        return input_schema_dict
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        outputs = {
            "table": {
                "type": "table",
                "doc": "The merged table, including all source tables and columns.",
            }
        }
        return outputs
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        import pyarrow as pa
        input_schema: typing.Dict[str, typing.Any] = self.get_config_value(
            "input_schema"
        )
        sources = {}
        for field_name in input_schema.keys():
            sources[field_name] = inputs.get_value_data(field_name)
        len_dict = {}
        arrays = []
        column_names = []
        for source_key, table_or_column in sources.items():
            if isinstance(table_or_column, pa.Table):
                rows = table_or_column.num_rows
                for name in table_or_column.schema.names:
                    column = table_or_column.column(name)
                    arrays.append(column)
                    column_names.append(name)
            elif isinstance(table_or_column, (pa.Array, pa.ChunkedArray)):
                rows = len(table_or_column)
                arrays.append(table_or_column)
                column_names.append(source_key)
            else:
                raise KiaraProcessingException(
                    f"Can't merge table: invalid type '{type(table_or_column)}' for source '{source_key}'."
                )
            len_dict[source_key] = rows
        all_rows = None
        for source_key, rows in len_dict.items():
            if all_rows is None:
                all_rows = rows
            else:
                if all_rows != rows:
                    all_rows = None
                    break
        if all_rows is None:
            len_str = ""
            for name, rows in len_dict.items():
                len_str = f" {name} ({rows})"
            raise KiaraProcessingException(
                f"Can't merge table, sources have different lengths:{len_str}"
            )
        table = pa.Table.from_arrays(arrays=arrays, names=column_names)
        outputs.set_value("table", table)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/table/__init__.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    input_schema_dict = self.get_config_value("input_schema")
    return input_schema_dict
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/table/__init__.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    outputs = {
        "table": {
            "type": "table",
            "doc": "The merged table, including all source tables and columns.",
        }
    }
    return outputs
        
MergeTableModuleConfig            (ModuleTypeConfigSchema)
        
  
      pydantic-model
  
¶
    Source code in core/table/__init__.py
          class MergeTableModuleConfig(ModuleTypeConfigSchema):
    input_schema: typing.Dict[str, typing.Any] = Field(
        description="A dict describing the inputs for this merge process."
    )
input_schema: Dict[str, Any]
  
      pydantic-field
      required
  
¶
    A dict describing the inputs for this merge process.
        
SampleTableModule            (SampleValueModule)
        
¶
    Sample a table.
Samples are used to randomly select a subset of a dataset, which helps test queries and workflows on smaller versions of the original data, to adjust parameters before a full run.
Source code in core/table/__init__.py
          class SampleTableModule(SampleValueModule):
    """Sample a table.
    Samples are used to randomly select a subset of a dataset, which helps test queries and workflows on smaller versions
    of the original data, to adjust parameters before a full run.
    """
    _module_type_name = "sample"
    @classmethod
    def get_value_type(cls) -> str:
        return "table"
    # def create_input_schema(
    #     self,
    # ) -> typing.Mapping[
    #     str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    # ]:
    #
    #     return {
    #         "table": {"type": "table", "doc": "The table to sample data from."},
    #         "sample_size": {
    #             "type": "integer",
    #             "doc": "The percentage or number of rows to sample (depending on 'sample_unit' input).",
    #         }
    #     }
    #
    # def create_output_schema(
    #     self,
    # ) -> typing.Mapping[
    #     str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    # ]:
    #
    #     return {"sampled_table": {"type": "table", "doc": "A sampled table."}}
    def sample_percent(self, value: Value, sample_size: int):
        import duckdb
        import pyarrow as pa
        table: pa.Table = value.get_value_data()
        if sample_size >= 100:
            return table
        query = f"SELECT * FROM data USING SAMPLE {sample_size} PERCENT (bernoulli);"
        rel_from_arrow = duckdb.arrow(table)
        result: duckdb.DuckDBPyResult = rel_from_arrow.query("data", query)
        result_table: pa.Table = result.fetch_arrow_table()
        return result_table
    def sample_rows(self, value: Value, sample_size: int):
        import duckdb
        import pyarrow as pa
        table: pa.Table = value.get_value_data()
        if sample_size >= len(table):
            return table
        query = f"SELECT * FROM data USING SAMPLE {sample_size};"
        rel_from_arrow = duckdb.arrow(table)
        result: duckdb.DuckDBPyResult = rel_from_arrow.query("data", query)
        result_table: pa.Table = result.fetch_arrow_table()
        return result_table
    def sample_rows_from_start(self, value: Value, sample_size: int):
        import pyarrow as pa
        table: pa.Table = value.get_value_data()
        if sample_size >= len(table):
            return table
        return table.slice(0, sample_size)
    def sample_rows_to_end(self, value: Value, sample_size: int):
        import pyarrow as pa
        table: pa.Table = value.get_value_data()
        if sample_size >= len(table):
            return table
        return table.slice(len(table) - sample_size)
get_value_type()
  
      classmethod
  
¶
    Return the value type for this sample module.
Source code in core/table/__init__.py
          @classmethod
def get_value_type(cls) -> str:
    return "table"
        
SaveArrowTableConfig            (StoreValueModuleConfig)
        
  
      pydantic-model
  
¶
    Source code in core/table/__init__.py
          class SaveArrowTableConfig(StoreValueModuleConfig):
    compression: str = Field(
        description="The compression to use when saving the table.", default="zstd"
    )
compression: str
  
      pydantic-field
  
¶
    The compression to use when saving the table.
        
StoreArrowTable            (StoreValueTypeModule)
        
¶
    Source code in core/table/__init__.py
          class StoreArrowTable(StoreValueTypeModule):
    _config_cls = SaveArrowTableConfig
    _module_type_name = "store"
    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return "table"
    def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:
        import pyarrow as pa
        from pyarrow import feather
        table: pa.Table = value.get_value_data()
        full_path: str = os.path.join(base_path, DEFAULT_SAVE_TABLE_FILE_NAME)
        if os.path.exists(full_path):
            raise KiaraProcessingException(
                f"Can't save table, file already exists: {full_path}"
            )
        os.makedirs(os.path.dirname(full_path), exist_ok=True)
        compression = self.get_config_value("compression")
        feather.write_feather(table, full_path, compression=compression)
        result = {
            "module_type": "table.load",
            "base_path_input_name": "base_path",
            "inputs": {
                "base_path": os.path.dirname(full_path),
                "rel_path": os.path.basename(full_path),
                "format": "feather",
            },
            "output_name": "table",
        }
        return result
store_value(self, value, base_path)
¶
    Save the value, and return the load config needed to load it again.
Source code in core/table/__init__.py
          def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:
    import pyarrow as pa
    from pyarrow import feather
    table: pa.Table = value.get_value_data()
    full_path: str = os.path.join(base_path, DEFAULT_SAVE_TABLE_FILE_NAME)
    if os.path.exists(full_path):
        raise KiaraProcessingException(
            f"Can't save table, file already exists: {full_path}"
        )
    os.makedirs(os.path.dirname(full_path), exist_ok=True)
    compression = self.get_config_value("compression")
    feather.write_feather(table, full_path, compression=compression)
    result = {
        "module_type": "table.load",
        "base_path_input_name": "base_path",
        "inputs": {
            "base_path": os.path.dirname(full_path),
            "rel_path": os.path.basename(full_path),
            "format": "feather",
        },
        "output_name": "table",
    }
    return result
        
TableConversionModuleConfig            (CreateValueModuleConfig)
        
  
      pydantic-model
  
¶
    Source code in core/table/__init__.py
          class TableConversionModuleConfig(CreateValueModuleConfig):
    ignore_errors: bool = Field(
        description="Whether to ignore convert errors and omit the failed items.",
        default=False,
    )
ignore_errors: bool
  
      pydantic-field
  
¶
    Whether to ignore convert errors and omit the failed items.
        
TableMetadataModule            (ExtractMetadataModule)
        
¶
    Extract metadata from a table object.
Source code in core/table/__init__.py
          class TableMetadataModule(ExtractMetadataModule):
    """Extract metadata from a table object."""
    _module_type_name = "metadata"
    @classmethod
    def _get_supported_types(cls) -> str:
        return "table"
    @classmethod
    def get_metadata_key(cls) -> str:
        return "table"
    def _get_metadata_schema(
        self, type: str
    ) -> typing.Union[str, typing.Type[BaseModel]]:
        return TableMetadata
    def extract_metadata(self, value: Value) -> typing.Mapping[str, typing.Any]:
        import pyarrow as pa
        table: pa.Table = value.get_value_data()
        table_schema = {}
        for name in table.schema.names:
            field = table.schema.field(name)
            md = field.metadata
            _type = field.type
            if not md:
                md = {
                    "arrow_type_id": _type.id,
                }
            _d = {
                "type_name": str(_type),
                "metadata": md,
            }
            table_schema[name] = _d
        return {
            "column_names": table.column_names,
            "column_schema": table_schema,
            "rows": table.num_rows,
            "size": table.nbytes,
        }
        filter
¶
    
        
CreateFilteredTableModule            (KiaraModule)
        
¶
    Filter a table using a mask array.
Source code in core/table/filter.py
          class CreateFilteredTableModule(KiaraModule):
    """Filter a table using a mask array."""
    _module_type_name = "with_mask"
    _config_cls = TableFilterModuleConfig
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        inputs = {
            "table": {"type": "table", "doc": "The table to filter."},
            "mask": {
                "type": "array",
                "doc": "An mask array of booleans of the same length as the table.",
            },
        }
        return inputs
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        outputs = {"table": {"type": "table", "doc": "The filtered table."}}
        return outputs
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        import pyarrow as pa
        input_table: pa.Table = inputs.get_value_data("table")
        filter_array: pa.Array = inputs.get_value_data("mask")
        filtered = input_table.filter(filter_array)
        outputs.set_value("table", filtered)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/table/filter.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    inputs = {
        "table": {"type": "table", "doc": "The table to filter."},
        "mask": {
            "type": "array",
            "doc": "An mask array of booleans of the same length as the table.",
        },
    }
    return inputs
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/table/filter.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    outputs = {"table": {"type": "table", "doc": "The filtered table."}}
    return outputs
        query
¶
    
        
QueryTableGraphQL            (KiaraModule)
        
¶
    Execute a graphql aggregation query against an (Arrow) table.
References
- https://vaex.io/docs/example_graphql.html
 
Examples:
An example for a query could be:
{
  df(where: {
    Language: {_eq: "German"}
  } ) {
    row(limit: 10) {
      Label
      City
    }
  }
}
Source code in core/table/query.py
          class QueryTableGraphQL(KiaraModule):
    """Execute a graphql aggregation query against an (Arrow) table.
    References:
        - https://vaex.io/docs/example_graphql.html
    Examples:
        An example for a query could be:
            {
              df(where: {
                Language: {_eq: "German"}
              } ) {
                row(limit: 10) {
                  Label
                  City
                }
              }
            }
    """
    _module_type_name = "graphql"
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        inputs: typing.Mapping[str, typing.Any] = {
            "table": {"type": "table", "doc": "The table to query."},
            "query": {"type": "string", "doc": "The query."},
        }
        return inputs
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        outputs: typing.Mapping[str, typing.Any] = {
            "query_result": {"type": "dict", "doc": "The query result."}
        }
        return outputs
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        import vaex
        table = inputs.get_value_data("table")
        query = inputs.get_value_data("query")
        df = vaex.from_arrow_table(table)
        result = df.graphql.execute(query)
        outputs.set_value("query_result", result.to_dict()["data"])
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/table/query.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    inputs: typing.Mapping[str, typing.Any] = {
        "table": {"type": "table", "doc": "The table to query."},
        "query": {"type": "string", "doc": "The query."},
    }
    return inputs
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/table/query.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    outputs: typing.Mapping[str, typing.Any] = {
        "query_result": {"type": "dict", "doc": "The query result."}
    }
    return outputs
        
QueryTableSQL            (KiaraModule)
        
¶
    Execute a sql query against an (Arrow) table.
Source code in core/table/query.py
          class QueryTableSQL(KiaraModule):
    """Execute a sql query against an (Arrow) table."""
    _module_type_name = "sql"
    _config_cls = QueryTableSQLModuleConfig
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        inputs = {
            "table": {
                "type": "table",
                "doc": "The table to query",
            }
        }
        if self.get_config_value("query") is None:
            inputs["query"] = {"type": "string", "doc": "The query."}
            inputs["relation_name"] = {
                "type": "string",
                "doc": "The name the table is referred to in the sql query.",
                "default": "data",
            }
        return inputs
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        return {"query_result": {"type": "table", "doc": "The query result."}}
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        import duckdb
        if self.get_config_value("query") is None:
            _query: str = inputs.get_value_data("query")
            _relation_name: str = inputs.get_value_data("relation_name")
        else:
            _query = self.get_config_value("query")
            _relation_name = self.get_config_value("relation_name")
        if _relation_name.upper() in RESERVED_SQL_KEYWORDS:
            raise KiaraProcessingException(
                f"Invalid relation name '{_relation_name}': this is a reserved sql keyword, please select a different name."
            )
        _table = inputs.get_value_data("table")
        rel_from_arrow = duckdb.arrow(_table)
        result: duckdb.DuckDBPyResult = rel_from_arrow.query(_relation_name, _query)
        outputs.set_value("query_result", result.fetch_arrow_table())
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/table/query.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    inputs = {
        "table": {
            "type": "table",
            "doc": "The table to query",
        }
    }
    if self.get_config_value("query") is None:
        inputs["query"] = {"type": "string", "doc": "The query."}
        inputs["relation_name"] = {
            "type": "string",
            "doc": "The name the table is referred to in the sql query.",
            "default": "data",
        }
    return inputs
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/table/query.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    return {"query_result": {"type": "table", "doc": "The query result."}}
        
QueryTableSQLModuleConfig            (ModuleTypeConfigSchema)
        
  
      pydantic-model
  
¶
    Source code in core/table/query.py
          class QueryTableSQLModuleConfig(ModuleTypeConfigSchema):
    query: typing.Optional[str] = Field(
        description="The query to execute. If not specified, the user will be able to provide their own.",
        default=None,
    )
    relation_name: typing.Optional[str] = Field(
        description="The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.",
        default="data",
    )
        utils
¶
    
create_sqlite_schema_data_from_arrow_table(table, column_map=None, index_columns=None, extra_column_info=None)
¶
    Create a sql schema statement from an Arrow table object.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
table | 
        pa.Table | 
        the Arrow table object  | 
        required | 
column_map | 
        Optional[Mapping[str, str]] | 
        a map that contains column names that should be changed in the new table  | 
        None | 
      
index_columns | 
        Optional[Iterable[str]] | 
        a list of column names (after mapping) to create module_indexes for  | 
        None | 
      
extra_column_info | 
        Optional[Mapping[str, Iterable[str]]] | 
        a list of extra schema instructions per column name (after mapping)  | 
        None | 
      
Source code in core/table/utils.py
          def create_sqlite_schema_data_from_arrow_table(
    table: "pa.Table",
    column_map: typing.Optional[typing.Mapping[str, str]] = None,
    index_columns: typing.Optional[typing.Iterable[str]] = None,
    extra_column_info: typing.Optional[
        typing.Mapping[str, typing.Iterable[str]]
    ] = None,
) -> SqliteTableSchema:
    """Create a sql schema statement from an Arrow table object.
    Arguments:
        table: the Arrow table object
        column_map: a map that contains column names that should be changed in the new table
        index_columns: a list of column names (after mapping) to create module_indexes for
        extra_column_info: a list of extra schema instructions per column name (after mapping)
    """
    columns = convert_arrow_column_types_to_sqlite(table=table)
    if column_map is None:
        column_map = {}
    if extra_column_info is None:
        extra_column_info = {}
    temp: typing.Dict[str, typing.Dict[str, typing.Any]] = {}
    if index_columns is None:
        index_columns = []
    for cn, data in columns.items():
        if cn in column_map.keys():
            new_key = column_map[cn]
        else:
            new_key = cn
        temp_data = dict(data)
        if new_key in extra_column_info.keys():
            temp_data["extra_column_info"] = extra_column_info[new_key]
        else:
            temp_data["extra_column_info"] = [""]
        if cn in index_columns:
            temp_data["create_index"] = True
        temp[new_key] = temp_data
    columns = temp
    if not columns:
        raise Exception("Resulting table schema has no columns.")
    else:
        for ic in index_columns:
            if ic not in columns.keys():
                raise Exception(
                    f"Can't create schema, requested index column name not available: {ic}"
                )
    return SqliteTableSchema(columns=columns, column_map=column_map)
        value
¶
    
        
DataProfilerModule            (KiaraModule)
        
¶
    Generate a data profile report for a dataset.
This uses the DataProfiler Python library, check out its documentation for more details.
Source code in core/value.py
          class DataProfilerModule(KiaraModule):
    """Generate a data profile report for a dataset.
    This uses the [DataProfiler](https://capitalone.github.io/DataProfiler/docs/0.7.0/html/index.html) Python library,
    check out its documentation for more details.
    """
    _module_type_name = "data_profile"
    _config_cls = DataProfilerModuleConfig
    @classmethod
    def retrieve_module_profiles(
        cls, kiara: "Kiara"
    ) -> typing.Mapping[str, typing.Union[typing.Mapping[str, typing.Any], Operation]]:
        supported_source_types = ["table", "file"]
        doc = cls.get_type_metadata().documentation
        all_profiles = {}
        for sup_type in supported_source_types:
            op_config = {
                "module_type": cls._module_type_id,  # type: ignore
                "module_config": {"value_type": sup_type},
                "doc": doc,
            }
            all_profiles[f"profile.{sup_type}.data"] = op_config
        return all_profiles
    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        inputs: typing.Mapping[str, typing.Mapping[str, typing.Any]] = {
            "item": {
                "type": self.get_config_value("value_type"),
                "doc": f"The {self.get_config_value('value_type')} to profile.",
            }
        }
        return inputs
    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:
        outputs: typing.Mapping[str, typing.Mapping[str, typing.Any]] = {
            "report": {"type": "dict", "doc": "Statistics/details about the dataset."}
        }
        return outputs
    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:
        import pyarrow as pa
        from dataprofiler import Data, Profiler, ProfilerOptions, set_verbosity
        set_verbosity(logging.WARNING)
        value_type = self.get_config_value("value_type")
        profile_options = ProfilerOptions()
        profile_options.structured_options.data_labeler.is_enabled = False
        profile_options.unstructured_options.data_labeler.is_enabled = False
        if value_type == "table":
            table_item: pa.Table = inputs.get_value_data("item")
            pd = table_item.to_pandas()
            profile = Profiler(
                pd, options=profile_options
            )  # Calculate Statistics, Entity Recognition, etc
            report = profile.report()
        elif value_type == "file":
            file_item: KiaraFile = inputs.get_value_data("item")
            data = Data(file_item.path)
            profile = Profiler(data, options=profile_options)
            report = profile.report()
        else:
            raise KiaraProcessingException(
                f"Data profiling of value type '{value_type}' not supported."
            )
        outputs.set_value("report", report)
create_input_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[input_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this input]",
          "optional*': [boolean whether this input is optional or required (defaults to 'False')]
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/value.py
          def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    inputs: typing.Mapping[str, typing.Mapping[str, typing.Any]] = {
        "item": {
            "type": self.get_config_value("value_type"),
            "doc": f"The {self.get_config_value('value_type')} to profile.",
        }
    }
    return inputs
create_output_schema(self)
¶
    Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
      "[output_field_name]: {
          "type": "[value_type]",
          "doc*": "[a description of this output]"
      "[other_input_field_name]: {
          "type: ...
          ...
      }
Source code in core/value.py
          def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    outputs: typing.Mapping[str, typing.Mapping[str, typing.Any]] = {
        "report": {"type": "dict", "doc": "Statistics/details about the dataset."}
    }
    return outputs
retrieve_module_profiles(kiara)
  
      classmethod
  
¶
    Retrieve a collection of profiles (pre-set module configs) for this kiara module type.
This is used to automatically create generally useful operations (incl. their ids).
Source code in core/value.py
          @classmethod
def retrieve_module_profiles(
    cls, kiara: "Kiara"
) -> typing.Mapping[str, typing.Union[typing.Mapping[str, typing.Any], Operation]]:
    supported_source_types = ["table", "file"]
    doc = cls.get_type_metadata().documentation
    all_profiles = {}
    for sup_type in supported_source_types:
        op_config = {
            "module_type": cls._module_type_id,  # type: ignore
            "module_config": {"value_type": sup_type},
            "doc": doc,
        }
        all_profiles[f"profile.{sup_type}.data"] = op_config
    return all_profiles
        
DataProfilerModuleConfig            (ModuleTypeConfigSchema)
        
  
      pydantic-model
  
¶
    Source code in core/value.py
          class DataProfilerModuleConfig(ModuleTypeConfigSchema):
    value_type: str = Field(description="The value type to profile.")
value_type: str
  
      pydantic-field
      required
  
¶
    The value type to profile.
        value_types
¶
    This module contains the value type classes that are used in the kiara_modules.core package.
        
ArrayType            (AnyType)
        
¶
    An Apache arrow array.
Source code in core/value_types.py
          class ArrayType(AnyType):
    """An Apache arrow array."""
    _value_type_name = "array"
    @classmethod
    def backing_python_type(cls) -> typing.Type:
        import pyarrow as pa
        return pa.Array
    @classmethod
    def candidate_python_types(cls) -> typing.Optional[typing.Iterable[typing.Type]]:
        import pyarrow as pa
        return [pa.ChunkedArray, pa.Table]
    def parse_value(self, value: typing.Any) -> typing.Any:
        import pyarrow as pa
        if isinstance(value, pa.Table):
            if len(value.columns) != 1:
                raise Exception(
                    f"Invalid type, only Arrow Arrays or single-column Tables allowed. This value is a table with {len(value.columns)} columns."
                )
            return value.column(0)
    def validate(cls, value: typing.Any) -> None:
        import pyarrow as pa
        if isinstance(value, pa.ChunkedArray):
            return value
        else:
            raise Exception(
                f"invalid type '{type(value).__name__}', must be '{pa.Array.__name__}'."
            )
    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:
        max_rows = print_config.get("max_no_rows")
        max_row_height = print_config.get("max_row_height")
        max_cell_length = print_config.get("max_cell_length")
        half_lines: typing.Optional[int] = None
        if max_rows:
            half_lines = int(max_rows / 2)
        array = value.get_value_data()
        import pyarrow as pa
        temp_table = pa.Table.from_arrays(arrays=[array], names=["array"])
        atw = ArrowTabularWrap(temp_table)
        result = [
            atw.pretty_print(
                rows_head=half_lines,
                rows_tail=half_lines,
                max_row_height=max_row_height,
                max_cell_length=max_cell_length,
            )
        ]
        return result
parse_value(self, value)
¶
    Parse a value into a supported python type.
This exists to make it easier to do trivial conversions (e.g. from a date string to a datetime object). If you choose to overwrite this method, make 100% sure that you don't change the meaning of the value, and try to avoid adding or removing information from the data (e.g. by changing the resolution of a date).
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
v | 
        the value  | 
        required | 
Returns:
| Type | Description | 
|---|---|
Any | 
      'None', if no parsing was done and the original value should be used, otherwise return the parsed Python object  | 
    
Source code in core/value_types.py
          def parse_value(self, value: typing.Any) -> typing.Any:
    import pyarrow as pa
    if isinstance(value, pa.Table):
        if len(value.columns) != 1:
            raise Exception(
                f"Invalid type, only Arrow Arrays or single-column Tables allowed. This value is a table with {len(value.columns)} columns."
            )
        return value.column(0)
validate(cls, value)
¶
    Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).
Source code in core/value_types.py
          def validate(cls, value: typing.Any) -> None:
    import pyarrow as pa
    if isinstance(value, pa.ChunkedArray):
        return value
    else:
        raise Exception(
            f"invalid type '{type(value).__name__}', must be '{pa.Array.__name__}'."
        )
        
BooleanType            (AnyType)
        
¶
    A boolean.
Source code in core/value_types.py
          class BooleanType(AnyType):
    "A boolean."
    _value_type_name = "boolean"
    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return bool
    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
        return str(hash(value))
    def validate(cls, value: typing.Any):
        if not isinstance(value, bool):
            # if isinstance(v, str):
            #     if v.lower() in ["true", "yes"]:
            #         v = True
            #     elif v.lower() in ["false", "no"]:
            #         v = False
            #     else:
            #         raise ValueError(f"Can't parse string into boolean: {v}")
            # else:
            raise ValueError(f"Invalid type '{type(value)}' for boolean: {value}")
    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:
        data = value.get_value_data()
        return [str(data)]
calculate_value_hash(value, hash_type)
  
      classmethod
  
¶
    Calculate the hash of this value.
If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.
Source code in core/value_types.py
          @classmethod
def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
    return str(hash(value))
validate(cls, value)
¶
    Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).
Source code in core/value_types.py
          def validate(cls, value: typing.Any):
    if not isinstance(value, bool):
        # if isinstance(v, str):
        #     if v.lower() in ["true", "yes"]:
        #         v = True
        #     elif v.lower() in ["false", "no"]:
        #         v = False
        #     else:
        #         raise ValueError(f"Can't parse string into boolean: {v}")
        # else:
        raise ValueError(f"Invalid type '{type(value)}' for boolean: {value}")
        
BytesType            (AnyType)
        
¶
    An array of bytes.
Source code in core/value_types.py
          class BytesType(AnyType):
    """An array of bytes."""
    _value_type_name = "bytes"
    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return bytes
    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
        return str(hash(value))
    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:
        data: bytes = value.get_value_data()
        return [data.decode()]
    # @classmethod
    # def get_operations(
    #     cls,
    # ) -> typing.Mapping[str, typing.Mapping[str, typing.Mapping[str, typing.Any]]]:
    #
    #     return {
    #         "save_value": {
    #             "default": {
    #                 "module_type": "bytes.save",
    #                 "input_name": "bytes",
    #             }
    #         }
    #     }
calculate_value_hash(value, hash_type)
  
      classmethod
  
¶
    Calculate the hash of this value.
If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.
Source code in core/value_types.py
          @classmethod
def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
    return str(hash(value))
        
DatabaseType            (ComplexModelType)
        
¶
    A database, containing one or several tables.
This is backed by sqlite databases.
Source code in core/value_types.py
          class DatabaseType(ComplexModelType[KiaraDatabase]):
    """A database, containing one or several tables.
    This is backed by sqlite databases.
    """
    _value_type_name = "database"
    @classmethod
    def backing_model_type(self) -> typing.Type[KiaraDatabase]:
        return KiaraDatabase
    @classmethod
    def candidate_python_types(cls) -> typing.Optional[typing.Iterable[typing.Type]]:
        return [KiaraDatabase, str]
    def parse_value(self, value: typing.Any) -> typing.Any:
        if isinstance(value, str):
            # TODO: check path exists
            return KiaraDatabase(db_file_path=value)
        return value
    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:
        max_rows = print_config.get("max_no_rows")
        max_row_height = print_config.get("max_row_height")
        max_cell_length = print_config.get("max_cell_length")
        half_lines: typing.Optional[int] = None
        if max_rows:
            half_lines = int(max_rows / 2)
        db: KiaraDatabase = value.get_value_data()
        from sqlalchemy import inspect
        inspector = inspect(db.get_sqlalchemy_engine())
        result: typing.List[typing.Any] = [""]
        for table_name in inspector.get_table_names():
            atw = SqliteTabularWrap(db=db, table_name=table_name)
            pretty = atw.pretty_print(
                rows_head=half_lines,
                rows_tail=half_lines,
                max_row_height=max_row_height,
                max_cell_length=max_cell_length,
            )
            result.append(f"[b]Table[/b]: [i]{table_name}[/i]")
            result.append(pretty)
        return result
parse_value(self, value)
¶
    Parse a value into a supported python type.
This exists to make it easier to do trivial conversions (e.g. from a date string to a datetime object). If you choose to overwrite this method, make 100% sure that you don't change the meaning of the value, and try to avoid adding or removing information from the data (e.g. by changing the resolution of a date).
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
v | 
        the value  | 
        required | 
Returns:
| Type | Description | 
|---|---|
Any | 
      'None', if no parsing was done and the original value should be used, otherwise return the parsed Python object  | 
    
Source code in core/value_types.py
          def parse_value(self, value: typing.Any) -> typing.Any:
    if isinstance(value, str):
        # TODO: check path exists
        return KiaraDatabase(db_file_path=value)
    return value
        
DateType            (AnyType)
        
¶
    A date.
Internally, this will always be represented as a Python datetime object. Iff provided as input, it can also
be as string, in which case the dateutils.parser.parse method will be used to parse the string into a datetime object.
Source code in core/value_types.py
          class DateType(AnyType):
    """A date.
    Internally, this will always be represented as a Python ``datetime`` object. Iff provided as input, it can also
    be as string, in which case the [``dateutils.parser.parse``](https://dateutil.readthedocs.io/en/stable/parser.html#dateutil.parser.parse) method will be used to parse the string into a datetime object.
    """
    _value_type_name = "date"
    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return datetime.datetime
    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_typpe: str) -> str:
        return str(hash(value))
    def parse_value(self, v: typing.Any) -> typing.Any:
        from dateutil import parser
        if isinstance(v, str):
            d = parser.parse(v)
            return d
        elif isinstance(v, datetime.date):
            _d = datetime.datetime(year=v.year, month=v.month, day=v.day)
            return _d
        return None
    def validate(cls, value: typing.Any):
        assert isinstance(value, datetime.datetime)
    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:
        data = value.get_value_data()
        return [str(data)]
calculate_value_hash(value, hash_typpe)
  
      classmethod
  
¶
    Calculate the hash of this value.
If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.
Source code in core/value_types.py
          @classmethod
def calculate_value_hash(cls, value: typing.Any, hash_typpe: str) -> str:
    return str(hash(value))
parse_value(self, v)
¶
    Parse a value into a supported python type.
This exists to make it easier to do trivial conversions (e.g. from a date string to a datetime object). If you choose to overwrite this method, make 100% sure that you don't change the meaning of the value, and try to avoid adding or removing information from the data (e.g. by changing the resolution of a date).
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
v | 
        Any | 
        the value  | 
        required | 
Returns:
| Type | Description | 
|---|---|
Any | 
      'None', if no parsing was done and the original value should be used, otherwise return the parsed Python object  | 
    
Source code in core/value_types.py
          def parse_value(self, v: typing.Any) -> typing.Any:
    from dateutil import parser
    if isinstance(v, str):
        d = parser.parse(v)
        return d
    elif isinstance(v, datetime.date):
        _d = datetime.datetime(year=v.year, month=v.month, day=v.day)
        return _d
    return None
validate(cls, value)
¶
    Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).
Source code in core/value_types.py
          def validate(cls, value: typing.Any):
    assert isinstance(value, datetime.datetime)
        
DictType            (AnyType)
        
¶
    A dict-like object.
Source code in core/value_types.py
          class DictType(AnyType):
    """A dict-like object."""
    _value_type_name = "dict"
    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return dict
    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
        from deepdiff import DeepHash
        dh = DeepHash(value)
        return str(dh[value])
    def validate(cls, value: typing.Any) -> None:
        if not isinstance(value, typing.Mapping):
            raise ValueError(f"Invalid type '{type(value)}', not a mapping.")
    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:
        data = value.get_value_data()
        return [pprint.pformat(data)]
calculate_value_hash(value, hash_type)
  
      classmethod
  
¶
    Calculate the hash of this value.
If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.
Source code in core/value_types.py
          @classmethod
def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
    from deepdiff import DeepHash
    dh = DeepHash(value)
    return str(dh[value])
validate(cls, value)
¶
    Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).
Source code in core/value_types.py
          def validate(cls, value: typing.Any) -> None:
    if not isinstance(value, typing.Mapping):
        raise ValueError(f"Invalid type '{type(value)}', not a mapping.")
        
FileBundleType            (AnyType)
        
¶
    A representation of a set of files (folder, archive, etc.).
It is recommended to 'onboard' files before working with them, otherwise metadata consistency can not be guaranteed.
Source code in core/value_types.py
          class FileBundleType(AnyType):
    """A representation of a set of files (folder, archive, etc.).
    It is recommended to 'onboard' files before working with them, otherwise metadata consistency can not be guaranteed.
    """
    _value_type_name = "file_bundle"
    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return KiaraFileBundle
    @classmethod
    def candidate_python_types(cls) -> typing.Optional[typing.Iterable[typing.Type]]:
        return [KiaraFileBundle]
    @classmethod
    def get_supported_hash_types(cls) -> typing.Iterable[str]:
        return ["sha3_256"]
    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
        assert hash_type == "sha3_256"
        assert isinstance(value, KiaraFileBundle)
        return value.file_bundle_hash
    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:
        max_no_included_files = print_config.get("max_no_files", 40)
        data: KiaraFileBundle = value.get_value_data()
        pretty = data.dict(exclude={"included_files"})
        files = list(data.included_files.keys())
        if max_no_included_files >= 0:
            if len(files) > max_no_included_files:
                half = int((max_no_included_files - 1) / 2)
                head = files[0:half]
                tail = files[-1 * half :]  # noqa
                files = (
                    head
                    + ["..... output skipped .....", "..... output skipped ....."]
                    + tail
                )
        pretty["included_files"] = files
        return [json.dumps(pretty, indent=2)]
        # return [data.json(indent=2)]
calculate_value_hash(value, hash_type)
  
      classmethod
  
¶
    Calculate the hash of this value.
If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.
Source code in core/value_types.py
          @classmethod
def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
    assert hash_type == "sha3_256"
    assert isinstance(value, KiaraFileBundle)
    return value.file_bundle_hash
        
FileType            (AnyType)
        
¶
    A representation of a file.
It is recommended to 'onboard' files before working with them, otherwise metadata consistency can not be guaranteed.
Source code in core/value_types.py
          class FileType(AnyType):
    """A representation of a file.
    It is recommended to 'onboard' files before working with them, otherwise metadata consistency can not be guaranteed.
    """
    _value_type_name = "file"
    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return KiaraFile
    @classmethod
    def candidate_python_types(cls) -> typing.Optional[typing.Iterable[typing.Type]]:
        return [KiaraFile]
    @classmethod
    def get_supported_hash_types(cls) -> typing.Iterable[str]:
        return ["sha3_256"]
    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
        assert hash_type == "sha3_256"
        assert isinstance(value, KiaraFile)
        return value.file_hash
    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:
        data: KiaraFile = value.get_value_data()
        max_lines = print_config.get("max_lines", 34)
        try:
            lines = []
            with open(data.path, "r") as f:
                for idx, l in enumerate(f):
                    if idx > max_lines:
                        lines.append("...\n")
                        lines.append("...")
                        break
                    lines.append(l)
            # TODO: syntax highlighting
            return ["".join(lines)]
        except UnicodeDecodeError:
            # found non-text data
            return [
                "Binary file or non-utf8 enconding, not printing content...",
                "",
                "[b]File metadata:[/b]",
                "",
                data.json(indent=2),
            ]
calculate_value_hash(value, hash_type)
  
      classmethod
  
¶
    Calculate the hash of this value.
If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.
Source code in core/value_types.py
          @classmethod
def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
    assert hash_type == "sha3_256"
    assert isinstance(value, KiaraFile)
    return value.file_hash
        
FloatType            (AnyType)
        
¶
    A float.
Source code in core/value_types.py
          class FloatType(AnyType):
    "A float."
    _value_type_name = "float"
    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return float
    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
        return str(hash(value))
    def validate(cls, value: typing.Any) -> typing.Any:
        if not isinstance(value, float):
            raise ValueError(f"Invalid type '{type(value)}' for float: {value}")
    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:
        data = value.get_value_data()
        return [str(data)]
calculate_value_hash(value, hash_type)
  
      classmethod
  
¶
    Calculate the hash of this value.
If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.
Source code in core/value_types.py
          @classmethod
def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
    return str(hash(value))
validate(cls, value)
¶
    Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).
Source code in core/value_types.py
          def validate(cls, value: typing.Any) -> typing.Any:
    if not isinstance(value, float):
        raise ValueError(f"Invalid type '{type(value)}' for float: {value}")
        
IntegerType            (AnyType)
        
¶
    An integer.
Source code in core/value_types.py
          class IntegerType(AnyType):
    """An integer."""
    _value_type_name = "integer"
    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return int
    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
        return str(hash(value))
    def validate(cls, value: typing.Any) -> None:
        if not isinstance(value, int):
            #     if isinstance(v, str):
            #         try:
            #             v = int(v)
            #         except Exception:
            #             raise ValueError(f"Can't parse string into integer: {v}")
            # else:
            raise ValueError(f"Invalid type '{type(value)}' for integer: {value}")
    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:
        data = value.get_value_data()
        return [str(data)]
calculate_value_hash(value, hash_type)
  
      classmethod
  
¶
    Calculate the hash of this value.
If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.
Source code in core/value_types.py
          @classmethod
def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
    return str(hash(value))
validate(cls, value)
¶
    Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).
Source code in core/value_types.py
          def validate(cls, value: typing.Any) -> None:
    if not isinstance(value, int):
        #     if isinstance(v, str):
        #         try:
        #             v = int(v)
        #         except Exception:
        #             raise ValueError(f"Can't parse string into integer: {v}")
        # else:
        raise ValueError(f"Invalid type '{type(value)}' for integer: {value}")
        
ListType            (AnyType)
        
¶
    A list-like object.
Source code in core/value_types.py
          class ListType(AnyType):
    """A list-like object."""
    _value_type_name = "list"
    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return list
    @classmethod
    def calculate_value_hash(self, value: typing.Any, hash_type: str) -> str:
        from deepdiff import DeepHash
        dh = DeepHash(value)
        return str(dh[value])
    def validate(cls, value: typing.Any) -> None:
        assert isinstance(value, typing.Iterable)
    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:
        data = value.get_value_data()
        return [pprint.pformat(data)]
calculate_value_hash(value, hash_type)
  
      classmethod
  
¶
    Calculate the hash of this value.
If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.
Source code in core/value_types.py
          @classmethod
def calculate_value_hash(self, value: typing.Any, hash_type: str) -> str:
    from deepdiff import DeepHash
    dh = DeepHash(value)
    return str(dh[value])
validate(cls, value)
¶
    Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).
Source code in core/value_types.py
          def validate(cls, value: typing.Any) -> None:
    assert isinstance(value, typing.Iterable)
        
RenderablesType            (ValueType)
        
¶
    A list of renderable objects, used in the 'rich' Python library, to print to the terminal or in Jupyter.
Internally, the result list items can be either a string, a 'rich.console.ConsoleRenderable', or a 'rich.console.RichCast'.
Source code in core/value_types.py
          class RenderablesType(ValueType[object, ValueTypeConfigSchema]):
    """A list of renderable objects, used in the 'rich' Python library, to print to the terminal or in Jupyter.
    Internally, the result list items can be either a string, a 'rich.console.ConsoleRenderable', or a 'rich.console.RichCast'.
    """
    _value_type_name = "renderables"
    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return object
    @classmethod
    def candidate_python_types(cls) -> typing.Optional[typing.Iterable[typing.Type]]:
        return [str, ConsoleRenderable, RichCast]
    @classmethod
    def type_config_cls(cls) -> typing.Type[ValueTypeConfigSchema]:
        return ValueTypeConfigSchema
        
StringType            (AnyType)
        
¶
    A string.
Source code in core/value_types.py
          class StringType(AnyType):
    """A string."""
    _value_type_name = "string"
    @classmethod
    def backing_python_type(cls) -> typing.Type:
        return str
    @classmethod
    def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
        return str(hash(value))
    def validate(cls, value: typing.Any) -> None:
        if not isinstance(value, str):
            raise ValueError(f"Invalid type '{type(value)}': string required")
    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:
        data = value.get_value_data()
        return [data]
calculate_value_hash(value, hash_type)
  
      classmethod
  
¶
    Calculate the hash of this value.
If a hash can't be calculated, or the calculation of a type is not implemented (yet), this will return None.
Source code in core/value_types.py
          @classmethod
def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
    return str(hash(value))
validate(cls, value)
¶
    Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).
Source code in core/value_types.py
          def validate(cls, value: typing.Any) -> None:
    if not isinstance(value, str):
        raise ValueError(f"Invalid type '{type(value)}': string required")
        
TableType            (AnyType)
        
¶
    A table.
Internally, this is backed by the Apache Arrow Table class.
Source code in core/value_types.py
          class TableType(AnyType):
    """A table.
    Internally, this is backed by the [Apache Arrow](https://arrow.apache.org) [``Table``](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html) class.
    """
    _value_type_name = "table"
    @classmethod
    def backing_python_type(cls) -> typing.Type:
        import pyarrow as pa
        return pa.Table
    @classmethod
    def candidate_python_types(cls) -> typing.Optional[typing.Iterable[typing.Type]]:
        import pyarrow as pa
        return [pa.Table]
    @classmethod
    def check_data(cls, data: typing.Any) -> typing.Optional["ValueType"]:
        import pyarrow as pa
        if isinstance(data, pa.Table):
            return TableType()
        return None
    # @classmethod
    # def get_supported_hash_types(cls) -> typing.Iterable[str]:
    #
    #     return ["pandas_df_hash"]
    #
    # @classmethod
    # def calculate_value_hash(cls, value: typing.Any, hash_type: str) -> str:
    #
    #     import pyarrow as pa
    #
    #     # this is only for testing, and will be replaced with a native arrow table hush function, once I figure out how to do that efficiently
    #     table: pa.Table = value
    #     from pandas.util import hash_pandas_object
    #
    #     hash_result = hash_pandas_object(table.to_pandas()).sum()
    #     return str(hash_result)
    def validate(cls, value: typing.Any) -> None:
        import pyarrow as pa
        if not isinstance(value, pa.Table):
            raise Exception(
                f"invalid type '{type(value).__name__}', must be '{pa.Table.__name__}'."
            )
    def pretty_print_as_renderables(
        self, value: "Value", print_config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:
        max_rows = print_config.get("max_no_rows")
        max_row_height = print_config.get("max_row_height")
        max_cell_length = print_config.get("max_cell_length")
        half_lines: typing.Optional[int] = None
        if max_rows:
            half_lines = int(max_rows / 2)
        atw = ArrowTabularWrap(value.get_value_data())
        result = [
            atw.pretty_print(
                rows_head=half_lines,
                rows_tail=half_lines,
                max_row_height=max_row_height,
                max_cell_length=max_cell_length,
            )
        ]
        return result
check_data(data)
  
      classmethod
  
¶
    Check whether the provided input matches this value type.
If it does, return a ValueType object (with the appropriate type configuration).
Source code in core/value_types.py
          @classmethod
def check_data(cls, data: typing.Any) -> typing.Optional["ValueType"]:
    import pyarrow as pa
    if isinstance(data, pa.Table):
        return TableType()
    return None
validate(cls, value)
¶
    Validate the value. This expects an instance of the defined Python class (from 'backing_python_type).
Source code in core/value_types.py
          def validate(cls, value: typing.Any) -> None:
    import pyarrow as pa
    if not isinstance(value, pa.Table):
        raise Exception(
            f"invalid type '{type(value).__name__}', must be '{pa.Table.__name__}'."
        )
        yaml
¶
    
        
ToYamlModuleOld            (OldTypeConversionModule)
        
¶
    Convert arbitrary types into YAML format.
Early days for this module, it doesn't support a whole lot of types yet.
Source code in core/yaml.py
          class ToYamlModuleOld(OldTypeConversionModule):
    """Convert arbitrary types into YAML format.
    Early days for this module, it doesn't support a whole lot of types yet.
    """
    _module_type_name = "to_yaml"
    @classmethod
    def _get_supported_source_types(self) -> typing.Union[typing.Iterable[str], str]:
        return YAML_SUPPORTED_SOURCE_TYPES
    @classmethod
    def _get_target_types(self) -> typing.Union[typing.Iterable[str], str]:
        return ["yaml"]
    def convert(
        self, value: Value, config: typing.Mapping[str, typing.Any]
    ) -> typing.Any:
        input_value: typing.Any = value.get_value_data()
        input_value_str = convert_to_yaml(
            self._kiara, data=input_value, convert_config=config
        )
        return input_value_str