Skip to content

kiara.interfaces.python_api.controller

ApiController

A pipeline controller for pipelines generated by the kiara Python API.

ensure_step(self, step_id)

Ensure a step has valid outputs.

Returns:

Type Description
bool

'True' if the step now has valid outputs, 'False` if that's not possible because of invalid/missing inputs

Source code in kiara/interfaces/python_api/controller.py
def ensure_step(self, step_id: str) -> bool:
    """Ensure a step has valid outputs.

    Returns:
        'True' if the step now has valid outputs, 'False` if that's not possible because of invalid/missing inputs
    """

    if step_id not in self.pipeline.step_ids:
        raise Exception(f"No step with id: {step_id}")

    outputs = self.get_step_outputs(step_id=step_id)
    if outputs.items_are_valid():
        return True

    if self._is_running:
        log.debug("Pipeline running, doing nothing.")
        raise Exception("Pipeline already running.")

    self._is_running = True
    try:
        for stage in self.processing_stages:
            job_ids: typing.List[str] = []
            finished = False
            if step_id in stage:
                finished = True
                job_id = self._process(step_id=step_id)
                if job_id is None:
                    return True
                if not job_id:
                    return False
                job_ids.append(job_id)  # type: ignore
            else:
                for s_id in stage:
                    job_id = self._process(step_id=s_id)
                    if job_id is None:
                        continue
                    elif job_id is False:
                        return False

                    job_ids.append(job_id)  # type: ignore

            self._processor.wait_for(*job_ids)
            if finished:
                break

    finally:
        self._is_running = False

    return True

pipeline_inputs_changed(self, event)

Method to override if the implementing controller needs to react to events where one or several pipeline inputs have changed.

Note

Whenever pipeline inputs change, the connected step inputs also change and an (extra) event will be fired for those. Which means you can choose to only implement the step_inputs_changed method if you want to. This behaviour might change in the future.

Parameters:

Name Type Description Default
event PipelineInputEvent

the pipeline input event

required
Source code in kiara/interfaces/python_api/controller.py
def pipeline_inputs_changed(self, event: PipelineInputEvent):

    if self._is_running:
        raise NotImplementedError()

pipeline_outputs_changed(self, event)

Method to override if the implementing controller needs to react to events where one or several pipeline outputs have changed.

Parameters:

Name Type Description Default
event PipelineOutputEvent

the pipeline output event

required
Source code in kiara/interfaces/python_api/controller.py
def pipeline_outputs_changed(self, event: "PipelineOutputEvent"):

    if self.pipeline_is_finished():
        # TODO: check if something is running
        self._is_running = False

process_pipeline(self)

Execute the connected pipeline end-to-end.

Controllers can elect to overwrite this method, but this is optional.

Source code in kiara/interfaces/python_api/controller.py
def process_pipeline(self):

    if self._is_running:
        log.debug("Pipeline running, doing nothing.")
        raise Exception("Pipeline already running.")

    self._is_running = True
    try:
        for stage in self.processing_stages:
            job_ids = []
            for step_id in stage:
                if not self.can_be_processed(step_id):
                    if self.can_be_skipped(step_id):
                        continue
                    else:
                        # from kiara.utils.output import rich_print
                        # rich_print(self.pipeline.get_current_state())
                        raise Exception(
                            f"Required pipeline step '{step_id}' can't be processed, inputs not ready yet: {', '.join(self.invalid_inputs(step_id))}"
                        )
                try:
                    if not self.get_step_outputs(step_id).items_are_valid():

                        job_id = self.process_step(step_id)
                        job_ids.append(job_id)
                except Exception as e:
                    # TODO: cancel running jobs?
                    log.error(
                        f"Processing of step '{step_id}' from pipeline '{self.pipeline.id}' failed: {e}"
                    )
                    return False
            self._processor.wait_for(*job_ids)
    finally:
        self._is_running = False

step_inputs_changed(self, event)

Method to override if the implementing controller needs to react to events where one or several step inputs have changed.

Parameters:

Name Type Description Default
event StepInputEvent

the step input event

required
Source code in kiara/interfaces/python_api/controller.py
def step_inputs_changed(self, event: StepInputEvent):

    for step_id in event.updated_step_inputs.keys():
        # outputs = self.get_step_outputs(step_id)
        raise NotImplementedError()
        # outputs.invalidate()