Skip to content

kiara.pipeline.pipeline

Pipeline

An instance of a PipelineStructure that holds state for all of the inputs/outputs of the steps within.

inputs: SlottedValueSet property readonly

All (pipeline) input values of this pipeline.

outputs: SlottedValueSet property readonly

All (pipeline) output values of this pipeline.

status: StepStatus property readonly

Return the current status of this pipeline.

step_ids: Iterable[str] property readonly

Return all ids of the steps of this pipeline.

add_listener(self, listener)

Add a listener taht gets notified on any internal pipeline input/output events.

Source code in kiara/pipeline/pipeline.py
def add_listener(self, listener: PipelineListener) -> None:
    """Add a listener taht gets notified on any internal pipeline input/output events."""
    self._listeners.append(listener)

get_pipeline_inputs_by_stage(self)

Return a list of pipeline input names, ordered by stage they are first required.

Source code in kiara/pipeline/pipeline.py
def get_pipeline_inputs_by_stage(self) -> typing.Mapping[int, typing.Iterable[str]]:
    """Return a list of pipeline input names, ordered by stage they are first required."""

    if self._inputs_by_stage is not None:
        return self._inputs_by_stage

    result: typing.Dict[int, typing.List[str]] = {}
    for k, v in self.inputs._value_slots.items():  # type: ignore
        refs = self._value_refs[v]
        min_stage = sys.maxsize
        for ref in refs:
            if not isinstance(ref, StepInputRef):
                continue
            step = self.get_step(ref.step_id)
            stage = step.processing_stage
            assert stage is not None
            if stage < min_stage:
                min_stage = stage  # type: ignore
            result.setdefault(min_stage, []).append(k)

    self._inputs_by_stage = result
    return self._inputs_by_stage

get_pipeline_inputs_for_stage(self, stage)

Return a list of pipeline inputs that are required for a stage to be processed.

The result of this method does not include inputs that were required earlier already.

Source code in kiara/pipeline/pipeline.py
def get_pipeline_inputs_for_stage(self, stage: int) -> typing.Iterable[str]:
    """Return a list of pipeline inputs that are required for a stage to be processed.

    The result of this method does not include inputs that were required earlier already.
    """

    return self.get_pipeline_inputs_by_stage().get(stage, [])

get_pipeline_outputs_by_stage(self)

Return a list of pipeline input names, ordered by the stage that needs to be executed before they are available.

Source code in kiara/pipeline/pipeline.py
def get_pipeline_outputs_by_stage(
    self,
) -> typing.Mapping[int, typing.Iterable[str]]:
    """Return a list of pipeline input names, ordered by the stage that needs to be executed before they are available."""

    if self._outputs_by_stage is not None:
        return self._outputs_by_stage

    result: typing.Dict[int, typing.List[str]] = {}
    for k, v in self.outputs._value_slots.items():  # type: ignore
        refs = self._value_refs[v]
        min_stage = sys.maxsize
        for ref in refs:
            if not isinstance(ref, StepOutputRef):
                continue
            step = self.get_step(ref.step_id)
            stage = step.processing_stage
            assert stage is not None
            if stage < min_stage:
                min_stage = stage  # type: ignore
            result.setdefault(min_stage, []).append(k)

    self._outputs_by_stage = result
    return self._outputs_by_stage

get_pipeline_outputs_for_stage(self, stage)

Return a list of pipeline outputs that are first available after the specified stage completed processing.

Source code in kiara/pipeline/pipeline.py
def get_pipeline_outputs_for_stage(self, stage: int) -> typing.Iterable[str]:
    """Return a list of pipeline outputs that are first available after the specified stage completed processing."""

    return self.get_pipeline_outputs_by_stage().get(stage, [])

get_step(self, step_id)

Return the object representing a step in this workflow, identified by the step id.

Source code in kiara/pipeline/pipeline.py
def get_step(self, step_id: str) -> PipelineStep:
    """Return the object representing a step in this workflow, identified by the step id."""
    return self._structure.get_step(step_id)

get_step_inputs(self, step_id)

Return all inputs for a step id (incl. inputs that are not pipeline inputs but connected to other modules output).

Source code in kiara/pipeline/pipeline.py
def get_step_inputs(self, step_id: str) -> ValueSet:
    """Return all inputs for a step id (incl. inputs that are not pipeline inputs but connected to other modules output)."""
    return self._step_inputs[step_id]

get_step_outputs(self, step_id)

Return all outputs for a step id (incl. outputs that are not pipeline outputs).

Source code in kiara/pipeline/pipeline.py
def get_step_outputs(self, step_id: str) -> ValueSet:
    """Return all outputs for a step id (incl. outputs that are not pipeline outputs)."""
    return self._step_outputs[step_id]

get_steps_by_stage(self)

Return a all pipeline steps, ordered by stage they belong to.

Source code in kiara/pipeline/pipeline.py
def get_steps_by_stage(
    self,
) -> typing.Mapping[int, typing.Mapping[str, PipelineStep]]:
    """Return a all pipeline steps, ordered by stage they belong to."""

    if self._steps_by_stage is not None:
        return self._steps_by_stage

    result: typing.Dict[int, typing.Dict[str, PipelineStep]] = {}
    for step_id in self.step_ids:
        step = self.get_step(step_id)
        stage = step.processing_stage
        assert stage is not None
        result.setdefault(stage, {})[step_id] = step

    self._steps_by_stage = result
    return self._steps_by_stage