kiara.pipeline.controller¶
PipelineController
¶
An object that controls how a Pipeline should react to events related to it's inputs/outputs.
This is the base for the central controller class that needs to be implemented by a Kiara frontend. The default implementation
that is used if no PipelineController
is provided in a Pipeline constructor
is the [BatchController][kiara.pipeline.controller.BatchController], which basically waits until all required inputs are
set, and then processes all pipeline steps in one go (in the right order).
The pipeline object to control can be set either in the constructor, or via the set_pipeline
method. But only once,
every subsequent attempt to set a pipeline will raise an Exception.
If you want to implement your own controller, you have to override at least one of the (empty) event hook methods:
- [
pipeline_inputs_changed
][kiara.pipeline.controller.PipelineController.pipeline_inputs_changed] - [
pipeline_outputs_changed
][kiara.pipeline.controller.PipelineController.pipeline_outputs_changed] - [
step_inputs_changed
][kiara.pipeline.controller.PipelineController.step_inputs_changed] - [
step_outputs_changed
][kiara.pipeline.controller.PipelineController.step_outputs_changed]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
Pipeline |
the pipeline object to control |
required |
pipeline: Pipeline
property
readonly
¶
Return the pipeline this controller, well, ...controls...
pipeline_inputs: ValueSet
property
writable
¶
Return the inputs object for this pipeline.
pipeline_outputs: ValueSet
property
readonly
¶
Return the (current) pipeline outputs object for this pipeline.
processing_stages: List[List[str]]
property
readonly
¶
Return the processing stage order of the pipeline.
Returns:
Type | Description |
---|---|
List[List[str]] |
a list of lists of step ids |
can_be_processed(self, step_id)
¶
Check whether the step with the provided id is ready to be processed.
Source code in kiara/pipeline/controller/__init__.py
def can_be_processed(self, step_id: str) -> bool:
"""Check whether the step with the provided id is ready to be processed."""
result = True
step_inputs = self.get_step_inputs(step_id=step_id)
for input_name in step_inputs.get_all_field_names():
value = step_inputs.get_value_obj(input_name)
if not value.item_is_valid():
result = False
break
return result
can_be_skipped(self, step_id)
¶
Check whether the processing of a step can be skipped.
Source code in kiara/pipeline/controller/__init__.py
def can_be_skipped(self, step_id: str) -> bool:
"""Check whether the processing of a step can be skipped."""
result = True
step = self.get_step(step_id)
if step.required:
result = self.can_be_processed(step_id)
return result
check_inputs_status(self)
¶
Check the status of all stages/steps, and whether they are ready to be processed.
The result dictionary uses stage numbers as keys, and a secondary dictionary as values which in turn uses the step_id as key, and a dict with details (keys: 'ready', 'invalid_inputs', 'required') as values.
Source code in kiara/pipeline/controller/__init__.py
def check_inputs_status(
self,
) -> typing.Mapping[int, typing.Mapping[str, typing.Mapping[str, typing.Any]]]:
"""Check the status of all stages/steps, and whether they are ready to be processed.
The result dictionary uses stage numbers as keys, and a secondary dictionary as values which in turn uses
the step_id as key, and a dict with details (keys: 'ready', 'invalid_inputs', 'required') as values.
"""
result: typing.Dict[int, typing.Dict[str, typing.Dict[str, typing.Any]]] = {}
for idx, stage in enumerate(self.processing_stages, start=1):
for step_id in stage:
if not self.can_be_processed(step_id):
if self.can_be_skipped(step_id):
result.setdefault(idx, {})[step_id] = {
"ready": True,
"required": False,
"invalid_inputs": [
f"{step_id}.{ii}" for ii in self.invalid_inputs(step_id)
],
}
else:
result.setdefault(idx, {})[step_id] = {
"ready": False,
"invalid_inputs": [
f"{step_id}.{ii}" for ii in self.invalid_inputs(step_id)
],
"required": True,
}
else:
if self.can_be_skipped(step_id):
result.setdefault(idx, {})[step_id] = {
"ready": True,
"required": True,
"invalid_inputs": [],
}
return result
get_current_pipeline_state(self)
¶
Return a description of the current pipeline state.
This methods creates a new [PipelineState][kiara.pipeline.pipeline.PipelineState] object when called, containing the pipeline structure as well as metadata about pipeline as well as step inputs and outputs.
Returns:
Type | Description |
---|---|
PipelineState |
an object outlining the current pipeline state |
Source code in kiara/pipeline/controller/__init__.py
def get_current_pipeline_state(self) -> "PipelineState":
"""Return a description of the current pipeline state.
This methods creates a new [PipelineState][kiara.pipeline.pipeline.PipelineState] object when called, containing
the pipeline structure as well as metadata about pipeline as well as step inputs and outputs.
Returns:
an object outlining the current pipeline state
"""
return self.pipeline.get_current_state()
get_job_details(self, step_or_job_id)
¶
Returns job details for a job id, or in case a step_id was provided, the last execution of this step.
Source code in kiara/pipeline/controller/__init__.py
def get_job_details(self, step_or_job_id: str) -> typing.Optional[Job]:
"""Returns job details for a job id, or in case a step_id was provided, the last execution of this step."""
if self._job_ids.get(step_or_job_id, None) is None:
return self._processor.get_job_details(step_or_job_id)
else:
return self._processor.get_job_details(self._job_ids[step_or_job_id])
get_step(self, step_id)
¶
Return the step object for the provided id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_id |
str |
the step id |
required |
Returns:
Type | Description |
---|---|
PipelineStep |
the step object |
Source code in kiara/pipeline/controller/__init__.py
def get_step(self, step_id: str) -> PipelineStep:
"""Return the step object for the provided id.
Arguments:
step_id: the step id
Returns:
the step object
"""
return self.pipeline.get_step(step_id)
get_step_input(self, step_id, input_name)
¶
Get the (current) input value for a specified step and input field name.
Source code in kiara/pipeline/controller/__init__.py
def get_step_input(self, step_id: str, input_name: str) -> Value:
"""Get the (current) input value for a specified step and input field name."""
item = self.get_step_inputs(step_id).get_value_obj(input_name)
assert item is not None
return item
get_step_inputs(self, step_id)
¶
Return the inputs object for the pipeline.
Source code in kiara/pipeline/controller/__init__.py
def get_step_inputs(self, step_id: str) -> ValueSet:
"""Return the inputs object for the pipeline."""
return self.pipeline.get_step_inputs(step_id)
get_step_output(self, step_id, output_name)
¶
Get the (current) output value for a specified step and output field name.
Source code in kiara/pipeline/controller/__init__.py
def get_step_output(self, step_id: str, output_name: str) -> Value:
"""Get the (current) output value for a specified step and output field name."""
item = self.get_step_outputs(step_id).get_value_obj(output_name)
assert item is not None
return item
get_step_outputs(self, step_id)
¶
Return the outputs object for the pipeline.
Source code in kiara/pipeline/controller/__init__.py
def get_step_outputs(self, step_id: str) -> ValueSet:
"""Return the outputs object for the pipeline."""
return self.pipeline.get_step_outputs(step_id)
pipeline_is_finished(self)
¶
Return whether the pipeline has been processed successfully.
A True
result means that every step of the pipeline has been processed successfully, and no pipeline input
has changed since that happened.
Returns:
Type | Description |
---|---|
bool |
whether the pipeline was processed successfully ( |
Source code in kiara/pipeline/controller/__init__.py
def pipeline_is_finished(self) -> bool:
"""Return whether the pipeline has been processed successfully.
A ``True`` result means that every step of the pipeline has been processed successfully, and no pipeline input
has changed since that happened.
Returns:
whether the pipeline was processed successfully (``True``) or not (``False``)
"""
return self.pipeline.outputs.items_are_valid()
pipeline_is_ready(self)
¶
Return whether the pipeline is ready to be processed.
A True
result means that all pipeline inputs are set with valid values, and therefore every step within the
pipeline can be processed.
Returns:
Type | Description |
---|---|
bool |
whether the pipeline can be processed as a whole ( |
Source code in kiara/pipeline/controller/__init__.py
def pipeline_is_ready(self) -> bool:
"""Return whether the pipeline is ready to be processed.
A ``True`` result means that all pipeline inputs are set with valid values, and therefore every step within the
pipeline can be processed.
Returns:
whether the pipeline can be processed as a whole (``True``) or not (``False``)
"""
return self.pipeline.inputs.items_are_valid()
process_pipeline(self)
¶
Execute the connected pipeline end-to-end.
Controllers can elect to overwrite this method, but this is optional.
Source code in kiara/pipeline/controller/__init__.py
def process_pipeline(self):
"""Execute the connected pipeline end-to-end.
Controllers can elect to overwrite this method, but this is optional.
Returns:
"""
raise NotImplementedError()
process_step(self, step_id, raise_exception=False, wait=False)
¶
Kick off processing for the step with the provided id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_id |
str |
the id of the step that should be started |
required |
Source code in kiara/pipeline/controller/__init__.py
def process_step(
self, step_id: str, raise_exception: bool = False, wait: bool = False
) -> str:
"""Kick off processing for the step with the provided id.
Arguments:
step_id: the id of the step that should be started
"""
step_inputs = self.get_step_inputs(step_id)
# if the inputs are not valid, ignore this step
if not step_inputs.items_are_valid():
status = step_inputs.check_invalid()
assert status is not None
raise Exception(
f"Can't execute step '{step_id}', invalid inputs: {', '.join(status.keys())}"
)
# get the output 'holder' objects, which we'll need to pass to the module
step_outputs = self.get_step_outputs(step_id)
# get the module object that holds the code that will do the processing
step = self.get_step(step_id)
job_id = self._processor.start(
pipeline_id=self.pipeline.id,
pipeline_name=self.pipeline.title,
step_id=step_id,
module=step.module,
inputs=step_inputs,
outputs=step_outputs,
)
self._job_ids[step_id] = job_id
if wait:
self.wait_for_jobs(job_id, sync_outputs=True)
return job_id
set_pipeline(self, pipeline)
¶
Set the pipeline object for this controller.
Only one pipeline can be set, once.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
Pipeline |
the pipeline object |
required |
Source code in kiara/pipeline/controller/__init__.py
def set_pipeline(self, pipeline: "Pipeline"):
"""Set the pipeline object for this controller.
Only one pipeline can be set, once.
Arguments:
pipeline: the pipeline object
"""
if self._pipeline is not None:
raise Exception("Pipeline already set.")
self._pipeline = pipeline
set_pipeline_inputs(self, **inputs)
¶
Set one, several or all inputs for this pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**inputs |
Any |
the input values to set |
{} |
Source code in kiara/pipeline/controller/__init__.py
def set_pipeline_inputs(self, **inputs: typing.Any):
"""Set one, several or all inputs for this pipeline.
Arguments:
**inputs: the input values to set
"""
_inputs = self._pipeline_input_hook(**inputs)
self.pipeline_inputs.set_values(**_inputs)
step_is_finished(self, step_id)
¶
Return whether the step with the provided id has been processed successfully.
A True
result means that all output fields are currently set with valid values, and the inputs haven't changed
since the last time processing was done.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_id |
str |
the id of the step to check |
required |
Returns:
Type | Description |
---|---|
bool |
whether the step result is valid ( |
Source code in kiara/pipeline/controller/__init__.py
def step_is_finished(self, step_id: str) -> bool:
"""Return whether the step with the provided id has been processed successfully.
A ``True`` result means that all output fields are currently set with valid values, and the inputs haven't changed
since the last time processing was done.
Arguments:
step_id: the id of the step to check
Returns:
whether the step result is valid (``True``) or not (``False``)
"""
return self.get_step_outputs(step_id).items_are_valid()
step_is_ready(self, step_id)
¶
Return whether the step with the provided id is ready to be processed.
A True
result means that all input fields are currently set with valid values.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_id |
str |
the id of the step to check |
required |
Returns:
Type | Description |
---|---|
bool |
whether the step is ready ( |
Source code in kiara/pipeline/controller/__init__.py
def step_is_ready(self, step_id: str) -> bool:
"""Return whether the step with the provided id is ready to be processed.
A ``True`` result means that all input fields are currently set with valid values.
Arguments:
step_id: the id of the step to check
Returns:
whether the step is ready (``True``) or not (``False``)
"""
return self.get_step_inputs(step_id).items_are_valid()