kiara.module_mgmt.pipelines¶
PipelineModuleManager
¶
Module manager that discovers pipeline descriptions, and create modules out of them.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
folders |
|
a list of folders to search for pipeline descriptions, if 'None', 'kiara.pipelines' entrypoints are searched, as well as the user config |
required |
ignore_errors |
|
ignore any errors that occur during pipeline discovery (as much as that is possible) |
required |
add_pipelines_path(self, namespace, path, base_module)
¶
Add a pipeline description file or folder containing some to this manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
namespace |
str |
the namespace the pipeline modules found under this path will be part of, if it starts with '_' it will be omitted |
required |
path |
Union[str, pathlib.Path] |
the path to a pipeline description file, or folder which contains some |
required |
base_module |
Optional[str] |
the base module the assembled pipeline modules under this path will be located at in the Python namespace |
required |
Returns:
Type | Description |
---|---|
Iterable[str] |
a list of module type names that were added |
Source code in kiara/module_mgmt/pipelines.py
def add_pipelines_path(
self,
namespace: str,
path: typing.Union[str, Path],
base_module: typing.Optional[str],
) -> typing.Iterable[str]:
"""Add a pipeline description file or folder containing some to this manager.
Arguments:
namespace: the namespace the pipeline modules found under this path will be part of, if it starts with '_' it will be omitted
path: the path to a pipeline description file, or folder which contains some
base_module: the base module the assembled pipeline modules under this path will be located at in the Python namespace
Returns:
a list of module type names that were added
"""
if isinstance(path, str):
path = Path(os.path.expanduser(path))
elif isinstance(path, typing.Iterable):
raise TypeError(f"Invalid type for path: {path}")
if not path.exists():
log.warning(f"Can't add pipeline path '{path}': path does not exist")
return []
elif path.is_dir():
files: typing.Dict[str, typing.Mapping[str, typing.Any]] = {}
for root, dirnames, filenames in os.walk(path, topdown=True):
dirnames[:] = [d for d in dirnames if d not in DEFAULT_EXCLUDE_DIRS]
for filename in [
f
for f in filenames
if os.path.isfile(os.path.join(root, f))
and any(f.endswith(ext) for ext in VALID_PIPELINE_FILE_EXTENSIONS)
]:
try:
full_path = os.path.join(root, filename)
name, data = get_pipeline_details_from_path(
path=full_path, base_module=base_module
)
data = check_doc_sidecar(full_path, data)
rel_path = os.path.relpath(os.path.dirname(full_path), path)
if not rel_path or rel_path == ".":
ns_name = name
else:
_rel_path = rel_path.replace(os.path.sep, ".")
ns_name = f"{_rel_path}.{name}"
if not ns_name:
raise Exception(
f"Could not determine namespace for pipeline file '{filename}'."
)
if ns_name in files.keys():
raise Exception(
f"Duplicate workflow name in namespace '{namespace}': {ns_name}"
)
files[ns_name] = data
except Exception as e:
log.warning(
f"Ignoring invalid pipeline file '{full_path}': {e}"
)
elif path.is_file():
name, data = get_pipeline_details_from_path(
path=path, base_module=base_module
)
data = check_doc_sidecar(path, data)
if not name:
raise Exception(f"Could not determine pipeline name for: {path}")
files = {name: data}
result = {}
for k, v in files.items():
if namespace.startswith("_"):
tokens = namespace.split(".")
if len(tokens) == 1:
_namespace = ""
else:
_namespace = ".".join(tokens[1:])
else:
_namespace = namespace
if not _namespace:
full_name = k
else:
full_name = f"{_namespace}.{k}"
if full_name.startswith("core."):
full_name = full_name[5:]
if full_name in self._pipeline_descs.keys():
raise Exception(f"Duplicate workflow name: {name}")
result[full_name] = v
self._pipeline_descs.update(result)
return result.keys()
register_pipeline(self, data, module_type_name=None, namespace=None)
¶
Register a pipeline description to the pipeline pool.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Union[str, pathlib.Path, Mapping[str, Any]] |
the pipeline data (a dict, or a path to a file) |
required |
module_type_name |
Optional[str] |
the type name this pipeline should be registered as |
None |
Source code in kiara/module_mgmt/pipelines.py
def register_pipeline(
self,
data: typing.Union[str, Path, typing.Mapping[str, typing.Any]],
module_type_name: typing.Optional[str] = None,
namespace: typing.Optional[str] = None,
) -> str:
"""Register a pipeline description to the pipeline pool.
Arguments:
data: the pipeline data (a dict, or a path to a file)
module_type_name: the type name this pipeline should be registered as
"""
# TODO: verify that there is no conflict with module_type_name
if isinstance(data, str):
data = Path(os.path.expanduser(data))
if isinstance(data, Path):
_name, _data = get_pipeline_details_from_path(data)
_data = check_doc_sidecar(data, _data)
if module_type_name:
_name = module_type_name
elif isinstance(data, typing.Mapping):
_data = dict(data)
if module_type_name:
_data[MODULE_TYPE_NAME_KEY] = module_type_name
_name = _data.get(MODULE_TYPE_NAME_KEY, None)
if not _name:
raise Exception(
f"Can't register pipeline, no module type name available: {data}"
)
_data = {"data": _data, "source": data, "source_type": "dict"}
else:
raise Exception(
f"Can't register pipeline, must be dict-like data, not {type(data)}"
)
if not _name:
raise Exception("No pipeline name set.")
if not namespace:
full_name = _name
else:
full_name = f"{namespace}.{_name}"
if full_name.startswith("core."):
full_name = full_name[5:]
if full_name in self._pipeline_descs.keys():
raise Exception(
f"Can't register pipeline: duplicate workflow name '{_name}'"
)
self._pipeline_descs[full_name] = _data
return full_name
PipelineModuleManagerConfig
pydantic-model
¶
folders: List[str]
pydantic-field
¶
A list of folders that contain pipeline descriptions.
get_pipeline_details_from_path(path, module_type_name=None, base_module=None)
¶
Load a pipeline description, save it's content, and determine it the pipeline base name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, pathlib.Path] |
the path to the pipeline file |
required |
module_type_name |
Optional[str] |
if specifies, overwrites any auto-detected or assigned pipeline name |
None |
base_module |
Optional[str] |
overrides the base module the assembled pipeline module will be located in the python hierarchy |
None |
Source code in kiara/module_mgmt/pipelines.py
def get_pipeline_details_from_path(
path: typing.Union[str, Path],
module_type_name: typing.Optional[str] = None,
base_module: typing.Optional[str] = None,
) -> typing.Tuple[typing.Optional[str], typing.Mapping[str, typing.Any]]:
"""Load a pipeline description, save it's content, and determine it the pipeline base name.
Arguments:
path: the path to the pipeline file
module_type_name: if specifies, overwrites any auto-detected or assigned pipeline name
base_module: overrides the base module the assembled pipeline module will be located in the python hierarchy
"""
if isinstance(path, str):
path = Path(os.path.expanduser(path))
if not path.is_file():
raise Exception(
f"Can't add pipeline description '{path.as_posix()}': not a file"
)
data = get_data_from_file(path)
if not data:
raise Exception(
f"Can't register pipeline file '{path.as_posix()}': no content."
)
if module_type_name:
data[MODULE_TYPE_NAME_KEY] = module_type_name
filename = path.name
if not isinstance(data, typing.Mapping):
raise Exception("Not a dictionary type.")
name = data.get(MODULE_TYPE_NAME_KEY, None)
if name is None:
name = filename.split(".", maxsplit=1)[0]
result = {"data": data, "source": path.as_posix(), "source_type": "file"}
if base_module:
result["base_module"] = base_module
return (name, result)