Skip to content

kiara.module_mgmt.pipelines

PipelineModuleManager

Module manager that discovers pipeline descriptions, and create modules out of them.

Parameters:

Name Type Description Default
folders

a list of folders to search for pipeline descriptions, if 'None', 'kiara.pipelines' entrypoints are searched, as well as the user config

required
ignore_errors

ignore any errors that occur during pipeline discovery (as much as that is possible)

required

add_pipelines_path(self, namespace, path, base_module)

Add a pipeline description file or folder containing some to this manager.

Parameters:

Name Type Description Default
namespace str

the namespace the pipeline modules found under this path will be part of, if it starts with '_' it will be omitted

required
path Union[str, pathlib.Path]

the path to a pipeline description file, or folder which contains some

required
base_module Optional[str]

the base module the assembled pipeline modules under this path will be located at in the Python namespace

required

Returns:

Type Description
Iterable[str]

a list of module type names that were added

Source code in kiara/module_mgmt/pipelines.py
def add_pipelines_path(
    self,
    namespace: str,
    path: typing.Union[str, Path],
    base_module: typing.Optional[str],
) -> typing.Iterable[str]:
    """Add a pipeline description file or folder containing some to this manager.

    Arguments:
        namespace: the namespace the pipeline modules found under this path will be part of, if it starts with '_' it will be omitted
        path: the path to a pipeline description file, or folder which contains some
        base_module: the base module the assembled pipeline modules under this path will be located at in the Python namespace
    Returns:
        a list of module type names that were added
    """

    if isinstance(path, str):
        path = Path(os.path.expanduser(path))
    elif isinstance(path, typing.Iterable):
        raise TypeError(f"Invalid type for path: {path}")

    if not path.exists():
        log.warning(f"Can't add pipeline path '{path}': path does not exist")
        return []

    elif path.is_dir():

        files: typing.Dict[str, typing.Mapping[str, typing.Any]] = {}
        for root, dirnames, filenames in os.walk(path, topdown=True):

            dirnames[:] = [d for d in dirnames if d not in DEFAULT_EXCLUDE_DIRS]

            for filename in [
                f
                for f in filenames
                if os.path.isfile(os.path.join(root, f))
                and any(f.endswith(ext) for ext in VALID_PIPELINE_FILE_EXTENSIONS)
            ]:

                try:

                    full_path = os.path.join(root, filename)

                    name, data = get_pipeline_details_from_path(
                        path=full_path, base_module=base_module
                    )
                    data = check_doc_sidecar(full_path, data)
                    rel_path = os.path.relpath(os.path.dirname(full_path), path)
                    if not rel_path or rel_path == ".":
                        ns_name = name
                    else:
                        _rel_path = rel_path.replace(os.path.sep, ".")
                        ns_name = f"{_rel_path}.{name}"
                    if not ns_name:
                        raise Exception(
                            f"Could not determine namespace for pipeline file '{filename}'."
                        )
                    if ns_name in files.keys():
                        raise Exception(
                            f"Duplicate workflow name in namespace '{namespace}': {ns_name}"
                        )
                    files[ns_name] = data
                except Exception as e:
                    log.warning(
                        f"Ignoring invalid pipeline file '{full_path}': {e}"
                    )
    elif path.is_file():
        name, data = get_pipeline_details_from_path(
            path=path, base_module=base_module
        )
        data = check_doc_sidecar(path, data)
        if not name:
            raise Exception(f"Could not determine pipeline name for: {path}")
        files = {name: data}

    result = {}
    for k, v in files.items():

        if namespace.startswith("_"):
            tokens = namespace.split(".")
            if len(tokens) == 1:
                _namespace = ""
            else:
                _namespace = ".".join(tokens[1:])
        else:
            _namespace = namespace

        if not _namespace:
            full_name = k
        else:
            full_name = f"{_namespace}.{k}"

        if full_name.startswith("core."):
            full_name = full_name[5:]
        if full_name in self._pipeline_descs.keys():
            raise Exception(f"Duplicate workflow name: {name}")
        result[full_name] = v

    self._pipeline_descs.update(result)
    return result.keys()

register_pipeline(self, data, module_type_name=None, namespace=None)

Register a pipeline description to the pipeline pool.

Parameters:

Name Type Description Default
data Union[str, pathlib.Path, Mapping[str, Any]]

the pipeline data (a dict, or a path to a file)

required
module_type_name Optional[str]

the type name this pipeline should be registered as

None
Source code in kiara/module_mgmt/pipelines.py
def register_pipeline(
    self,
    data: typing.Union[str, Path, typing.Mapping[str, typing.Any]],
    module_type_name: typing.Optional[str] = None,
    namespace: typing.Optional[str] = None,
) -> str:
    """Register a pipeline description to the pipeline pool.

    Arguments:
        data: the pipeline data (a dict, or a path to a file)
        module_type_name: the type name this pipeline should be registered as
    """

    # TODO: verify that there is no conflict with module_type_name
    if isinstance(data, str):
        data = Path(os.path.expanduser(data))

    if isinstance(data, Path):
        _name, _data = get_pipeline_details_from_path(data)
        _data = check_doc_sidecar(data, _data)
        if module_type_name:
            _name = module_type_name

    elif isinstance(data, typing.Mapping):
        _data = dict(data)
        if module_type_name:
            _data[MODULE_TYPE_NAME_KEY] = module_type_name

        _name = _data.get(MODULE_TYPE_NAME_KEY, None)
        if not _name:
            raise Exception(
                f"Can't register pipeline, no module type name available: {data}"
            )

        _data = {"data": _data, "source": data, "source_type": "dict"}
    else:
        raise Exception(
            f"Can't register pipeline, must be dict-like data, not {type(data)}"
        )

    if not _name:
        raise Exception("No pipeline name set.")

    if not namespace:
        full_name = _name
    else:
        full_name = f"{namespace}.{_name}"

    if full_name.startswith("core."):
        full_name = full_name[5:]
    if full_name in self._pipeline_descs.keys():
        raise Exception(
            f"Can't register pipeline: duplicate workflow name '{_name}'"
        )

    self._pipeline_descs[full_name] = _data

    return full_name

PipelineModuleManagerConfig pydantic-model

folders: List[str] pydantic-field

A list of folders that contain pipeline descriptions.

get_pipeline_details_from_path(path, module_type_name=None, base_module=None)

Load a pipeline description, save it's content, and determine it the pipeline base name.

Parameters:

Name Type Description Default
path Union[str, pathlib.Path]

the path to the pipeline file

required
module_type_name Optional[str]

if specifies, overwrites any auto-detected or assigned pipeline name

None
base_module Optional[str]

overrides the base module the assembled pipeline module will be located in the python hierarchy

None
Source code in kiara/module_mgmt/pipelines.py
def get_pipeline_details_from_path(
    path: typing.Union[str, Path],
    module_type_name: typing.Optional[str] = None,
    base_module: typing.Optional[str] = None,
) -> typing.Tuple[typing.Optional[str], typing.Mapping[str, typing.Any]]:
    """Load a pipeline description, save it's content, and determine it the pipeline base name.

    Arguments:
        path: the path to the pipeline file
        module_type_name: if specifies, overwrites any auto-detected or assigned pipeline name
        base_module: overrides the base module the assembled pipeline module will be located in the python hierarchy

    """

    if isinstance(path, str):
        path = Path(os.path.expanduser(path))

    if not path.is_file():
        raise Exception(
            f"Can't add pipeline description '{path.as_posix()}': not a file"
        )

    data = get_data_from_file(path)

    if not data:
        raise Exception(
            f"Can't register pipeline file '{path.as_posix()}': no content."
        )

    if module_type_name:
        data[MODULE_TYPE_NAME_KEY] = module_type_name

    filename = path.name

    if not isinstance(data, typing.Mapping):
        raise Exception("Not a dictionary type.")
    name = data.get(MODULE_TYPE_NAME_KEY, None)
    if name is None:
        name = filename.split(".", maxsplit=1)[0]

    result = {"data": data, "source": path.as_posix(), "source_type": "file"}
    if base_module:
        result["base_module"] = base_module
    return (name, result)