kiara.interfaces.python_api.controller¶
ApiController
¶
A pipeline controller for pipelines generated by the kiara Python API.
ensure_step(self, step_id)
¶
Ensure a step has valid outputs.
Returns:
Type | Description |
---|---|
bool |
'True' if the step now has valid outputs, 'False` if that's not possible because of invalid/missing inputs |
Source code in kiara/interfaces/python_api/controller.py
def ensure_step(self, step_id: str) -> bool:
"""Ensure a step has valid outputs.
Returns:
'True' if the step now has valid outputs, 'False` if that's not possible because of invalid/missing inputs
"""
if step_id not in self.pipeline.step_ids:
raise Exception(f"No step with id: {step_id}")
outputs = self.get_step_outputs(step_id=step_id)
if outputs.items_are_valid():
return True
if self._is_running:
log.debug("Pipeline running, doing nothing.")
raise Exception("Pipeline already running.")
self._is_running = True
try:
for stage in self.processing_stages:
job_ids: typing.List[str] = []
finished = False
if step_id in stage:
finished = True
job_id = self._process(step_id=step_id)
if job_id is None:
return True
if not job_id:
return False
job_ids.append(job_id) # type: ignore
else:
for s_id in stage:
job_id = self._process(step_id=s_id)
if job_id is None:
continue
elif job_id is False:
return False
job_ids.append(job_id) # type: ignore
self._processor.wait_for(*job_ids)
if finished:
break
finally:
self._is_running = False
return True
pipeline_inputs_changed(self, event)
¶
Method to override if the implementing controller needs to react to events where one or several pipeline inputs have changed.
Note
Whenever pipeline inputs change, the connected step inputs also change and an (extra) event will be fired for those. Which means
you can choose to only implement the step_inputs_changed
method if you want to. This behaviour might change in the future.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
PipelineInputEvent |
the pipeline input event |
required |
Source code in kiara/interfaces/python_api/controller.py
def pipeline_inputs_changed(self, event: PipelineInputEvent):
if self._is_running:
raise NotImplementedError()
pipeline_outputs_changed(self, event)
¶
Method to override if the implementing controller needs to react to events where one or several pipeline outputs have changed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
PipelineOutputEvent |
the pipeline output event |
required |
Source code in kiara/interfaces/python_api/controller.py
def pipeline_outputs_changed(self, event: "PipelineOutputEvent"):
if self.pipeline_is_finished():
# TODO: check if something is running
self._is_running = False
process_pipeline(self)
¶
Execute the connected pipeline end-to-end.
Controllers can elect to overwrite this method, but this is optional.
Source code in kiara/interfaces/python_api/controller.py
def process_pipeline(self):
if self._is_running:
log.debug("Pipeline running, doing nothing.")
raise Exception("Pipeline already running.")
self._is_running = True
try:
for stage in self.processing_stages:
job_ids = []
for step_id in stage:
if not self.can_be_processed(step_id):
if self.can_be_skipped(step_id):
continue
else:
# from kiara.utils.output import rich_print
# rich_print(self.pipeline.get_current_state())
raise Exception(
f"Required pipeline step '{step_id}' can't be processed, inputs not ready yet: {', '.join(self.invalid_inputs(step_id))}"
)
try:
if not self.get_step_outputs(step_id).items_are_valid():
job_id = self.process_step(step_id)
job_ids.append(job_id)
except Exception as e:
# TODO: cancel running jobs?
log.error(
f"Processing of step '{step_id}' from pipeline '{self.pipeline.id}' failed: {e}"
)
return False
self._processor.wait_for(*job_ids)
finally:
self._is_running = False
step_inputs_changed(self, event)
¶
Method to override if the implementing controller needs to react to events where one or several step inputs have changed.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
StepInputEvent |
the step input event |
required |
Source code in kiara/interfaces/python_api/controller.py
def step_inputs_changed(self, event: StepInputEvent):
for step_id in event.updated_step_inputs.keys():
# outputs = self.get_step_outputs(step_id)
raise NotImplementedError()
# outputs.invalidate()