Skip to content

pipelines

Attributes

Classes

Functions

create_step_value_address(value_address_config: Union[str, Mapping[str, Any]], default_field_name: str) -> StepValueAddress

Source code in kiara/utils/pipelines.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def create_step_value_address(
    value_address_config: Union[str, Mapping[str, Any]],
    default_field_name: str,
) -> "StepValueAddress":

    if isinstance(value_address_config, StepValueAddress):
        return value_address_config

    sub_value: Union[Mapping[str, Any], None] = None

    if isinstance(value_address_config, str):

        tokens = value_address_config.split(".")
        if len(tokens) == 1:
            step_id = value_address_config
            output_name = default_field_name
        elif len(tokens) == 2:
            step_id = tokens[0]
            output_name = tokens[1]
        elif len(tokens) == 3:
            step_id = tokens[0]
            output_name = tokens[1]
            sub_value = {"config": tokens[2]}
        else:
            raise NotImplementedError()

    elif isinstance(value_address_config, Mapping):

        step_id = value_address_config["step_id"]
        output_name = value_address_config["value_name"]
        sub_value = value_address_config.get("sub_value", None)
    else:
        raise TypeError(
            f"Invalid type for creating step value address: {type(value_address_config)}"
        )

    if sub_value is not None and not isinstance(sub_value, Mapping):
        raise ValueError(
            f"Invalid type '{type(sub_value)}' for sub_value (step_id: {step_id}, value name: {output_name}): {sub_value}"
        )

    input_link = StepValueAddress(
        step_id=step_id, value_name=output_name, sub_value=sub_value
    )
    return input_link

ensure_step_value_addresses(link: Union[str, Mapping, Iterable], default_field_name: str) -> List[StepValueAddress]

Source code in kiara/utils/pipelines.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def ensure_step_value_addresses(
    link: Union[str, Mapping, Iterable], default_field_name: str
) -> List["StepValueAddress"]:

    if isinstance(link, (str, Mapping)):
        input_links: List[StepValueAddress] = [
            create_step_value_address(
                value_address_config=link, default_field_name=default_field_name
            )
        ]

    elif isinstance(link, Iterable):
        input_links = []
        for o in link:
            il = create_step_value_address(
                value_address_config=o, default_field_name=default_field_name
            )
            input_links.append(il)
    else:
        raise TypeError(f"Can't parse input map, invalid type for output: {link}")

    return input_links

get_pipeline_details_from_path(path: Union[str, Path], module_type_name: Union[str, None] = None, base_module: Union[str, None] = None) -> Dict[str, Any]

Load a pipeline description, save it's content, and determine it the pipeline base name.

Parameters:

Name Type Description Default
path Union[str, Path]

the path to the pipeline file

required
module_type_name Union[str, None]

if specifies, overwrites any auto-detected or assigned pipeline name

None
base_module Union[str, None]

overrides the base module the assembled pipeline module will be located in the python hierarchy

None
Source code in kiara/utils/pipelines.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def get_pipeline_details_from_path(
    path: Union[str, Path],
    module_type_name: Union[str, None] = None,
    base_module: Union[str, None] = None,
) -> Dict[str, Any]:
    """Load a pipeline description, save it's content, and determine it the pipeline base name.

    Arguments:
        path: the path to the pipeline file
        module_type_name: if specifies, overwrites any auto-detected or assigned pipeline name
        base_module: overrides the base module the assembled pipeline module will be located in the python hierarchy

    """

    if isinstance(path, str):
        path = Path(os.path.expanduser(path))

    if not path.is_file():
        raise Exception(
            f"Can't add pipeline description '{path.as_posix()}': not a file"
        )

    data = get_data_from_file(path)

    if not data:
        raise Exception(
            f"Can't register pipeline file '{path.as_posix()}': no content."
        )

    if module_type_name:
        data[MODULE_TYPE_NAME_KEY] = module_type_name

    if not isinstance(data, Mapping):
        raise Exception("Not a dictionary type.")

    # filename = path.name
    # name = data.get(MODULE_TYPE_NAME_KEY, None)
    # if name is None:
    #     name = filename.split(".", maxsplit=1)[0]

    result = {"data": data, "source": path.as_posix(), "source_type": "file"}
    if base_module:
        result["base_module"] = base_module
    return result

check_doc_sidecar(path: Union[Path, str], data: Dict[str, Any]) -> Dict[str, Any]

Source code in kiara/utils/pipelines.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def check_doc_sidecar(path: Union[Path, str], data: Dict[str, Any]) -> Dict[str, Any]:

    if isinstance(path, str):
        path = Path(os.path.expanduser(path))

    _doc = data["data"].get("documentation", None)
    if _doc is None:
        _doc_path = Path(path.as_posix() + ".md")
        if _doc_path.is_file():
            doc = _doc_path.read_text()
            if doc:
                data["data"]["documentation"] = doc

    return data

get_pipeline_config(pipeline: str, kiara: typing.Union[Kiara, None] = None) -> PipelineConfig

Extract a pipeline config from the item specified.

The lookup of the 'pipeline' reference happens in this order (first match returns the result): - check whether there is an operation with that name that is a pipeline - check whether the provided string is a path to an existing file - check whether the provided string starts with 'workflow:' and matches a workflow alias (or id), in which case it returns the pipeline config for the workflows current state

Parameters:

Name Type Description Default
pipeline str

a reference to the desired pipeline

required
kiara typing.Union[Kiara, None]

the kiara context

None

Returns:

Type Description
PipelineConfig

a pipeline config object

Source code in kiara/utils/pipelines.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def get_pipeline_config(
    pipeline: str, kiara: typing.Union["Kiara", None] = None
) -> "PipelineConfig":
    """Extract a pipeline config from the item specified.

    The lookup of the 'pipeline' reference happens in this order (first match returns the result):
    - check whether there is an operation with that name that is a pipeline
    - check whether the provided string is a path to an existing file
    - check whether the provided string starts with 'workflow:' and matches a workflow alias (or id), in which case it returns the pipeline config for the workflows current state

    Arguments:
        pipeline: a reference to the desired pipeline
        kiara: the kiara context

    Returns:
        a pipeline config object
    """

    if kiara is None:
        from kiara.context import Kiara

        kiara = Kiara.instance()

    pc: Union[PipelineConfig, None] = None
    try:
        _operation = kiara.operation_registry.get_operation(pipeline)

        pipeline_module: PipelineModule = _operation.module  # type: ignore
        if pipeline_module.is_pipeline():
            pc = pipeline_module.config
    except NoSuchOperationException:
        pass

    if pc is None:
        if os.path.isfile(pipeline):
            from kiara.models.module.pipeline import PipelineConfig

            pc = PipelineConfig.from_file(pipeline, kiara=kiara)

    if pc is None and pipeline.startswith("workflow:"):
        try:
            workflow = pipeline[9:]
            if "@" in workflow:
                raise NotImplementedError()

            wfm = kiara.workflow_registry.get_workflow_metadata(workflow=workflow)
            if wfm.current_state:
                state = kiara.workflow_registry.get_workflow_state(
                    workflow_state_id=wfm.current_state, workflow=wfm.workflow_id
                )
                pc = state.pipeline_config
        except Exception as e:
            log_exception(e)

    if pc is None:
        raise Exception(f"Could not resolve pipeline reference '{pipeline}'.")

    return pc