Skip to content

pipeline

Attributes

logger = structlog.getLogger() module-attribute

yaml = YAML(typ='safe') module-attribute

Classes

PipelineOperationDetails

Bases: OperationDetails

Source code in kiara/operations/included_core_operations/pipeline.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class PipelineOperationDetails(OperationDetails):

    pipeline_inputs_schema: Mapping[str, ValueSchema] = Field(
        description="The input schema for the pipeline."
    )
    pipeline_outputs_schema: Mapping[str, ValueSchema] = Field(
        description="The output schema for the pipeline."
    )
    pipeline_config: PipelineConfig = Field(description="The pipeline config.")
    _op_schema: OperationSchema = PrivateAttr(default=None)

    def get_operation_schema(self) -> OperationSchema:

        if self._op_schema is not None:
            return self._op_schema

        self._op_schema = OperationSchema(
            alias=self.operation_id,
            inputs_schema=self.pipeline_inputs_schema,
            outputs_schema=self.pipeline_outputs_schema,
        )
        return self._op_schema

Attributes

pipeline_inputs_schema: Mapping[str, ValueSchema] = Field(description='The input schema for the pipeline.') class-attribute
pipeline_outputs_schema: Mapping[str, ValueSchema] = Field(description='The output schema for the pipeline.') class-attribute
pipeline_config: PipelineConfig = Field(description='The pipeline config.') class-attribute

Functions

get_operation_schema() -> OperationSchema
Source code in kiara/operations/included_core_operations/pipeline.py
47
48
49
50
51
52
53
54
55
56
57
def get_operation_schema(self) -> OperationSchema:

    if self._op_schema is not None:
        return self._op_schema

    self._op_schema = OperationSchema(
        alias=self.operation_id,
        inputs_schema=self.pipeline_inputs_schema,
        outputs_schema=self.pipeline_outputs_schema,
    )
    return self._op_schema

PipelineOperationType

Bases: OperationType[PipelineOperationDetails]

Source code in kiara/operations/included_core_operations/pipeline.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class PipelineOperationType(OperationType[PipelineOperationDetails]):

    _operation_type_name = "pipeline"

    def __init__(self, kiara: "Kiara", op_type_name: str) -> None:

        super().__init__(kiara=kiara, op_type_name=op_type_name)
        self._pipelines: Union[None, Mapping[str, Mapping[str, Any]]] = None

    @property
    def pipeline_data(self) -> Mapping[str, Mapping[str, Any]]:

        if self._pipelines is not None:
            return self._pipelines

        ignore_errors = False
        pipeline_paths: Dict[
            str, Union[Dict[str, Any], None]
        ] = find_all_kiara_pipeline_paths(skip_errors=ignore_errors)

        for ep in self._kiara.context_config.extra_pipelines:
            ep = os.path.realpath(ep)
            if ep not in pipeline_paths.keys():
                pipeline_paths[ep] = None

        return find_pipeline_data_in_paths(pipeline_paths)

    def retrieve_included_operation_configs(
        self,
    ) -> Iterable[Union[Mapping, OperationConfig]]:

        op_configs = []
        for pipeline_name, pipeline_data in self.pipeline_data.items():
            pipeline_config: Dict[str, Any] = dict(pipeline_data["data"])
            pipeline_id = pipeline_config.pop("pipeline_name", None)
            doc = pipeline_config.get("doc", None)
            pipeline_metadata = pipeline_data["metadata"]

            op_details = PipelineOperationConfig(
                pipeline_name=pipeline_id,
                pipeline_config=pipeline_config,
                doc=doc,
                metadata=pipeline_metadata,
            )
            op_configs.append(op_details)
        return op_configs

    def check_matching_operation(
        self, module: "KiaraModule"
    ) -> Union[PipelineOperationDetails, None]:

        if isinstance(module, PipelineModule):

            op_details = PipelineOperationDetails.create_operation_details(
                operation_id=module.config.pipeline_name,
                pipeline_inputs_schema=module.inputs_schema,
                pipeline_outputs_schema=module.outputs_schema,
                pipeline_config=module.config,
            )
            return op_details
        else:
            return None

Attributes

pipeline_data: Mapping[str, Mapping[str, Any]] property

Functions

retrieve_included_operation_configs() -> Iterable[Union[Mapping, OperationConfig]]
Source code in kiara/operations/included_core_operations/pipeline.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def retrieve_included_operation_configs(
    self,
) -> Iterable[Union[Mapping, OperationConfig]]:

    op_configs = []
    for pipeline_name, pipeline_data in self.pipeline_data.items():
        pipeline_config: Dict[str, Any] = dict(pipeline_data["data"])
        pipeline_id = pipeline_config.pop("pipeline_name", None)
        doc = pipeline_config.get("doc", None)
        pipeline_metadata = pipeline_data["metadata"]

        op_details = PipelineOperationConfig(
            pipeline_name=pipeline_id,
            pipeline_config=pipeline_config,
            doc=doc,
            metadata=pipeline_metadata,
        )
        op_configs.append(op_details)
    return op_configs
check_matching_operation(module: KiaraModule) -> Union[PipelineOperationDetails, None]
Source code in kiara/operations/included_core_operations/pipeline.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def check_matching_operation(
    self, module: "KiaraModule"
) -> Union[PipelineOperationDetails, None]:

    if isinstance(module, PipelineModule):

        op_details = PipelineOperationDetails.create_operation_details(
            operation_id=module.config.pipeline_name,
            pipeline_inputs_schema=module.inputs_schema,
            pipeline_outputs_schema=module.outputs_schema,
            pipeline_config=module.config,
        )
        return op_details
    else:
        return None

Functions