Skip to content

controller

Attributes

logger = structlog.getLogger() module-attribute

Classes

PipelineController

Bases: PipelineListener

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/models/module/pipeline/controller.py
22
23
24
class PipelineController(PipelineListener):

    pass

SinglePipelineController

Bases: PipelineController

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/models/module/pipeline/controller.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class SinglePipelineController(PipelineController):
    def __init__(
        self, job_registry: JobRegistry, pipeline: Union[Pipeline, None] = None
    ):

        self._pipeline: Union[Pipeline, None] = None
        self._job_registry: JobRegistry = job_registry
        self._pipeline_details: Union[PipelineState, None] = None

        if pipeline is not None:
            self.pipeline = pipeline

    @property
    def pipeline(self) -> Pipeline:

        if self._pipeline is None:
            raise Exception("Pipeline not set (yet).")
        return self._pipeline

    @pipeline.setter
    def pipeline(self, pipeline: Pipeline):

        if self._pipeline is not None:
            # TODO: destroy object?
            self._pipeline._listeners.clear()

        self._pipeline = pipeline
        if self._pipeline is not None:
            self._pipeline.add_listener(self)

    def current_pipeline_state(self) -> PipelineState:

        if self._pipeline_details is None:
            self._pipeline_details = self.pipeline.get_pipeline_details()
        return self._pipeline_details

    def can_be_processed(self, step_id: str) -> bool:
        """Check whether the step with the provided id is ready to be processed."""
        pipeline_state = self.current_pipeline_state()
        step_state = pipeline_state.step_states[step_id]

        return not step_state.invalid_details

    def can_be_skipped(self, step_id: str) -> bool:
        """Check whether the processing of a step can be skipped."""
        required = self.pipeline.structure.step_is_required(step_id=step_id)
        if required:
            required = self.can_be_processed(step_id)
        return required

    def _pipeline_event_occurred(self, event: PipelineEvent):

        if event.pipeline_id != self.pipeline.pipeline_id:
            return

        self._pipeline_details = None

    def set_processing_results(
        self, job_ids: Mapping[str, uuid.UUID]
    ) -> Mapping[uuid.UUID, uuid.UUID]:
        """
        Set the processing results as values of the approrpiate step outputs.

        Returns:
        -------
            a dict with the result value id as key, and the id of the job that produced it as value
        """
        self._job_registry.wait_for(*job_ids.values())

        result: Dict[uuid.UUID, uuid.UUID] = {}
        combined_outputs = {}
        for step_id, job_id in job_ids.items():
            record = self._job_registry.get_job_record(job_id=job_id)
            if record is None:
                continue
            combined_outputs[step_id] = record.outputs
            for output_id in record.outputs.values():
                result[output_id] = job_id

        self.pipeline.set_multiple_step_outputs(
            changed_outputs=combined_outputs, notify_listeners=True
        )

        return result

    def pipeline_is_ready(self) -> bool:
        """
        Return whether the pipeline is ready to be processed.

        A ``True`` result means that all pipeline inputs are set with valid values, and therefore every step within the
        pipeline can be processed.

        Returns:
        -------
            whether the pipeline can be processed as a whole (``True``) or not (``False``)
        """
        pipeline_inputs = self.pipeline._all_values.get_alias("pipeline.inputs")
        assert pipeline_inputs is not None
        return pipeline_inputs.all_items_valid

    def process_step(self, step_id: str, wait: bool = False) -> uuid.UUID:
        """
        Kick off processing for the step with the provided id.

        Arguments:
        ---------
            step_id: the id of the step that should be started
        """
        job_config = self.pipeline.create_job_config_for_step(step_id)

        job_metadata = {"is_pipeline_step": True, "step_id": step_id}
        job_id = self._job_registry.execute_job(
            job_config=job_config, job_metadata=job_metadata
        )
        # job_id = self._processor.create_job(job_config=job_config)
        # self._processor.queue_job(job_id=job_id)

        if wait:
            self._job_registry.wait_for(job_id)

        return job_id

Attributes

pipeline: Pipeline property writable

Functions

current_pipeline_state() -> PipelineState
Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/models/module/pipeline/controller.py
57
58
59
60
61
def current_pipeline_state(self) -> PipelineState:

    if self._pipeline_details is None:
        self._pipeline_details = self.pipeline.get_pipeline_details()
    return self._pipeline_details
can_be_processed(step_id: str) -> bool

Check whether the step with the provided id is ready to be processed.

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/models/module/pipeline/controller.py
63
64
65
66
67
68
def can_be_processed(self, step_id: str) -> bool:
    """Check whether the step with the provided id is ready to be processed."""
    pipeline_state = self.current_pipeline_state()
    step_state = pipeline_state.step_states[step_id]

    return not step_state.invalid_details
can_be_skipped(step_id: str) -> bool

Check whether the processing of a step can be skipped.

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/models/module/pipeline/controller.py
70
71
72
73
74
75
def can_be_skipped(self, step_id: str) -> bool:
    """Check whether the processing of a step can be skipped."""
    required = self.pipeline.structure.step_is_required(step_id=step_id)
    if required:
        required = self.can_be_processed(step_id)
    return required
set_processing_results(job_ids: Mapping[str, uuid.UUID]) -> Mapping[uuid.UUID, uuid.UUID]

Set the processing results as values of the approrpiate step outputs.


a dict with the result value id as key, and the id of the job that produced it as value
Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/models/module/pipeline/controller.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def set_processing_results(
    self, job_ids: Mapping[str, uuid.UUID]
) -> Mapping[uuid.UUID, uuid.UUID]:
    """
    Set the processing results as values of the approrpiate step outputs.

    Returns:
    -------
        a dict with the result value id as key, and the id of the job that produced it as value
    """
    self._job_registry.wait_for(*job_ids.values())

    result: Dict[uuid.UUID, uuid.UUID] = {}
    combined_outputs = {}
    for step_id, job_id in job_ids.items():
        record = self._job_registry.get_job_record(job_id=job_id)
        if record is None:
            continue
        combined_outputs[step_id] = record.outputs
        for output_id in record.outputs.values():
            result[output_id] = job_id

    self.pipeline.set_multiple_step_outputs(
        changed_outputs=combined_outputs, notify_listeners=True
    )

    return result
pipeline_is_ready() -> bool

Return whether the pipeline is ready to be processed.

A True result means that all pipeline inputs are set with valid values, and therefore every step within the pipeline can be processed.


whether the pipeline can be processed as a whole (``True``) or not (``False``)
Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/models/module/pipeline/controller.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def pipeline_is_ready(self) -> bool:
    """
    Return whether the pipeline is ready to be processed.

    A ``True`` result means that all pipeline inputs are set with valid values, and therefore every step within the
    pipeline can be processed.

    Returns:
    -------
        whether the pipeline can be processed as a whole (``True``) or not (``False``)
    """
    pipeline_inputs = self.pipeline._all_values.get_alias("pipeline.inputs")
    assert pipeline_inputs is not None
    return pipeline_inputs.all_items_valid
process_step(step_id: str, wait: bool = False) -> uuid.UUID

Kick off processing for the step with the provided id.


step_id: the id of the step that should be started
Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/models/module/pipeline/controller.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def process_step(self, step_id: str, wait: bool = False) -> uuid.UUID:
    """
    Kick off processing for the step with the provided id.

    Arguments:
    ---------
        step_id: the id of the step that should be started
    """
    job_config = self.pipeline.create_job_config_for_step(step_id)

    job_metadata = {"is_pipeline_step": True, "step_id": step_id}
    job_id = self._job_registry.execute_job(
        job_config=job_config, job_metadata=job_metadata
    )
    # job_id = self._processor.create_job(job_config=job_config)
    # self._processor.queue_job(job_id=job_id)

    if wait:
        self._job_registry.wait_for(job_id)

    return job_id

SinglePipelineBatchController

Bases: SinglePipelineController

A [PipelineController][kiara.models.modules.pipeline.controller.PipelineController] that executes all pipeline steps non-interactively.

This is the default implementation of a PipelineController, and probably the most simple implementation of one. It waits until all inputs are set, after which it executes all pipeline steps in the required order.


pipeline: the pipeline to control
auto_process: whether to automatically start processing the pipeline as soon as the input set is valid
Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/models/module/pipeline/controller.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
class SinglePipelineBatchController(SinglePipelineController):

    """
    A [PipelineController][kiara.models.modules.pipeline.controller.PipelineController] that executes all pipeline steps non-interactively.

    This is the default implementation of a ``PipelineController``, and probably the most simple implementation of one.
    It waits until all inputs are set, after which it executes all pipeline steps in the required order.

    Arguments:
    ---------
        pipeline: the pipeline to control
        auto_process: whether to automatically start processing the pipeline as soon as the input set is valid
    """

    def __init__(
        self,
        pipeline: Pipeline,
        job_registry: JobRegistry,
        auto_process: bool = True,
    ):

        self._auto_process: bool = auto_process
        self._is_running: bool = False
        super().__init__(pipeline=pipeline, job_registry=job_registry)

    @property
    def auto_process(self) -> bool:
        return self._auto_process

    @auto_process.setter
    def auto_process(self, auto_process: bool):
        self._auto_process = auto_process

    def process_pipeline(
        self, event_callback: Union[Callable, None] = None
    ) -> Mapping[str, Union[uuid.UUID, Exception]]:

        log = logger.bind(pipeline_id=self.pipeline.pipeline_id)
        if self._is_running:
            log.debug(
                "ignore.pipeline_process",
                reason="Pipeline already running.",
            )
            raise Exception("Pipeline already running.")

        log.debug("execute.pipeline")
        self._is_running = True
        all_job_ids: Dict[str, Union[Exception, uuid.UUID]] = {}
        try:
            stages = PipelineStage.extract_stages(
                self.pipeline.structure, stages_extraction_type="early"
            )
            for idx, stage in enumerate(stages, start=1):

                if event_callback:
                    event_callback(f"start processing pipeline stage: {idx}")

                log.debug(
                    "execute.pipeline.stage",
                    stage=idx,
                )

                job_ids = {}
                for step_id in stage:
                    if event_callback:
                        event_callback(f"start processing pipeline step: {step_id}")

                    log.debug(
                        "execute.pipeline.step",
                        step_id=step_id,
                    )

                    try:
                        job_id = self.process_step(step_id)
                        job_ids[step_id] = job_id
                        if event_callback:
                            event_callback(f"finished processing step '{step_id}'")
                    except Exception as e:
                        all_job_ids[step_id] = e
                        # TODO: cancel running jobs?
                        log_exception(e)
                        log.error(
                            "error.processing.pipeline",
                            step_id=step_id,
                            error=e,
                        )
                        if event_callback:
                            event_callback(f"Error processing step '{step_id}': {e}")

                self.set_processing_results(job_ids=job_ids)
                log.debug(
                    "execute_finished.pipeline.stage",
                    stage=idx,
                )
                if event_callback:
                    event_callback(f"finished processing pipeline stage: {idx}")
                all_job_ids.update(job_ids)

        finally:
            self._is_running = False

        log.debug("execute_finished.pipeline")
        if event_callback:
            event_callback("finished processing pipeline")
        return all_job_ids

Attributes

auto_process: bool property writable

Functions

process_pipeline(event_callback: Union[Callable, None] = None) -> Mapping[str, Union[uuid.UUID, Exception]]
Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/models/module/pipeline/controller.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def process_pipeline(
    self, event_callback: Union[Callable, None] = None
) -> Mapping[str, Union[uuid.UUID, Exception]]:

    log = logger.bind(pipeline_id=self.pipeline.pipeline_id)
    if self._is_running:
        log.debug(
            "ignore.pipeline_process",
            reason="Pipeline already running.",
        )
        raise Exception("Pipeline already running.")

    log.debug("execute.pipeline")
    self._is_running = True
    all_job_ids: Dict[str, Union[Exception, uuid.UUID]] = {}
    try:
        stages = PipelineStage.extract_stages(
            self.pipeline.structure, stages_extraction_type="early"
        )
        for idx, stage in enumerate(stages, start=1):

            if event_callback:
                event_callback(f"start processing pipeline stage: {idx}")

            log.debug(
                "execute.pipeline.stage",
                stage=idx,
            )

            job_ids = {}
            for step_id in stage:
                if event_callback:
                    event_callback(f"start processing pipeline step: {step_id}")

                log.debug(
                    "execute.pipeline.step",
                    step_id=step_id,
                )

                try:
                    job_id = self.process_step(step_id)
                    job_ids[step_id] = job_id
                    if event_callback:
                        event_callback(f"finished processing step '{step_id}'")
                except Exception as e:
                    all_job_ids[step_id] = e
                    # TODO: cancel running jobs?
                    log_exception(e)
                    log.error(
                        "error.processing.pipeline",
                        step_id=step_id,
                        error=e,
                    )
                    if event_callback:
                        event_callback(f"Error processing step '{step_id}': {e}")

            self.set_processing_results(job_ids=job_ids)
            log.debug(
                "execute_finished.pipeline.stage",
                stage=idx,
            )
            if event_callback:
                event_callback(f"finished processing pipeline stage: {idx}")
            all_job_ids.update(job_ids)

    finally:
        self._is_running = False

    log.debug("execute_finished.pipeline")
    if event_callback:
        event_callback("finished processing pipeline")
    return all_job_ids

Functions