kiara.module¶
KiaraModule
¶
The base class that every custom module in Kiara needs to inherit from.
The core of every KiaraModule
is a process
method, which should be a 'pure',
idempotent function that creates one or several output values from the given input(s), and its purpose is to transfor
a set of inputs into a set of outputs.
Every module can be configured. The module configuration schema can differ, but every one such configuration needs to
subclass the ModuleTypeConfigSchema class and set as the value to the
_config_cls
attribute of the module class. This is useful, because it allows for some modules to serve a much
larger variety of use-cases than non-configurable modules would be, which would mean more code duplication because
of very simlilar, but slightly different module types.
Each module class (type) has a unique -- within a kiara context -- module type id which can be accessed via the
_module_type_id
class attribute.
Examples:
A simple example would be an 'addition' module, with a
and b
configured as inputs, and z
as the output field name.
An implementing class would look something like this:
TODO
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id |
str |
the id for this module (needs to be unique within a pipeline) |
required |
parent_id |
Optional[str] |
the id of the parent, in case this module is part of a pipeline |
required |
module_config |
Any |
the configuation for this module |
required |
metadata |
Mapping[str, Any] |
metadata for this module (not implemented yet) |
required |
config: ~KIARA_CONFIG
property
readonly
¶
Retrieve the configuration object for this module.
Returns:
Type | Description |
---|---|
~KIARA_CONFIG |
the module-class-specific config object |
full_id: str
property
readonly
¶
The full id for this module.
id: str
property
readonly
¶
The id of this module.
This is only unique within a pipeline.
info: KiaraModuleInstanceMetadata
property
readonly
¶
Return an info wrapper class for this module.
input_names: Iterable[str]
property
readonly
¶
A list of input field names for this module.
input_schemas: Mapping[str, kiara.data.values.ValueSchema]
property
readonly
¶
The input schema for this module.
module_instance_doc: str
property
readonly
¶
Return documentation for this instance of the module.
If not overwritten, will return this class' method doc()
.
module_instance_hash: int
property
readonly
¶
Return this modules 'module_hash'.
If two module instances module_instance_hash
values are the same, it is guaranteed that their process
methods will
return the same output, given the same inputs (except if that processing step uses randomness). It can also be
assumed that the two instances have the same input and output fields, with the same schemas.
Note
This implementation is preliminary, since it's not yet 100% clear to me how much that will be needed, and in which situations. Also, module versioning needs to be implemented before this can work reliably. Also, for now it is assumed that a module configuration is not changed once set, this also might change in the future
Returns:
Type | Description |
---|---|
int |
this modules 'module_instance_hash' |
output_names: Iterable[str]
property
readonly
¶
A list of output field names for this module.
output_schemas: Mapping[str, kiara.data.values.ValueSchema]
property
readonly
¶
The output schema for this module.
parent_id: Optional[str]
property
readonly
¶
The id of the parent of this module (if part of a pipeline).
create_input_schema(self)
¶
Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
"[input_field_name]: {
"type": "[value_type]",
"doc*": "[a description of this input]",
"optional*': [boolean whether this input is optional or required (defaults to 'False')]
"[other_input_field_name]: {
"type: ...
...
}
Source code in kiara/module.py
@abstractmethod
def create_input_schema(
self,
) -> typing.Mapping[
str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
"""Abstract method to implement by child classes, returns a description of the input schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
```
{
"[input_field_name]: {
"type": "[value_type]",
"doc*": "[a description of this input]",
"optional*': [boolean whether this input is optional or required (defaults to 'False')]
"[other_input_field_name]: {
"type: ...
...
}
```
"""
create_instance(module_type=None, module_config=None, kiara=None)
classmethod
¶
Create an instance of a kiara module.
This class method is overloaded in a way that you can either provide the module_type
argument, in which case
the relevant sub-class will be queried from the kiara context, or you can call this method directly on any of the
inehreting sub-classes. You can't do both, though.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
module_type |
Optional[str] |
must be None if called on the |
None |
module_config |
Optional[Mapping[str, Any]] |
the configuration of the module instance |
None |
kiara |
Optional[Kiara] |
the kiara context |
None |
Source code in kiara/module.py
@classmethod
def create_instance(
cls,
module_type: typing.Optional[str] = None,
module_config: typing.Optional[typing.Mapping[str, typing.Any]] = None,
kiara: typing.Optional["Kiara"] = None,
) -> "KiaraModule":
"""Create an instance of a *kiara* module.
This class method is overloaded in a way that you can either provide the `module_type` argument, in which case
the relevant sub-class will be queried from the *kiara* context, or you can call this method directly on any of the
inehreting sub-classes. You can't do both, though.
Arguments:
module_type: must be None if called on the ``KiaraModule`` base class, otherwise the module or operation id
module_config: the configuration of the module instance
kiara: the *kiara* context
"""
if cls == KiaraModule:
if not module_type:
raise Exception(
"This method must be either called on a subclass of KiaraModule, not KiaraModule itself, or it needs the 'module_type' argument specified."
)
else:
if module_type:
raise Exception(
"This method must be either called without the 'module_type' argument specified, or on a subclass of the KiaraModule class, but not both."
)
if cls == KiaraModule:
assert module_type is not None
module_conf = ModuleConfig.create_module_config(
config=module_type, module_config=module_config, kiara=kiara
)
else:
module_conf = ModuleConfig.create_module_config(
config=cls, module_config=module_config, kiara=kiara
)
return module_conf.create_module(kiara=kiara)
create_output_schema(self)
¶
Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
{
"[output_field_name]: {
"type": "[value_type]",
"doc*": "[a description of this output]"
"[other_input_field_name]: {
"type: ...
...
}
Source code in kiara/module.py
@abstractmethod
def create_output_schema(
self,
) -> typing.Mapping[
str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:
"""Abstract method to implement by child classes, returns a description of the output schema of this module.
If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):
```
{
"[output_field_name]: {
"type": "[value_type]",
"doc*": "[a description of this output]"
"[other_input_field_name]: {
"type: ...
...
}
```
"""
get_config_value(self, key)
¶
Retrieve the value for a specific configuration option.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
str |
the config key |
required |
Returns:
Type | Description |
---|---|
Any |
the value for the provided key |
Source code in kiara/module.py
def get_config_value(self, key: str) -> typing.Any:
"""Retrieve the value for a specific configuration option.
Arguments:
key: the config key
Returns:
the value for the provided key
"""
try:
return self.config.get(key)
except Exception:
raise Exception(
f"Error accessing config value '{key}' in module {self.__class__._module_type_id}." # type: ignore
)
get_type_metadata()
classmethod
¶
Return all metadata associated with this module type.
Source code in kiara/module.py
@classmethod
def get_type_metadata(cls) -> KiaraModuleTypeMetadata:
"""Return all metadata associated with this module type."""
return KiaraModuleTypeMetadata.from_module_class(cls)
is_pipeline()
classmethod
¶
Check whether this module type is a pipeline, or not.
Source code in kiara/module.py
@classmethod
def is_pipeline(cls) -> bool:
"""Check whether this module type is a pipeline, or not."""
return False
process_step(self, inputs, outputs, job_log)
¶
Kick off processing for a specific set of input/outputs.
This method calls the implemented [process][kiara.module.KiaraModule.process] method of the inheriting class, as well as wrapping input/output-data related functionality.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs |
ValueSet |
the input value set |
required |
outputs |
ValueSet |
the output value set |
required |
Source code in kiara/module.py
def process_step(
self, inputs: ValueSet, outputs: ValueSet, job_log: JobLog
) -> None:
"""Kick off processing for a specific set of input/outputs.
This method calls the implemented [process][kiara.module.KiaraModule.process] method of the inheriting class,
as well as wrapping input/output-data related functionality.
Arguments:
inputs: the input value set
outputs: the output value set
"""
signature = inspect.signature(self.process) # type: ignore
if "job_log" not in signature.parameters.keys():
try:
self.process(inputs=inputs, outputs=outputs) # type: ignore
except Exception as e:
if is_debug():
try:
import traceback
traceback.print_exc()
except Exception:
pass
raise e
else:
try:
self.process(inputs=inputs, outputs=outputs, job_log=job_log) # type: ignore
except Exception as e:
if is_debug():
try:
import traceback
traceback.print_exc()
except Exception:
pass
raise e
retrieve_module_profiles(kiara)
classmethod
¶
Retrieve a collection of profiles (pre-set module configs) for this kiara module type.
This is used to automatically create generally useful operations (incl. their ids).
Source code in kiara/module.py
@classmethod
def retrieve_module_profiles(
cls, kiara: "Kiara"
) -> typing.Mapping[str, typing.Union[typing.Mapping[str, typing.Any], Operation]]:
"""Retrieve a collection of profiles (pre-set module configs) for this *kiara* module type.
This is used to automatically create generally useful operations (incl. their ids).
"""
run(self, _attach_lineage=True, **inputs)
¶
Execute the module with the provided inputs directly.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs |
Any |
a map of the input values (as described by the input schema |
{} |
Returns:
Type | Description |
---|---|
ValueSet |
a map of the output values (as described by the output schema) |
Source code in kiara/module.py
def run(self, _attach_lineage: bool = True, **inputs: typing.Any) -> ValueSet:
"""Execute the module with the provided inputs directly.
Arguments:
inputs: a map of the input values (as described by the input schema
Returns:
a map of the output values (as described by the output schema)
"""
resolved_inputs = self.create_full_inputs(**inputs)
# TODO: introduce a 'temp' value set implementation and use that here
input_value_set = SlottedValueSet.from_schemas(
kiara=self._kiara,
schemas=self.full_input_schemas,
read_only=True,
initial_values=resolved_inputs,
title=f"module_inputs_{self.id}",
)
if not input_value_set.items_are_valid():
invalid_details = input_value_set.check_invalid()
raise Exception(
f"Can't process module '{self._module_type_name}', input field(s) not valid: {', '.join(invalid_details.keys())}" # type: ignore
)
output_value_set = SlottedValueSet.from_schemas(
kiara=self._kiara,
schemas=self.output_schemas,
read_only=False,
title=f"{self._module_type_name}_module_outputs_{self.id}", # type: ignore
default_value=SpecialValue.NOT_SET,
)
self.process(inputs=input_value_set, outputs=output_value_set) # type: ignore
result_outputs: typing.MutableMapping[str, Value] = {}
if _attach_lineage:
input_infos = {k: v.get_info() for k, v in resolved_inputs.items()}
for field_name, output in output_value_set.items():
value_lineage = ValueLineage.from_module_and_inputs(
module=self, output_name=field_name, inputs=input_infos
)
# value_lineage = None
output_val = self._kiara.data_registry.register_data(
value_data=output, lineage=value_lineage
)
result_outputs[field_name] = output_val
else:
result_outputs = output_value_set
result_set = SlottedValueSet.from_schemas(
kiara=self._kiara,
schemas=self.output_schemas,
read_only=True,
initial_values=result_outputs,
title=f"{self._module_type_name}_module_outputs_{self.id}", # type: ignore
)
return result_set
# result = output_value_set.get_all_value_objects()
# return output_value_set
# return ValueSetImpl(items=result, read_only=True)
StepInputs
¶
Wrapper class to hold a set of inputs for a pipeline processing step.
This is necessary because we can't assume the processing will be done on the same machine (or in the same process) as the pipeline controller. By disconnecting the value from the processing code, we can react appropriately to those circumstances.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
inputs |
ValueSet |
the input values of a pipeline step |
required |
get_all_field_names(self)
¶
All field names included in this ValueSet.
Source code in kiara/module.py
def get_all_field_names(self) -> typing.Iterable[str]:
"""All field names included in this ValueSet."""
return self._inputs.keys()
StepOutputs
¶
Wrapper class to hold a set of outputs for a pipeline processing step.
This is necessary because we can't assume the processing will be done on the same machine (or in the same process) as the pipeline controller. By disconnecting the value from the processing code, we can react appropriately to those circumstances.
Internally, this class stores two sets of its values: the 'actual', up-to-date values, and the referenced (original) ones that were used when creating an object of this class. It's not a good idea to keep both synced all the time, because that could potentially involve unnecessary data transfer and I/O.
Also, in some cases a developer might want to avoid events that could be triggered by a changed value.
Both value sets can be synced manually using the 'sync()' method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
outputs |
ValueSet |
the output values of a pipeline step |
required |
get_all_field_names(self)
¶
All field names included in this ValueSet.
Source code in kiara/module.py
def get_all_field_names(self) -> typing.Iterable[str]:
"""All field names included in this ValueSet."""
return self._outputs.get_all_field_names()
sync(self, lineage=None, **metadata)
¶
Sync this value sets 'shadow' values with the ones a user would retrieve.
Source code in kiara/module.py
def sync(
self, lineage: typing.Optional[ValueLineage] = None, **metadata: MetadataModel
):
"""Sync this value sets 'shadow' values with the ones a user would retrieve."""
self._outputs.set_values(lineage=lineage, metadata=metadata, **self._outputs_staging) # type: ignore
self._outputs_staging.clear() # type: ignore