Skip to content

kiara.pipeline.controller.batch

BatchController

A [PipelineController][kiara.pipeline.controller.PipelineController] that executes all pipeline steps non-interactively.

This is the default implementation of a PipelineController, and probably the most simple implementation of one. It waits until all inputs are set, after which it executes all pipeline steps in the required order.

Parameters:

Name Type Description Default
pipeline

the pipeline to control

required
auto_process

whether to automatically start processing the pipeline as soon as the input set is valid

required

pipeline_outputs_changed(self, event)

Method to override if the implementing controller needs to react to events where one or several pipeline outputs have changed.

Parameters:

Name Type Description Default
event PipelineOutputEvent

the pipeline output event

required
Source code in kiara/pipeline/controller/batch.py
def pipeline_outputs_changed(self, event: "PipelineOutputEvent"):

    if self.pipeline_is_finished():
        # TODO: check if something is running
        self._is_running = False

process_pipeline(self)

Execute the connected pipeline end-to-end.

Controllers can elect to overwrite this method, but this is optional.

Source code in kiara/pipeline/controller/batch.py
def process_pipeline(self):

    if self._is_running:
        log.debug("Pipeline running, doing nothing.")
        raise Exception("Pipeline already running.")

    self._is_running = True
    try:
        for stage in self.processing_stages:
            job_ids = []
            for step_id in stage:

                if not self.can_be_processed(step_id):
                    if self.can_be_skipped(step_id):
                        continue
                    else:
                        raise Exception(
                            f"Required pipeline step '{step_id}' can't be processed, inputs not ready yet: {', '.join(self.invalid_inputs(step_id))}"
                        )
                try:
                    job_id = self.process_step(step_id)
                    job_ids.append(job_id)
                except Exception as e:
                    # TODO: cancel running jobs?
                    if is_debug():
                        import traceback

                        traceback.print_exc()
                    log.error(
                        f"Processing of step '{step_id}' from pipeline '{self.pipeline.id}' failed: {e}"
                    )
                    return False

            self._processor.wait_for(*job_ids)
            # for j_id in job_ids:
            #     job = self._processor.get_job_details(j_id)
            #     assert job is not None
            #     if job.error:
            #         print(job.error)
    finally:
        self._is_running = False

step_inputs_changed(self, event)

Method to override if the implementing controller needs to react to events where one or several step inputs have changed.

Parameters:

Name Type Description Default
event StepInputEvent

the step input event

required
Source code in kiara/pipeline/controller/batch.py
def step_inputs_changed(self, event: "StepInputEvent"):

    if self._is_running:
        log.debug("Pipeline running, doing nothing.")
        return

    if not self.pipeline_is_ready():
        log.debug(f"Pipeline not ready after input event: {event}")
        return

    if self._auto_process:
        self.process_pipeline()

BatchControllerManual

A [PipelineController][kiara.pipeline.controller.PipelineController] that executes all pipeline steps non-interactively.

This is the default implementation of a PipelineController, and probably the most simple implementation of one. It waits until all inputs are set, after which it executes all pipeline steps in the required order.

Parameters:

Name Type Description Default
pipeline

the pipeline to control

required
auto_process

whether to automatically start processing the pipeline as soon as the input set is valid

required

pipeline_inputs_changed(self, event)

Method to override if the implementing controller needs to react to events where one or several pipeline inputs have changed.

Note

Whenever pipeline inputs change, the connected step inputs also change and an (extra) event will be fired for those. Which means you can choose to only implement the step_inputs_changed method if you want to. This behaviour might change in the future.

Parameters:

Name Type Description Default
event PipelineInputEvent

the pipeline input event

required
Source code in kiara/pipeline/controller/batch.py
def pipeline_inputs_changed(self, event: "PipelineInputEvent"):

    self._finished_until = None
    # print("============================")
    # min_stage_to_clear = sys.maxsize
    # for inp in event.updated_pipeline_inputs:
    #     stage = self.pipeline.get_stage_for_pipeline_input(inp)
    #     if stage < min_stage_to_clear:
    #         min_stage_to_clear = stage
    #
    # for stage, steps in self.pipeline.get_steps_by_stage().items():
    #     if stage < min_stage_to_clear:
    #         continue
    #     for step_id, step in steps.items():
    #         step_inputs = self.get_step_inputs(step_id)
    #         empty = { k: None for k in step_inputs.keys() }
    #         step_inputs.set_values(**empty)
    #         step_outputs = self.get_step_outputs(step_id)
    #         empty = { k: None for k in step_outputs.keys() }
    #         step_outputs.set_values(**empty)

pipeline_outputs_changed(self, event)

Method to override if the implementing controller needs to react to events where one or several pipeline outputs have changed.

Parameters:

Name Type Description Default
event PipelineOutputEvent

the pipeline output event

required
Source code in kiara/pipeline/controller/batch.py
def pipeline_outputs_changed(self, event: "PipelineOutputEvent"):

    if self.pipeline_is_finished():
        # TODO: check if something is running
        self._is_running = False

process_pipeline(self)

Execute the connected pipeline end-to-end.

Controllers can elect to overwrite this method, but this is optional.

Source code in kiara/pipeline/controller/batch.py
def process_pipeline(self):

    if self._is_running:
        log.debug("Pipeline running, doing nothing.")
        raise Exception("Pipeline already running.")

    self._is_running = True
    try:
        for stage in self.processing_stages:
            job_ids = []
            for step_id in stage:
                if not self.can_be_processed(step_id):
                    if self.can_be_skipped(step_id):
                        continue
                    else:
                        raise Exception(
                            f"Required pipeline step '{step_id}' can't be processed, inputs not ready yet: {', '.join(self.invalid_inputs(step_id))}"
                        )
                try:
                    job_id = self.process_step(step_id)
                    job_ids.append(job_id)
                except Exception as e:
                    # TODO: cancel running jobs?
                    if is_debug():
                        import traceback

                        traceback.print_stack()
                    log.error(
                        f"Processing of step '{step_id}' from pipeline '{self.pipeline.title}' failed: {e}"
                    )
                    return False
            self._processor.wait_for(*job_ids)
    finally:
        self._is_running = False

step_inputs_changed(self, event)

Method to override if the implementing controller needs to react to events where one or several step inputs have changed.

Parameters:

Name Type Description Default
event StepInputEvent

the step input event

required
Source code in kiara/pipeline/controller/batch.py
def step_inputs_changed(self, event: "StepInputEvent"):

    if self._is_running:
        log.debug("Pipeline running, doing nothing.")
        return

    if not self.pipeline_is_ready():
        log.debug(f"Pipeline not ready after input event: {event}")
        return