Skip to content

pipelines

Attributes

logger = structlog.get_logger() module-attribute

Classes

Functions

create_step_value_address(value_address_config: Union[str, Mapping[str, Any]], default_field_name: str) -> StepValueAddress

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/utils/pipelines.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def create_step_value_address(
    value_address_config: Union[str, Mapping[str, Any]],
    default_field_name: str,
) -> "StepValueAddress":

    from kiara.models.module.pipeline.value_refs import StepValueAddress

    if isinstance(value_address_config, StepValueAddress):
        return value_address_config

    sub_value: Union[Mapping[str, Any], None] = None

    if isinstance(value_address_config, str):

        tokens = value_address_config.split(".")
        if len(tokens) == 1:
            step_id = value_address_config
            output_name = default_field_name
        elif len(tokens) == 2:
            step_id = tokens[0]
            output_name = tokens[1]
        elif len(tokens) == 3:
            step_id = tokens[0]
            output_name = tokens[1]
            sub_value = {"config": tokens[2]}
        else:
            raise NotImplementedError()

    elif isinstance(value_address_config, Mapping):

        step_id = value_address_config["step_id"]
        output_name = value_address_config["value_name"]
        sub_value = value_address_config.get("sub_value", None)
    else:
        raise TypeError(
            f"Invalid type for creating step value address: {type(value_address_config)}"
        )

    if sub_value is not None and not isinstance(sub_value, Mapping):
        raise ValueError(
            f"Invalid type '{type(sub_value)}' for sub_value (step_id: {step_id}, value name: {output_name}): {sub_value}"
        )

    input_link = StepValueAddress(
        step_id=step_id, value_name=output_name, sub_value=sub_value
    )
    return input_link

ensure_step_value_addresses(link: Union[str, Mapping, Iterable], default_field_name: str) -> List[StepValueAddress]

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/utils/pipelines.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def ensure_step_value_addresses(
    link: Union[str, Mapping, Iterable], default_field_name: str
) -> List["StepValueAddress"]:

    if isinstance(link, (str, Mapping)):
        input_links: List[StepValueAddress] = [
            create_step_value_address(
                value_address_config=link, default_field_name=default_field_name
            )
        ]

    elif isinstance(link, Iterable):
        input_links = []
        for o in link:
            il = create_step_value_address(
                value_address_config=o, default_field_name=default_field_name
            )
            input_links.append(il)
    else:
        raise TypeError(f"Can't parse input map, invalid type for output: {link}")

    return input_links

get_pipeline_details_from_path(path: Union[str, Path], module_type_name: Union[str, None] = None, base_module: Union[str, None] = None) -> Dict[str, Any]

Load a pipeline description, save it's content, and determine it the pipeline base name.


path: the path to the pipeline file
module_type_name: if specifies, overwrites any auto-detected or assigned pipeline name
base_module: overrides the base module the assembled pipeline module will be located in the python hierarchy
Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/utils/pipelines.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def get_pipeline_details_from_path(
    path: Union[str, Path],
    module_type_name: Union[str, None] = None,
    base_module: Union[str, None] = None,
) -> Dict[str, Any]:
    """
    Load a pipeline description, save it's content, and determine it the pipeline base name.

    Arguments:
    ---------
        path: the path to the pipeline file
        module_type_name: if specifies, overwrites any auto-detected or assigned pipeline name
        base_module: overrides the base module the assembled pipeline module will be located in the python hierarchy

    """
    if isinstance(path, str):
        path = Path(os.path.expanduser(path))

    if not path.is_file():
        raise Exception(
            f"Can't add pipeline description '{path.as_posix()}': not a file"
        )

    data = get_data_from_file(path)

    if not data:
        raise Exception(
            f"Can't register pipeline file '{path.as_posix()}': no content."
        )

    if module_type_name:
        data[MODULE_TYPE_NAME_KEY] = module_type_name

    if not isinstance(data, Mapping):
        raise Exception("Not a dictionary type.")

    result = {"data": data, "source": path.as_posix(), "source_type": "file"}
    if base_module:
        result["base_module"] = base_module
    return result

check_doc_sidecar(path: Union[Path, str], data: Dict[str, Any]) -> Dict[str, Any]

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/utils/pipelines.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def check_doc_sidecar(path: Union[Path, str], data: Dict[str, Any]) -> Dict[str, Any]:

    if isinstance(path, str):
        path = Path(os.path.expanduser(path))

    _doc = data["data"].get("documentation", None)
    if _doc is None:
        _doc_path = Path(path.as_posix() + ".md")
        if _doc_path.is_file():
            doc = _doc_path.read_text()
            if doc:
                data["data"]["documentation"] = doc

    return data

get_pipeline_config(pipeline: str, kiara: typing.Union[Kiara, None] = None) -> PipelineConfig

Extract a pipeline config from the item specified.

The lookup of the 'pipeline' reference happens in this order (first match returns the result): - check whether there is an operation with that name that is a pipeline - check whether the provided string is a path to an existing file - check whether the provided string starts with 'workflow:' and matches a workflow alias (or id), in which case it returns the pipeline config for the workflows current state

Parameters:

Name Type Description Default
pipeline str

a reference to the desired pipeline

required
kiara typing.Union[Kiara, None]

the kiara context

None

Returns:

Type Description
PipelineConfig

a pipeline config object

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/utils/pipelines.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def get_pipeline_config(
    pipeline: str, kiara: typing.Union["Kiara", None] = None
) -> "PipelineConfig":
    """
    Extract a pipeline config from the item specified.

    The lookup of the 'pipeline' reference happens in this order (first match returns the result):
    - check whether there is an operation with that name that is a pipeline
    - check whether the provided string is a path to an existing file
    - check whether the provided string starts with 'workflow:' and matches a workflow alias (or id), in which case it returns the pipeline config for the workflows current state

    Arguments:
        pipeline: a reference to the desired pipeline
        kiara: the kiara context

    Returns:
        a pipeline config object
    """
    if kiara is None:
        from kiara.context import Kiara

        kiara = Kiara.instance()

    pc: Union["PipelineConfig", None] = None
    error: Union[Exception, None] = None
    try:
        _operation = kiara.operation_registry.get_operation(pipeline)

        pipeline_module: PipelineModule = _operation.module  # type: ignore
        if pipeline_module.is_pipeline():
            pc = pipeline_module.config
    except NoSuchOperationException as nsoe:
        error = nsoe
    except InvalidOperationException as ioe:
        error = ioe

    if pc is None:
        if os.path.isfile(pipeline):
            from kiara.models.module.pipeline import PipelineConfig

            pc = PipelineConfig.from_file(pipeline, kiara=kiara)

    if pc is None and pipeline.startswith("workflow:"):
        try:
            workflow = pipeline[9:]
            if "@" in workflow:
                raise NotImplementedError()

            wfm = kiara.workflow_registry.get_workflow_metadata(workflow=workflow)
            if wfm.current_state:
                state = kiara.workflow_registry.get_workflow_state(
                    workflow_state_id=wfm.current_state, workflow=wfm.workflow_id
                )
                pc = state.pipeline_config
        except Exception as e:
            log_exception(e)

    if pc is None:
        if error:
            raise error
        else:
            raise Exception(f"Could not resolve pipeline reference '{pipeline}'.")

    return pc

find_pipeline_data_in_paths(pipeline_paths: Dict[str, Union[Dict[str, Any], None]]) -> Mapping[str, Mapping[str, Any]]

Find pipeline data in the provided paths.

The 'pipeline_paths' argument has a local path as key, and a mapping as value that contains optional metadata about the context for all the pipelines that are found under the path.


pipeline_paths: a mapping of pipeline names to paths
Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/utils/pipelines.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def find_pipeline_data_in_paths(
    pipeline_paths: Dict[str, Union[Dict[str, Any], None]]
) -> Mapping[str, Mapping[str, Any]]:
    """
    Find pipeline data in the provided paths.

    The 'pipeline_paths' argument has a local path as key, and a mapping as value that contains optional metadata about the context for all the pipelines that are found under the path.

    Arguments:
    ---------
        pipeline_paths: a mapping of pipeline names to paths
    """
    all_pipelines = []

    for _path in pipeline_paths.keys():
        path = Path(_path)
        if not path.exists():
            logger.warning(
                "ignore.pipeline_path", path=path, reason="path does not exist"
            )
            continue

        elif path.is_dir():

            for root, dirnames, filenames in os.walk(path, topdown=True):

                dirnames[:] = [d for d in dirnames if d not in DEFAULT_EXCLUDE_DIRS]

                for filename in [
                    f
                    for f in filenames
                    if os.path.isfile(os.path.join(root, f))
                    and any(f.endswith(ext) for ext in VALID_PIPELINE_FILE_EXTENSIONS)
                ]:

                    full_path = os.path.join(root, filename)
                    try:

                        data = get_pipeline_details_from_path(path=full_path)
                        data = check_doc_sidecar(full_path, data)
                        existing_metadata = data.pop("metadata", {})
                        _md = pipeline_paths[_path]
                        if _md is None:
                            md = {}
                        else:
                            md = dict(_md)
                        md.update(existing_metadata)
                        data["metadata"] = md

                        all_pipelines.append(data)

                    except Exception as e:
                        log_exception(e)
                        logger.warning(
                            "ignore.pipeline_file", path=full_path, reason=str(e)
                        )

        elif path.is_file():
            data = get_pipeline_details_from_path(path=path)
            data = check_doc_sidecar(path, data)
            existing_metadata = data.pop("metadata", {})
            _md = pipeline_paths[_path]
            if _md is None:
                md = {}
            else:
                md = dict(_md)
            md.update(existing_metadata)
            data["metadata"] = md
            all_pipelines.append(data)

    pipelines = {}
    for pipeline in all_pipelines:
        name = pipeline["data"].get("pipeline_name", None)
        if name is None:
            source = pipeline["source"]
            name = os.path.basename(source)
            if "." in name:
                name, _ = name.rsplit(".", maxsplit=1)
            pipeline["data"]["pipeline_name"] = name
        pipelines[name] = pipeline

    return pipelines