kiara.pipeline.controller.batch¶
BatchController
¶
A [PipelineController][kiara.pipeline.controller.PipelineController] that executes all pipeline steps non-interactively.
This is the default implementation of a PipelineController
, and probably the most simple implementation of one.
It waits until all inputs are set, after which it executes all pipeline steps in the required order.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
|
the pipeline to control |
required |
auto_process |
|
whether to automatically start processing the pipeline as soon as the input set is valid |
required |
pipeline_outputs_changed(self, event)
¶
Method to override if the implementing controller needs to react to events where one or several pipeline outputs have changed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
PipelineOutputEvent |
the pipeline output event |
required |
Source code in kiara/pipeline/controller/batch.py
def pipeline_outputs_changed(self, event: "PipelineOutputEvent"):
if self.pipeline_is_finished():
# TODO: check if something is running
self._is_running = False
process_pipeline(self)
¶
Execute the connected pipeline end-to-end.
Controllers can elect to overwrite this method, but this is optional.
Source code in kiara/pipeline/controller/batch.py
def process_pipeline(self):
if self._is_running:
log.debug("Pipeline running, doing nothing.")
raise Exception("Pipeline already running.")
self._is_running = True
try:
for stage in self.processing_stages:
job_ids = []
for step_id in stage:
if not self.can_be_processed(step_id):
if self.can_be_skipped(step_id):
continue
else:
raise Exception(
f"Required pipeline step '{step_id}' can't be processed, inputs not ready yet: {', '.join(self.invalid_inputs(step_id))}"
)
try:
job_id = self.process_step(step_id)
job_ids.append(job_id)
except Exception as e:
# TODO: cancel running jobs?
if is_debug():
import traceback
traceback.print_exc()
log.error(
f"Processing of step '{step_id}' from pipeline '{self.pipeline.id}' failed: {e}"
)
return False
self._processor.wait_for(*job_ids)
# for j_id in job_ids:
# job = self._processor.get_job_details(j_id)
# assert job is not None
# if job.error:
# print(job.error)
finally:
self._is_running = False
step_inputs_changed(self, event)
¶
Method to override if the implementing controller needs to react to events where one or several step inputs have changed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
StepInputEvent |
the step input event |
required |
Source code in kiara/pipeline/controller/batch.py
def step_inputs_changed(self, event: "StepInputEvent"):
if self._is_running:
log.debug("Pipeline running, doing nothing.")
return
if not self.pipeline_is_ready():
log.debug(f"Pipeline not ready after input event: {event}")
return
if self._auto_process:
self.process_pipeline()
BatchControllerManual
¶
A [PipelineController][kiara.pipeline.controller.PipelineController] that executes all pipeline steps non-interactively.
This is the default implementation of a PipelineController
, and probably the most simple implementation of one.
It waits until all inputs are set, after which it executes all pipeline steps in the required order.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
|
the pipeline to control |
required |
auto_process |
|
whether to automatically start processing the pipeline as soon as the input set is valid |
required |
pipeline_inputs_changed(self, event)
¶
Method to override if the implementing controller needs to react to events where one or several pipeline inputs have changed.
Note
Whenever pipeline inputs change, the connected step inputs also change and an (extra) event will be fired for those. Which means
you can choose to only implement the step_inputs_changed
method if you want to. This behaviour might change in the future.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
PipelineInputEvent |
the pipeline input event |
required |
Source code in kiara/pipeline/controller/batch.py
def pipeline_inputs_changed(self, event: "PipelineInputEvent"):
self._finished_until = None
# print("============================")
# min_stage_to_clear = sys.maxsize
# for inp in event.updated_pipeline_inputs:
# stage = self.pipeline.get_stage_for_pipeline_input(inp)
# if stage < min_stage_to_clear:
# min_stage_to_clear = stage
#
# for stage, steps in self.pipeline.get_steps_by_stage().items():
# if stage < min_stage_to_clear:
# continue
# for step_id, step in steps.items():
# step_inputs = self.get_step_inputs(step_id)
# empty = { k: None for k in step_inputs.keys() }
# step_inputs.set_values(**empty)
# step_outputs = self.get_step_outputs(step_id)
# empty = { k: None for k in step_outputs.keys() }
# step_outputs.set_values(**empty)
pipeline_outputs_changed(self, event)
¶
Method to override if the implementing controller needs to react to events where one or several pipeline outputs have changed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
PipelineOutputEvent |
the pipeline output event |
required |
Source code in kiara/pipeline/controller/batch.py
def pipeline_outputs_changed(self, event: "PipelineOutputEvent"):
if self.pipeline_is_finished():
# TODO: check if something is running
self._is_running = False
process_pipeline(self)
¶
Execute the connected pipeline end-to-end.
Controllers can elect to overwrite this method, but this is optional.
Source code in kiara/pipeline/controller/batch.py
def process_pipeline(self):
if self._is_running:
log.debug("Pipeline running, doing nothing.")
raise Exception("Pipeline already running.")
self._is_running = True
try:
for stage in self.processing_stages:
job_ids = []
for step_id in stage:
if not self.can_be_processed(step_id):
if self.can_be_skipped(step_id):
continue
else:
raise Exception(
f"Required pipeline step '{step_id}' can't be processed, inputs not ready yet: {', '.join(self.invalid_inputs(step_id))}"
)
try:
job_id = self.process_step(step_id)
job_ids.append(job_id)
except Exception as e:
# TODO: cancel running jobs?
if is_debug():
import traceback
traceback.print_stack()
log.error(
f"Processing of step '{step_id}' from pipeline '{self.pipeline.title}' failed: {e}"
)
return False
self._processor.wait_for(*job_ids)
finally:
self._is_running = False
step_inputs_changed(self, event)
¶
Method to override if the implementing controller needs to react to events where one or several step inputs have changed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
StepInputEvent |
the step input event |
required |
Source code in kiara/pipeline/controller/batch.py
def step_inputs_changed(self, event: "StepInputEvent"):
if self._is_running:
log.debug("Pipeline running, doing nothing.")
return
if not self.pipeline_is_ready():
log.debug(f"Pipeline not ready after input event: {event}")
return