Skip to content

kiara.pipeline.controller

PipelineController

An object that controls how a Pipeline should react to events related to it's inputs/outputs.

This is the base for the central controller class that needs to be implemented by a Kiara frontend. The default implementation that is used if no PipelineController is provided in a Pipeline constructor is the [BatchController][kiara.pipeline.controller.BatchController], which basically waits until all required inputs are set, and then processes all pipeline steps in one go (in the right order).

The pipeline object to control can be set either in the constructor, or via the set_pipeline method. But only once, every subsequent attempt to set a pipeline will raise an Exception.

If you want to implement your own controller, you have to override at least one of the (empty) event hook methods:

  • [pipeline_inputs_changed][kiara.pipeline.controller.PipelineController.pipeline_inputs_changed]
  • [pipeline_outputs_changed][kiara.pipeline.controller.PipelineController.pipeline_outputs_changed]
  • [step_inputs_changed][kiara.pipeline.controller.PipelineController.step_inputs_changed]
  • [step_outputs_changed][kiara.pipeline.controller.PipelineController.step_outputs_changed]

Parameters:

Name Type Description Default
pipeline Pipeline

the pipeline object to control

required

pipeline: Pipeline property readonly

Return the pipeline this controller, well, ...controls...

pipeline_inputs: ValueSet property writable

Return the inputs object for this pipeline.

pipeline_outputs: ValueSet property readonly

Return the (current) pipeline outputs object for this pipeline.

processing_stages: List[List[str]] property readonly

Return the processing stage order of the pipeline.

Returns:

Type Description
List[List[str]]

a list of lists of step ids

can_be_processed(self, step_id)

Check whether the step with the provided id is ready to be processed.

Source code in kiara/pipeline/controller/__init__.py
def can_be_processed(self, step_id: str) -> bool:
    """Check whether the step with the provided id is ready to be processed."""

    result = True
    step_inputs = self.get_step_inputs(step_id=step_id)
    for input_name in step_inputs.get_all_field_names():

        value = step_inputs.get_value_obj(input_name)

        if not value.item_is_valid():
            result = False
            break

    return result

can_be_skipped(self, step_id)

Check whether the processing of a step can be skipped.

Source code in kiara/pipeline/controller/__init__.py
def can_be_skipped(self, step_id: str) -> bool:
    """Check whether the processing of a step can be skipped."""

    result = True
    step = self.get_step(step_id)
    if step.required:
        result = self.can_be_processed(step_id)

    return result

check_inputs_status(self)

Check the status of all stages/steps, and whether they are ready to be processed.

The result dictionary uses stage numbers as keys, and a secondary dictionary as values which in turn uses the step_id as key, and a dict with details (keys: 'ready', 'invalid_inputs', 'required') as values.

Source code in kiara/pipeline/controller/__init__.py
def check_inputs_status(
    self,
) -> typing.Mapping[int, typing.Mapping[str, typing.Mapping[str, typing.Any]]]:
    """Check the status of all stages/steps, and whether they are ready to be processed.

    The result dictionary uses stage numbers as keys, and a secondary dictionary as values which in turn uses
    the step_id as key, and a dict with details (keys: 'ready', 'invalid_inputs', 'required') as values.
    """

    result: typing.Dict[int, typing.Dict[str, typing.Dict[str, typing.Any]]] = {}
    for idx, stage in enumerate(self.processing_stages, start=1):
        for step_id in stage:
            if not self.can_be_processed(step_id):
                if self.can_be_skipped(step_id):
                    result.setdefault(idx, {})[step_id] = {
                        "ready": True,
                        "required": False,
                        "invalid_inputs": [
                            f"{step_id}.{ii}" for ii in self.invalid_inputs(step_id)
                        ],
                    }
                else:
                    result.setdefault(idx, {})[step_id] = {
                        "ready": False,
                        "invalid_inputs": [
                            f"{step_id}.{ii}" for ii in self.invalid_inputs(step_id)
                        ],
                        "required": True,
                    }
            else:
                if self.can_be_skipped(step_id):
                    result.setdefault(idx, {})[step_id] = {
                        "ready": True,
                        "required": True,
                        "invalid_inputs": [],
                    }

    return result

get_current_pipeline_state(self)

Return a description of the current pipeline state.

This methods creates a new [PipelineState][kiara.pipeline.pipeline.PipelineState] object when called, containing the pipeline structure as well as metadata about pipeline as well as step inputs and outputs.

Returns:

Type Description
PipelineState

an object outlining the current pipeline state

Source code in kiara/pipeline/controller/__init__.py
def get_current_pipeline_state(self) -> "PipelineState":
    """Return a description of the current pipeline state.

    This methods creates a new [PipelineState][kiara.pipeline.pipeline.PipelineState] object when called, containing
    the pipeline structure as well as metadata about pipeline as well as step inputs and outputs.

    Returns:
        an object outlining the current pipeline state
    """

    return self.pipeline.get_current_state()

get_job_details(self, step_or_job_id)

Returns job details for a job id, or in case a step_id was provided, the last execution of this step.

Source code in kiara/pipeline/controller/__init__.py
def get_job_details(self, step_or_job_id: str) -> typing.Optional[Job]:
    """Returns job details for a job id, or in case a step_id was provided, the last execution of this step."""

    if self._job_ids.get(step_or_job_id, None) is None:
        return self._processor.get_job_details(step_or_job_id)
    else:
        return self._processor.get_job_details(self._job_ids[step_or_job_id])

get_step(self, step_id)

Return the step object for the provided id.

Parameters:

Name Type Description Default
step_id str

the step id

required

Returns:

Type Description
PipelineStep

the step object

Source code in kiara/pipeline/controller/__init__.py
def get_step(self, step_id: str) -> PipelineStep:
    """Return the step object for the provided id.

    Arguments:
        step_id: the step id
    Returns:
        the step object
    """

    return self.pipeline.get_step(step_id)

get_step_input(self, step_id, input_name)

Get the (current) input value for a specified step and input field name.

Source code in kiara/pipeline/controller/__init__.py
def get_step_input(self, step_id: str, input_name: str) -> Value:
    """Get the (current) input value for a specified step and input field name."""

    item = self.get_step_inputs(step_id).get_value_obj(input_name)
    assert item is not None
    return item

get_step_inputs(self, step_id)

Return the inputs object for the pipeline.

Source code in kiara/pipeline/controller/__init__.py
def get_step_inputs(self, step_id: str) -> ValueSet:
    """Return the inputs object for the pipeline."""

    return self.pipeline.get_step_inputs(step_id)

get_step_output(self, step_id, output_name)

Get the (current) output value for a specified step and output field name.

Source code in kiara/pipeline/controller/__init__.py
def get_step_output(self, step_id: str, output_name: str) -> Value:
    """Get the (current) output value for a specified step and output field name."""

    item = self.get_step_outputs(step_id).get_value_obj(output_name)
    assert item is not None
    return item

get_step_outputs(self, step_id)

Return the outputs object for the pipeline.

Source code in kiara/pipeline/controller/__init__.py
def get_step_outputs(self, step_id: str) -> ValueSet:
    """Return the outputs object for the pipeline."""

    return self.pipeline.get_step_outputs(step_id)

pipeline_is_finished(self)

Return whether the pipeline has been processed successfully.

A True result means that every step of the pipeline has been processed successfully, and no pipeline input has changed since that happened.

Returns:

Type Description
bool

whether the pipeline was processed successfully (True) or not (False)

Source code in kiara/pipeline/controller/__init__.py
def pipeline_is_finished(self) -> bool:
    """Return whether the pipeline has been processed successfully.

    A ``True`` result means that every step of the pipeline has been processed successfully, and no pipeline input
    has changed since that happened.

    Returns:
        whether the pipeline was processed successfully (``True``) or not (``False``)
    """
    return self.pipeline.outputs.items_are_valid()

pipeline_is_ready(self)

Return whether the pipeline is ready to be processed.

A True result means that all pipeline inputs are set with valid values, and therefore every step within the pipeline can be processed.

Returns:

Type Description
bool

whether the pipeline can be processed as a whole (True) or not (False)

Source code in kiara/pipeline/controller/__init__.py
def pipeline_is_ready(self) -> bool:
    """Return whether the pipeline is ready to be processed.

    A ``True`` result means that all pipeline inputs are set with valid values, and therefore every step within the
    pipeline can be processed.

    Returns:
        whether the pipeline can be processed as a whole (``True``) or not (``False``)
    """

    return self.pipeline.inputs.items_are_valid()

process_pipeline(self)

Execute the connected pipeline end-to-end.

Controllers can elect to overwrite this method, but this is optional.

Source code in kiara/pipeline/controller/__init__.py
def process_pipeline(self):
    """Execute the connected pipeline end-to-end.

    Controllers can elect to overwrite this method, but this is optional.

    Returns:
    """
    raise NotImplementedError()

process_step(self, step_id, raise_exception=False, wait=False)

Kick off processing for the step with the provided id.

Parameters:

Name Type Description Default
step_id str

the id of the step that should be started

required
Source code in kiara/pipeline/controller/__init__.py
def process_step(
    self, step_id: str, raise_exception: bool = False, wait: bool = False
) -> str:
    """Kick off processing for the step with the provided id.

    Arguments:
        step_id: the id of the step that should be started
    """

    step_inputs = self.get_step_inputs(step_id)

    # if the inputs are not valid, ignore this step
    if not step_inputs.items_are_valid():
        status = step_inputs.check_invalid()
        assert status is not None
        raise Exception(
            f"Can't execute step '{step_id}', invalid inputs: {', '.join(status.keys())}"
        )

    # get the output 'holder' objects, which we'll need to pass to the module
    step_outputs = self.get_step_outputs(step_id)
    # get the module object that holds the code that will do the processing
    step = self.get_step(step_id)

    job_id = self._processor.start(
        pipeline_id=self.pipeline.id,
        pipeline_name=self.pipeline.title,
        step_id=step_id,
        module=step.module,
        inputs=step_inputs,
        outputs=step_outputs,
    )
    self._job_ids[step_id] = job_id

    if wait:
        self.wait_for_jobs(job_id, sync_outputs=True)

    return job_id

set_pipeline(self, pipeline)

Set the pipeline object for this controller.

Only one pipeline can be set, once.

Parameters:

Name Type Description Default
pipeline Pipeline

the pipeline object

required
Source code in kiara/pipeline/controller/__init__.py
def set_pipeline(self, pipeline: "Pipeline"):
    """Set the pipeline object for this controller.

    Only one pipeline can be set, once.

    Arguments:
        pipeline: the pipeline object
    """
    if self._pipeline is not None:
        raise Exception("Pipeline already set.")
    self._pipeline = pipeline

set_pipeline_inputs(self, **inputs)

Set one, several or all inputs for this pipeline.

Parameters:

Name Type Description Default
**inputs Any

the input values to set

{}
Source code in kiara/pipeline/controller/__init__.py
def set_pipeline_inputs(self, **inputs: typing.Any):
    """Set one, several or all inputs for this pipeline.

    Arguments:
        **inputs: the input values to set
    """

    _inputs = self._pipeline_input_hook(**inputs)
    self.pipeline_inputs.set_values(**_inputs)

step_is_finished(self, step_id)

Return whether the step with the provided id has been processed successfully.

A True result means that all output fields are currently set with valid values, and the inputs haven't changed since the last time processing was done.

Parameters:

Name Type Description Default
step_id str

the id of the step to check

required

Returns:

Type Description
bool

whether the step result is valid (True) or not (False)

Source code in kiara/pipeline/controller/__init__.py
def step_is_finished(self, step_id: str) -> bool:
    """Return whether the step with the provided id has been processed successfully.

    A ``True`` result means that all output fields are currently set with valid values, and the inputs haven't changed
    since the last time processing was done.

    Arguments:
        step_id: the id of the step to check

    Returns:
          whether the step result is valid (``True``) or not (``False``)
    """

    return self.get_step_outputs(step_id).items_are_valid()

step_is_ready(self, step_id)

Return whether the step with the provided id is ready to be processed.

A True result means that all input fields are currently set with valid values.

Parameters:

Name Type Description Default
step_id str

the id of the step to check

required

Returns:

Type Description
bool

whether the step is ready (True) or not (False)

Source code in kiara/pipeline/controller/__init__.py
def step_is_ready(self, step_id: str) -> bool:
    """Return whether the step with the provided id is ready to be processed.

    A ``True`` result means that all input fields are currently set with valid values.

    Arguments:
        step_id: the id of the step to check

    Returns:
        whether the step is ready (``True``) or not (``False``)
    """
    return self.get_step_inputs(step_id).items_are_valid()