pipeline
Classes¶
PipelineConfig (KiaraModuleConfig)
pydantic-model
¶
A class to hold the configuration for a [PipelineModule][kiara.pipeline.module.PipelineModule].
If you want to control the pipeline input and output names, you need to have to provide a map that uses the autogenerated field name ([step_id]__[alias] -- 2 underscores!!) as key, and the desired field name as value. The reason that schema for the autogenerated field names exist is that it's hard to ensure the uniqueness of each field; some steps can have the same input field names, but will need different input values. In some cases, some inputs of different steps need the same input. Those sorts of things. So, to make sure that we always use the right values, I chose to implement a conservative default approach, accepting that in some cases the user will be prompted for duplicate inputs for the same value.
To remedy that, the pipeline creator has the option to manually specify a mapping to rename some or all of the input/output fields.
Further, because in a lot of cases there won't be any overlapping fields, the creator can specify auto
,
in which case Kiara will automatically create a mapping that tries to map autogenerated field names
to the shortest possible names for each case.
Examples:
Configuration for a pipeline module that functions as a nand
logic gate (in Python):
and_step = PipelineStepConfig(module_type="and", step_id="and")
not_step = PipelineStepConfig(module_type="not", step_id="not", input_links={"a": ["and.y"]}
nand_p_conf = PipelineConfig(doc="Returns 'False' if both inputs are 'True'.",
steps=[and_step, not_step],
input_aliases={
"and__a": "a",
"and__b": "b"
},
output_aliases={
"not__y": "y"
}}
Or, the same thing in json:
{
"module_type_name": "nand",
"doc": "Returns 'False' if both inputs are 'True'.",
"steps": [
{
"module_type": "and",
"step_id": "and"
},
{
"module_type": "not",
"step_id": "not",
"input_links": {
"a": "and.y"
}
}
],
"input_aliases": {
"and__a": "a",
"and__b": "b"
},
"output_aliases": {
"not__y": "y"
}
}
Source code in kiara/models/module/pipeline/__init__.py
class PipelineConfig(KiaraModuleConfig):
"""A class to hold the configuration for a [PipelineModule][kiara.pipeline.module.PipelineModule].
If you want to control the pipeline input and output names, you need to have to provide a map that uses the
autogenerated field name ([step_id]__[alias] -- 2 underscores!!) as key, and the desired field name
as value. The reason that schema for the autogenerated field names exist is that it's hard to ensure
the uniqueness of each field; some steps can have the same input field names, but will need different input
values. In some cases, some inputs of different steps need the same input. Those sorts of things.
So, to make sure that we always use the right values, I chose to implement a conservative default approach,
accepting that in some cases the user will be prompted for duplicate inputs for the same value.
To remedy that, the pipeline creator has the option to manually specify a mapping to rename some or all of
the input/output fields.
Further, because in a lot of cases there won't be any overlapping fields, the creator can specify ``auto``,
in which case *Kiara* will automatically create a mapping that tries to map autogenerated field names
to the shortest possible names for each case.
Examples:
Configuration for a pipeline module that functions as a ``nand`` logic gate (in Python):
``` python
and_step = PipelineStepConfig(module_type="and", step_id="and")
not_step = PipelineStepConfig(module_type="not", step_id="not", input_links={"a": ["and.y"]}
nand_p_conf = PipelineConfig(doc="Returns 'False' if both inputs are 'True'.",
steps=[and_step, not_step],
input_aliases={
"and__a": "a",
"and__b": "b"
},
output_aliases={
"not__y": "y"
}}
```
Or, the same thing in json:
``` json
{
"module_type_name": "nand",
"doc": "Returns 'False' if both inputs are 'True'.",
"steps": [
{
"module_type": "and",
"step_id": "and"
},
{
"module_type": "not",
"step_id": "not",
"input_links": {
"a": "and.y"
}
}
],
"input_aliases": {
"and__a": "a",
"and__b": "b"
},
"output_aliases": {
"not__y": "y"
}
}
```
"""
_kiara_model_id = "instance.module_config.pipeline"
@classmethod
def from_file(
cls,
path: str,
kiara: Optional["Kiara"] = None,
# module_map: Optional[Mapping[str, Any]] = None,
):
data = get_data_from_file(path)
pipeline_name = data.pop("pipeline_name", None)
if pipeline_name is None:
pipeline_name = os.path.basename(path)
return cls.from_config(pipeline_name=pipeline_name, data=data, kiara=kiara)
@classmethod
def from_config(
cls,
pipeline_name: str,
data: Mapping[str, Any],
kiara: Optional["Kiara"] = None,
# module_map: Optional[Mapping[str, Any]] = None,
):
if kiara is None:
from kiara.context import Kiara
kiara = Kiara.instance()
if not kiara.operation_registry.is_initialized:
kiara.operation_registry.operations # noqa
config = cls._from_config(pipeline_name=pipeline_name, data=data, kiara=kiara)
return config
@classmethod
def _from_config(
cls,
pipeline_name: str,
data: Mapping[str, Any],
kiara: "Kiara",
module_map: Optional[Mapping[str, Any]] = None,
):
data = dict(data)
steps = data.pop("steps")
steps = PipelineStep.create_steps(*steps, kiara=kiara, module_map=module_map)
data["steps"] = steps
if not data.get("input_aliases"):
data["input_aliases"] = create_input_alias_map(steps)
if not data.get("output_aliases"):
data["output_aliases"] = create_output_alias_map(steps)
result = cls(pipeline_name=pipeline_name, **data)
return result
class Config:
extra = Extra.ignore
validate_assignment = True
pipeline_name: str = Field(description="The name of this pipeline.")
steps: List[PipelineStep] = Field(
description="A list of steps/modules of this pipeline, and their connections.",
)
input_aliases: Dict[str, str] = Field(
description="A map of input aliases, with the calculated (<step_id>__<input_name> -- double underscore!) name as key, and a string (the resulting workflow input alias) as value. Check the documentation for the config class for which marker strings can be used to automatically create this map if possible.",
)
output_aliases: Dict[str, str] = Field(
description="A map of output aliases, with the calculated (<step_id>__<output_name> -- double underscore!) name as key, and a string (the resulting workflow output alias) as value. Check the documentation for the config class for which marker strings can be used to automatically create this map if possible.",
)
doc: str = Field(
default="-- n/a --", description="Documentation about what the pipeline does."
)
context: Dict[str, Any] = Field(
default_factory=dict, description="Metadata for this workflow."
)
_structure: Optional["PipelineStructure"] = PrivateAttr(default=None)
@validator("steps", pre=True)
def _validate_steps(cls, v):
if not v:
raise ValueError(f"Invalid type for 'steps' value: {type(v)}")
steps = []
for step in v:
if not step:
raise ValueError("No step data provided.")
if isinstance(step, PipelineStep):
steps.append(step)
elif isinstance(step, Mapping):
steps.append(PipelineStep(**step))
else:
raise TypeError(step)
return steps
@property
def structure(self) -> "PipelineStructure":
if self._structure is not None:
return self._structure
from kiara.models.module.pipeline.structure import PipelineStructure
self._structure = PipelineStructure(pipeline_config=self)
return self._structure
def create_renderable(self, **config: Any) -> RenderableType:
return create_table_from_model_object(self, exclude_fields={"steps"})
# def create_input_alias_map(self) -> Dict[str, str]:
#
# aliases: Dict[str, List[str]] = {}
# for step in self.steps:
# field_names = step.module.input_names
# for field_name in field_names:
# aliases.setdefault(field_name, []).append(step.step_id)
#
# result: Dict[str, str] = {}
# for field_name, step_ids in aliases.items():
# for step_id in step_ids:
# generated = generate_pipeline_endpoint_name(step_id, field_name)
# result[generated] = generated
#
# return result
#
# def create_output_alias_map(self) -> Dict[str, str]:
#
# aliases: Dict[str, List[str]] = {}
# for step in self.steps:
# field_names = step.module.input_names
# for field_name in field_names:
# aliases.setdefault(field_name, []).append(step.step_id)
#
# result: Dict[str, str] = {}
# for field_name, step_ids in aliases.items():
# for step_id in step_ids:
# generated = generate_pipeline_endpoint_name(step_id, field_name)
# result[generated] = generated
#
# return result
Attributes¶
context: Dict[str, Any]
pydantic-field
¶
Metadata for this workflow.
doc: str
pydantic-field
¶
Documentation about what the pipeline does.
input_aliases: Dict[str, str]
pydantic-field
required
¶
A map of input aliases, with the calculated (
output_aliases: Dict[str, str]
pydantic-field
required
¶
A map of output aliases, with the calculated (
pipeline_name: str
pydantic-field
required
¶
The name of this pipeline.
steps: List[kiara.models.module.pipeline.PipelineStep]
pydantic-field
required
¶
A list of steps/modules of this pipeline, and their connections.
structure: PipelineStructure
property
readonly
¶
Config
¶
create_renderable(self, **config)
¶
Source code in kiara/models/module/pipeline/__init__.py
def create_renderable(self, **config: Any) -> RenderableType:
return create_table_from_model_object(self, exclude_fields={"steps"})
from_config(pipeline_name, data, kiara=None)
classmethod
¶
Source code in kiara/models/module/pipeline/__init__.py
@classmethod
def from_config(
cls,
pipeline_name: str,
data: Mapping[str, Any],
kiara: Optional["Kiara"] = None,
# module_map: Optional[Mapping[str, Any]] = None,
):
if kiara is None:
from kiara.context import Kiara
kiara = Kiara.instance()
if not kiara.operation_registry.is_initialized:
kiara.operation_registry.operations # noqa
config = cls._from_config(pipeline_name=pipeline_name, data=data, kiara=kiara)
return config
from_file(path, kiara=None)
classmethod
¶
Source code in kiara/models/module/pipeline/__init__.py
@classmethod
def from_file(
cls,
path: str,
kiara: Optional["Kiara"] = None,
# module_map: Optional[Mapping[str, Any]] = None,
):
data = get_data_from_file(path)
pipeline_name = data.pop("pipeline_name", None)
if pipeline_name is None:
pipeline_name = os.path.basename(path)
return cls.from_config(pipeline_name=pipeline_name, data=data, kiara=kiara)
PipelineStep (Manifest)
pydantic-model
¶
A step within a pipeline-structure, includes information about it's connection(s) and other metadata.
Source code in kiara/models/module/pipeline/__init__.py
class PipelineStep(Manifest):
"""A step within a pipeline-structure, includes information about it's connection(s) and other metadata."""
_kiara_model_id = "instance.pipeline_step"
class Config:
validate_assignment = True
extra = Extra.forbid
@classmethod
def create_steps(
cls,
*steps: Mapping[str, Any],
kiara: "Kiara",
module_map: Optional[Mapping[str, Any]] = None,
) -> List["PipelineStep"]:
if module_map is None:
module_map = {}
else:
module_map = dict(module_map)
if kiara.operation_registry.is_initialized:
for op_id, op in kiara.operation_registry.operations.items():
module_map[op_id] = {
"module_type": op.module_type,
"module_config": op.module_config,
}
result: List[PipelineStep] = []
for step in steps:
module_type = step.get("module_type", None)
if not module_type:
raise ValueError("Can't create step, no 'module_type' specified.")
module_config = step.get("module_config", {})
if module_type not in kiara.module_type_names:
if module_type in module_map.keys():
resolved_module_type = module_map[module_type]["module_type"]
resolved_module_config = module_map[module_type]["module_config"]
manifest = kiara.create_manifest(
module_or_operation=resolved_module_type,
config=resolved_module_config,
)
else:
raise Exception(f"Can't resolve module type: {module_type}")
else:
manifest = kiara.create_manifest(
module_or_operation=module_type, config=module_config
)
resolved_module_type = module_type
resolved_module_config = module_config
module = kiara.create_module(manifest=manifest)
step_id = step.get("step_id", None)
if not step_id:
raise ValueError("Can't create step, no 'step_id' specified.")
input_links = {}
for input_field, sources in step.get("input_links", {}).items():
if isinstance(sources, str):
sources = [sources]
input_links[input_field] = sources
# TODO: do we really need the deepcopy here?
_s = PipelineStep(
step_id=step_id,
module_type=resolved_module_type,
module_config=dict(resolved_module_config),
input_links=input_links, # type: ignore
module_details=KiaraModuleClass.from_module(module=module),
)
_s._module = module
result.append(_s)
return result
@validator("step_id")
def _validate_step_id(cls, v):
assert isinstance(v, str)
if "." in v:
raise ValueError("Step ids can't contain '.' characters.")
return v
step_id: str = Field(
description="Locally unique id (within a pipeline) of this step."
)
module_type: str = Field(description="The module type.")
module_config: Dict[str, Any] = Field(
description="The module config.", default_factory=dict
)
# required: bool = Field(
# description="Whether this step is required within the workflow.\n\nIn some cases, when none of the pipeline outputs have a required input that connects to a step, then it is not necessary for this step to have been executed, even if it is placed before a step in the execution hierarchy. This also means that the pipeline inputs that are connected to this step might not be required.",
# default=True,
# )
# processing_stage: Optional[int] = Field(
# default=None,
# description="The stage number this step is executed within the pipeline.",
# )
input_links: Mapping[str, List[StepValueAddress]] = Field(
description="The links that connect to inputs of the module.",
default_factory=list,
)
module_details: KiaraModuleClass = Field(
description="The class of the underlying module."
)
_module: Optional["KiaraModule"] = PrivateAttr(default=None)
@root_validator(pre=True)
def create_step_id(cls, values):
if "module_type" not in values:
raise ValueError("No 'module_type' specified.")
if "step_id" not in values or not values["step_id"]:
values["step_id"] = slugify(values["module_type"])
return values
@validator("step_id")
def ensure_valid_id(cls, v):
# TODO: check with regex
if "." in v or " " in v:
raise ValueError(
f"Step id can't contain special characters or whitespaces: {v}"
)
return v
@validator("module_config", pre=True)
def ensure_dict(cls, v):
if v is None:
v = {}
return v
@validator("input_links", pre=True)
def ensure_input_links_valid(cls, v):
if v is None:
v = {}
result = {}
for input_name, output in v.items():
input_links = ensure_step_value_addresses(
default_field_name=input_name, link=output
)
result[input_name] = input_links
return result
@property
def module(self) -> "KiaraModule":
if self._module is None:
m_cls = self.module_details.get_class()
self._module = m_cls(module_config=self.module_config)
return self._module
def __repr__(self):
return f"{self.__class__.__name__}(step_id={self.step_id} module_type={self.module_type})"
def __str__(self):
return f"step: {self.step_id} (module: {self.module_type})"
Attributes¶
input_links: Mapping[str, List[kiara.models.module.pipeline.value_refs.StepValueAddress]]
pydantic-field
¶
The links that connect to inputs of the module.
module: KiaraModule
property
readonly
¶
module_details: KiaraModuleClass
pydantic-field
required
¶
The class of the underlying module.
step_id: str
pydantic-field
required
¶
Locally unique id (within a pipeline) of this step.
Config
¶
create_step_id(values)
classmethod
¶
Source code in kiara/models/module/pipeline/__init__.py
@root_validator(pre=True)
def create_step_id(cls, values):
if "module_type" not in values:
raise ValueError("No 'module_type' specified.")
if "step_id" not in values or not values["step_id"]:
values["step_id"] = slugify(values["module_type"])
return values
create_steps(*steps, *, kiara, module_map=None)
classmethod
¶
Source code in kiara/models/module/pipeline/__init__.py
@classmethod
def create_steps(
cls,
*steps: Mapping[str, Any],
kiara: "Kiara",
module_map: Optional[Mapping[str, Any]] = None,
) -> List["PipelineStep"]:
if module_map is None:
module_map = {}
else:
module_map = dict(module_map)
if kiara.operation_registry.is_initialized:
for op_id, op in kiara.operation_registry.operations.items():
module_map[op_id] = {
"module_type": op.module_type,
"module_config": op.module_config,
}
result: List[PipelineStep] = []
for step in steps:
module_type = step.get("module_type", None)
if not module_type:
raise ValueError("Can't create step, no 'module_type' specified.")
module_config = step.get("module_config", {})
if module_type not in kiara.module_type_names:
if module_type in module_map.keys():
resolved_module_type = module_map[module_type]["module_type"]
resolved_module_config = module_map[module_type]["module_config"]
manifest = kiara.create_manifest(
module_or_operation=resolved_module_type,
config=resolved_module_config,
)
else:
raise Exception(f"Can't resolve module type: {module_type}")
else:
manifest = kiara.create_manifest(
module_or_operation=module_type, config=module_config
)
resolved_module_type = module_type
resolved_module_config = module_config
module = kiara.create_module(manifest=manifest)
step_id = step.get("step_id", None)
if not step_id:
raise ValueError("Can't create step, no 'step_id' specified.")
input_links = {}
for input_field, sources in step.get("input_links", {}).items():
if isinstance(sources, str):
sources = [sources]
input_links[input_field] = sources
# TODO: do we really need the deepcopy here?
_s = PipelineStep(
step_id=step_id,
module_type=resolved_module_type,
module_config=dict(resolved_module_config),
input_links=input_links, # type: ignore
module_details=KiaraModuleClass.from_module(module=module),
)
_s._module = module
result.append(_s)
return result
ensure_dict(v)
classmethod
¶
Source code in kiara/models/module/pipeline/__init__.py
@validator("module_config", pre=True)
def ensure_dict(cls, v):
if v is None:
v = {}
return v
ensure_input_links_valid(v)
classmethod
¶
Source code in kiara/models/module/pipeline/__init__.py
@validator("input_links", pre=True)
def ensure_input_links_valid(cls, v):
if v is None:
v = {}
result = {}
for input_name, output in v.items():
input_links = ensure_step_value_addresses(
default_field_name=input_name, link=output
)
result[input_name] = input_links
return result
ensure_valid_id(v)
classmethod
¶
Source code in kiara/models/module/pipeline/__init__.py
@validator("step_id")
def ensure_valid_id(cls, v):
# TODO: check with regex
if "." in v or " " in v:
raise ValueError(
f"Step id can't contain special characters or whitespaces: {v}"
)
return v
StepStatus (Enum)
¶
Enum to describe the state of a workflow.
Source code in kiara/models/module/pipeline/__init__.py
class StepStatus(Enum):
"""Enum to describe the state of a workflow."""
INPUTS_INVALID = "inputs_invalid"
INPUTS_READY = "inputs_ready"
RESULTS_READY = "results_ready"
create_input_alias_map(steps)
¶
Source code in kiara/models/module/pipeline/__init__.py
def create_input_alias_map(steps: Iterable[PipelineStep]) -> Dict[str, str]:
aliases: Dict[str, str] = {}
for step in steps:
field_names = step.module.input_names
for field_name in field_names:
alias = generate_pipeline_endpoint_name(
step_id=step.step_id, value_name=field_name
)
assert alias not in aliases.keys()
aliases[f"{step.step_id}.{field_name}"] = alias
return aliases
create_output_alias_map(steps)
¶
Source code in kiara/models/module/pipeline/__init__.py
def create_output_alias_map(steps: Iterable[PipelineStep]) -> Dict[str, str]:
aliases: Dict[str, str] = {}
for step in steps:
field_names = step.module.output_names
for field_name in field_names:
alias = generate_pipeline_endpoint_name(
step_id=step.step_id, value_name=field_name
)
assert alias not in aliases.keys()
aliases[f"{step.step_id}.{field_name}"] = alias
return aliases
generate_pipeline_endpoint_name(step_id, value_name)
¶
Source code in kiara/models/module/pipeline/__init__.py
def generate_pipeline_endpoint_name(step_id: str, value_name: str):
return f"{step_id}__{value_name}"
Modules¶
controller
¶
logger
¶
Classes¶
PipelineController (PipelineListener)
¶
Source code in kiara/models/module/pipeline/controller.py
class PipelineController(PipelineListener):
pass
SinglePipelineBatchController (SinglePipelineController)
¶
A [PipelineController][kiara.models.modules.pipeline.controller.PipelineController] that executes all pipeline steps non-interactively.
This is the default implementation of a PipelineController
, and probably the most simple implementation of one.
It waits until all inputs are set, after which it executes all pipeline steps in the required order.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
Pipeline |
the pipeline to control |
required |
auto_process |
bool |
whether to automatically start processing the pipeline as soon as the input set is valid |
True |
Source code in kiara/models/module/pipeline/controller.py
class SinglePipelineBatchController(SinglePipelineController):
"""A [PipelineController][kiara.models.modules.pipeline.controller.PipelineController] that executes all pipeline steps non-interactively.
This is the default implementation of a ``PipelineController``, and probably the most simple implementation of one.
It waits until all inputs are set, after which it executes all pipeline steps in the required order.
Arguments:
pipeline: the pipeline to control
auto_process: whether to automatically start processing the pipeline as soon as the input set is valid
"""
def __init__(
self,
pipeline: Pipeline,
job_registry: JobRegistry,
auto_process: bool = True,
):
self._auto_process: bool = auto_process
self._is_running: bool = False
super().__init__(pipeline=pipeline, job_registry=job_registry)
@property
def auto_process(self) -> bool:
return self._auto_process
@auto_process.setter
def auto_process(self, auto_process: bool):
self._auto_process = auto_process
def process_pipeline(self):
log = logger.bind(pipeline_id=self.pipeline.pipeline_id)
if self._is_running:
log.debug(
"ignore.pipeline_process",
reason="Pipeline already running.",
)
raise Exception("Pipeline already running.")
log.debug("execute.pipeline")
self._is_running = True
try:
for idx, stage in enumerate(
self.pipeline.structure.processing_stages, start=1
):
log.debug(
"execute.pipeline.stage",
stage=idx,
)
job_ids = {}
for step_id in stage:
log.debug(
"execute.pipeline.step",
step_id=step_id,
)
try:
job_id = self.process_step(step_id)
job_ids[step_id] = job_id
except Exception as e:
# TODO: cancel running jobs?
if is_debug():
import traceback
traceback.print_exc()
log.error(
"error.processing.pipeline",
step_id=step_id,
error=e,
)
return False
self.set_processing_results(job_ids=job_ids)
log.debug(
"execute_finished.pipeline.stage",
stage=idx,
)
finally:
self._is_running = False
log.debug("execute_finished.pipeline")
auto_process: bool
property
writable
¶process_pipeline(self)
¶Source code in kiara/models/module/pipeline/controller.py
def process_pipeline(self):
log = logger.bind(pipeline_id=self.pipeline.pipeline_id)
if self._is_running:
log.debug(
"ignore.pipeline_process",
reason="Pipeline already running.",
)
raise Exception("Pipeline already running.")
log.debug("execute.pipeline")
self._is_running = True
try:
for idx, stage in enumerate(
self.pipeline.structure.processing_stages, start=1
):
log.debug(
"execute.pipeline.stage",
stage=idx,
)
job_ids = {}
for step_id in stage:
log.debug(
"execute.pipeline.step",
step_id=step_id,
)
try:
job_id = self.process_step(step_id)
job_ids[step_id] = job_id
except Exception as e:
# TODO: cancel running jobs?
if is_debug():
import traceback
traceback.print_exc()
log.error(
"error.processing.pipeline",
step_id=step_id,
error=e,
)
return False
self.set_processing_results(job_ids=job_ids)
log.debug(
"execute_finished.pipeline.stage",
stage=idx,
)
finally:
self._is_running = False
log.debug("execute_finished.pipeline")
SinglePipelineController (PipelineController)
¶
Source code in kiara/models/module/pipeline/controller.py
class SinglePipelineController(PipelineController):
def __init__(self, pipeline: Pipeline, job_registry: JobRegistry):
self._pipeline: Pipeline = pipeline
self._job_registry: JobRegistry = job_registry
self._pipeline.add_listener(self)
self._pipeline_details: Optional[PipelineDetails] = None
@property
def pipeline(self) -> Pipeline:
return self._pipeline
def current_pipeline_state(self) -> PipelineDetails:
if self._pipeline_details is None:
self._pipeline_details = self.pipeline.get_pipeline_details()
return self._pipeline_details
def can_be_processed(self, step_id: str) -> bool:
"""Check whether the step with the provided id is ready to be processed."""
pipeline_state = self.current_pipeline_state()
step_state = pipeline_state.step_states[step_id]
return not step_state.invalid_details
def can_be_skipped(self, step_id: str) -> bool:
"""Check whether the processing of a step can be skipped."""
required = self.pipeline.structure.step_is_required(step_id=step_id)
if required:
required = self.can_be_processed(step_id)
return required
def _pipeline_event_occurred(self, event: PipelineEvent):
self._pipeline_details = None
def set_processing_results(self, job_ids: Mapping[str, uuid.UUID]):
self._job_registry.wait_for(*job_ids.values())
combined_outputs = {}
for step_id, job_id in job_ids.items():
record = self._job_registry.get_job_record_in_session(job_id=job_id)
combined_outputs[step_id] = record.outputs
self.pipeline.set_multiple_step_outputs(
changed_outputs=combined_outputs, notify_listeners=True
)
def pipeline_is_ready(self) -> bool:
"""Return whether the pipeline is ready to be processed.
A ``True`` result means that all pipeline inputs are set with valid values, and therefore every step within the
pipeline can be processed.
Returns:
whether the pipeline can be processed as a whole (``True``) or not (``False``)
"""
pipeline_inputs = self.pipeline._all_values.get_alias("pipeline.inputs")
assert pipeline_inputs is not None
return pipeline_inputs.all_items_valid
def process_step(self, step_id: str, wait: bool = False) -> uuid.UUID:
"""Kick off processing for the step with the provided id.
Arguments:
step_id: the id of the step that should be started
"""
job_config = self.pipeline.create_job_config_for_step(step_id)
job_id = self._job_registry.execute_job(job_config=job_config)
# job_id = self._processor.create_job(job_config=job_config)
# self._processor.queue_job(job_id=job_id)
if wait:
self._job_registry.wait_for(job_id)
return job_id
pipeline: Pipeline
property
readonly
¶Methods¶
can_be_processed(self, step_id)
¶Check whether the step with the provided id is ready to be processed.
Source code in kiara/models/module/pipeline/controller.py
def can_be_processed(self, step_id: str) -> bool:
"""Check whether the step with the provided id is ready to be processed."""
pipeline_state = self.current_pipeline_state()
step_state = pipeline_state.step_states[step_id]
return not step_state.invalid_details
can_be_skipped(self, step_id)
¶Check whether the processing of a step can be skipped.
Source code in kiara/models/module/pipeline/controller.py
def can_be_skipped(self, step_id: str) -> bool:
"""Check whether the processing of a step can be skipped."""
required = self.pipeline.structure.step_is_required(step_id=step_id)
if required:
required = self.can_be_processed(step_id)
return required
current_pipeline_state(self)
¶Source code in kiara/models/module/pipeline/controller.py
def current_pipeline_state(self) -> PipelineDetails:
if self._pipeline_details is None:
self._pipeline_details = self.pipeline.get_pipeline_details()
return self._pipeline_details
pipeline_is_ready(self)
¶Return whether the pipeline is ready to be processed.
A True
result means that all pipeline inputs are set with valid values, and therefore every step within the
pipeline can be processed.
Returns:
Type | Description |
---|---|
bool |
whether the pipeline can be processed as a whole ( |
Source code in kiara/models/module/pipeline/controller.py
def pipeline_is_ready(self) -> bool:
"""Return whether the pipeline is ready to be processed.
A ``True`` result means that all pipeline inputs are set with valid values, and therefore every step within the
pipeline can be processed.
Returns:
whether the pipeline can be processed as a whole (``True``) or not (``False``)
"""
pipeline_inputs = self.pipeline._all_values.get_alias("pipeline.inputs")
assert pipeline_inputs is not None
return pipeline_inputs.all_items_valid
process_step(self, step_id, wait=False)
¶Kick off processing for the step with the provided id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_id |
str |
the id of the step that should be started |
required |
Source code in kiara/models/module/pipeline/controller.py
def process_step(self, step_id: str, wait: bool = False) -> uuid.UUID:
"""Kick off processing for the step with the provided id.
Arguments:
step_id: the id of the step that should be started
"""
job_config = self.pipeline.create_job_config_for_step(step_id)
job_id = self._job_registry.execute_job(job_config=job_config)
# job_id = self._processor.create_job(job_config=job_config)
# self._processor.queue_job(job_id=job_id)
if wait:
self._job_registry.wait_for(job_id)
return job_id
set_processing_results(self, job_ids)
¶Source code in kiara/models/module/pipeline/controller.py
def set_processing_results(self, job_ids: Mapping[str, uuid.UUID]):
self._job_registry.wait_for(*job_ids.values())
combined_outputs = {}
for step_id, job_id in job_ids.items():
record = self._job_registry.get_job_record_in_session(job_id=job_id)
combined_outputs[step_id] = record.outputs
self.pipeline.set_multiple_step_outputs(
changed_outputs=combined_outputs, notify_listeners=True
)
pipeline
¶
Classes¶
Pipeline
¶
An instance of a [PipelineStructure][kiara.pipeline.structure.PipelineStructure] that holds state for all of the inputs/outputs of the steps within.
Source code in kiara/models/module/pipeline/pipeline.py
class Pipeline(object):
"""An instance of a [PipelineStructure][kiara.pipeline.structure.PipelineStructure] that holds state for all of the inputs/outputs of the steps within."""
def __init__(self, structure: PipelineStructure, data_registry: DataRegistry):
self._id: uuid.UUID = uuid.uuid4()
self._structure: PipelineStructure = structure
self._value_refs: Mapping[AliasValueMap, Iterable[ValueRef]] = None # type: ignore
# self._status: StepStatus = StepStatus.STALE
self._steps_by_stage: Dict[int, Dict[str, PipelineStep]] = None # type: ignore
self._inputs_by_stage: Dict[int, List[str]] = None # type: ignore
self._outputs_by_stage: Dict[int, List[str]] = None # type: ignore
self._data_registry: DataRegistry = data_registry
self._all_values: AliasValueMap = None # type: ignore
self._init_values()
self._listeners: List[PipelineListener] = []
# self._update_status()
@property
def pipeline_id(self) -> uuid.UUID:
return self._id
@property
def kiara_id(self) -> uuid.UUID:
return self._data_registry.kiara_id
def _init_values(self):
"""Initialize this object. This should only be called once.
Basically, this goes through all the inputs and outputs of all steps, and 'allocates' a PipelineValueInfo object
for each of them. In case where output/input or pipeline-input/input points are connected, only one
value item is allocated, since those refer to the same value.
"""
values = AliasValueMap(
alias=str(self.id), version=0, assoc_value=None, values_schema={}
)
values._data_registry = self._data_registry
for field_name, schema in self._structure.pipeline_inputs_schema.items():
values.set_alias_schema(f"pipeline.inputs.{field_name}", schema=schema)
for field_name, schema in self._structure.pipeline_outputs_schema.items():
values.set_alias_schema(f"pipeline.outputs.{field_name}", schema=schema)
for step_id in self.step_ids:
step = self.get_step(step_id)
for field_name, value_schema in step.module.inputs_schema.items():
values.set_alias_schema(
f"steps.{step_id}.inputs.{field_name}", schema=value_schema
)
for field_name, value_schema in step.module.outputs_schema.items():
values.set_alias_schema(
f"steps.{step_id}.outputs.{field_name}", schema=value_schema
)
self._all_values = values
def __eq__(self, other):
if not isinstance(other, Pipeline):
return False
return self._id == other._id
def __hash__(self):
return hash(self._id)
def add_listener(self, listener: PipelineListener):
self._listeners.append(listener)
@property
def id(self) -> uuid.UUID:
return self._id
@property
def structure(self) -> PipelineStructure:
return self._structure
def get_current_pipeline_inputs(self) -> Dict[str, uuid.UUID]:
"""All (pipeline) input values of this pipeline."""
alias_map = self._all_values.get_alias("pipeline.inputs")
return alias_map.get_all_value_ids() # type: ignore
def get_current_pipeline_outputs(self) -> Dict[str, uuid.UUID]:
"""All (pipeline) output values of this pipeline."""
alias_map = self._all_values.get_alias("pipeline.outputs")
return alias_map.get_all_value_ids() # type: ignore
def get_current_step_inputs(self, step_id) -> Dict[str, uuid.UUID]:
alias_map = self._all_values.get_alias(f"steps.{step_id}.inputs")
return alias_map.get_all_value_ids() # type: ignore
def get_current_step_outputs(self, step_id) -> Dict[str, uuid.UUID]:
alias_map = self._all_values.get_alias(f"steps.{step_id}.outputs")
return alias_map.get_all_value_ids() # type: ignore
def get_inputs_for_steps(self, *step_ids: str) -> Dict[str, Dict[str, uuid.UUID]]:
"""Retrieve value ids for the inputs of the specified steps (or all steps, if no argument provided."""
result = {}
for step_id in self._structure.step_ids:
if step_ids and step_id not in step_ids:
continue
ids = self.get_current_step_inputs(step_id=step_id)
result[step_id] = ids
return result
def get_outputs_for_steps(self, *step_ids: str) -> Dict[str, Dict[str, uuid.UUID]]:
"""Retrieve value ids for the outputs of the specified steps (or all steps, if no argument provided."""
result = {}
for step_id in self._structure.step_ids:
if step_ids and step_id not in step_ids:
continue
ids = self.get_current_step_outputs(step_id=step_id)
result[step_id] = ids
return result
def _notify_pipeline_listeners(self, event: PipelineEvent):
for listener in self._listeners:
listener._pipeline_event_occurred(event=event)
def get_pipeline_details(self) -> PipelineDetails:
pipeline_inputs = self._all_values.get_alias("pipeline.inputs")
pipeline_outputs = self._all_values.get_alias("pipeline.outputs")
assert pipeline_inputs is not None
assert pipeline_outputs is not None
invalid = pipeline_inputs.check_invalid()
if not invalid:
status = StepStatus.INPUTS_READY
step_outputs = self._all_values.get_alias("pipeline.outputs")
assert step_outputs is not None
invalid_outputs = step_outputs.check_invalid()
# TODO: also check that all the pedigrees match up with current inputs
if not invalid_outputs:
status = StepStatus.RESULTS_READY
else:
status = StepStatus.INPUTS_INVALID
step_states = {}
for step_id in self._structure.step_ids:
d = self.get_step_details(step_id)
step_states[step_id] = d
details = PipelineDetails.construct(
kiara_id=self._data_registry.kiara_id,
pipeline_id=self.pipeline_id,
pipeline_status=status,
pipeline_inputs=pipeline_inputs.get_all_value_ids(),
pipeline_outputs=pipeline_outputs.get_all_value_ids(),
invalid_details=invalid,
step_states=step_states,
)
return details
def get_step_details(self, step_id: str) -> StepDetails:
step_input_ids = self.get_current_step_inputs(step_id=step_id)
step_output_ids = self.get_current_step_outputs(step_id=step_id)
step_inputs = self._all_values.get_alias(f"steps.{step_id}.inputs")
assert step_inputs is not None
invalid = step_inputs.check_invalid()
processing_stage = self._structure.get_processing_stage(step_id)
if not invalid:
status = StepStatus.INPUTS_READY
step_outputs = self._all_values.get_alias(f"steps.{step_id}.outputs")
assert step_outputs is not None
invalid_outputs = step_outputs.check_invalid()
# TODO: also check that all the pedigrees match up with current inputs
if not invalid_outputs:
status = StepStatus.RESULTS_READY
else:
status = StepStatus.INPUTS_INVALID
details = StepDetails.construct(
kiara_id=self._data_registry.kiara_id,
pipeline_id=self.pipeline_id,
step_id=step_id,
status=status,
inputs=step_input_ids,
outputs=step_output_ids,
invalid_details=invalid,
processing_stage=processing_stage,
)
return details
def set_pipeline_inputs(
self,
inputs: Mapping[str, Any],
sync_to_step_inputs: bool = True,
notify_listeners: bool = True,
) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:
values_to_set: Dict[str, uuid.UUID] = {}
for k, v in inputs.items():
if v is None:
values_to_set[k] = NONE_VALUE_ID
else:
alias_map = self._all_values.get_alias("pipeline.inputs")
assert alias_map is not None
# dbg(alias_map.__dict__)
schema = alias_map.values_schema[k]
value = self._data_registry.register_data(
data=v, schema=schema, pedigree=ORPHAN, reuse_existing=True
)
values_to_set[k] = value.value_id
changed_pipeline_inputs = self._set_values("pipeline.inputs", **values_to_set)
changed_results = {"__pipeline__": {"inputs": changed_pipeline_inputs}}
if sync_to_step_inputs:
changed = self.sync_pipeline_inputs(notify_listeners=False)
dpath.util.merge(changed_results, changed) # type: ignore
if notify_listeners:
event = PipelineEvent.create_event(pipeline=self, changed=changed_results)
self._notify_pipeline_listeners(event)
return changed_results
def sync_pipeline_inputs(
self, notify_listeners: bool = True
) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:
pipeline_inputs = self.get_current_pipeline_inputs()
values_to_sync: Dict[str, Dict[str, Optional[uuid.UUID]]] = {}
for field_name, ref in self._structure.pipeline_input_refs.items():
for step_input in ref.connected_inputs:
step_inputs = self.get_current_step_inputs(step_input.step_id)
if step_inputs[step_input.value_name] != pipeline_inputs[field_name]:
values_to_sync.setdefault(step_input.step_id, {})[
step_input.value_name
] = pipeline_inputs[field_name]
results: Dict[str, Mapping[str, Mapping[str, ChangedValue]]] = {}
for step_id in values_to_sync.keys():
values = values_to_sync[step_id]
step_changed = self._set_step_inputs(step_id=step_id, inputs=values)
dpath.util.merge(results, step_changed) # type: ignore
if notify_listeners:
event = PipelineEvent.create_event(pipeline=self, changed=results)
self._notify_pipeline_listeners(event)
return results
def _set_step_inputs(
self, step_id: str, inputs: Mapping[str, Optional[uuid.UUID]]
) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:
changed_step_inputs = self._set_values(f"steps.{step_id}.inputs", **inputs)
if not changed_step_inputs:
return {}
result: Dict[str, Dict[str, Dict[str, ChangedValue]]] = {
step_id: {"inputs": changed_step_inputs}
}
step_outputs = self._structure.get_step_output_refs(step_id=step_id)
null_outputs = {k: None for k in step_outputs.keys()}
changed_outputs = self.set_step_outputs(
step_id=step_id, outputs=null_outputs, notify_listeners=False
)
assert step_id not in changed_outputs.keys()
result.update(changed_outputs) # type: ignore
return result
def set_multiple_step_outputs(
self,
changed_outputs: Mapping[str, Mapping[str, Optional[uuid.UUID]]],
notify_listeners: bool = True,
) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:
results: Dict[str, Dict[str, Dict[str, ChangedValue]]] = {}
for step_id, outputs in changed_outputs.items():
step_results = self.set_step_outputs(
step_id=step_id, outputs=outputs, notify_listeners=False
)
dpath.util.merge(results, step_results) # type: ignore
if notify_listeners:
event = PipelineEvent.create_event(pipeline=self, changed=results)
self._notify_pipeline_listeners(event)
return results
def set_step_outputs(
self,
step_id: str,
outputs: Mapping[str, Optional[uuid.UUID]],
notify_listeners: bool = True,
) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:
# make sure pedigrees match with respective inputs?
changed_step_outputs = self._set_values(f"steps.{step_id}.outputs", **outputs)
if not changed_step_outputs:
return {}
result: Dict[str, Dict[str, Dict[str, ChangedValue]]] = {
step_id: {"outputs": changed_step_outputs}
}
output_refs = self._structure.get_step_output_refs(step_id=step_id)
pipeline_outputs: Dict[str, Optional[uuid.UUID]] = {}
inputs_to_set: Dict[str, Dict[str, Optional[uuid.UUID]]] = {}
for field_name, ref in output_refs.items():
if ref.pipeline_output:
assert ref.pipeline_output not in pipeline_outputs.keys()
pipeline_outputs[ref.pipeline_output] = outputs[field_name]
for input_ref in ref.connected_inputs:
inputs_to_set.setdefault(input_ref.step_id, {})[
input_ref.value_name
] = outputs[field_name]
for step_id, step_inputs in inputs_to_set.items():
changed_step_fields = self._set_step_inputs(
step_id=step_id, inputs=step_inputs
)
dpath.util.merge(result, changed_step_fields) # type: ignore
if pipeline_outputs:
changed_pipeline_outputs = self._set_pipeline_outputs(**pipeline_outputs)
dpath.util.merge( # type: ignore
result, {"__pipeline__": {"outputs": changed_pipeline_outputs}}
)
if notify_listeners:
event = PipelineEvent.create_event(pipeline=self, changed=result)
self._notify_pipeline_listeners(event)
return result
def _set_pipeline_outputs(
self, **outputs: Optional[uuid.UUID]
) -> Mapping[str, ChangedValue]:
changed_pipeline_outputs = self._set_values("pipeline.outputs", **outputs)
return changed_pipeline_outputs
def _set_values(
self, alias: str, **values: Optional[uuid.UUID]
) -> Dict[str, ChangedValue]:
"""Set values (value-ids) for the sub-alias-map with the specified alias path."""
invalid = {}
for k in values.keys():
_alias = self._all_values.get_alias(alias)
assert _alias is not None
if k not in _alias.values_schema.keys():
invalid[
k
] = f"Invalid field '{k}'. Available fields: {', '.join(self.get_current_pipeline_inputs().keys())}"
if invalid:
raise InvalidValuesException(invalid_values=invalid)
alias_map: Optional[AliasValueMap] = self._all_values.get_alias(alias)
assert alias_map is not None
values_to_set: Dict[str, Optional[uuid.UUID]] = {}
current: Dict[str, Optional[uuid.UUID]] = {}
changed: Dict[str, ChangedValue] = {}
for field_name, new_value in values.items():
current_value = self._all_values.get_alias(f"{alias}.{field_name}")
if current_value is not None:
current_value_id = current_value.assoc_value
else:
current_value_id = None
current[field_name] = current_value_id
if current_value_id != new_value:
values_to_set[field_name] = new_value
changed[field_name] = ChangedValue(old=current_value_id, new=new_value)
_alias = self._all_values.get_alias(alias)
assert _alias is not None
_alias.set_aliases(**values_to_set)
return changed
@property
def step_ids(self) -> Iterable[str]:
"""Return all ids of the steps of this pipeline."""
return self._structure.step_ids
def get_step(self, step_id: str) -> PipelineStep:
"""Return the object representing a step in this workflow, identified by the step id."""
return self._structure.get_step(step_id)
def get_steps_by_stage(
self,
) -> Mapping[int, Mapping[str, PipelineStep]]:
"""Return a all pipeline steps, ordered by stage they belong to."""
if self._steps_by_stage is not None:
return self._steps_by_stage
result: Dict[int, Dict[str, PipelineStep]] = {}
for step_id in self.step_ids:
step = self.get_step(step_id)
stage = self._structure.get_processing_stage(step.step_id)
assert stage is not None
result.setdefault(stage, {})[step_id] = step
self._steps_by_stage = result
return self._steps_by_stage
def create_job_config_for_step(self, step_id: str) -> JobConfig:
step_inputs: Mapping[str, Optional[uuid.UUID]] = self.get_current_step_inputs(
step_id
)
step_details: StepDetails = self.get_step_details(step_id=step_id)
step: PipelineStep = self.get_step(step_id=step_id)
# if the inputs are not valid, ignore this step
if step_details.status == StepStatus.INPUTS_INVALID:
invalid_details = step_details.invalid_details
assert invalid_details is not None
msg = f"Can't execute step '{step_id}', invalid inputs: {', '.join(invalid_details.keys())}"
raise InvalidValuesException(msg=msg, invalid_values=invalid_details)
job_config = JobConfig.create_from_module(
data_registry=self._data_registry, module=step.module, inputs=step_inputs
)
return job_config
Attributes¶
id: UUID
property
readonly
¶kiara_id: UUID
property
readonly
¶pipeline_id: UUID
property
readonly
¶step_ids: Iterable[str]
property
readonly
¶Return all ids of the steps of this pipeline.
structure: PipelineStructure
property
readonly
¶Methods¶
add_listener(self, listener)
¶Source code in kiara/models/module/pipeline/pipeline.py
def add_listener(self, listener: PipelineListener):
self._listeners.append(listener)
create_job_config_for_step(self, step_id)
¶Source code in kiara/models/module/pipeline/pipeline.py
def create_job_config_for_step(self, step_id: str) -> JobConfig:
step_inputs: Mapping[str, Optional[uuid.UUID]] = self.get_current_step_inputs(
step_id
)
step_details: StepDetails = self.get_step_details(step_id=step_id)
step: PipelineStep = self.get_step(step_id=step_id)
# if the inputs are not valid, ignore this step
if step_details.status == StepStatus.INPUTS_INVALID:
invalid_details = step_details.invalid_details
assert invalid_details is not None
msg = f"Can't execute step '{step_id}', invalid inputs: {', '.join(invalid_details.keys())}"
raise InvalidValuesException(msg=msg, invalid_values=invalid_details)
job_config = JobConfig.create_from_module(
data_registry=self._data_registry, module=step.module, inputs=step_inputs
)
return job_config
get_current_pipeline_inputs(self)
¶All (pipeline) input values of this pipeline.
Source code in kiara/models/module/pipeline/pipeline.py
def get_current_pipeline_inputs(self) -> Dict[str, uuid.UUID]:
"""All (pipeline) input values of this pipeline."""
alias_map = self._all_values.get_alias("pipeline.inputs")
return alias_map.get_all_value_ids() # type: ignore
get_current_pipeline_outputs(self)
¶All (pipeline) output values of this pipeline.
Source code in kiara/models/module/pipeline/pipeline.py
def get_current_pipeline_outputs(self) -> Dict[str, uuid.UUID]:
"""All (pipeline) output values of this pipeline."""
alias_map = self._all_values.get_alias("pipeline.outputs")
return alias_map.get_all_value_ids() # type: ignore
get_current_step_inputs(self, step_id)
¶Source code in kiara/models/module/pipeline/pipeline.py
def get_current_step_inputs(self, step_id) -> Dict[str, uuid.UUID]:
alias_map = self._all_values.get_alias(f"steps.{step_id}.inputs")
return alias_map.get_all_value_ids() # type: ignore
get_current_step_outputs(self, step_id)
¶Source code in kiara/models/module/pipeline/pipeline.py
def get_current_step_outputs(self, step_id) -> Dict[str, uuid.UUID]:
alias_map = self._all_values.get_alias(f"steps.{step_id}.outputs")
return alias_map.get_all_value_ids() # type: ignore
get_inputs_for_steps(self, *step_ids)
¶Retrieve value ids for the inputs of the specified steps (or all steps, if no argument provided.
Source code in kiara/models/module/pipeline/pipeline.py
def get_inputs_for_steps(self, *step_ids: str) -> Dict[str, Dict[str, uuid.UUID]]:
"""Retrieve value ids for the inputs of the specified steps (or all steps, if no argument provided."""
result = {}
for step_id in self._structure.step_ids:
if step_ids and step_id not in step_ids:
continue
ids = self.get_current_step_inputs(step_id=step_id)
result[step_id] = ids
return result
get_outputs_for_steps(self, *step_ids)
¶Retrieve value ids for the outputs of the specified steps (or all steps, if no argument provided.
Source code in kiara/models/module/pipeline/pipeline.py
def get_outputs_for_steps(self, *step_ids: str) -> Dict[str, Dict[str, uuid.UUID]]:
"""Retrieve value ids for the outputs of the specified steps (or all steps, if no argument provided."""
result = {}
for step_id in self._structure.step_ids:
if step_ids and step_id not in step_ids:
continue
ids = self.get_current_step_outputs(step_id=step_id)
result[step_id] = ids
return result
get_pipeline_details(self)
¶Source code in kiara/models/module/pipeline/pipeline.py
def get_pipeline_details(self) -> PipelineDetails:
pipeline_inputs = self._all_values.get_alias("pipeline.inputs")
pipeline_outputs = self._all_values.get_alias("pipeline.outputs")
assert pipeline_inputs is not None
assert pipeline_outputs is not None
invalid = pipeline_inputs.check_invalid()
if not invalid:
status = StepStatus.INPUTS_READY
step_outputs = self._all_values.get_alias("pipeline.outputs")
assert step_outputs is not None
invalid_outputs = step_outputs.check_invalid()
# TODO: also check that all the pedigrees match up with current inputs
if not invalid_outputs:
status = StepStatus.RESULTS_READY
else:
status = StepStatus.INPUTS_INVALID
step_states = {}
for step_id in self._structure.step_ids:
d = self.get_step_details(step_id)
step_states[step_id] = d
details = PipelineDetails.construct(
kiara_id=self._data_registry.kiara_id,
pipeline_id=self.pipeline_id,
pipeline_status=status,
pipeline_inputs=pipeline_inputs.get_all_value_ids(),
pipeline_outputs=pipeline_outputs.get_all_value_ids(),
invalid_details=invalid,
step_states=step_states,
)
return details
get_step(self, step_id)
¶Return the object representing a step in this workflow, identified by the step id.
Source code in kiara/models/module/pipeline/pipeline.py
def get_step(self, step_id: str) -> PipelineStep:
"""Return the object representing a step in this workflow, identified by the step id."""
return self._structure.get_step(step_id)
get_step_details(self, step_id)
¶Source code in kiara/models/module/pipeline/pipeline.py
def get_step_details(self, step_id: str) -> StepDetails:
step_input_ids = self.get_current_step_inputs(step_id=step_id)
step_output_ids = self.get_current_step_outputs(step_id=step_id)
step_inputs = self._all_values.get_alias(f"steps.{step_id}.inputs")
assert step_inputs is not None
invalid = step_inputs.check_invalid()
processing_stage = self._structure.get_processing_stage(step_id)
if not invalid:
status = StepStatus.INPUTS_READY
step_outputs = self._all_values.get_alias(f"steps.{step_id}.outputs")
assert step_outputs is not None
invalid_outputs = step_outputs.check_invalid()
# TODO: also check that all the pedigrees match up with current inputs
if not invalid_outputs:
status = StepStatus.RESULTS_READY
else:
status = StepStatus.INPUTS_INVALID
details = StepDetails.construct(
kiara_id=self._data_registry.kiara_id,
pipeline_id=self.pipeline_id,
step_id=step_id,
status=status,
inputs=step_input_ids,
outputs=step_output_ids,
invalid_details=invalid,
processing_stage=processing_stage,
)
return details
get_steps_by_stage(self)
¶Return a all pipeline steps, ordered by stage they belong to.
Source code in kiara/models/module/pipeline/pipeline.py
def get_steps_by_stage(
self,
) -> Mapping[int, Mapping[str, PipelineStep]]:
"""Return a all pipeline steps, ordered by stage they belong to."""
if self._steps_by_stage is not None:
return self._steps_by_stage
result: Dict[int, Dict[str, PipelineStep]] = {}
for step_id in self.step_ids:
step = self.get_step(step_id)
stage = self._structure.get_processing_stage(step.step_id)
assert stage is not None
result.setdefault(stage, {})[step_id] = step
self._steps_by_stage = result
return self._steps_by_stage
set_multiple_step_outputs(self, changed_outputs, notify_listeners=True)
¶Source code in kiara/models/module/pipeline/pipeline.py
def set_multiple_step_outputs(
self,
changed_outputs: Mapping[str, Mapping[str, Optional[uuid.UUID]]],
notify_listeners: bool = True,
) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:
results: Dict[str, Dict[str, Dict[str, ChangedValue]]] = {}
for step_id, outputs in changed_outputs.items():
step_results = self.set_step_outputs(
step_id=step_id, outputs=outputs, notify_listeners=False
)
dpath.util.merge(results, step_results) # type: ignore
if notify_listeners:
event = PipelineEvent.create_event(pipeline=self, changed=results)
self._notify_pipeline_listeners(event)
return results
set_pipeline_inputs(self, inputs, sync_to_step_inputs=True, notify_listeners=True)
¶Source code in kiara/models/module/pipeline/pipeline.py
def set_pipeline_inputs(
self,
inputs: Mapping[str, Any],
sync_to_step_inputs: bool = True,
notify_listeners: bool = True,
) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:
values_to_set: Dict[str, uuid.UUID] = {}
for k, v in inputs.items():
if v is None:
values_to_set[k] = NONE_VALUE_ID
else:
alias_map = self._all_values.get_alias("pipeline.inputs")
assert alias_map is not None
# dbg(alias_map.__dict__)
schema = alias_map.values_schema[k]
value = self._data_registry.register_data(
data=v, schema=schema, pedigree=ORPHAN, reuse_existing=True
)
values_to_set[k] = value.value_id
changed_pipeline_inputs = self._set_values("pipeline.inputs", **values_to_set)
changed_results = {"__pipeline__": {"inputs": changed_pipeline_inputs}}
if sync_to_step_inputs:
changed = self.sync_pipeline_inputs(notify_listeners=False)
dpath.util.merge(changed_results, changed) # type: ignore
if notify_listeners:
event = PipelineEvent.create_event(pipeline=self, changed=changed_results)
self._notify_pipeline_listeners(event)
return changed_results
set_step_outputs(self, step_id, outputs, notify_listeners=True)
¶Source code in kiara/models/module/pipeline/pipeline.py
def set_step_outputs(
self,
step_id: str,
outputs: Mapping[str, Optional[uuid.UUID]],
notify_listeners: bool = True,
) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:
# make sure pedigrees match with respective inputs?
changed_step_outputs = self._set_values(f"steps.{step_id}.outputs", **outputs)
if not changed_step_outputs:
return {}
result: Dict[str, Dict[str, Dict[str, ChangedValue]]] = {
step_id: {"outputs": changed_step_outputs}
}
output_refs = self._structure.get_step_output_refs(step_id=step_id)
pipeline_outputs: Dict[str, Optional[uuid.UUID]] = {}
inputs_to_set: Dict[str, Dict[str, Optional[uuid.UUID]]] = {}
for field_name, ref in output_refs.items():
if ref.pipeline_output:
assert ref.pipeline_output not in pipeline_outputs.keys()
pipeline_outputs[ref.pipeline_output] = outputs[field_name]
for input_ref in ref.connected_inputs:
inputs_to_set.setdefault(input_ref.step_id, {})[
input_ref.value_name
] = outputs[field_name]
for step_id, step_inputs in inputs_to_set.items():
changed_step_fields = self._set_step_inputs(
step_id=step_id, inputs=step_inputs
)
dpath.util.merge(result, changed_step_fields) # type: ignore
if pipeline_outputs:
changed_pipeline_outputs = self._set_pipeline_outputs(**pipeline_outputs)
dpath.util.merge( # type: ignore
result, {"__pipeline__": {"outputs": changed_pipeline_outputs}}
)
if notify_listeners:
event = PipelineEvent.create_event(pipeline=self, changed=result)
self._notify_pipeline_listeners(event)
return result
sync_pipeline_inputs(self, notify_listeners=True)
¶Source code in kiara/models/module/pipeline/pipeline.py
def sync_pipeline_inputs(
self, notify_listeners: bool = True
) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:
pipeline_inputs = self.get_current_pipeline_inputs()
values_to_sync: Dict[str, Dict[str, Optional[uuid.UUID]]] = {}
for field_name, ref in self._structure.pipeline_input_refs.items():
for step_input in ref.connected_inputs:
step_inputs = self.get_current_step_inputs(step_input.step_id)
if step_inputs[step_input.value_name] != pipeline_inputs[field_name]:
values_to_sync.setdefault(step_input.step_id, {})[
step_input.value_name
] = pipeline_inputs[field_name]
results: Dict[str, Mapping[str, Mapping[str, ChangedValue]]] = {}
for step_id in values_to_sync.keys():
values = values_to_sync[step_id]
step_changed = self._set_step_inputs(step_id=step_id, inputs=values)
dpath.util.merge(results, step_changed) # type: ignore
if notify_listeners:
event = PipelineEvent.create_event(pipeline=self, changed=results)
self._notify_pipeline_listeners(event)
return results
PipelineListener (ABC)
¶
Source code in kiara/models/module/pipeline/pipeline.py
class PipelineListener(abc.ABC):
@abc.abstractmethod
def _pipeline_event_occurred(self, event: PipelineEvent):
pass
structure
¶
Classes¶
PipelineStructure (KiaraModel)
pydantic-model
¶
An object that holds one or several steps, and describes the connections between them.
Source code in kiara/models/module/pipeline/structure.py
class PipelineStructure(KiaraModel):
"""An object that holds one or several steps, and describes the connections between them."""
_kiara_model_id = "instance.pipeline_structure"
pipeline_config: PipelineConfig = Field(
description="The underlying pipeline config."
)
steps: List[PipelineStep] = Field(description="The pipeline steps ")
input_aliases: Dict[str, str] = Field(description="The input aliases.")
output_aliases: Dict[str, str] = Field(description="The output aliases.")
@root_validator(pre=True)
def validate_pipeline_config(cls, values):
pipeline_config = values.get("pipeline_config", None)
if not pipeline_config:
raise ValueError("No 'pipeline_config' provided.")
if len(values) != 1:
raise ValueError(
"Only 'pipeline_config' key allowed when creating a pipeline structure object."
)
_config: PipelineConfig = pipeline_config
_steps: List[PipelineStep] = list(_config.steps)
_input_aliases: Dict[str, str] = dict(_config.input_aliases)
_output_aliases: Dict[str, str] = dict(_config.output_aliases)
invalid_input_aliases = [a for a in _input_aliases.values() if "." in a]
if invalid_input_aliases:
raise Exception(
f"Invalid input aliases, aliases can't contain special characters: {', '.join(invalid_input_aliases)}."
)
invalid_output_aliases = [a for a in _input_aliases.values() if "." in a]
if invalid_input_aliases:
raise Exception(
f"Invalid input aliases, aliases can't contain special characters: {', '.join(invalid_output_aliases)}."
)
valid_input_names = set()
for step in _steps:
for input_name in step.module.input_names:
valid_input_names.add(f"{step.step_id}.{input_name}")
invalid_input_aliases = [
a for a in _input_aliases.keys() if a not in valid_input_names
]
if invalid_input_aliases:
raise Exception(
f"Invalid input reference(s): {', '.join(invalid_input_aliases)}. Must be one of: {', '.join(valid_input_names)}"
)
valid_output_names = set()
for step in _steps:
for output_name in step.module.output_names:
valid_output_names.add(f"{step.step_id}.{output_name}")
invalid_output_names = [
a for a in _output_aliases.keys() if a not in valid_output_names
]
if invalid_output_names:
raise Exception(
f"Invalid output reference(s): {', '.join(invalid_output_names)}. Must be one of: {', '.join(valid_output_names)}"
)
values["steps"] = _steps
values["input_aliases"] = _input_aliases
values["output_aliases"] = _output_aliases
return values
# this is hardcoded for now
_add_all_workflow_outputs: bool = PrivateAttr(default=False)
_constants: Dict[str, Any] = PrivateAttr(default=None) # type: ignore
_defaults: Dict[str, Any] = PrivateAttr(None) # type: ignore
_execution_graph: nx.DiGraph = PrivateAttr(None) # type: ignore
_data_flow_graph: nx.DiGraph = PrivateAttr(None) # type: ignore
_data_flow_graph_simple: nx.DiGraph = PrivateAttr(None) # type: ignore
_processing_stages: List[List[str]] = PrivateAttr(None) # type: ignore
# holds details about the (current) processing steps contained in this workflow
_steps_details: Dict[str, Any] = PrivateAttr(None) # type: ignore
def _retrieve_data_to_hash(self) -> Any:
return {
"steps": [step.instance_cid for step in self.steps],
"input_aliases": self.input_aliases,
"output_aliases": self.output_aliases,
}
def _retrieve_id(self) -> str:
return self.pipeline_config.instance_id
@property
def steps_details(self) -> Mapping[str, Any]:
if self._steps_details is None:
self._process_steps()
return self._steps_details # type: ignore
@property
def step_ids(self) -> Iterable[str]:
if self._steps_details is None:
self._process_steps()
return self._steps_details.keys() # type: ignore
@property
def constants(self) -> Mapping[str, Any]:
if self._constants is None:
self._process_steps()
return self._constants # type: ignore
@property
def defaults(self) -> Mapping[str, Any]:
if self._defaults is None:
self._process_steps()
return self._defaults # type: ignore
def get_step(self, step_id: str) -> PipelineStep:
d = self.steps_details.get(step_id, None)
if d is None:
raise Exception(f"No module with id: {step_id}")
return d["step"]
def get_step_input_refs(self, step_id: str) -> Mapping[str, StepInputRef]:
d = self.steps_details.get(step_id, None)
if d is None:
raise Exception(f"No module with id: {step_id}")
return d["inputs"]
def get_step_output_refs(self, step_id: str) -> Mapping[str, StepOutputRef]:
d = self.steps_details.get(step_id, None)
if d is None:
raise Exception(f"No module with id: {step_id}")
return d["outputs"]
def get_step_details(self, step_id: str) -> Mapping[str, Any]:
d = self.steps_details.get(step_id, None)
if d is None:
raise Exception(f"No module with id: {step_id}")
return d
@property
def execution_graph(self) -> nx.DiGraph:
if self._execution_graph is None:
self._process_steps()
return self._execution_graph
@property
def data_flow_graph(self) -> nx.DiGraph:
if self._data_flow_graph is None:
self._process_steps()
return self._data_flow_graph
@property
def data_flow_graph_simple(self) -> nx.DiGraph:
if self._data_flow_graph_simple is None:
self._process_steps()
return self._data_flow_graph_simple
@property
def processing_stages(self) -> List[List[str]]:
if self._steps_details is None:
self._process_steps()
return self._processing_stages
@lru_cache()
def _get_node_of_type(self, node_type: str):
if self._steps_details is None:
self._process_steps()
return [
node
for node, attr in self._data_flow_graph.nodes(data=True)
if attr["type"] == node_type
]
@property
def steps_input_refs(self) -> Dict[str, StepInputRef]:
return {
node.alias: node
for node in self._get_node_of_type(node_type=StepInputRef.__name__)
}
@property
def steps_output_refs(self) -> Dict[str, StepOutputRef]:
return {
node.alias: node
for node in self._get_node_of_type(node_type=StepOutputRef.__name__)
}
@property
def pipeline_input_refs(self) -> Dict[str, PipelineInputRef]:
return {
node.value_name: node
for node in self._get_node_of_type(node_type=PipelineInputRef.__name__)
}
@property
def pipeline_output_refs(self) -> Dict[str, PipelineOutputRef]:
return {
node.value_name: node
for node in self._get_node_of_type(node_type=PipelineOutputRef.__name__)
}
@property
def pipeline_inputs_schema(self) -> Mapping[str, ValueSchema]:
schemas = {
input_name: w_in.value_schema
for input_name, w_in in self.pipeline_input_refs.items()
}
return schemas
@property
def pipeline_outputs_schema(self) -> Mapping[str, ValueSchema]:
return {
output_name: w_out.value_schema
for output_name, w_out in self.pipeline_output_refs.items()
}
def get_processing_stage(self, step_id: str) -> int:
"""Return the processing stage for the specified step_id.
Returns the stage nr (starting with '1').
"""
for index, stage in enumerate(self.processing_stages, start=1):
if step_id in stage:
return index
raise Exception(f"Invalid step id '{step_id}'.")
def step_is_required(self, step_id: str) -> bool:
"""Check if the specified step is required, or can be omitted."""
return self.get_step_details(step_id=step_id)["required"]
def _process_steps(self):
"""The core method of this class, it connects all the processing modules, their inputs and outputs."""
steps_details: Dict[str, Any] = {}
execution_graph = nx.DiGraph()
execution_graph.add_node("__root__")
data_flow_graph = nx.DiGraph()
data_flow_graph_simple = nx.DiGraph()
processing_stages = []
constants = {}
structure_defaults = {}
# temp variable, to hold all outputs
outputs: Dict[str, StepOutputRef] = {}
# process all pipeline and step outputs first
_temp_steps_map: Dict[str, PipelineStep] = {}
pipeline_outputs: Dict[str, PipelineOutputRef] = {}
for step in self.steps:
_temp_steps_map[step.step_id] = step
if step.step_id in steps_details.keys():
raise Exception(
f"Can't process steps: duplicate step_id '{step.step_id}'"
)
steps_details[step.step_id] = {
"step": step,
"outputs": {},
"inputs": {},
"required": True,
}
data_flow_graph.add_node(step, type="step")
# go through all the module outputs, create points for them and connect them to pipeline outputs
for output_name, schema in step.module.outputs_schema.items():
step_output = StepOutputRef(
value_name=output_name,
value_schema=schema,
step_id=step.step_id,
)
steps_details[step.step_id]["outputs"][output_name] = step_output
step_alias = generate_step_alias(step.step_id, output_name)
outputs[step_alias] = step_output
# step_output_name = generate_pipeline_endpoint_name(
# step_id=step.step_id, value_name=output_name
# )
step_output_name = f"{step.step_id}.{output_name}"
if not self.output_aliases:
raise NotImplementedError()
if step_output_name in self.output_aliases.keys():
step_output_name = self.output_aliases[step_output_name]
else:
if not self._add_all_workflow_outputs:
# this output is not interesting for the workflow
step_output_name = None
if step_output_name:
step_output_address = StepValueAddress(
step_id=step.step_id, value_name=output_name
)
pipeline_output = PipelineOutputRef(
value_name=step_output_name,
connected_output=step_output_address,
value_schema=schema,
)
pipeline_outputs[step_output_name] = pipeline_output
step_output.pipeline_output = pipeline_output.value_name
data_flow_graph.add_node(
pipeline_output, type=PipelineOutputRef.__name__
)
data_flow_graph.add_edge(step_output, pipeline_output)
data_flow_graph_simple.add_node(
pipeline_output, type=PipelineOutputRef.__name__
)
data_flow_graph_simple.add_edge(step, pipeline_output)
data_flow_graph.add_node(step_output, type=StepOutputRef.__name__)
data_flow_graph.add_edge(step, step_output)
# now process inputs, and connect them to the appropriate output/pipeline-input points
existing_pipeline_input_points: Dict[str, PipelineInputRef] = {}
for step in self.steps:
other_step_dependency: Set = set()
# go through all the inputs of a module, create input points and connect them to either
# other module outputs, or pipeline inputs (which need to be created)
module_constants: Mapping[str, Any] = step.module.get_config_value(
"constants"
)
for input_name, schema in step.module.inputs_schema.items():
matching_input_links: List[StepValueAddress] = []
is_constant = input_name in module_constants.keys()
for value_name, input_links in step.input_links.items():
if value_name == input_name:
for input_link in input_links:
if input_link in matching_input_links:
raise Exception(f"Duplicate input link: {input_link}")
matching_input_links.append(input_link)
if matching_input_links:
# this means we connect to other steps output
connected_output_points: List[StepOutputRef] = []
connected_outputs: List[StepValueAddress] = []
for input_link in matching_input_links:
output_id = generate_step_alias(
input_link.step_id, input_link.value_name
)
if output_id not in outputs.keys():
raise Exception(
f"Can't connect input '{input_name}' for step '{step.step_id}': no output '{output_id}' available. Available output names: {', '.join(outputs.keys())}"
)
connected_output_points.append(outputs[output_id])
connected_outputs.append(input_link)
other_step_dependency.add(input_link.step_id)
step_input_point = StepInputRef(
step_id=step.step_id,
value_name=input_name,
value_schema=schema,
is_constant=is_constant,
connected_pipeline_input=None,
connected_outputs=connected_outputs,
)
for op in connected_output_points:
op.connected_inputs.append(step_input_point.address)
data_flow_graph.add_edge(op, step_input_point)
data_flow_graph_simple.add_edge(
_temp_steps_map[op.step_id], step_input_point
) # TODO: name edge
data_flow_graph_simple.add_edge(
step_input_point, step
) # TODO: name edge
else:
# this means we connect to pipeline input
# pipeline_input_name = generate_pipeline_endpoint_name(
# step_id=step.step_id, value_name=input_name
# )
pipeline_input_name = f"{step.step_id}.{input_name}"
# check whether this input has an alias associated with it
if not self.input_aliases:
raise NotImplementedError()
if pipeline_input_name in self.input_aliases.keys():
# this means we use the pipeline alias
pipeline_input_name = self.input_aliases[pipeline_input_name]
if pipeline_input_name in existing_pipeline_input_points.keys():
# we already created a pipeline input with this name
# TODO: check whether schema fits
connected_pipeline_input = existing_pipeline_input_points[
pipeline_input_name
]
assert connected_pipeline_input.is_constant == is_constant
else:
# we need to create the pipeline input
connected_pipeline_input = PipelineInputRef(
value_name=pipeline_input_name,
value_schema=schema,
is_constant=is_constant,
)
existing_pipeline_input_points[
pipeline_input_name
] = connected_pipeline_input
data_flow_graph.add_node(
connected_pipeline_input, type=PipelineInputRef.__name__
)
data_flow_graph_simple.add_node(
connected_pipeline_input, type=PipelineInputRef.__name__
)
if is_constant:
constants[
pipeline_input_name
] = step.module.get_config_value("constants")[input_name]
default_val = step.module.get_config_value("defaults").get(
input_name, None
)
if is_constant and default_val is not None:
raise Exception(
f"Module config invalid for step '{step.step_id}': both default value and constant provided for input '{input_name}'."
)
elif default_val is not None:
structure_defaults[pipeline_input_name] = default_val
step_input_point = StepInputRef(
step_id=step.step_id,
value_name=input_name,
value_schema=schema,
connected_pipeline_input=connected_pipeline_input.value_name,
connected_outputs=None,
)
connected_pipeline_input.connected_inputs.append(
step_input_point.address
)
data_flow_graph.add_edge(connected_pipeline_input, step_input_point)
data_flow_graph_simple.add_edge(connected_pipeline_input, step)
data_flow_graph.add_node(step_input_point, type=StepInputRef.__name__)
steps_details[step.step_id]["inputs"][input_name] = step_input_point
data_flow_graph.add_edge(step_input_point, step)
if other_step_dependency:
for module_id in other_step_dependency:
execution_graph.add_edge(module_id, step.step_id)
else:
execution_graph.add_edge("__root__", step.step_id)
# calculate execution order
path_lengths: Dict[str, int] = {}
for step in self.steps:
step_id = step.step_id
paths = list(nx.all_simple_paths(execution_graph, "__root__", step_id))
max_steps = max(paths, key=lambda x: len(x))
path_lengths[step_id] = len(max_steps) - 1
max_length = max(path_lengths.values())
for i in range(1, max_length + 1):
stage: List[str] = [m for m, length in path_lengths.items() if length == i]
processing_stages.append(stage)
for _step_id in stage:
steps_details[_step_id]["processing_stage"] = i
# steps_details[_step_id]["step"].processing_stage = i
self._constants = constants
self._defaults = structure_defaults
self._steps_details = steps_details
self._execution_graph = execution_graph
self._data_flow_graph = data_flow_graph
self._data_flow_graph_simple = data_flow_graph_simple
self._processing_stages = processing_stages
self._get_node_of_type.cache_clear()
# calculating which steps are always required to execute to compute one of the required pipeline outputs.
# this is done because in some cases it's possible that some steps can be skipped to execute if they
# don't have a valid input set, because the inputs downstream they are connecting to are 'non-required'
# optional_steps = []
# last_stage = self._processing_stages[-1]
#
# step_nodes: List[PipelineStep] = [
# node
# for node in self._data_flow_graph_simple.nodes
# if isinstance(node, PipelineStep)
# ]
#
# all_required_inputs = []
# for step_id in last_stage:
#
# step = self.get_step(step_id)
# step_nodes.remove(step)
#
# for k, s_inp in self.get_step_input_refs(step_id).items():
# if not s_inp.value_schema.is_required():
# continue
# all_required_inputs.append(s_inp)
#
# for pipeline_input in self.pipeline_input_refs.values():
#
# for last_step_input in all_required_inputs:
# try:
# path = nx.shortest_path(
# self._data_flow_graph_simple, pipeline_input, last_step_input
# )
# for p in path:
# if p in step_nodes:
# step_nodes.remove(p)
# except (NetworkXNoPath, NodeNotFound):
# pass
# # print("NO PATH")
# # print(f"{pipeline_input} -> {last_step_input}")
#
# for s in step_nodes:
# self._steps_details[s.step_id]["required"] = False
# # s.required = False
#
# for input_name, inp in self.pipeline_input_refs.items():
# steps = set()
# for ci in inp.connected_inputs:
# steps.add(ci.step_id)
#
# optional = True
# for step_id in steps:
# step = self.get_step(step_id)
# if self._steps_details[step_id]["required"]:
# optional = False
# break
# if optional:
# inp.value_schema.optional = True
Attributes¶
constants: Mapping[str, Any]
property
readonly
¶data_flow_graph: DiGraph
property
readonly
¶data_flow_graph_simple: DiGraph
property
readonly
¶defaults: Mapping[str, Any]
property
readonly
¶execution_graph: DiGraph
property
readonly
¶input_aliases: Dict[str, str]
pydantic-field
required
¶The input aliases.
output_aliases: Dict[str, str]
pydantic-field
required
¶The output aliases.
pipeline_config: PipelineConfig
pydantic-field
required
¶The underlying pipeline config.
pipeline_input_refs: Dict[str, kiara.models.module.pipeline.value_refs.PipelineInputRef]
property
readonly
¶pipeline_inputs_schema: Mapping[str, kiara.models.values.value_schema.ValueSchema]
property
readonly
¶pipeline_output_refs: Dict[str, kiara.models.module.pipeline.value_refs.PipelineOutputRef]
property
readonly
¶pipeline_outputs_schema: Mapping[str, kiara.models.values.value_schema.ValueSchema]
property
readonly
¶processing_stages: List[List[str]]
property
readonly
¶step_ids: Iterable[str]
property
readonly
¶steps: List[kiara.models.module.pipeline.PipelineStep]
pydantic-field
required
¶The pipeline steps
steps_details: Mapping[str, Any]
property
readonly
¶steps_input_refs: Dict[str, kiara.models.module.pipeline.value_refs.StepInputRef]
property
readonly
¶steps_output_refs: Dict[str, kiara.models.module.pipeline.value_refs.StepOutputRef]
property
readonly
¶Methods¶
get_processing_stage(self, step_id)
¶Return the processing stage for the specified step_id.
Returns the stage nr (starting with '1').
Source code in kiara/models/module/pipeline/structure.py
def get_processing_stage(self, step_id: str) -> int:
"""Return the processing stage for the specified step_id.
Returns the stage nr (starting with '1').
"""
for index, stage in enumerate(self.processing_stages, start=1):
if step_id in stage:
return index
raise Exception(f"Invalid step id '{step_id}'.")
get_step(self, step_id)
¶Source code in kiara/models/module/pipeline/structure.py
def get_step(self, step_id: str) -> PipelineStep:
d = self.steps_details.get(step_id, None)
if d is None:
raise Exception(f"No module with id: {step_id}")
return d["step"]
get_step_details(self, step_id)
¶Source code in kiara/models/module/pipeline/structure.py
def get_step_details(self, step_id: str) -> Mapping[str, Any]:
d = self.steps_details.get(step_id, None)
if d is None:
raise Exception(f"No module with id: {step_id}")
return d
get_step_input_refs(self, step_id)
¶Source code in kiara/models/module/pipeline/structure.py
def get_step_input_refs(self, step_id: str) -> Mapping[str, StepInputRef]:
d = self.steps_details.get(step_id, None)
if d is None:
raise Exception(f"No module with id: {step_id}")
return d["inputs"]
get_step_output_refs(self, step_id)
¶Source code in kiara/models/module/pipeline/structure.py
def get_step_output_refs(self, step_id: str) -> Mapping[str, StepOutputRef]:
d = self.steps_details.get(step_id, None)
if d is None:
raise Exception(f"No module with id: {step_id}")
return d["outputs"]
step_is_required(self, step_id)
¶Check if the specified step is required, or can be omitted.
Source code in kiara/models/module/pipeline/structure.py
def step_is_required(self, step_id: str) -> bool:
"""Check if the specified step is required, or can be omitted."""
return self.get_step_details(step_id=step_id)["required"]
validate_pipeline_config(values)
classmethod
¶Source code in kiara/models/module/pipeline/structure.py
@root_validator(pre=True)
def validate_pipeline_config(cls, values):
pipeline_config = values.get("pipeline_config", None)
if not pipeline_config:
raise ValueError("No 'pipeline_config' provided.")
if len(values) != 1:
raise ValueError(
"Only 'pipeline_config' key allowed when creating a pipeline structure object."
)
_config: PipelineConfig = pipeline_config
_steps: List[PipelineStep] = list(_config.steps)
_input_aliases: Dict[str, str] = dict(_config.input_aliases)
_output_aliases: Dict[str, str] = dict(_config.output_aliases)
invalid_input_aliases = [a for a in _input_aliases.values() if "." in a]
if invalid_input_aliases:
raise Exception(
f"Invalid input aliases, aliases can't contain special characters: {', '.join(invalid_input_aliases)}."
)
invalid_output_aliases = [a for a in _input_aliases.values() if "." in a]
if invalid_input_aliases:
raise Exception(
f"Invalid input aliases, aliases can't contain special characters: {', '.join(invalid_output_aliases)}."
)
valid_input_names = set()
for step in _steps:
for input_name in step.module.input_names:
valid_input_names.add(f"{step.step_id}.{input_name}")
invalid_input_aliases = [
a for a in _input_aliases.keys() if a not in valid_input_names
]
if invalid_input_aliases:
raise Exception(
f"Invalid input reference(s): {', '.join(invalid_input_aliases)}. Must be one of: {', '.join(valid_input_names)}"
)
valid_output_names = set()
for step in _steps:
for output_name in step.module.output_names:
valid_output_names.add(f"{step.step_id}.{output_name}")
invalid_output_names = [
a for a in _output_aliases.keys() if a not in valid_output_names
]
if invalid_output_names:
raise Exception(
f"Invalid output reference(s): {', '.join(invalid_output_names)}. Must be one of: {', '.join(valid_output_names)}"
)
values["steps"] = _steps
values["input_aliases"] = _input_aliases
values["output_aliases"] = _output_aliases
return values
generate_pipeline_endpoint_name(step_id, value_name)
¶
Source code in kiara/models/module/pipeline/structure.py
def generate_pipeline_endpoint_name(step_id: str, value_name: str):
return f"{step_id}__{value_name}"
value_refs
¶
Classes¶
PipelineInputRef (ValueRef)
pydantic-model
¶
An input to a pipeline.
Source code in kiara/models/module/pipeline/value_refs.py
class PipelineInputRef(ValueRef):
"""An input to a pipeline."""
connected_inputs: List[StepValueAddress] = Field(
description="The step inputs that are connected to this pipeline input",
default_factory=list,
)
is_constant: bool = Field(
"Whether this input is a constant and can't be changed by the user."
)
@property
def alias(self) -> str:
return generate_step_alias(PIPELINE_PARENT_MARKER, self.value_name)
PipelineOutputRef (ValueRef)
pydantic-model
¶
An output to a pipeline.
Source code in kiara/models/module/pipeline/value_refs.py
class PipelineOutputRef(ValueRef):
"""An output to a pipeline."""
connected_output: StepValueAddress = Field(description="Connected step outputs.")
@property
def alias(self) -> str:
return generate_step_alias(PIPELINE_PARENT_MARKER, self.value_name)
StepInputRef (ValueRef)
pydantic-model
¶
An input to a step.
This object can either have a 'connected_outputs' set, or a 'connected_pipeline_input', not both.
Source code in kiara/models/module/pipeline/value_refs.py
class StepInputRef(ValueRef):
"""An input to a step.
This object can either have a 'connected_outputs' set, or a 'connected_pipeline_input', not both.
"""
step_id: str = Field(description="The step id.")
connected_outputs: Optional[List[StepValueAddress]] = Field(
default=None,
description="A potential connected list of one or several module outputs.",
)
connected_pipeline_input: Optional[str] = Field(
default=None, description="A potential pipeline input."
)
is_constant: bool = Field(
"Whether this input is a constant and can't be changed by the user."
)
@root_validator(pre=True)
def ensure_single_connected_item(cls, values):
if values.get("connected_outputs", None) and values.get(
"connected_pipeline_input"
):
raise ValueError("Multiple connected items, only one allowed.")
return values
@property
def alias(self) -> str:
return generate_step_alias(self.step_id, self.value_name)
@property
def address(self) -> StepValueAddress:
return StepValueAddress(step_id=self.step_id, value_name=self.value_name)
def __str__(self):
name = camel_case_to_snake_case(self.__class__.__name__[0:-5], repl=" ")
return f"{name}: {self.step_id}.{self.value_name} ({self.value_schema.type})"
Attributes¶
address: StepValueAddress
property
readonly
¶alias: str
property
readonly
¶connected_outputs: List[kiara.models.module.pipeline.value_refs.StepValueAddress]
pydantic-field
¶A potential connected list of one or several module outputs.
connected_pipeline_input: str
pydantic-field
¶A potential pipeline input.
is_constant: bool
pydantic-field
¶step_id: str
pydantic-field
required
¶The step id.
ensure_single_connected_item(values)
classmethod
¶Source code in kiara/models/module/pipeline/value_refs.py
@root_validator(pre=True)
def ensure_single_connected_item(cls, values):
if values.get("connected_outputs", None) and values.get(
"connected_pipeline_input"
):
raise ValueError("Multiple connected items, only one allowed.")
return values
StepOutputRef (ValueRef)
pydantic-model
¶
An output to a step.
Source code in kiara/models/module/pipeline/value_refs.py
class StepOutputRef(ValueRef):
"""An output to a step."""
class Config:
allow_mutation = True
step_id: str = Field(description="The step id.")
pipeline_output: Optional[str] = Field(description="The connected pipeline output.")
connected_inputs: List[StepValueAddress] = Field(
description="The step inputs that are connected to this step output",
default_factory=list,
)
@property
def alias(self) -> str:
return generate_step_alias(self.step_id, self.value_name)
@property
def address(self) -> StepValueAddress:
return StepValueAddress(step_id=self.step_id, value_name=self.value_name)
def __str__(self):
name = camel_case_to_snake_case(self.__class__.__name__[0:-5], repl=" ")
return f"{name}: {self.step_id}.{self.value_name} ({self.value_schema.type})"
Attributes¶
address: StepValueAddress
property
readonly
¶alias: str
property
readonly
¶connected_inputs: List[kiara.models.module.pipeline.value_refs.StepValueAddress]
pydantic-field
¶The step inputs that are connected to this step output
pipeline_output: str
pydantic-field
¶The connected pipeline output.
step_id: str
pydantic-field
required
¶The step id.
Config
¶Source code in kiara/models/module/pipeline/value_refs.py
class Config:
allow_mutation = True
StepValueAddress (BaseModel)
pydantic-model
¶
Small model to describe the address of a value of a step, within a Pipeline/PipelineStructure.
Source code in kiara/models/module/pipeline/value_refs.py
class StepValueAddress(BaseModel):
"""Small model to describe the address of a value of a step, within a Pipeline/PipelineStructure."""
class Config:
extra = Extra.forbid
step_id: str = Field(description="The id of a step within a pipeline.")
value_name: str = Field(
description="The name of the value (output name or pipeline input name)."
)
sub_value: Optional[Dict[str, Any]] = Field(
default=None,
description="A reference to a subitem of a value (e.g. column, list item)",
)
@property
def alias(self):
"""An alias string for this address (in the form ``[step_id].[value_name]``)."""
return generate_step_alias(self.step_id, self.value_name)
def __eq__(self, other):
if not isinstance(other, StepValueAddress):
return False
return (self.step_id, self.value_name, self.sub_value) == (
other.step_id,
other.value_name,
other.sub_value,
)
def __hash__(self):
return hash((self.step_id, self.value_name, self.sub_value))
def __repr__(self):
if self.sub_value:
sub_value = f" sub_value={self.sub_value}"
else:
sub_value = ""
return f"{self.__class__.__name__}(step_id={self.step_id}, value_name={self.value_name}{sub_value})"
def __str__(self):
return self.__repr__()
Attributes¶
alias
property
readonly
¶An alias string for this address (in the form [step_id].[value_name]
).
step_id: str
pydantic-field
required
¶The id of a step within a pipeline.
sub_value: Dict[str, Any]
pydantic-field
¶A reference to a subitem of a value (e.g. column, list item)
value_name: str
pydantic-field
required
¶The name of the value (output name or pipeline input name).
Config
¶Source code in kiara/models/module/pipeline/value_refs.py
class Config:
extra = Extra.forbid
ValueRef (BaseModel)
pydantic-model
¶
An object that holds information about the location of a value within a pipeline (or other structure).
Basically, a ValueRef
helps the containing object where in its structure the value belongs (for example so
it can update dependent other values). A ValueRef
object (obviously) does not contain the value itself.
There are four different ValueRef type that are relevant for pipelines:
- [kiara.pipeline.values.StepInputRef][]: an input to a step
- [kiara.pipeline.values.StepOutputRef][]: an output of a step
- [kiara.pipeline.values.PipelineInputRef][]: an input to a pipeline
- [kiara.pipeline.values.PipelineOutputRef][]: an output for a pipeline
Several ValueRef
objects can target the same value, for example a step output and a connected step input would
reference the same Value
(in most cases)..
Source code in kiara/models/module/pipeline/value_refs.py
class ValueRef(BaseModel):
"""An object that holds information about the location of a value within a pipeline (or other structure).
Basically, a `ValueRef` helps the containing object where in its structure the value belongs (for example so
it can update dependent other values). A `ValueRef` object (obviously) does not contain the value itself.
There are four different ValueRef type that are relevant for pipelines:
- [kiara.pipeline.values.StepInputRef][]: an input to a step
- [kiara.pipeline.values.StepOutputRef][]: an output of a step
- [kiara.pipeline.values.PipelineInputRef][]: an input to a pipeline
- [kiara.pipeline.values.PipelineOutputRef][]: an output for a pipeline
Several `ValueRef` objects can target the same value, for example a step output and a connected step input would
reference the same `Value` (in most cases)..
"""
class Config:
allow_mutation = True
extra = Extra.forbid
_id: uuid.UUID = PrivateAttr(default_factory=uuid.uuid4)
value_name: str
value_schema: ValueSchema
# pipeline_id: str
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return self._id == other._id
def __hash__(self):
return hash(self._id)
def __repr__(self):
step_id = ""
if hasattr(self, "step_id"):
step_id = f" step_id='{self.step_id}'"
return f"{self.__class__.__name__}(value_name='{self.value_name}' {step_id})"
def __str__(self):
name = camel_case_to_snake_case(self.__class__.__name__[0:-5], repl=" ")
return f"{name}: {self.value_name} ({self.value_schema.type})"
generate_step_alias(step_id, value_name)
¶
Source code in kiara/models/module/pipeline/value_refs.py
def generate_step_alias(step_id: str, value_name):
return f"{step_id}.{value_name}"