Skip to content

kiara.module

KiaraModule

The base class that every custom module in Kiara needs to inherit from.

The core of every KiaraModule is a process method, which should be a 'pure', idempotent function that creates one or several output values from the given input(s), and its purpose is to transfor a set of inputs into a set of outputs.

Every module can be configured. The module configuration schema can differ, but every one such configuration needs to subclass the ModuleTypeConfigSchema class and set as the value to the _config_cls attribute of the module class. This is useful, because it allows for some modules to serve a much larger variety of use-cases than non-configurable modules would be, which would mean more code duplication because of very simlilar, but slightly different module types.

Each module class (type) has a unique -- within a kiara context -- module type id which can be accessed via the _module_type_id class attribute.

Examples:

A simple example would be an 'addition' module, with a and b configured as inputs, and z as the output field name.

An implementing class would look something like this:

TODO

Parameters:

Name Type Description Default
id str

the id for this module (needs to be unique within a pipeline)

required
parent_id Optional[str]

the id of the parent, in case this module is part of a pipeline

required
module_config Any

the configuation for this module

required
metadata Mapping[str, Any]

metadata for this module (not implemented yet)

required

config: ~KIARA_CONFIG property readonly

Retrieve the configuration object for this module.

Returns:

Type Description
~KIARA_CONFIG

the module-class-specific config object

full_id: str property readonly

The full id for this module.

id: str property readonly

The id of this module.

This is only unique within a pipeline.

info: KiaraModuleInstanceMetadata property readonly

Return an info wrapper class for this module.

input_names: Iterable[str] property readonly

A list of input field names for this module.

input_schemas: Mapping[str, kiara.data.values.ValueSchema] property readonly

The input schema for this module.

module_instance_doc: str property readonly

Return documentation for this instance of the module.

If not overwritten, will return this class' method doc().

module_instance_hash: int property readonly

Return this modules 'module_hash'.

If two module instances module_instance_hash values are the same, it is guaranteed that their process methods will return the same output, given the same inputs (except if that processing step uses randomness). It can also be assumed that the two instances have the same input and output fields, with the same schemas.

Note

This implementation is preliminary, since it's not yet 100% clear to me how much that will be needed, and in which situations. Also, module versioning needs to be implemented before this can work reliably. Also, for now it is assumed that a module configuration is not changed once set, this also might change in the future

Returns:

Type Description
int

this modules 'module_instance_hash'

output_names: Iterable[str] property readonly

A list of output field names for this module.

output_schemas: Mapping[str, kiara.data.values.ValueSchema] property readonly

The output schema for this module.

parent_id: Optional[str] property readonly

The id of the parent of this module (if part of a pipeline).

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in kiara/module.py
@abstractmethod
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    """Abstract method to implement by child classes, returns a description of the input schema of this module.

    If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

    ```
        {
          "[input_field_name]: {
              "type": "[value_type]",
              "doc*": "[a description of this input]",
              "optional*': [boolean whether this input is optional or required (defaults to 'False')]
          "[other_input_field_name]: {
              "type: ...
              ...
          }
          ```
    """

create_instance(module_type=None, module_config=None, kiara=None) classmethod

Create an instance of a kiara module.

This class method is overloaded in a way that you can either provide the module_type argument, in which case the relevant sub-class will be queried from the kiara context, or you can call this method directly on any of the inehreting sub-classes. You can't do both, though.

Parameters:

Name Type Description Default
module_type Optional[str]

must be None if called on the KiaraModule base class, otherwise the module or operation id

None
module_config Optional[Mapping[str, Any]]

the configuration of the module instance

None
kiara Optional[Kiara]

the kiara context

None
Source code in kiara/module.py
@classmethod
def create_instance(
    cls,
    module_type: typing.Optional[str] = None,
    module_config: typing.Optional[typing.Mapping[str, typing.Any]] = None,
    kiara: typing.Optional["Kiara"] = None,
) -> "KiaraModule":
    """Create an instance of a *kiara* module.

    This class method is overloaded in a way that you can either provide the `module_type` argument, in which case
    the relevant sub-class will be queried from the *kiara* context, or you can call this method directly on any of the
    inehreting sub-classes. You can't do both, though.

    Arguments:
        module_type: must be None if called on the ``KiaraModule`` base class, otherwise the module or operation id
        module_config: the configuration of the module instance
        kiara: the *kiara* context
    """

    if cls == KiaraModule:
        if not module_type:
            raise Exception(
                "This method must be either called on a subclass of KiaraModule, not KiaraModule itself, or it needs the 'module_type' argument specified."
            )
    else:
        if module_type:
            raise Exception(
                "This method must be either called without the 'module_type' argument specified, or on a subclass of the KiaraModule class, but not both."
            )

    if cls == KiaraModule:
        assert module_type is not None
        module_conf = ModuleConfig.create_module_config(
            config=module_type, module_config=module_config, kiara=kiara
        )
    else:
        module_conf = ModuleConfig.create_module_config(
            config=cls, module_config=module_config, kiara=kiara
        )

    return module_conf.create_module(kiara=kiara)

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in kiara/module.py
@abstractmethod
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
    """Abstract method to implement by child classes, returns a description of the output schema of this module.

    If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

    ```
        {
          "[output_field_name]: {
              "type": "[value_type]",
              "doc*": "[a description of this output]"
          "[other_input_field_name]: {
              "type: ...
              ...
          }
        ```
    """

get_config_value(self, key)

Retrieve the value for a specific configuration option.

Parameters:

Name Type Description Default
key str

the config key

required

Returns:

Type Description
Any

the value for the provided key

Source code in kiara/module.py
def get_config_value(self, key: str) -> typing.Any:
    """Retrieve the value for a specific configuration option.

    Arguments:
        key: the config key

    Returns:
        the value for the provided key
    """

    try:
        return self.config.get(key)
    except Exception:
        raise Exception(
            f"Error accessing config value '{key}' in module {self.__class__._module_type_id}."  # type: ignore
        )

get_type_metadata() classmethod

Return all metadata associated with this module type.

Source code in kiara/module.py
@classmethod
def get_type_metadata(cls) -> KiaraModuleTypeMetadata:
    """Return all metadata associated with this module type."""

    return KiaraModuleTypeMetadata.from_module_class(cls)

is_pipeline() classmethod

Check whether this module type is a pipeline, or not.

Source code in kiara/module.py
@classmethod
def is_pipeline(cls) -> bool:
    """Check whether this module type is a pipeline, or not."""
    return False

process_step(self, inputs, outputs, job_log)

Kick off processing for a specific set of input/outputs.

This method calls the implemented [process][kiara.module.KiaraModule.process] method of the inheriting class, as well as wrapping input/output-data related functionality.

Parameters:

Name Type Description Default
inputs ValueSet

the input value set

required
outputs ValueSet

the output value set

required
Source code in kiara/module.py
def process_step(
    self, inputs: ValueSet, outputs: ValueSet, job_log: JobLog
) -> None:
    """Kick off processing for a specific set of input/outputs.

    This method calls the implemented [process][kiara.module.KiaraModule.process] method of the inheriting class,
    as well as wrapping input/output-data related functionality.

    Arguments:
        inputs: the input value set
        outputs: the output value set
    """

    signature = inspect.signature(self.process)  # type: ignore

    if "job_log" not in signature.parameters.keys():

        try:
            self.process(inputs=inputs, outputs=outputs)  # type: ignore
        except Exception as e:
            if is_debug():
                try:
                    import traceback

                    traceback.print_exc()
                except Exception:
                    pass
            raise e

    else:

        try:
            self.process(inputs=inputs, outputs=outputs, job_log=job_log)  # type: ignore
        except Exception as e:
            if is_debug():
                try:
                    import traceback

                    traceback.print_exc()
                except Exception:
                    pass
            raise e

retrieve_module_profiles(kiara) classmethod

Retrieve a collection of profiles (pre-set module configs) for this kiara module type.

This is used to automatically create generally useful operations (incl. their ids).

Source code in kiara/module.py
@classmethod
def retrieve_module_profiles(
    cls, kiara: "Kiara"
) -> typing.Mapping[str, typing.Union[typing.Mapping[str, typing.Any], Operation]]:
    """Retrieve a collection of profiles (pre-set module configs) for this *kiara* module type.

    This is used to automatically create generally useful operations (incl. their ids).
    """

run(self, _attach_lineage=True, **inputs)

Execute the module with the provided inputs directly.

Parameters:

Name Type Description Default
inputs Any

a map of the input values (as described by the input schema

{}

Returns:

Type Description
ValueSet

a map of the output values (as described by the output schema)

Source code in kiara/module.py
def run(self, _attach_lineage: bool = True, **inputs: typing.Any) -> ValueSet:
    """Execute the module with the provided inputs directly.

    Arguments:
        inputs: a map of the input values (as described by the input schema
    Returns:
        a map of the output values (as described by the output schema)
    """

    resolved_inputs = self.create_full_inputs(**inputs)

    # TODO: introduce a 'temp' value set implementation and use that here
    input_value_set = SlottedValueSet.from_schemas(
        kiara=self._kiara,
        schemas=self.full_input_schemas,
        read_only=True,
        initial_values=resolved_inputs,
        title=f"module_inputs_{self.id}",
    )

    if not input_value_set.items_are_valid():

        invalid_details = input_value_set.check_invalid()
        raise Exception(
            f"Can't process module '{self._module_type_name}', input field(s) not valid: {', '.join(invalid_details.keys())}"  # type: ignore
        )

    output_value_set = SlottedValueSet.from_schemas(
        kiara=self._kiara,
        schemas=self.output_schemas,
        read_only=False,
        title=f"{self._module_type_name}_module_outputs_{self.id}",  # type: ignore
        default_value=SpecialValue.NOT_SET,
    )

    self.process(inputs=input_value_set, outputs=output_value_set)  # type: ignore

    result_outputs: typing.MutableMapping[str, Value] = {}
    if _attach_lineage:
        input_infos = {k: v.get_info() for k, v in resolved_inputs.items()}
        for field_name, output in output_value_set.items():
            value_lineage = ValueLineage.from_module_and_inputs(
                module=self, output_name=field_name, inputs=input_infos
            )
            # value_lineage = None
            output_val = self._kiara.data_registry.register_data(
                value_data=output, lineage=value_lineage
            )
            result_outputs[field_name] = output_val
    else:
        result_outputs = output_value_set

    result_set = SlottedValueSet.from_schemas(
        kiara=self._kiara,
        schemas=self.output_schemas,
        read_only=True,
        initial_values=result_outputs,
        title=f"{self._module_type_name}_module_outputs_{self.id}",  # type: ignore
    )

    return result_set

    # result = output_value_set.get_all_value_objects()
    # return output_value_set
    # return ValueSetImpl(items=result, read_only=True)

StepInputs

Wrapper class to hold a set of inputs for a pipeline processing step.

This is necessary because we can't assume the processing will be done on the same machine (or in the same process) as the pipeline controller. By disconnecting the value from the processing code, we can react appropriately to those circumstances.

Parameters:

Name Type Description Default
inputs ValueSet

the input values of a pipeline step

required

get_all_field_names(self)

All field names included in this ValueSet.

Source code in kiara/module.py
def get_all_field_names(self) -> typing.Iterable[str]:
    """All field names included in this ValueSet."""

    return self._inputs.keys()

StepOutputs

Wrapper class to hold a set of outputs for a pipeline processing step.

This is necessary because we can't assume the processing will be done on the same machine (or in the same process) as the pipeline controller. By disconnecting the value from the processing code, we can react appropriately to those circumstances.

Internally, this class stores two sets of its values: the 'actual', up-to-date values, and the referenced (original) ones that were used when creating an object of this class. It's not a good idea to keep both synced all the time, because that could potentially involve unnecessary data transfer and I/O.

Also, in some cases a developer might want to avoid events that could be triggered by a changed value.

Both value sets can be synced manually using the 'sync()' method.

Parameters:

Name Type Description Default
outputs ValueSet

the output values of a pipeline step

required

get_all_field_names(self)

All field names included in this ValueSet.

Source code in kiara/module.py
def get_all_field_names(self) -> typing.Iterable[str]:
    """All field names included in this ValueSet."""

    return self._outputs.get_all_field_names()

sync(self, lineage=None, **metadata)

Sync this value sets 'shadow' values with the ones a user would retrieve.

Source code in kiara/module.py
def sync(
    self, lineage: typing.Optional[ValueLineage] = None, **metadata: MetadataModel
):
    """Sync this value sets 'shadow' values with the ones a user would retrieve."""

    self._outputs.set_values(lineage=lineage, metadata=metadata, **self._outputs_staging)  # type: ignore
    self._outputs_staging.clear()  # type: ignore