kiara.pipeline.pipeline¶
Pipeline
¶
An instance of a PipelineStructure that holds state for all of the inputs/outputs of the steps within.
inputs: SlottedValueSet
property
readonly
¶
All (pipeline) input values of this pipeline.
outputs: SlottedValueSet
property
readonly
¶
All (pipeline) output values of this pipeline.
status: StepStatus
property
readonly
¶
Return the current status of this pipeline.
step_ids: Iterable[str]
property
readonly
¶
Return all ids of the steps of this pipeline.
add_listener(self, listener)
¶
Add a listener taht gets notified on any internal pipeline input/output events.
Source code in kiara/pipeline/pipeline.py
def add_listener(self, listener: PipelineListener) -> None:
"""Add a listener taht gets notified on any internal pipeline input/output events."""
self._listeners.append(listener)
get_pipeline_inputs_by_stage(self)
¶
Return a list of pipeline input names, ordered by stage they are first required.
Source code in kiara/pipeline/pipeline.py
def get_pipeline_inputs_by_stage(self) -> typing.Mapping[int, typing.Iterable[str]]:
"""Return a list of pipeline input names, ordered by stage they are first required."""
if self._inputs_by_stage is not None:
return self._inputs_by_stage
result: typing.Dict[int, typing.List[str]] = {}
for k, v in self.inputs._value_slots.items(): # type: ignore
refs = self._value_refs[v]
min_stage = sys.maxsize
for ref in refs:
if not isinstance(ref, StepInputRef):
continue
step = self.get_step(ref.step_id)
stage = step.processing_stage
assert stage is not None
if stage < min_stage:
min_stage = stage # type: ignore
result.setdefault(min_stage, []).append(k)
self._inputs_by_stage = result
return self._inputs_by_stage
get_pipeline_inputs_for_stage(self, stage)
¶
Return a list of pipeline inputs that are required for a stage to be processed.
The result of this method does not include inputs that were required earlier already.
Source code in kiara/pipeline/pipeline.py
def get_pipeline_inputs_for_stage(self, stage: int) -> typing.Iterable[str]:
"""Return a list of pipeline inputs that are required for a stage to be processed.
The result of this method does not include inputs that were required earlier already.
"""
return self.get_pipeline_inputs_by_stage().get(stage, [])
get_pipeline_outputs_by_stage(self)
¶
Return a list of pipeline input names, ordered by the stage that needs to be executed before they are available.
Source code in kiara/pipeline/pipeline.py
def get_pipeline_outputs_by_stage(
self,
) -> typing.Mapping[int, typing.Iterable[str]]:
"""Return a list of pipeline input names, ordered by the stage that needs to be executed before they are available."""
if self._outputs_by_stage is not None:
return self._outputs_by_stage
result: typing.Dict[int, typing.List[str]] = {}
for k, v in self.outputs._value_slots.items(): # type: ignore
refs = self._value_refs[v]
min_stage = sys.maxsize
for ref in refs:
if not isinstance(ref, StepOutputRef):
continue
step = self.get_step(ref.step_id)
stage = step.processing_stage
assert stage is not None
if stage < min_stage:
min_stage = stage # type: ignore
result.setdefault(min_stage, []).append(k)
self._outputs_by_stage = result
return self._outputs_by_stage
get_pipeline_outputs_for_stage(self, stage)
¶
Return a list of pipeline outputs that are first available after the specified stage completed processing.
Source code in kiara/pipeline/pipeline.py
def get_pipeline_outputs_for_stage(self, stage: int) -> typing.Iterable[str]:
"""Return a list of pipeline outputs that are first available after the specified stage completed processing."""
return self.get_pipeline_outputs_by_stage().get(stage, [])
get_step(self, step_id)
¶
Return the object representing a step in this workflow, identified by the step id.
Source code in kiara/pipeline/pipeline.py
def get_step(self, step_id: str) -> PipelineStep:
"""Return the object representing a step in this workflow, identified by the step id."""
return self._structure.get_step(step_id)
get_step_inputs(self, step_id)
¶
Return all inputs for a step id (incl. inputs that are not pipeline inputs but connected to other modules output).
Source code in kiara/pipeline/pipeline.py
def get_step_inputs(self, step_id: str) -> ValueSet:
"""Return all inputs for a step id (incl. inputs that are not pipeline inputs but connected to other modules output)."""
return self._step_inputs[step_id]
get_step_outputs(self, step_id)
¶
Return all outputs for a step id (incl. outputs that are not pipeline outputs).
Source code in kiara/pipeline/pipeline.py
def get_step_outputs(self, step_id: str) -> ValueSet:
"""Return all outputs for a step id (incl. outputs that are not pipeline outputs)."""
return self._step_outputs[step_id]
get_steps_by_stage(self)
¶
Return a all pipeline steps, ordered by stage they belong to.
Source code in kiara/pipeline/pipeline.py
def get_steps_by_stage(
self,
) -> typing.Mapping[int, typing.Mapping[str, PipelineStep]]:
"""Return a all pipeline steps, ordered by stage they belong to."""
if self._steps_by_stage is not None:
return self._steps_by_stage
result: typing.Dict[int, typing.Dict[str, PipelineStep]] = {}
for step_id in self.step_ids:
step = self.get_step(step_id)
stage = step.processing_stage
assert stage is not None
result.setdefault(stage, {})[step_id] = step
self._steps_by_stage = result
return self._steps_by_stage