Skip to content

pipeline

Classes

PipelineConfig (KiaraModuleConfig) pydantic-model

A class to hold the configuration for a [PipelineModule][kiara.pipeline.module.PipelineModule].

If you want to control the pipeline input and output names, you need to have to provide a map that uses the autogenerated field name ([step_id]__[alias] -- 2 underscores!!) as key, and the desired field name as value. The reason that schema for the autogenerated field names exist is that it's hard to ensure the uniqueness of each field; some steps can have the same input field names, but will need different input values. In some cases, some inputs of different steps need the same input. Those sorts of things. So, to make sure that we always use the right values, I chose to implement a conservative default approach, accepting that in some cases the user will be prompted for duplicate inputs for the same value.

To remedy that, the pipeline creator has the option to manually specify a mapping to rename some or all of the input/output fields.

Further, because in a lot of cases there won't be any overlapping fields, the creator can specify auto, in which case Kiara will automatically create a mapping that tries to map autogenerated field names to the shortest possible names for each case.

Examples:

Configuration for a pipeline module that functions as a nand logic gate (in Python):

and_step = PipelineStepConfig(module_type="and", step_id="and")
not_step = PipelineStepConfig(module_type="not", step_id="not", input_links={"a": ["and.y"]}
nand_p_conf = PipelineConfig(doc="Returns 'False' if both inputs are 'True'.",
                    steps=[and_step, not_step],
                    input_aliases={
                        "and__a": "a",
                        "and__b": "b"
                    },
                    output_aliases={
                        "not__y": "y"
                    }}

Or, the same thing in json:

{
  "module_type_name": "nand",
  "doc": "Returns 'False' if both inputs are 'True'.",
  "steps": [
    {
      "module_type": "and",
      "step_id": "and"
    },
    {
      "module_type": "not",
      "step_id": "not",
      "input_links": {
        "a": "and.y"
      }
    }
  ],
  "input_aliases": {
    "and__a": "a",
    "and__b": "b"
  },
  "output_aliases": {
    "not__y": "y"
  }
}
Source code in kiara/models/module/pipeline/__init__.py
class PipelineConfig(KiaraModuleConfig):
    """A class to hold the configuration for a [PipelineModule][kiara.pipeline.module.PipelineModule].

    If you want to control the pipeline input and output names, you need to have to provide a map that uses the
    autogenerated field name ([step_id]__[alias] -- 2 underscores!!) as key, and the desired field name
    as value. The reason that schema for the autogenerated field names exist is that it's hard to ensure
    the uniqueness of each field; some steps can have the same input field names, but will need different input
    values. In some cases, some inputs of different steps need the same input. Those sorts of things.
    So, to make sure that we always use the right values, I chose to implement a conservative default approach,
    accepting that in some cases the user will be prompted for duplicate inputs for the same value.

    To remedy that, the pipeline creator has the option to manually specify a mapping to rename some or all of
    the input/output fields.

    Further, because in a lot of cases there won't be any overlapping fields, the creator can specify ``auto``,
    in which case *Kiara* will automatically create a mapping that tries to map autogenerated field names
    to the shortest possible names for each case.

    Examples:

        Configuration for a pipeline module that functions as a ``nand`` logic gate (in Python):

        ``` python
        and_step = PipelineStepConfig(module_type="and", step_id="and")
        not_step = PipelineStepConfig(module_type="not", step_id="not", input_links={"a": ["and.y"]}
        nand_p_conf = PipelineConfig(doc="Returns 'False' if both inputs are 'True'.",
                            steps=[and_step, not_step],
                            input_aliases={
                                "and__a": "a",
                                "and__b": "b"
                            },
                            output_aliases={
                                "not__y": "y"
                            }}
        ```

        Or, the same thing in json:

        ``` json
        {
          "module_type_name": "nand",
          "doc": "Returns 'False' if both inputs are 'True'.",
          "steps": [
            {
              "module_type": "and",
              "step_id": "and"
            },
            {
              "module_type": "not",
              "step_id": "not",
              "input_links": {
                "a": "and.y"
              }
            }
          ],
          "input_aliases": {
            "and__a": "a",
            "and__b": "b"
          },
          "output_aliases": {
            "not__y": "y"
          }
        }
        ```
    """

    _kiara_model_id = "instance.module_config.pipeline"

    @classmethod
    def from_file(
        cls,
        path: str,
        kiara: Optional["Kiara"] = None,
        # module_map: Optional[Mapping[str, Any]] = None,
    ):

        data = get_data_from_file(path)
        pipeline_name = data.pop("pipeline_name", None)
        if pipeline_name is None:
            pipeline_name = os.path.basename(path)

        return cls.from_config(pipeline_name=pipeline_name, data=data, kiara=kiara)

    @classmethod
    def from_config(
        cls,
        pipeline_name: str,
        data: Mapping[str, Any],
        kiara: Optional["Kiara"] = None,
        # module_map: Optional[Mapping[str, Any]] = None,
    ):

        if kiara is None:
            from kiara.context import Kiara

            kiara = Kiara.instance()

        if not kiara.operation_registry.is_initialized:
            kiara.operation_registry.operations  # noqa

        config = cls._from_config(pipeline_name=pipeline_name, data=data, kiara=kiara)
        return config

    @classmethod
    def _from_config(
        cls,
        pipeline_name: str,
        data: Mapping[str, Any],
        kiara: "Kiara",
        module_map: Optional[Mapping[str, Any]] = None,
    ):

        data = dict(data)
        steps = data.pop("steps")
        steps = PipelineStep.create_steps(*steps, kiara=kiara, module_map=module_map)
        data["steps"] = steps
        if not data.get("input_aliases"):
            data["input_aliases"] = create_input_alias_map(steps)
        if not data.get("output_aliases"):
            data["output_aliases"] = create_output_alias_map(steps)

        result = cls(pipeline_name=pipeline_name, **data)

        return result

    class Config:
        extra = Extra.ignore
        validate_assignment = True

    pipeline_name: str = Field(description="The name of this pipeline.")
    steps: List[PipelineStep] = Field(
        description="A list of steps/modules of this pipeline, and their connections.",
    )
    input_aliases: Dict[str, str] = Field(
        description="A map of input aliases, with the calculated (<step_id>__<input_name> -- double underscore!) name as key, and a string (the resulting workflow input alias) as value. Check the documentation for the config class for which marker strings can be used to automatically create this map if possible.",
    )
    output_aliases: Dict[str, str] = Field(
        description="A map of output aliases, with the calculated (<step_id>__<output_name> -- double underscore!) name as key, and a string (the resulting workflow output alias) as value.  Check the documentation for the config class for which marker strings can be used to automatically create this map if possible.",
    )
    doc: str = Field(
        default="-- n/a --", description="Documentation about what the pipeline does."
    )
    context: Dict[str, Any] = Field(
        default_factory=dict, description="Metadata for this workflow."
    )
    _structure: Optional["PipelineStructure"] = PrivateAttr(default=None)

    @validator("steps", pre=True)
    def _validate_steps(cls, v):

        if not v:
            raise ValueError(f"Invalid type for 'steps' value: {type(v)}")

        steps = []
        for step in v:
            if not step:
                raise ValueError("No step data provided.")
            if isinstance(step, PipelineStep):
                steps.append(step)
            elif isinstance(step, Mapping):
                steps.append(PipelineStep(**step))
            else:
                raise TypeError(step)
        return steps

    @property
    def structure(self) -> "PipelineStructure":

        if self._structure is not None:
            return self._structure

        from kiara.models.module.pipeline.structure import PipelineStructure

        self._structure = PipelineStructure(pipeline_config=self)
        return self._structure

    def create_renderable(self, **config: Any) -> RenderableType:

        return create_table_from_model_object(self, exclude_fields={"steps"})

    # def create_input_alias_map(self) -> Dict[str, str]:
    #
    #     aliases: Dict[str, List[str]] = {}
    #     for step in self.steps:
    #         field_names = step.module.input_names
    #         for field_name in field_names:
    #             aliases.setdefault(field_name, []).append(step.step_id)
    #
    #     result: Dict[str, str] = {}
    #     for field_name, step_ids in aliases.items():
    #         for step_id in step_ids:
    #             generated = generate_pipeline_endpoint_name(step_id, field_name)
    #             result[generated] = generated
    #
    #     return result
    #
    # def create_output_alias_map(self) -> Dict[str, str]:
    #
    #     aliases: Dict[str, List[str]] = {}
    #     for step in self.steps:
    #         field_names = step.module.input_names
    #         for field_name in field_names:
    #             aliases.setdefault(field_name, []).append(step.step_id)
    #
    #     result: Dict[str, str] = {}
    #     for field_name, step_ids in aliases.items():
    #         for step_id in step_ids:
    #             generated = generate_pipeline_endpoint_name(step_id, field_name)
    #             result[generated] = generated
    #
    #     return result

Attributes

context: Dict[str, Any] pydantic-field

Metadata for this workflow.

doc: str pydantic-field

Documentation about what the pipeline does.

input_aliases: Dict[str, str] pydantic-field required

A map of input aliases, with the calculated (__ -- double underscore!) name as key, and a string (the resulting workflow input alias) as value. Check the documentation for the config class for which marker strings can be used to automatically create this map if possible.

output_aliases: Dict[str, str] pydantic-field required

A map of output aliases, with the calculated (__ -- double underscore!) name as key, and a string (the resulting workflow output alias) as value. Check the documentation for the config class for which marker strings can be used to automatically create this map if possible.

pipeline_name: str pydantic-field required

The name of this pipeline.

steps: List[kiara.models.module.pipeline.PipelineStep] pydantic-field required

A list of steps/modules of this pipeline, and their connections.

structure: PipelineStructure property readonly
Config
Source code in kiara/models/module/pipeline/__init__.py
class Config:
    extra = Extra.ignore
    validate_assignment = True
extra
validate_assignment
create_renderable(self, **config)
Source code in kiara/models/module/pipeline/__init__.py
def create_renderable(self, **config: Any) -> RenderableType:

    return create_table_from_model_object(self, exclude_fields={"steps"})
from_config(pipeline_name, data, kiara=None) classmethod
Source code in kiara/models/module/pipeline/__init__.py
@classmethod
def from_config(
    cls,
    pipeline_name: str,
    data: Mapping[str, Any],
    kiara: Optional["Kiara"] = None,
    # module_map: Optional[Mapping[str, Any]] = None,
):

    if kiara is None:
        from kiara.context import Kiara

        kiara = Kiara.instance()

    if not kiara.operation_registry.is_initialized:
        kiara.operation_registry.operations  # noqa

    config = cls._from_config(pipeline_name=pipeline_name, data=data, kiara=kiara)
    return config
from_file(path, kiara=None) classmethod
Source code in kiara/models/module/pipeline/__init__.py
@classmethod
def from_file(
    cls,
    path: str,
    kiara: Optional["Kiara"] = None,
    # module_map: Optional[Mapping[str, Any]] = None,
):

    data = get_data_from_file(path)
    pipeline_name = data.pop("pipeline_name", None)
    if pipeline_name is None:
        pipeline_name = os.path.basename(path)

    return cls.from_config(pipeline_name=pipeline_name, data=data, kiara=kiara)

PipelineStep (Manifest) pydantic-model

A step within a pipeline-structure, includes information about it's connection(s) and other metadata.

Source code in kiara/models/module/pipeline/__init__.py
class PipelineStep(Manifest):
    """A step within a pipeline-structure, includes information about it's connection(s) and other metadata."""

    _kiara_model_id = "instance.pipeline_step"

    class Config:
        validate_assignment = True
        extra = Extra.forbid

    @classmethod
    def create_steps(
        cls,
        *steps: Mapping[str, Any],
        kiara: "Kiara",
        module_map: Optional[Mapping[str, Any]] = None,
    ) -> List["PipelineStep"]:

        if module_map is None:
            module_map = {}
        else:
            module_map = dict(module_map)

        if kiara.operation_registry.is_initialized:
            for op_id, op in kiara.operation_registry.operations.items():
                module_map[op_id] = {
                    "module_type": op.module_type,
                    "module_config": op.module_config,
                }

        result: List[PipelineStep] = []

        for step in steps:

            module_type = step.get("module_type", None)
            if not module_type:
                raise ValueError("Can't create step, no 'module_type' specified.")

            module_config = step.get("module_config", {})

            if module_type not in kiara.module_type_names:
                if module_type in module_map.keys():
                    resolved_module_type = module_map[module_type]["module_type"]
                    resolved_module_config = module_map[module_type]["module_config"]
                    manifest = kiara.create_manifest(
                        module_or_operation=resolved_module_type,
                        config=resolved_module_config,
                    )
                else:
                    raise Exception(f"Can't resolve module type: {module_type}")
            else:
                manifest = kiara.create_manifest(
                    module_or_operation=module_type, config=module_config
                )
                resolved_module_type = module_type
                resolved_module_config = module_config

            module = kiara.create_module(manifest=manifest)

            step_id = step.get("step_id", None)
            if not step_id:
                raise ValueError("Can't create step, no 'step_id' specified.")

            input_links = {}
            for input_field, sources in step.get("input_links", {}).items():
                if isinstance(sources, str):
                    sources = [sources]
                    input_links[input_field] = sources

            # TODO: do we really need the deepcopy here?
            _s = PipelineStep(
                step_id=step_id,
                module_type=resolved_module_type,
                module_config=dict(resolved_module_config),
                input_links=input_links,  # type: ignore
                module_details=KiaraModuleClass.from_module(module=module),
            )
            _s._module = module
            result.append(_s)

        return result

    @validator("step_id")
    def _validate_step_id(cls, v):

        assert isinstance(v, str)
        if "." in v:
            raise ValueError("Step ids can't contain '.' characters.")

        return v

    step_id: str = Field(
        description="Locally unique id (within a pipeline) of this step."
    )

    module_type: str = Field(description="The module type.")
    module_config: Dict[str, Any] = Field(
        description="The module config.", default_factory=dict
    )
    # required: bool = Field(
    #     description="Whether this step is required within the workflow.\n\nIn some cases, when none of the pipeline outputs have a required input that connects to a step, then it is not necessary for this step to have been executed, even if it is placed before a step in the execution hierarchy. This also means that the pipeline inputs that are connected to this step might not be required.",
    #     default=True,
    # )
    # processing_stage: Optional[int] = Field(
    #     default=None,
    #     description="The stage number this step is executed within the pipeline.",
    # )
    input_links: Mapping[str, List[StepValueAddress]] = Field(
        description="The links that connect to inputs of the module.",
        default_factory=list,
    )
    module_details: KiaraModuleClass = Field(
        description="The class of the underlying module."
    )
    _module: Optional["KiaraModule"] = PrivateAttr(default=None)

    @root_validator(pre=True)
    def create_step_id(cls, values):

        if "module_type" not in values:
            raise ValueError("No 'module_type' specified.")
        if "step_id" not in values or not values["step_id"]:
            values["step_id"] = slugify(values["module_type"])

        return values

    @validator("step_id")
    def ensure_valid_id(cls, v):

        # TODO: check with regex
        if "." in v or " " in v:
            raise ValueError(
                f"Step id can't contain special characters or whitespaces: {v}"
            )

        return v

    @validator("module_config", pre=True)
    def ensure_dict(cls, v):

        if v is None:
            v = {}
        return v

    @validator("input_links", pre=True)
    def ensure_input_links_valid(cls, v):

        if v is None:
            v = {}

        result = {}
        for input_name, output in v.items():

            input_links = ensure_step_value_addresses(
                default_field_name=input_name, link=output
            )
            result[input_name] = input_links

        return result

    @property
    def module(self) -> "KiaraModule":
        if self._module is None:
            m_cls = self.module_details.get_class()
            self._module = m_cls(module_config=self.module_config)
        return self._module

    def __repr__(self):

        return f"{self.__class__.__name__}(step_id={self.step_id} module_type={self.module_type})"

    def __str__(self):
        return f"step: {self.step_id} (module: {self.module_type})"

Attributes

The links that connect to inputs of the module.

module: KiaraModule property readonly
module_details: KiaraModuleClass pydantic-field required

The class of the underlying module.

step_id: str pydantic-field required

Locally unique id (within a pipeline) of this step.

Config
Source code in kiara/models/module/pipeline/__init__.py
class Config:
    validate_assignment = True
    extra = Extra.forbid
extra
validate_assignment
create_step_id(values) classmethod
Source code in kiara/models/module/pipeline/__init__.py
@root_validator(pre=True)
def create_step_id(cls, values):

    if "module_type" not in values:
        raise ValueError("No 'module_type' specified.")
    if "step_id" not in values or not values["step_id"]:
        values["step_id"] = slugify(values["module_type"])

    return values
create_steps(*steps, *, kiara, module_map=None) classmethod
Source code in kiara/models/module/pipeline/__init__.py
@classmethod
def create_steps(
    cls,
    *steps: Mapping[str, Any],
    kiara: "Kiara",
    module_map: Optional[Mapping[str, Any]] = None,
) -> List["PipelineStep"]:

    if module_map is None:
        module_map = {}
    else:
        module_map = dict(module_map)

    if kiara.operation_registry.is_initialized:
        for op_id, op in kiara.operation_registry.operations.items():
            module_map[op_id] = {
                "module_type": op.module_type,
                "module_config": op.module_config,
            }

    result: List[PipelineStep] = []

    for step in steps:

        module_type = step.get("module_type", None)
        if not module_type:
            raise ValueError("Can't create step, no 'module_type' specified.")

        module_config = step.get("module_config", {})

        if module_type not in kiara.module_type_names:
            if module_type in module_map.keys():
                resolved_module_type = module_map[module_type]["module_type"]
                resolved_module_config = module_map[module_type]["module_config"]
                manifest = kiara.create_manifest(
                    module_or_operation=resolved_module_type,
                    config=resolved_module_config,
                )
            else:
                raise Exception(f"Can't resolve module type: {module_type}")
        else:
            manifest = kiara.create_manifest(
                module_or_operation=module_type, config=module_config
            )
            resolved_module_type = module_type
            resolved_module_config = module_config

        module = kiara.create_module(manifest=manifest)

        step_id = step.get("step_id", None)
        if not step_id:
            raise ValueError("Can't create step, no 'step_id' specified.")

        input_links = {}
        for input_field, sources in step.get("input_links", {}).items():
            if isinstance(sources, str):
                sources = [sources]
                input_links[input_field] = sources

        # TODO: do we really need the deepcopy here?
        _s = PipelineStep(
            step_id=step_id,
            module_type=resolved_module_type,
            module_config=dict(resolved_module_config),
            input_links=input_links,  # type: ignore
            module_details=KiaraModuleClass.from_module(module=module),
        )
        _s._module = module
        result.append(_s)

    return result
ensure_dict(v) classmethod
Source code in kiara/models/module/pipeline/__init__.py
@validator("module_config", pre=True)
def ensure_dict(cls, v):

    if v is None:
        v = {}
    return v
Source code in kiara/models/module/pipeline/__init__.py
@validator("input_links", pre=True)
def ensure_input_links_valid(cls, v):

    if v is None:
        v = {}

    result = {}
    for input_name, output in v.items():

        input_links = ensure_step_value_addresses(
            default_field_name=input_name, link=output
        )
        result[input_name] = input_links

    return result
ensure_valid_id(v) classmethod
Source code in kiara/models/module/pipeline/__init__.py
@validator("step_id")
def ensure_valid_id(cls, v):

    # TODO: check with regex
    if "." in v or " " in v:
        raise ValueError(
            f"Step id can't contain special characters or whitespaces: {v}"
        )

    return v

StepStatus (Enum)

Enum to describe the state of a workflow.

Source code in kiara/models/module/pipeline/__init__.py
class StepStatus(Enum):
    """Enum to describe the state of a workflow."""

    INPUTS_INVALID = "inputs_invalid"
    INPUTS_READY = "inputs_ready"
    RESULTS_READY = "results_ready"
INPUTS_INVALID
INPUTS_READY
RESULTS_READY

create_input_alias_map(steps)

Source code in kiara/models/module/pipeline/__init__.py
def create_input_alias_map(steps: Iterable[PipelineStep]) -> Dict[str, str]:

    aliases: Dict[str, str] = {}
    for step in steps:
        field_names = step.module.input_names
        for field_name in field_names:
            alias = generate_pipeline_endpoint_name(
                step_id=step.step_id, value_name=field_name
            )
            assert alias not in aliases.keys()
            aliases[f"{step.step_id}.{field_name}"] = alias

    return aliases

create_output_alias_map(steps)

Source code in kiara/models/module/pipeline/__init__.py
def create_output_alias_map(steps: Iterable[PipelineStep]) -> Dict[str, str]:

    aliases: Dict[str, str] = {}
    for step in steps:
        field_names = step.module.output_names
        for field_name in field_names:
            alias = generate_pipeline_endpoint_name(
                step_id=step.step_id, value_name=field_name
            )
            assert alias not in aliases.keys()
            aliases[f"{step.step_id}.{field_name}"] = alias

    return aliases

generate_pipeline_endpoint_name(step_id, value_name)

Source code in kiara/models/module/pipeline/__init__.py
def generate_pipeline_endpoint_name(step_id: str, value_name: str):

    return f"{step_id}__{value_name}"

Modules

controller

logger

Classes

PipelineController (PipelineListener)
Source code in kiara/models/module/pipeline/controller.py
class PipelineController(PipelineListener):

    pass
SinglePipelineBatchController (SinglePipelineController)

A [PipelineController][kiara.models.modules.pipeline.controller.PipelineController] that executes all pipeline steps non-interactively.

This is the default implementation of a PipelineController, and probably the most simple implementation of one. It waits until all inputs are set, after which it executes all pipeline steps in the required order.

Parameters:

Name Type Description Default
pipeline Pipeline

the pipeline to control

required
auto_process bool

whether to automatically start processing the pipeline as soon as the input set is valid

True
Source code in kiara/models/module/pipeline/controller.py
class SinglePipelineBatchController(SinglePipelineController):
    """A [PipelineController][kiara.models.modules.pipeline.controller.PipelineController] that executes all pipeline steps non-interactively.

    This is the default implementation of a ``PipelineController``, and probably the most simple implementation of one.
    It waits until all inputs are set, after which it executes all pipeline steps in the required order.

    Arguments:
        pipeline: the pipeline to control
        auto_process: whether to automatically start processing the pipeline as soon as the input set is valid
    """

    def __init__(
        self,
        pipeline: Pipeline,
        job_registry: JobRegistry,
        auto_process: bool = True,
    ):

        self._auto_process: bool = auto_process
        self._is_running: bool = False
        super().__init__(pipeline=pipeline, job_registry=job_registry)

    @property
    def auto_process(self) -> bool:
        return self._auto_process

    @auto_process.setter
    def auto_process(self, auto_process: bool):
        self._auto_process = auto_process

    def process_pipeline(self):

        log = logger.bind(pipeline_id=self.pipeline.pipeline_id)
        if self._is_running:
            log.debug(
                "ignore.pipeline_process",
                reason="Pipeline already running.",
            )
            raise Exception("Pipeline already running.")

        log.debug("execute.pipeline")
        self._is_running = True
        try:
            for idx, stage in enumerate(
                self.pipeline.structure.processing_stages, start=1
            ):

                log.debug(
                    "execute.pipeline.stage",
                    stage=idx,
                )

                job_ids = {}
                for step_id in stage:

                    log.debug(
                        "execute.pipeline.step",
                        step_id=step_id,
                    )

                    try:
                        job_id = self.process_step(step_id)
                        job_ids[step_id] = job_id
                    except Exception as e:
                        # TODO: cancel running jobs?
                        if is_debug():
                            import traceback

                            traceback.print_exc()
                        log.error(
                            "error.processing.pipeline",
                            step_id=step_id,
                            error=e,
                        )
                        return False

                self.set_processing_results(job_ids=job_ids)
                log.debug(
                    "execute_finished.pipeline.stage",
                    stage=idx,
                )

        finally:
            self._is_running = False

        log.debug("execute_finished.pipeline")
auto_process: bool property writable
process_pipeline(self)
Source code in kiara/models/module/pipeline/controller.py
def process_pipeline(self):

    log = logger.bind(pipeline_id=self.pipeline.pipeline_id)
    if self._is_running:
        log.debug(
            "ignore.pipeline_process",
            reason="Pipeline already running.",
        )
        raise Exception("Pipeline already running.")

    log.debug("execute.pipeline")
    self._is_running = True
    try:
        for idx, stage in enumerate(
            self.pipeline.structure.processing_stages, start=1
        ):

            log.debug(
                "execute.pipeline.stage",
                stage=idx,
            )

            job_ids = {}
            for step_id in stage:

                log.debug(
                    "execute.pipeline.step",
                    step_id=step_id,
                )

                try:
                    job_id = self.process_step(step_id)
                    job_ids[step_id] = job_id
                except Exception as e:
                    # TODO: cancel running jobs?
                    if is_debug():
                        import traceback

                        traceback.print_exc()
                    log.error(
                        "error.processing.pipeline",
                        step_id=step_id,
                        error=e,
                    )
                    return False

            self.set_processing_results(job_ids=job_ids)
            log.debug(
                "execute_finished.pipeline.stage",
                stage=idx,
            )

    finally:
        self._is_running = False

    log.debug("execute_finished.pipeline")
SinglePipelineController (PipelineController)
Source code in kiara/models/module/pipeline/controller.py
class SinglePipelineController(PipelineController):
    def __init__(self, pipeline: Pipeline, job_registry: JobRegistry):

        self._pipeline: Pipeline = pipeline
        self._job_registry: JobRegistry = job_registry
        self._pipeline.add_listener(self)
        self._pipeline_details: Optional[PipelineDetails] = None

    @property
    def pipeline(self) -> Pipeline:
        return self._pipeline

    def current_pipeline_state(self) -> PipelineDetails:

        if self._pipeline_details is None:
            self._pipeline_details = self.pipeline.get_pipeline_details()
        return self._pipeline_details

    def can_be_processed(self, step_id: str) -> bool:
        """Check whether the step with the provided id is ready to be processed."""

        pipeline_state = self.current_pipeline_state()
        step_state = pipeline_state.step_states[step_id]

        return not step_state.invalid_details

    def can_be_skipped(self, step_id: str) -> bool:
        """Check whether the processing of a step can be skipped."""

        required = self.pipeline.structure.step_is_required(step_id=step_id)
        if required:
            required = self.can_be_processed(step_id)
        return required

    def _pipeline_event_occurred(self, event: PipelineEvent):

        self._pipeline_details = None

    def set_processing_results(self, job_ids: Mapping[str, uuid.UUID]):

        self._job_registry.wait_for(*job_ids.values())

        combined_outputs = {}
        for step_id, job_id in job_ids.items():
            record = self._job_registry.get_job_record_in_session(job_id=job_id)
            combined_outputs[step_id] = record.outputs

        self.pipeline.set_multiple_step_outputs(
            changed_outputs=combined_outputs, notify_listeners=True
        )

    def pipeline_is_ready(self) -> bool:
        """Return whether the pipeline is ready to be processed.

        A ``True`` result means that all pipeline inputs are set with valid values, and therefore every step within the
        pipeline can be processed.

        Returns:
            whether the pipeline can be processed as a whole (``True``) or not (``False``)
        """

        pipeline_inputs = self.pipeline._all_values.get_alias("pipeline.inputs")
        assert pipeline_inputs is not None
        return pipeline_inputs.all_items_valid

    def process_step(self, step_id: str, wait: bool = False) -> uuid.UUID:
        """Kick off processing for the step with the provided id.

        Arguments:
            step_id: the id of the step that should be started
        """

        job_config = self.pipeline.create_job_config_for_step(step_id)

        job_id = self._job_registry.execute_job(job_config=job_config)
        # job_id = self._processor.create_job(job_config=job_config)
        # self._processor.queue_job(job_id=job_id)

        if wait:
            self._job_registry.wait_for(job_id)

        return job_id
pipeline: Pipeline property readonly
Methods
can_be_processed(self, step_id)

Check whether the step with the provided id is ready to be processed.

Source code in kiara/models/module/pipeline/controller.py
def can_be_processed(self, step_id: str) -> bool:
    """Check whether the step with the provided id is ready to be processed."""

    pipeline_state = self.current_pipeline_state()
    step_state = pipeline_state.step_states[step_id]

    return not step_state.invalid_details
can_be_skipped(self, step_id)

Check whether the processing of a step can be skipped.

Source code in kiara/models/module/pipeline/controller.py
def can_be_skipped(self, step_id: str) -> bool:
    """Check whether the processing of a step can be skipped."""

    required = self.pipeline.structure.step_is_required(step_id=step_id)
    if required:
        required = self.can_be_processed(step_id)
    return required
current_pipeline_state(self)
Source code in kiara/models/module/pipeline/controller.py
def current_pipeline_state(self) -> PipelineDetails:

    if self._pipeline_details is None:
        self._pipeline_details = self.pipeline.get_pipeline_details()
    return self._pipeline_details
pipeline_is_ready(self)

Return whether the pipeline is ready to be processed.

A True result means that all pipeline inputs are set with valid values, and therefore every step within the pipeline can be processed.

Returns:

Type Description
bool

whether the pipeline can be processed as a whole (True) or not (False)

Source code in kiara/models/module/pipeline/controller.py
def pipeline_is_ready(self) -> bool:
    """Return whether the pipeline is ready to be processed.

    A ``True`` result means that all pipeline inputs are set with valid values, and therefore every step within the
    pipeline can be processed.

    Returns:
        whether the pipeline can be processed as a whole (``True``) or not (``False``)
    """

    pipeline_inputs = self.pipeline._all_values.get_alias("pipeline.inputs")
    assert pipeline_inputs is not None
    return pipeline_inputs.all_items_valid
process_step(self, step_id, wait=False)

Kick off processing for the step with the provided id.

Parameters:

Name Type Description Default
step_id str

the id of the step that should be started

required
Source code in kiara/models/module/pipeline/controller.py
def process_step(self, step_id: str, wait: bool = False) -> uuid.UUID:
    """Kick off processing for the step with the provided id.

    Arguments:
        step_id: the id of the step that should be started
    """

    job_config = self.pipeline.create_job_config_for_step(step_id)

    job_id = self._job_registry.execute_job(job_config=job_config)
    # job_id = self._processor.create_job(job_config=job_config)
    # self._processor.queue_job(job_id=job_id)

    if wait:
        self._job_registry.wait_for(job_id)

    return job_id
set_processing_results(self, job_ids)
Source code in kiara/models/module/pipeline/controller.py
def set_processing_results(self, job_ids: Mapping[str, uuid.UUID]):

    self._job_registry.wait_for(*job_ids.values())

    combined_outputs = {}
    for step_id, job_id in job_ids.items():
        record = self._job_registry.get_job_record_in_session(job_id=job_id)
        combined_outputs[step_id] = record.outputs

    self.pipeline.set_multiple_step_outputs(
        changed_outputs=combined_outputs, notify_listeners=True
    )

pipeline

Classes

Pipeline

An instance of a [PipelineStructure][kiara.pipeline.structure.PipelineStructure] that holds state for all of the inputs/outputs of the steps within.

Source code in kiara/models/module/pipeline/pipeline.py
class Pipeline(object):
    """An instance of a [PipelineStructure][kiara.pipeline.structure.PipelineStructure] that holds state for all of the inputs/outputs of the steps within."""

    def __init__(self, structure: PipelineStructure, data_registry: DataRegistry):

        self._id: uuid.UUID = uuid.uuid4()

        self._structure: PipelineStructure = structure

        self._value_refs: Mapping[AliasValueMap, Iterable[ValueRef]] = None  # type: ignore
        # self._status: StepStatus = StepStatus.STALE

        self._steps_by_stage: Dict[int, Dict[str, PipelineStep]] = None  # type: ignore
        self._inputs_by_stage: Dict[int, List[str]] = None  # type: ignore
        self._outputs_by_stage: Dict[int, List[str]] = None  # type: ignore

        self._data_registry: DataRegistry = data_registry

        self._all_values: AliasValueMap = None  # type: ignore

        self._init_values()

        self._listeners: List[PipelineListener] = []

        # self._update_status()

    @property
    def pipeline_id(self) -> uuid.UUID:
        return self._id

    @property
    def kiara_id(self) -> uuid.UUID:
        return self._data_registry.kiara_id

    def _init_values(self):
        """Initialize this object. This should only be called once.

        Basically, this goes through all the inputs and outputs of all steps, and 'allocates' a PipelineValueInfo object
        for each of them. In case where output/input or pipeline-input/input points are connected, only one
        value item is allocated, since those refer to the same value.
        """

        values = AliasValueMap(
            alias=str(self.id), version=0, assoc_value=None, values_schema={}
        )
        values._data_registry = self._data_registry
        for field_name, schema in self._structure.pipeline_inputs_schema.items():
            values.set_alias_schema(f"pipeline.inputs.{field_name}", schema=schema)
        for field_name, schema in self._structure.pipeline_outputs_schema.items():
            values.set_alias_schema(f"pipeline.outputs.{field_name}", schema=schema)
        for step_id in self.step_ids:
            step = self.get_step(step_id)
            for field_name, value_schema in step.module.inputs_schema.items():
                values.set_alias_schema(
                    f"steps.{step_id}.inputs.{field_name}", schema=value_schema
                )
            for field_name, value_schema in step.module.outputs_schema.items():
                values.set_alias_schema(
                    f"steps.{step_id}.outputs.{field_name}", schema=value_schema
                )

        self._all_values = values

    def __eq__(self, other):

        if not isinstance(other, Pipeline):
            return False

        return self._id == other._id

    def __hash__(self):

        return hash(self._id)

    def add_listener(self, listener: PipelineListener):

        self._listeners.append(listener)

    @property
    def id(self) -> uuid.UUID:
        return self._id

    @property
    def structure(self) -> PipelineStructure:
        return self._structure

    def get_current_pipeline_inputs(self) -> Dict[str, uuid.UUID]:
        """All (pipeline) input values of this pipeline."""

        alias_map = self._all_values.get_alias("pipeline.inputs")
        return alias_map.get_all_value_ids()  # type: ignore

    def get_current_pipeline_outputs(self) -> Dict[str, uuid.UUID]:
        """All (pipeline) output values of this pipeline."""

        alias_map = self._all_values.get_alias("pipeline.outputs")
        return alias_map.get_all_value_ids()  # type: ignore

    def get_current_step_inputs(self, step_id) -> Dict[str, uuid.UUID]:

        alias_map = self._all_values.get_alias(f"steps.{step_id}.inputs")
        return alias_map.get_all_value_ids()  # type: ignore

    def get_current_step_outputs(self, step_id) -> Dict[str, uuid.UUID]:

        alias_map = self._all_values.get_alias(f"steps.{step_id}.outputs")
        return alias_map.get_all_value_ids()  # type: ignore

    def get_inputs_for_steps(self, *step_ids: str) -> Dict[str, Dict[str, uuid.UUID]]:
        """Retrieve value ids for the inputs of the specified steps (or all steps, if no argument provided."""

        result = {}
        for step_id in self._structure.step_ids:
            if step_ids and step_id not in step_ids:
                continue
            ids = self.get_current_step_inputs(step_id=step_id)
            result[step_id] = ids
        return result

    def get_outputs_for_steps(self, *step_ids: str) -> Dict[str, Dict[str, uuid.UUID]]:
        """Retrieve value ids for the outputs of the specified steps (or all steps, if no argument provided."""

        result = {}
        for step_id in self._structure.step_ids:
            if step_ids and step_id not in step_ids:
                continue
            ids = self.get_current_step_outputs(step_id=step_id)
            result[step_id] = ids
        return result

    def _notify_pipeline_listeners(self, event: PipelineEvent):

        for listener in self._listeners:
            listener._pipeline_event_occurred(event=event)

    def get_pipeline_details(self) -> PipelineDetails:

        pipeline_inputs = self._all_values.get_alias("pipeline.inputs")
        pipeline_outputs = self._all_values.get_alias("pipeline.outputs")
        assert pipeline_inputs is not None
        assert pipeline_outputs is not None

        invalid = pipeline_inputs.check_invalid()
        if not invalid:
            status = StepStatus.INPUTS_READY
            step_outputs = self._all_values.get_alias("pipeline.outputs")
            assert step_outputs is not None
            invalid_outputs = step_outputs.check_invalid()
            # TODO: also check that all the pedigrees match up with current inputs
            if not invalid_outputs:
                status = StepStatus.RESULTS_READY
        else:
            status = StepStatus.INPUTS_INVALID

        step_states = {}
        for step_id in self._structure.step_ids:
            d = self.get_step_details(step_id)
            step_states[step_id] = d

        details = PipelineDetails.construct(
            kiara_id=self._data_registry.kiara_id,
            pipeline_id=self.pipeline_id,
            pipeline_status=status,
            pipeline_inputs=pipeline_inputs.get_all_value_ids(),
            pipeline_outputs=pipeline_outputs.get_all_value_ids(),
            invalid_details=invalid,
            step_states=step_states,
        )

        return details

    def get_step_details(self, step_id: str) -> StepDetails:

        step_input_ids = self.get_current_step_inputs(step_id=step_id)
        step_output_ids = self.get_current_step_outputs(step_id=step_id)
        step_inputs = self._all_values.get_alias(f"steps.{step_id}.inputs")

        assert step_inputs is not None
        invalid = step_inputs.check_invalid()

        processing_stage = self._structure.get_processing_stage(step_id)

        if not invalid:
            status = StepStatus.INPUTS_READY
            step_outputs = self._all_values.get_alias(f"steps.{step_id}.outputs")
            assert step_outputs is not None
            invalid_outputs = step_outputs.check_invalid()
            # TODO: also check that all the pedigrees match up with current inputs
            if not invalid_outputs:
                status = StepStatus.RESULTS_READY
        else:
            status = StepStatus.INPUTS_INVALID

        details = StepDetails.construct(
            kiara_id=self._data_registry.kiara_id,
            pipeline_id=self.pipeline_id,
            step_id=step_id,
            status=status,
            inputs=step_input_ids,
            outputs=step_output_ids,
            invalid_details=invalid,
            processing_stage=processing_stage,
        )
        return details

    def set_pipeline_inputs(
        self,
        inputs: Mapping[str, Any],
        sync_to_step_inputs: bool = True,
        notify_listeners: bool = True,
    ) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:

        values_to_set: Dict[str, uuid.UUID] = {}

        for k, v in inputs.items():
            if v is None:
                values_to_set[k] = NONE_VALUE_ID
            else:
                alias_map = self._all_values.get_alias("pipeline.inputs")
                assert alias_map is not None
                # dbg(alias_map.__dict__)
                schema = alias_map.values_schema[k]
                value = self._data_registry.register_data(
                    data=v, schema=schema, pedigree=ORPHAN, reuse_existing=True
                )
                values_to_set[k] = value.value_id

        changed_pipeline_inputs = self._set_values("pipeline.inputs", **values_to_set)

        changed_results = {"__pipeline__": {"inputs": changed_pipeline_inputs}}

        if sync_to_step_inputs:
            changed = self.sync_pipeline_inputs(notify_listeners=False)
            dpath.util.merge(changed_results, changed)  # type: ignore

        if notify_listeners:
            event = PipelineEvent.create_event(pipeline=self, changed=changed_results)
            self._notify_pipeline_listeners(event)

        return changed_results

    def sync_pipeline_inputs(
        self, notify_listeners: bool = True
    ) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:

        pipeline_inputs = self.get_current_pipeline_inputs()

        values_to_sync: Dict[str, Dict[str, Optional[uuid.UUID]]] = {}

        for field_name, ref in self._structure.pipeline_input_refs.items():
            for step_input in ref.connected_inputs:
                step_inputs = self.get_current_step_inputs(step_input.step_id)

                if step_inputs[step_input.value_name] != pipeline_inputs[field_name]:
                    values_to_sync.setdefault(step_input.step_id, {})[
                        step_input.value_name
                    ] = pipeline_inputs[field_name]

        results: Dict[str, Mapping[str, Mapping[str, ChangedValue]]] = {}
        for step_id in values_to_sync.keys():
            values = values_to_sync[step_id]
            step_changed = self._set_step_inputs(step_id=step_id, inputs=values)
            dpath.util.merge(results, step_changed)  # type: ignore

        if notify_listeners:
            event = PipelineEvent.create_event(pipeline=self, changed=results)
            self._notify_pipeline_listeners(event)

        return results

    def _set_step_inputs(
        self, step_id: str, inputs: Mapping[str, Optional[uuid.UUID]]
    ) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:

        changed_step_inputs = self._set_values(f"steps.{step_id}.inputs", **inputs)
        if not changed_step_inputs:
            return {}

        result: Dict[str, Dict[str, Dict[str, ChangedValue]]] = {
            step_id: {"inputs": changed_step_inputs}
        }

        step_outputs = self._structure.get_step_output_refs(step_id=step_id)
        null_outputs = {k: None for k in step_outputs.keys()}

        changed_outputs = self.set_step_outputs(
            step_id=step_id, outputs=null_outputs, notify_listeners=False
        )
        assert step_id not in changed_outputs.keys()

        result.update(changed_outputs)  # type: ignore

        return result

    def set_multiple_step_outputs(
        self,
        changed_outputs: Mapping[str, Mapping[str, Optional[uuid.UUID]]],
        notify_listeners: bool = True,
    ) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:

        results: Dict[str, Dict[str, Dict[str, ChangedValue]]] = {}
        for step_id, outputs in changed_outputs.items():
            step_results = self.set_step_outputs(
                step_id=step_id, outputs=outputs, notify_listeners=False
            )
            dpath.util.merge(results, step_results)  # type: ignore

        if notify_listeners:
            event = PipelineEvent.create_event(pipeline=self, changed=results)
            self._notify_pipeline_listeners(event)

        return results

    def set_step_outputs(
        self,
        step_id: str,
        outputs: Mapping[str, Optional[uuid.UUID]],
        notify_listeners: bool = True,
    ) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:

        # make sure pedigrees match with respective inputs?

        changed_step_outputs = self._set_values(f"steps.{step_id}.outputs", **outputs)
        if not changed_step_outputs:
            return {}

        result: Dict[str, Dict[str, Dict[str, ChangedValue]]] = {
            step_id: {"outputs": changed_step_outputs}
        }

        output_refs = self._structure.get_step_output_refs(step_id=step_id)

        pipeline_outputs: Dict[str, Optional[uuid.UUID]] = {}

        inputs_to_set: Dict[str, Dict[str, Optional[uuid.UUID]]] = {}

        for field_name, ref in output_refs.items():
            if ref.pipeline_output:
                assert ref.pipeline_output not in pipeline_outputs.keys()
                pipeline_outputs[ref.pipeline_output] = outputs[field_name]
            for input_ref in ref.connected_inputs:
                inputs_to_set.setdefault(input_ref.step_id, {})[
                    input_ref.value_name
                ] = outputs[field_name]

        for step_id, step_inputs in inputs_to_set.items():
            changed_step_fields = self._set_step_inputs(
                step_id=step_id, inputs=step_inputs
            )
            dpath.util.merge(result, changed_step_fields)  # type: ignore

        if pipeline_outputs:
            changed_pipeline_outputs = self._set_pipeline_outputs(**pipeline_outputs)
            dpath.util.merge(  # type: ignore
                result, {"__pipeline__": {"outputs": changed_pipeline_outputs}}
            )

        if notify_listeners:
            event = PipelineEvent.create_event(pipeline=self, changed=result)
            self._notify_pipeline_listeners(event)

        return result

    def _set_pipeline_outputs(
        self, **outputs: Optional[uuid.UUID]
    ) -> Mapping[str, ChangedValue]:

        changed_pipeline_outputs = self._set_values("pipeline.outputs", **outputs)
        return changed_pipeline_outputs

    def _set_values(
        self, alias: str, **values: Optional[uuid.UUID]
    ) -> Dict[str, ChangedValue]:
        """Set values (value-ids) for the sub-alias-map with the specified alias path."""

        invalid = {}
        for k in values.keys():
            _alias = self._all_values.get_alias(alias)
            assert _alias is not None
            if k not in _alias.values_schema.keys():
                invalid[
                    k
                ] = f"Invalid field '{k}'. Available fields: {', '.join(self.get_current_pipeline_inputs().keys())}"

        if invalid:
            raise InvalidValuesException(invalid_values=invalid)

        alias_map: Optional[AliasValueMap] = self._all_values.get_alias(alias)
        assert alias_map is not None

        values_to_set: Dict[str, Optional[uuid.UUID]] = {}
        current: Dict[str, Optional[uuid.UUID]] = {}
        changed: Dict[str, ChangedValue] = {}

        for field_name, new_value in values.items():

            current_value = self._all_values.get_alias(f"{alias}.{field_name}")
            if current_value is not None:
                current_value_id = current_value.assoc_value
            else:
                current_value_id = None
            current[field_name] = current_value_id

            if current_value_id != new_value:
                values_to_set[field_name] = new_value
                changed[field_name] = ChangedValue(old=current_value_id, new=new_value)

        _alias = self._all_values.get_alias(alias)
        assert _alias is not None
        _alias.set_aliases(**values_to_set)

        return changed

    @property
    def step_ids(self) -> Iterable[str]:
        """Return all ids of the steps of this pipeline."""
        return self._structure.step_ids

    def get_step(self, step_id: str) -> PipelineStep:
        """Return the object representing a step in this workflow, identified by the step id."""
        return self._structure.get_step(step_id)

    def get_steps_by_stage(
        self,
    ) -> Mapping[int, Mapping[str, PipelineStep]]:
        """Return a all pipeline steps, ordered by stage they belong to."""

        if self._steps_by_stage is not None:
            return self._steps_by_stage

        result: Dict[int, Dict[str, PipelineStep]] = {}
        for step_id in self.step_ids:
            step = self.get_step(step_id)
            stage = self._structure.get_processing_stage(step.step_id)
            assert stage is not None
            result.setdefault(stage, {})[step_id] = step

        self._steps_by_stage = result
        return self._steps_by_stage

    def create_job_config_for_step(self, step_id: str) -> JobConfig:

        step_inputs: Mapping[str, Optional[uuid.UUID]] = self.get_current_step_inputs(
            step_id
        )
        step_details: StepDetails = self.get_step_details(step_id=step_id)
        step: PipelineStep = self.get_step(step_id=step_id)

        # if the inputs are not valid, ignore this step
        if step_details.status == StepStatus.INPUTS_INVALID:
            invalid_details = step_details.invalid_details
            assert invalid_details is not None
            msg = f"Can't execute step '{step_id}', invalid inputs: {', '.join(invalid_details.keys())}"
            raise InvalidValuesException(msg=msg, invalid_values=invalid_details)

        job_config = JobConfig.create_from_module(
            data_registry=self._data_registry, module=step.module, inputs=step_inputs
        )
        return job_config
Attributes
id: UUID property readonly
kiara_id: UUID property readonly
pipeline_id: UUID property readonly
step_ids: Iterable[str] property readonly

Return all ids of the steps of this pipeline.

structure: PipelineStructure property readonly
Methods
add_listener(self, listener)
Source code in kiara/models/module/pipeline/pipeline.py
def add_listener(self, listener: PipelineListener):

    self._listeners.append(listener)
create_job_config_for_step(self, step_id)
Source code in kiara/models/module/pipeline/pipeline.py
def create_job_config_for_step(self, step_id: str) -> JobConfig:

    step_inputs: Mapping[str, Optional[uuid.UUID]] = self.get_current_step_inputs(
        step_id
    )
    step_details: StepDetails = self.get_step_details(step_id=step_id)
    step: PipelineStep = self.get_step(step_id=step_id)

    # if the inputs are not valid, ignore this step
    if step_details.status == StepStatus.INPUTS_INVALID:
        invalid_details = step_details.invalid_details
        assert invalid_details is not None
        msg = f"Can't execute step '{step_id}', invalid inputs: {', '.join(invalid_details.keys())}"
        raise InvalidValuesException(msg=msg, invalid_values=invalid_details)

    job_config = JobConfig.create_from_module(
        data_registry=self._data_registry, module=step.module, inputs=step_inputs
    )
    return job_config
get_current_pipeline_inputs(self)

All (pipeline) input values of this pipeline.

Source code in kiara/models/module/pipeline/pipeline.py
def get_current_pipeline_inputs(self) -> Dict[str, uuid.UUID]:
    """All (pipeline) input values of this pipeline."""

    alias_map = self._all_values.get_alias("pipeline.inputs")
    return alias_map.get_all_value_ids()  # type: ignore
get_current_pipeline_outputs(self)

All (pipeline) output values of this pipeline.

Source code in kiara/models/module/pipeline/pipeline.py
def get_current_pipeline_outputs(self) -> Dict[str, uuid.UUID]:
    """All (pipeline) output values of this pipeline."""

    alias_map = self._all_values.get_alias("pipeline.outputs")
    return alias_map.get_all_value_ids()  # type: ignore
get_current_step_inputs(self, step_id)
Source code in kiara/models/module/pipeline/pipeline.py
def get_current_step_inputs(self, step_id) -> Dict[str, uuid.UUID]:

    alias_map = self._all_values.get_alias(f"steps.{step_id}.inputs")
    return alias_map.get_all_value_ids()  # type: ignore
get_current_step_outputs(self, step_id)
Source code in kiara/models/module/pipeline/pipeline.py
def get_current_step_outputs(self, step_id) -> Dict[str, uuid.UUID]:

    alias_map = self._all_values.get_alias(f"steps.{step_id}.outputs")
    return alias_map.get_all_value_ids()  # type: ignore
get_inputs_for_steps(self, *step_ids)

Retrieve value ids for the inputs of the specified steps (or all steps, if no argument provided.

Source code in kiara/models/module/pipeline/pipeline.py
def get_inputs_for_steps(self, *step_ids: str) -> Dict[str, Dict[str, uuid.UUID]]:
    """Retrieve value ids for the inputs of the specified steps (or all steps, if no argument provided."""

    result = {}
    for step_id in self._structure.step_ids:
        if step_ids and step_id not in step_ids:
            continue
        ids = self.get_current_step_inputs(step_id=step_id)
        result[step_id] = ids
    return result
get_outputs_for_steps(self, *step_ids)

Retrieve value ids for the outputs of the specified steps (or all steps, if no argument provided.

Source code in kiara/models/module/pipeline/pipeline.py
def get_outputs_for_steps(self, *step_ids: str) -> Dict[str, Dict[str, uuid.UUID]]:
    """Retrieve value ids for the outputs of the specified steps (or all steps, if no argument provided."""

    result = {}
    for step_id in self._structure.step_ids:
        if step_ids and step_id not in step_ids:
            continue
        ids = self.get_current_step_outputs(step_id=step_id)
        result[step_id] = ids
    return result
get_pipeline_details(self)
Source code in kiara/models/module/pipeline/pipeline.py
def get_pipeline_details(self) -> PipelineDetails:

    pipeline_inputs = self._all_values.get_alias("pipeline.inputs")
    pipeline_outputs = self._all_values.get_alias("pipeline.outputs")
    assert pipeline_inputs is not None
    assert pipeline_outputs is not None

    invalid = pipeline_inputs.check_invalid()
    if not invalid:
        status = StepStatus.INPUTS_READY
        step_outputs = self._all_values.get_alias("pipeline.outputs")
        assert step_outputs is not None
        invalid_outputs = step_outputs.check_invalid()
        # TODO: also check that all the pedigrees match up with current inputs
        if not invalid_outputs:
            status = StepStatus.RESULTS_READY
    else:
        status = StepStatus.INPUTS_INVALID

    step_states = {}
    for step_id in self._structure.step_ids:
        d = self.get_step_details(step_id)
        step_states[step_id] = d

    details = PipelineDetails.construct(
        kiara_id=self._data_registry.kiara_id,
        pipeline_id=self.pipeline_id,
        pipeline_status=status,
        pipeline_inputs=pipeline_inputs.get_all_value_ids(),
        pipeline_outputs=pipeline_outputs.get_all_value_ids(),
        invalid_details=invalid,
        step_states=step_states,
    )

    return details
get_step(self, step_id)

Return the object representing a step in this workflow, identified by the step id.

Source code in kiara/models/module/pipeline/pipeline.py
def get_step(self, step_id: str) -> PipelineStep:
    """Return the object representing a step in this workflow, identified by the step id."""
    return self._structure.get_step(step_id)
get_step_details(self, step_id)
Source code in kiara/models/module/pipeline/pipeline.py
def get_step_details(self, step_id: str) -> StepDetails:

    step_input_ids = self.get_current_step_inputs(step_id=step_id)
    step_output_ids = self.get_current_step_outputs(step_id=step_id)
    step_inputs = self._all_values.get_alias(f"steps.{step_id}.inputs")

    assert step_inputs is not None
    invalid = step_inputs.check_invalid()

    processing_stage = self._structure.get_processing_stage(step_id)

    if not invalid:
        status = StepStatus.INPUTS_READY
        step_outputs = self._all_values.get_alias(f"steps.{step_id}.outputs")
        assert step_outputs is not None
        invalid_outputs = step_outputs.check_invalid()
        # TODO: also check that all the pedigrees match up with current inputs
        if not invalid_outputs:
            status = StepStatus.RESULTS_READY
    else:
        status = StepStatus.INPUTS_INVALID

    details = StepDetails.construct(
        kiara_id=self._data_registry.kiara_id,
        pipeline_id=self.pipeline_id,
        step_id=step_id,
        status=status,
        inputs=step_input_ids,
        outputs=step_output_ids,
        invalid_details=invalid,
        processing_stage=processing_stage,
    )
    return details
get_steps_by_stage(self)

Return a all pipeline steps, ordered by stage they belong to.

Source code in kiara/models/module/pipeline/pipeline.py
def get_steps_by_stage(
    self,
) -> Mapping[int, Mapping[str, PipelineStep]]:
    """Return a all pipeline steps, ordered by stage they belong to."""

    if self._steps_by_stage is not None:
        return self._steps_by_stage

    result: Dict[int, Dict[str, PipelineStep]] = {}
    for step_id in self.step_ids:
        step = self.get_step(step_id)
        stage = self._structure.get_processing_stage(step.step_id)
        assert stage is not None
        result.setdefault(stage, {})[step_id] = step

    self._steps_by_stage = result
    return self._steps_by_stage
set_multiple_step_outputs(self, changed_outputs, notify_listeners=True)
Source code in kiara/models/module/pipeline/pipeline.py
def set_multiple_step_outputs(
    self,
    changed_outputs: Mapping[str, Mapping[str, Optional[uuid.UUID]]],
    notify_listeners: bool = True,
) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:

    results: Dict[str, Dict[str, Dict[str, ChangedValue]]] = {}
    for step_id, outputs in changed_outputs.items():
        step_results = self.set_step_outputs(
            step_id=step_id, outputs=outputs, notify_listeners=False
        )
        dpath.util.merge(results, step_results)  # type: ignore

    if notify_listeners:
        event = PipelineEvent.create_event(pipeline=self, changed=results)
        self._notify_pipeline_listeners(event)

    return results
set_pipeline_inputs(self, inputs, sync_to_step_inputs=True, notify_listeners=True)
Source code in kiara/models/module/pipeline/pipeline.py
def set_pipeline_inputs(
    self,
    inputs: Mapping[str, Any],
    sync_to_step_inputs: bool = True,
    notify_listeners: bool = True,
) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:

    values_to_set: Dict[str, uuid.UUID] = {}

    for k, v in inputs.items():
        if v is None:
            values_to_set[k] = NONE_VALUE_ID
        else:
            alias_map = self._all_values.get_alias("pipeline.inputs")
            assert alias_map is not None
            # dbg(alias_map.__dict__)
            schema = alias_map.values_schema[k]
            value = self._data_registry.register_data(
                data=v, schema=schema, pedigree=ORPHAN, reuse_existing=True
            )
            values_to_set[k] = value.value_id

    changed_pipeline_inputs = self._set_values("pipeline.inputs", **values_to_set)

    changed_results = {"__pipeline__": {"inputs": changed_pipeline_inputs}}

    if sync_to_step_inputs:
        changed = self.sync_pipeline_inputs(notify_listeners=False)
        dpath.util.merge(changed_results, changed)  # type: ignore

    if notify_listeners:
        event = PipelineEvent.create_event(pipeline=self, changed=changed_results)
        self._notify_pipeline_listeners(event)

    return changed_results
set_step_outputs(self, step_id, outputs, notify_listeners=True)
Source code in kiara/models/module/pipeline/pipeline.py
def set_step_outputs(
    self,
    step_id: str,
    outputs: Mapping[str, Optional[uuid.UUID]],
    notify_listeners: bool = True,
) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:

    # make sure pedigrees match with respective inputs?

    changed_step_outputs = self._set_values(f"steps.{step_id}.outputs", **outputs)
    if not changed_step_outputs:
        return {}

    result: Dict[str, Dict[str, Dict[str, ChangedValue]]] = {
        step_id: {"outputs": changed_step_outputs}
    }

    output_refs = self._structure.get_step_output_refs(step_id=step_id)

    pipeline_outputs: Dict[str, Optional[uuid.UUID]] = {}

    inputs_to_set: Dict[str, Dict[str, Optional[uuid.UUID]]] = {}

    for field_name, ref in output_refs.items():
        if ref.pipeline_output:
            assert ref.pipeline_output not in pipeline_outputs.keys()
            pipeline_outputs[ref.pipeline_output] = outputs[field_name]
        for input_ref in ref.connected_inputs:
            inputs_to_set.setdefault(input_ref.step_id, {})[
                input_ref.value_name
            ] = outputs[field_name]

    for step_id, step_inputs in inputs_to_set.items():
        changed_step_fields = self._set_step_inputs(
            step_id=step_id, inputs=step_inputs
        )
        dpath.util.merge(result, changed_step_fields)  # type: ignore

    if pipeline_outputs:
        changed_pipeline_outputs = self._set_pipeline_outputs(**pipeline_outputs)
        dpath.util.merge(  # type: ignore
            result, {"__pipeline__": {"outputs": changed_pipeline_outputs}}
        )

    if notify_listeners:
        event = PipelineEvent.create_event(pipeline=self, changed=result)
        self._notify_pipeline_listeners(event)

    return result
sync_pipeline_inputs(self, notify_listeners=True)
Source code in kiara/models/module/pipeline/pipeline.py
def sync_pipeline_inputs(
    self, notify_listeners: bool = True
) -> Mapping[str, Mapping[str, Mapping[str, ChangedValue]]]:

    pipeline_inputs = self.get_current_pipeline_inputs()

    values_to_sync: Dict[str, Dict[str, Optional[uuid.UUID]]] = {}

    for field_name, ref in self._structure.pipeline_input_refs.items():
        for step_input in ref.connected_inputs:
            step_inputs = self.get_current_step_inputs(step_input.step_id)

            if step_inputs[step_input.value_name] != pipeline_inputs[field_name]:
                values_to_sync.setdefault(step_input.step_id, {})[
                    step_input.value_name
                ] = pipeline_inputs[field_name]

    results: Dict[str, Mapping[str, Mapping[str, ChangedValue]]] = {}
    for step_id in values_to_sync.keys():
        values = values_to_sync[step_id]
        step_changed = self._set_step_inputs(step_id=step_id, inputs=values)
        dpath.util.merge(results, step_changed)  # type: ignore

    if notify_listeners:
        event = PipelineEvent.create_event(pipeline=self, changed=results)
        self._notify_pipeline_listeners(event)

    return results
PipelineListener (ABC)
Source code in kiara/models/module/pipeline/pipeline.py
class PipelineListener(abc.ABC):
    @abc.abstractmethod
    def _pipeline_event_occurred(self, event: PipelineEvent):
        pass

structure

Classes

PipelineStructure (KiaraModel) pydantic-model

An object that holds one or several steps, and describes the connections between them.

Source code in kiara/models/module/pipeline/structure.py
class PipelineStructure(KiaraModel):
    """An object that holds one or several steps, and describes the connections between them."""

    _kiara_model_id = "instance.pipeline_structure"

    pipeline_config: PipelineConfig = Field(
        description="The underlying pipeline config."
    )
    steps: List[PipelineStep] = Field(description="The pipeline steps ")
    input_aliases: Dict[str, str] = Field(description="The input aliases.")
    output_aliases: Dict[str, str] = Field(description="The output aliases.")

    @root_validator(pre=True)
    def validate_pipeline_config(cls, values):

        pipeline_config = values.get("pipeline_config", None)
        if not pipeline_config:
            raise ValueError("No 'pipeline_config' provided.")

        if len(values) != 1:
            raise ValueError(
                "Only 'pipeline_config' key allowed when creating a pipeline structure object."
            )

        _config: PipelineConfig = pipeline_config
        _steps: List[PipelineStep] = list(_config.steps)

        _input_aliases: Dict[str, str] = dict(_config.input_aliases)
        _output_aliases: Dict[str, str] = dict(_config.output_aliases)

        invalid_input_aliases = [a for a in _input_aliases.values() if "." in a]
        if invalid_input_aliases:
            raise Exception(
                f"Invalid input aliases, aliases can't contain special characters: {', '.join(invalid_input_aliases)}."
            )
        invalid_output_aliases = [a for a in _input_aliases.values() if "." in a]
        if invalid_input_aliases:
            raise Exception(
                f"Invalid input aliases, aliases can't contain special characters: {', '.join(invalid_output_aliases)}."
            )

        valid_input_names = set()
        for step in _steps:
            for input_name in step.module.input_names:
                valid_input_names.add(f"{step.step_id}.{input_name}")
        invalid_input_aliases = [
            a for a in _input_aliases.keys() if a not in valid_input_names
        ]
        if invalid_input_aliases:
            raise Exception(
                f"Invalid input reference(s): {', '.join(invalid_input_aliases)}. Must be one of: {', '.join(valid_input_names)}"
            )
        valid_output_names = set()
        for step in _steps:
            for output_name in step.module.output_names:
                valid_output_names.add(f"{step.step_id}.{output_name}")
        invalid_output_names = [
            a for a in _output_aliases.keys() if a not in valid_output_names
        ]
        if invalid_output_names:
            raise Exception(
                f"Invalid output reference(s): {', '.join(invalid_output_names)}. Must be one of: {', '.join(valid_output_names)}"
            )

        values["steps"] = _steps
        values["input_aliases"] = _input_aliases
        values["output_aliases"] = _output_aliases
        return values

    # this is hardcoded for now
    _add_all_workflow_outputs: bool = PrivateAttr(default=False)
    _constants: Dict[str, Any] = PrivateAttr(default=None)  # type: ignore
    _defaults: Dict[str, Any] = PrivateAttr(None)  # type: ignore

    _execution_graph: nx.DiGraph = PrivateAttr(None)  # type: ignore
    _data_flow_graph: nx.DiGraph = PrivateAttr(None)  # type: ignore
    _data_flow_graph_simple: nx.DiGraph = PrivateAttr(None)  # type: ignore

    _processing_stages: List[List[str]] = PrivateAttr(None)  # type: ignore

    # holds details about the (current) processing steps contained in this workflow
    _steps_details: Dict[str, Any] = PrivateAttr(None)  # type: ignore

    def _retrieve_data_to_hash(self) -> Any:
        return {
            "steps": [step.instance_cid for step in self.steps],
            "input_aliases": self.input_aliases,
            "output_aliases": self.output_aliases,
        }

    def _retrieve_id(self) -> str:
        return self.pipeline_config.instance_id

    @property
    def steps_details(self) -> Mapping[str, Any]:

        if self._steps_details is None:
            self._process_steps()
        return self._steps_details  # type: ignore

    @property
    def step_ids(self) -> Iterable[str]:
        if self._steps_details is None:
            self._process_steps()
        return self._steps_details.keys()  # type: ignore

    @property
    def constants(self) -> Mapping[str, Any]:

        if self._constants is None:
            self._process_steps()
        return self._constants  # type: ignore

    @property
    def defaults(self) -> Mapping[str, Any]:

        if self._defaults is None:
            self._process_steps()
        return self._defaults  # type: ignore

    def get_step(self, step_id: str) -> PipelineStep:

        d = self.steps_details.get(step_id, None)
        if d is None:
            raise Exception(f"No module with id: {step_id}")

        return d["step"]

    def get_step_input_refs(self, step_id: str) -> Mapping[str, StepInputRef]:

        d = self.steps_details.get(step_id, None)
        if d is None:
            raise Exception(f"No module with id: {step_id}")

        return d["inputs"]

    def get_step_output_refs(self, step_id: str) -> Mapping[str, StepOutputRef]:

        d = self.steps_details.get(step_id, None)
        if d is None:
            raise Exception(f"No module with id: {step_id}")

        return d["outputs"]

    def get_step_details(self, step_id: str) -> Mapping[str, Any]:

        d = self.steps_details.get(step_id, None)
        if d is None:
            raise Exception(f"No module with id: {step_id}")

        return d

    @property
    def execution_graph(self) -> nx.DiGraph:
        if self._execution_graph is None:
            self._process_steps()
        return self._execution_graph

    @property
    def data_flow_graph(self) -> nx.DiGraph:
        if self._data_flow_graph is None:
            self._process_steps()
        return self._data_flow_graph

    @property
    def data_flow_graph_simple(self) -> nx.DiGraph:
        if self._data_flow_graph_simple is None:
            self._process_steps()
        return self._data_flow_graph_simple

    @property
    def processing_stages(self) -> List[List[str]]:
        if self._steps_details is None:
            self._process_steps()
        return self._processing_stages

    @lru_cache()
    def _get_node_of_type(self, node_type: str):
        if self._steps_details is None:
            self._process_steps()

        return [
            node
            for node, attr in self._data_flow_graph.nodes(data=True)
            if attr["type"] == node_type
        ]

    @property
    def steps_input_refs(self) -> Dict[str, StepInputRef]:
        return {
            node.alias: node
            for node in self._get_node_of_type(node_type=StepInputRef.__name__)
        }

    @property
    def steps_output_refs(self) -> Dict[str, StepOutputRef]:
        return {
            node.alias: node
            for node in self._get_node_of_type(node_type=StepOutputRef.__name__)
        }

    @property
    def pipeline_input_refs(self) -> Dict[str, PipelineInputRef]:
        return {
            node.value_name: node
            for node in self._get_node_of_type(node_type=PipelineInputRef.__name__)
        }

    @property
    def pipeline_output_refs(self) -> Dict[str, PipelineOutputRef]:
        return {
            node.value_name: node
            for node in self._get_node_of_type(node_type=PipelineOutputRef.__name__)
        }

    @property
    def pipeline_inputs_schema(self) -> Mapping[str, ValueSchema]:

        schemas = {
            input_name: w_in.value_schema
            for input_name, w_in in self.pipeline_input_refs.items()
        }
        return schemas

    @property
    def pipeline_outputs_schema(self) -> Mapping[str, ValueSchema]:
        return {
            output_name: w_out.value_schema
            for output_name, w_out in self.pipeline_output_refs.items()
        }

    def get_processing_stage(self, step_id: str) -> int:
        """Return the processing stage for the specified step_id.

        Returns the stage nr (starting with '1').
        """

        for index, stage in enumerate(self.processing_stages, start=1):
            if step_id in stage:
                return index

        raise Exception(f"Invalid step id '{step_id}'.")

    def step_is_required(self, step_id: str) -> bool:
        """Check if the specified step is required, or can be omitted."""

        return self.get_step_details(step_id=step_id)["required"]

    def _process_steps(self):
        """The core method of this class, it connects all the processing modules, their inputs and outputs."""

        steps_details: Dict[str, Any] = {}
        execution_graph = nx.DiGraph()
        execution_graph.add_node("__root__")
        data_flow_graph = nx.DiGraph()
        data_flow_graph_simple = nx.DiGraph()
        processing_stages = []
        constants = {}
        structure_defaults = {}

        # temp variable, to hold all outputs
        outputs: Dict[str, StepOutputRef] = {}

        # process all pipeline and step outputs first
        _temp_steps_map: Dict[str, PipelineStep] = {}
        pipeline_outputs: Dict[str, PipelineOutputRef] = {}
        for step in self.steps:

            _temp_steps_map[step.step_id] = step

            if step.step_id in steps_details.keys():
                raise Exception(
                    f"Can't process steps: duplicate step_id '{step.step_id}'"
                )

            steps_details[step.step_id] = {
                "step": step,
                "outputs": {},
                "inputs": {},
                "required": True,
            }

            data_flow_graph.add_node(step, type="step")

            # go through all the module outputs, create points for them and connect them to pipeline outputs
            for output_name, schema in step.module.outputs_schema.items():

                step_output = StepOutputRef(
                    value_name=output_name,
                    value_schema=schema,
                    step_id=step.step_id,
                )

                steps_details[step.step_id]["outputs"][output_name] = step_output
                step_alias = generate_step_alias(step.step_id, output_name)
                outputs[step_alias] = step_output

                # step_output_name = generate_pipeline_endpoint_name(
                #     step_id=step.step_id, value_name=output_name
                # )
                step_output_name = f"{step.step_id}.{output_name}"
                if not self.output_aliases:
                    raise NotImplementedError()
                if step_output_name in self.output_aliases.keys():
                    step_output_name = self.output_aliases[step_output_name]
                else:
                    if not self._add_all_workflow_outputs:
                        # this output is not interesting for the workflow
                        step_output_name = None

                if step_output_name:
                    step_output_address = StepValueAddress(
                        step_id=step.step_id, value_name=output_name
                    )
                    pipeline_output = PipelineOutputRef(
                        value_name=step_output_name,
                        connected_output=step_output_address,
                        value_schema=schema,
                    )
                    pipeline_outputs[step_output_name] = pipeline_output
                    step_output.pipeline_output = pipeline_output.value_name

                    data_flow_graph.add_node(
                        pipeline_output, type=PipelineOutputRef.__name__
                    )
                    data_flow_graph.add_edge(step_output, pipeline_output)

                    data_flow_graph_simple.add_node(
                        pipeline_output, type=PipelineOutputRef.__name__
                    )
                    data_flow_graph_simple.add_edge(step, pipeline_output)

                data_flow_graph.add_node(step_output, type=StepOutputRef.__name__)
                data_flow_graph.add_edge(step, step_output)

        # now process inputs, and connect them to the appropriate output/pipeline-input points
        existing_pipeline_input_points: Dict[str, PipelineInputRef] = {}
        for step in self.steps:

            other_step_dependency: Set = set()
            # go through all the inputs of a module, create input points and connect them to either
            # other module outputs, or pipeline inputs (which need to be created)

            module_constants: Mapping[str, Any] = step.module.get_config_value(
                "constants"
            )

            for input_name, schema in step.module.inputs_schema.items():

                matching_input_links: List[StepValueAddress] = []
                is_constant = input_name in module_constants.keys()

                for value_name, input_links in step.input_links.items():
                    if value_name == input_name:
                        for input_link in input_links:
                            if input_link in matching_input_links:
                                raise Exception(f"Duplicate input link: {input_link}")
                            matching_input_links.append(input_link)

                if matching_input_links:
                    # this means we connect to other steps output

                    connected_output_points: List[StepOutputRef] = []
                    connected_outputs: List[StepValueAddress] = []

                    for input_link in matching_input_links:
                        output_id = generate_step_alias(
                            input_link.step_id, input_link.value_name
                        )

                        if output_id not in outputs.keys():
                            raise Exception(
                                f"Can't connect input '{input_name}' for step '{step.step_id}': no output '{output_id}' available. Available output names: {', '.join(outputs.keys())}"
                            )
                        connected_output_points.append(outputs[output_id])
                        connected_outputs.append(input_link)

                        other_step_dependency.add(input_link.step_id)

                    step_input_point = StepInputRef(
                        step_id=step.step_id,
                        value_name=input_name,
                        value_schema=schema,
                        is_constant=is_constant,
                        connected_pipeline_input=None,
                        connected_outputs=connected_outputs,
                    )

                    for op in connected_output_points:
                        op.connected_inputs.append(step_input_point.address)
                        data_flow_graph.add_edge(op, step_input_point)
                        data_flow_graph_simple.add_edge(
                            _temp_steps_map[op.step_id], step_input_point
                        )  # TODO: name edge
                        data_flow_graph_simple.add_edge(
                            step_input_point, step
                        )  # TODO: name edge

                else:
                    # this means we connect to pipeline input
                    # pipeline_input_name = generate_pipeline_endpoint_name(
                    #     step_id=step.step_id, value_name=input_name
                    # )
                    pipeline_input_name = f"{step.step_id}.{input_name}"
                    # check whether this input has an alias associated with it
                    if not self.input_aliases:
                        raise NotImplementedError()

                    if pipeline_input_name in self.input_aliases.keys():
                        # this means we use the pipeline alias
                        pipeline_input_name = self.input_aliases[pipeline_input_name]

                    if pipeline_input_name in existing_pipeline_input_points.keys():
                        # we already created a pipeline input with this name
                        # TODO: check whether schema fits
                        connected_pipeline_input = existing_pipeline_input_points[
                            pipeline_input_name
                        ]
                        assert connected_pipeline_input.is_constant == is_constant
                    else:
                        # we need to create the pipeline input
                        connected_pipeline_input = PipelineInputRef(
                            value_name=pipeline_input_name,
                            value_schema=schema,
                            is_constant=is_constant,
                        )

                        existing_pipeline_input_points[
                            pipeline_input_name
                        ] = connected_pipeline_input

                        data_flow_graph.add_node(
                            connected_pipeline_input, type=PipelineInputRef.__name__
                        )
                        data_flow_graph_simple.add_node(
                            connected_pipeline_input, type=PipelineInputRef.__name__
                        )
                        if is_constant:
                            constants[
                                pipeline_input_name
                            ] = step.module.get_config_value("constants")[input_name]

                        default_val = step.module.get_config_value("defaults").get(
                            input_name, None
                        )
                        if is_constant and default_val is not None:
                            raise Exception(
                                f"Module config invalid for step '{step.step_id}': both default value and constant provided for input '{input_name}'."
                            )
                        elif default_val is not None:
                            structure_defaults[pipeline_input_name] = default_val

                    step_input_point = StepInputRef(
                        step_id=step.step_id,
                        value_name=input_name,
                        value_schema=schema,
                        connected_pipeline_input=connected_pipeline_input.value_name,
                        connected_outputs=None,
                    )
                    connected_pipeline_input.connected_inputs.append(
                        step_input_point.address
                    )
                    data_flow_graph.add_edge(connected_pipeline_input, step_input_point)
                    data_flow_graph_simple.add_edge(connected_pipeline_input, step)

                data_flow_graph.add_node(step_input_point, type=StepInputRef.__name__)

                steps_details[step.step_id]["inputs"][input_name] = step_input_point

                data_flow_graph.add_edge(step_input_point, step)

            if other_step_dependency:
                for module_id in other_step_dependency:
                    execution_graph.add_edge(module_id, step.step_id)
            else:
                execution_graph.add_edge("__root__", step.step_id)

        # calculate execution order
        path_lengths: Dict[str, int] = {}

        for step in self.steps:

            step_id = step.step_id

            paths = list(nx.all_simple_paths(execution_graph, "__root__", step_id))
            max_steps = max(paths, key=lambda x: len(x))
            path_lengths[step_id] = len(max_steps) - 1

        max_length = max(path_lengths.values())

        for i in range(1, max_length + 1):
            stage: List[str] = [m for m, length in path_lengths.items() if length == i]
            processing_stages.append(stage)
            for _step_id in stage:
                steps_details[_step_id]["processing_stage"] = i
                # steps_details[_step_id]["step"].processing_stage = i

        self._constants = constants
        self._defaults = structure_defaults
        self._steps_details = steps_details
        self._execution_graph = execution_graph
        self._data_flow_graph = data_flow_graph
        self._data_flow_graph_simple = data_flow_graph_simple
        self._processing_stages = processing_stages

        self._get_node_of_type.cache_clear()

        # calculating which steps are always required to execute to compute one of the required pipeline outputs.
        # this is done because in some cases it's possible that some steps can be skipped to execute if they
        # don't have a valid input set, because the inputs downstream they are connecting to are 'non-required'
        # optional_steps = []

        # last_stage = self._processing_stages[-1]
        #
        # step_nodes: List[PipelineStep] = [
        #     node
        #     for node in self._data_flow_graph_simple.nodes
        #     if isinstance(node, PipelineStep)
        # ]
        #
        # all_required_inputs = []
        # for step_id in last_stage:
        #
        #     step = self.get_step(step_id)
        #     step_nodes.remove(step)
        #
        #     for k, s_inp in self.get_step_input_refs(step_id).items():
        #         if not s_inp.value_schema.is_required():
        #             continue
        #         all_required_inputs.append(s_inp)
        #
        # for pipeline_input in self.pipeline_input_refs.values():
        #
        #     for last_step_input in all_required_inputs:
        #         try:
        #             path = nx.shortest_path(
        #                 self._data_flow_graph_simple, pipeline_input, last_step_input
        #             )
        #             for p in path:
        #                 if p in step_nodes:
        #                     step_nodes.remove(p)
        #         except (NetworkXNoPath, NodeNotFound):
        #             pass
        #             # print("NO PATH")
        #             # print(f"{pipeline_input} -> {last_step_input}")
        #
        # for s in step_nodes:
        #     self._steps_details[s.step_id]["required"] = False
        #     # s.required = False
        #
        # for input_name, inp in self.pipeline_input_refs.items():
        #     steps = set()
        #     for ci in inp.connected_inputs:
        #         steps.add(ci.step_id)
        #
        #     optional = True
        #     for step_id in steps:
        #         step = self.get_step(step_id)
        #         if self._steps_details[step_id]["required"]:
        #             optional = False
        #             break
        #     if optional:
        #         inp.value_schema.optional = True
Attributes
constants: Mapping[str, Any] property readonly
data_flow_graph: DiGraph property readonly
data_flow_graph_simple: DiGraph property readonly
defaults: Mapping[str, Any] property readonly
execution_graph: DiGraph property readonly
input_aliases: Dict[str, str] pydantic-field required

The input aliases.

output_aliases: Dict[str, str] pydantic-field required

The output aliases.

pipeline_config: PipelineConfig pydantic-field required

The underlying pipeline config.

pipeline_input_refs: Dict[str, kiara.models.module.pipeline.value_refs.PipelineInputRef] property readonly
pipeline_inputs_schema: Mapping[str, kiara.models.values.value_schema.ValueSchema] property readonly
pipeline_output_refs: Dict[str, kiara.models.module.pipeline.value_refs.PipelineOutputRef] property readonly
pipeline_outputs_schema: Mapping[str, kiara.models.values.value_schema.ValueSchema] property readonly
processing_stages: List[List[str]] property readonly
step_ids: Iterable[str] property readonly
steps: List[kiara.models.module.pipeline.PipelineStep] pydantic-field required

The pipeline steps

steps_details: Mapping[str, Any] property readonly
steps_input_refs: Dict[str, kiara.models.module.pipeline.value_refs.StepInputRef] property readonly
steps_output_refs: Dict[str, kiara.models.module.pipeline.value_refs.StepOutputRef] property readonly
Methods
get_processing_stage(self, step_id)

Return the processing stage for the specified step_id.

Returns the stage nr (starting with '1').

Source code in kiara/models/module/pipeline/structure.py
def get_processing_stage(self, step_id: str) -> int:
    """Return the processing stage for the specified step_id.

    Returns the stage nr (starting with '1').
    """

    for index, stage in enumerate(self.processing_stages, start=1):
        if step_id in stage:
            return index

    raise Exception(f"Invalid step id '{step_id}'.")
get_step(self, step_id)
Source code in kiara/models/module/pipeline/structure.py
def get_step(self, step_id: str) -> PipelineStep:

    d = self.steps_details.get(step_id, None)
    if d is None:
        raise Exception(f"No module with id: {step_id}")

    return d["step"]
get_step_details(self, step_id)
Source code in kiara/models/module/pipeline/structure.py
def get_step_details(self, step_id: str) -> Mapping[str, Any]:

    d = self.steps_details.get(step_id, None)
    if d is None:
        raise Exception(f"No module with id: {step_id}")

    return d
get_step_input_refs(self, step_id)
Source code in kiara/models/module/pipeline/structure.py
def get_step_input_refs(self, step_id: str) -> Mapping[str, StepInputRef]:

    d = self.steps_details.get(step_id, None)
    if d is None:
        raise Exception(f"No module with id: {step_id}")

    return d["inputs"]
get_step_output_refs(self, step_id)
Source code in kiara/models/module/pipeline/structure.py
def get_step_output_refs(self, step_id: str) -> Mapping[str, StepOutputRef]:

    d = self.steps_details.get(step_id, None)
    if d is None:
        raise Exception(f"No module with id: {step_id}")

    return d["outputs"]
step_is_required(self, step_id)

Check if the specified step is required, or can be omitted.

Source code in kiara/models/module/pipeline/structure.py
def step_is_required(self, step_id: str) -> bool:
    """Check if the specified step is required, or can be omitted."""

    return self.get_step_details(step_id=step_id)["required"]
validate_pipeline_config(values) classmethod
Source code in kiara/models/module/pipeline/structure.py
@root_validator(pre=True)
def validate_pipeline_config(cls, values):

    pipeline_config = values.get("pipeline_config", None)
    if not pipeline_config:
        raise ValueError("No 'pipeline_config' provided.")

    if len(values) != 1:
        raise ValueError(
            "Only 'pipeline_config' key allowed when creating a pipeline structure object."
        )

    _config: PipelineConfig = pipeline_config
    _steps: List[PipelineStep] = list(_config.steps)

    _input_aliases: Dict[str, str] = dict(_config.input_aliases)
    _output_aliases: Dict[str, str] = dict(_config.output_aliases)

    invalid_input_aliases = [a for a in _input_aliases.values() if "." in a]
    if invalid_input_aliases:
        raise Exception(
            f"Invalid input aliases, aliases can't contain special characters: {', '.join(invalid_input_aliases)}."
        )
    invalid_output_aliases = [a for a in _input_aliases.values() if "." in a]
    if invalid_input_aliases:
        raise Exception(
            f"Invalid input aliases, aliases can't contain special characters: {', '.join(invalid_output_aliases)}."
        )

    valid_input_names = set()
    for step in _steps:
        for input_name in step.module.input_names:
            valid_input_names.add(f"{step.step_id}.{input_name}")
    invalid_input_aliases = [
        a for a in _input_aliases.keys() if a not in valid_input_names
    ]
    if invalid_input_aliases:
        raise Exception(
            f"Invalid input reference(s): {', '.join(invalid_input_aliases)}. Must be one of: {', '.join(valid_input_names)}"
        )
    valid_output_names = set()
    for step in _steps:
        for output_name in step.module.output_names:
            valid_output_names.add(f"{step.step_id}.{output_name}")
    invalid_output_names = [
        a for a in _output_aliases.keys() if a not in valid_output_names
    ]
    if invalid_output_names:
        raise Exception(
            f"Invalid output reference(s): {', '.join(invalid_output_names)}. Must be one of: {', '.join(valid_output_names)}"
        )

    values["steps"] = _steps
    values["input_aliases"] = _input_aliases
    values["output_aliases"] = _output_aliases
    return values
generate_pipeline_endpoint_name(step_id, value_name)
Source code in kiara/models/module/pipeline/structure.py
def generate_pipeline_endpoint_name(step_id: str, value_name: str):

    return f"{step_id}__{value_name}"

value_refs

Classes

PipelineInputRef (ValueRef) pydantic-model

An input to a pipeline.

Source code in kiara/models/module/pipeline/value_refs.py
class PipelineInputRef(ValueRef):
    """An input to a pipeline."""

    connected_inputs: List[StepValueAddress] = Field(
        description="The step inputs that are connected to this pipeline input",
        default_factory=list,
    )
    is_constant: bool = Field(
        "Whether this input is a constant and can't be changed by the user."
    )

    @property
    def alias(self) -> str:
        return generate_step_alias(PIPELINE_PARENT_MARKER, self.value_name)
Attributes
alias: str property readonly
connected_inputs: List[kiara.models.module.pipeline.value_refs.StepValueAddress] pydantic-field

The step inputs that are connected to this pipeline input

is_constant: bool pydantic-field
PipelineOutputRef (ValueRef) pydantic-model

An output to a pipeline.

Source code in kiara/models/module/pipeline/value_refs.py
class PipelineOutputRef(ValueRef):
    """An output to a pipeline."""

    connected_output: StepValueAddress = Field(description="Connected step outputs.")

    @property
    def alias(self) -> str:
        return generate_step_alias(PIPELINE_PARENT_MARKER, self.value_name)
Attributes
alias: str property readonly
connected_output: StepValueAddress pydantic-field required

Connected step outputs.

StepInputRef (ValueRef) pydantic-model

An input to a step.

This object can either have a 'connected_outputs' set, or a 'connected_pipeline_input', not both.

Source code in kiara/models/module/pipeline/value_refs.py
class StepInputRef(ValueRef):
    """An input to a step.

    This object can either have a 'connected_outputs' set, or a 'connected_pipeline_input', not both.
    """

    step_id: str = Field(description="The step id.")
    connected_outputs: Optional[List[StepValueAddress]] = Field(
        default=None,
        description="A potential connected list of one or several module outputs.",
    )
    connected_pipeline_input: Optional[str] = Field(
        default=None, description="A potential pipeline input."
    )
    is_constant: bool = Field(
        "Whether this input is a constant and can't be changed by the user."
    )

    @root_validator(pre=True)
    def ensure_single_connected_item(cls, values):

        if values.get("connected_outputs", None) and values.get(
            "connected_pipeline_input"
        ):
            raise ValueError("Multiple connected items, only one allowed.")

        return values

    @property
    def alias(self) -> str:
        return generate_step_alias(self.step_id, self.value_name)

    @property
    def address(self) -> StepValueAddress:
        return StepValueAddress(step_id=self.step_id, value_name=self.value_name)

    def __str__(self):
        name = camel_case_to_snake_case(self.__class__.__name__[0:-5], repl=" ")
        return f"{name}: {self.step_id}.{self.value_name} ({self.value_schema.type})"
Attributes
address: StepValueAddress property readonly
alias: str property readonly
connected_outputs: List[kiara.models.module.pipeline.value_refs.StepValueAddress] pydantic-field

A potential connected list of one or several module outputs.

connected_pipeline_input: str pydantic-field

A potential pipeline input.

is_constant: bool pydantic-field
step_id: str pydantic-field required

The step id.

ensure_single_connected_item(values) classmethod
Source code in kiara/models/module/pipeline/value_refs.py
@root_validator(pre=True)
def ensure_single_connected_item(cls, values):

    if values.get("connected_outputs", None) and values.get(
        "connected_pipeline_input"
    ):
        raise ValueError("Multiple connected items, only one allowed.")

    return values
StepOutputRef (ValueRef) pydantic-model

An output to a step.

Source code in kiara/models/module/pipeline/value_refs.py
class StepOutputRef(ValueRef):
    """An output to a step."""

    class Config:
        allow_mutation = True

    step_id: str = Field(description="The step id.")
    pipeline_output: Optional[str] = Field(description="The connected pipeline output.")
    connected_inputs: List[StepValueAddress] = Field(
        description="The step inputs that are connected to this step output",
        default_factory=list,
    )

    @property
    def alias(self) -> str:
        return generate_step_alias(self.step_id, self.value_name)

    @property
    def address(self) -> StepValueAddress:
        return StepValueAddress(step_id=self.step_id, value_name=self.value_name)

    def __str__(self):
        name = camel_case_to_snake_case(self.__class__.__name__[0:-5], repl=" ")
        return f"{name}: {self.step_id}.{self.value_name} ({self.value_schema.type})"
Attributes
address: StepValueAddress property readonly
alias: str property readonly
connected_inputs: List[kiara.models.module.pipeline.value_refs.StepValueAddress] pydantic-field

The step inputs that are connected to this step output

pipeline_output: str pydantic-field

The connected pipeline output.

step_id: str pydantic-field required

The step id.

Config
Source code in kiara/models/module/pipeline/value_refs.py
class Config:
    allow_mutation = True
StepValueAddress (BaseModel) pydantic-model

Small model to describe the address of a value of a step, within a Pipeline/PipelineStructure.

Source code in kiara/models/module/pipeline/value_refs.py
class StepValueAddress(BaseModel):
    """Small model to describe the address of a value of a step, within a Pipeline/PipelineStructure."""

    class Config:
        extra = Extra.forbid

    step_id: str = Field(description="The id of a step within a pipeline.")
    value_name: str = Field(
        description="The name of the value (output name or pipeline input name)."
    )
    sub_value: Optional[Dict[str, Any]] = Field(
        default=None,
        description="A reference to a subitem of a value (e.g. column, list item)",
    )

    @property
    def alias(self):
        """An alias string for this address (in the form ``[step_id].[value_name]``)."""
        return generate_step_alias(self.step_id, self.value_name)

    def __eq__(self, other):

        if not isinstance(other, StepValueAddress):
            return False

        return (self.step_id, self.value_name, self.sub_value) == (
            other.step_id,
            other.value_name,
            other.sub_value,
        )

    def __hash__(self):

        return hash((self.step_id, self.value_name, self.sub_value))

    def __repr__(self):

        if self.sub_value:
            sub_value = f" sub_value={self.sub_value}"
        else:
            sub_value = ""
        return f"{self.__class__.__name__}(step_id={self.step_id}, value_name={self.value_name}{sub_value})"

    def __str__(self):
        return self.__repr__()
Attributes
alias property readonly

An alias string for this address (in the form [step_id].[value_name]).

step_id: str pydantic-field required

The id of a step within a pipeline.

sub_value: Dict[str, Any] pydantic-field

A reference to a subitem of a value (e.g. column, list item)

value_name: str pydantic-field required

The name of the value (output name or pipeline input name).

Config
Source code in kiara/models/module/pipeline/value_refs.py
class Config:
    extra = Extra.forbid
ValueRef (BaseModel) pydantic-model

An object that holds information about the location of a value within a pipeline (or other structure).

Basically, a ValueRef helps the containing object where in its structure the value belongs (for example so it can update dependent other values). A ValueRef object (obviously) does not contain the value itself.

There are four different ValueRef type that are relevant for pipelines:

  • [kiara.pipeline.values.StepInputRef][]: an input to a step
  • [kiara.pipeline.values.StepOutputRef][]: an output of a step
  • [kiara.pipeline.values.PipelineInputRef][]: an input to a pipeline
  • [kiara.pipeline.values.PipelineOutputRef][]: an output for a pipeline

Several ValueRef objects can target the same value, for example a step output and a connected step input would reference the same Value (in most cases)..

Source code in kiara/models/module/pipeline/value_refs.py
class ValueRef(BaseModel):
    """An object that holds information about the location of a value within a pipeline (or other structure).

    Basically, a `ValueRef` helps the containing object where in its structure the value belongs (for example so
    it can update dependent other values). A `ValueRef` object (obviously) does not contain the value itself.

    There are four different ValueRef type that are relevant for pipelines:

    - [kiara.pipeline.values.StepInputRef][]: an input to a step
    - [kiara.pipeline.values.StepOutputRef][]: an output of a step
    - [kiara.pipeline.values.PipelineInputRef][]: an input to a pipeline
    - [kiara.pipeline.values.PipelineOutputRef][]: an output for a pipeline

    Several `ValueRef` objects can target the same value, for example a step output and a connected step input would
    reference the same `Value` (in most cases)..
    """

    class Config:
        allow_mutation = True
        extra = Extra.forbid

    _id: uuid.UUID = PrivateAttr(default_factory=uuid.uuid4)
    value_name: str
    value_schema: ValueSchema
    # pipeline_id: str

    def __eq__(self, other):

        if not isinstance(other, self.__class__):
            return False

        return self._id == other._id

    def __hash__(self):
        return hash(self._id)

    def __repr__(self):
        step_id = ""
        if hasattr(self, "step_id"):
            step_id = f" step_id='{self.step_id}'"
        return f"{self.__class__.__name__}(value_name='{self.value_name}' {step_id})"

    def __str__(self):
        name = camel_case_to_snake_case(self.__class__.__name__[0:-5], repl=" ")
        return f"{name}: {self.value_name} ({self.value_schema.type})"
value_name: str pydantic-field required
value_schema: ValueSchema pydantic-field required
Config
Source code in kiara/models/module/pipeline/value_refs.py
class Config:
    allow_mutation = True
    extra = Extra.forbid
allow_mutation
extra
generate_step_alias(step_id, value_name)
Source code in kiara/models/module/pipeline/value_refs.py
def generate_step_alias(step_id: str, value_name):
    return f"{step_id}.{value_name}"