class PipelineOperationType(OperationType[PipelineOperationDetails]):
_operation_type_name = "pipeline"
def __init__(self, kiara: "Kiara", op_type_name: str) -> None:
super().__init__(kiara=kiara, op_type_name=op_type_name)
self._pipelines: Union[None, Mapping[str, Mapping[str, Any]]] = None
@property
def pipeline_data(self) -> Mapping[str, Mapping[str, Any]]:
if self._pipelines is not None:
return self._pipelines
ignore_errors = False
pipeline_paths: Dict[
str, Union[Dict[str, Any], None]
] = find_all_kiara_pipeline_paths(skip_errors=ignore_errors)
for ep in self._kiara.context_config.extra_pipelines:
ep = os.path.realpath(ep)
if ep not in pipeline_paths.keys():
pipeline_paths[ep] = None
return find_pipeline_data_in_paths(pipeline_paths)
def retrieve_included_operation_configs(
self,
) -> Iterable[Union[Mapping, OperationConfig]]:
op_configs = []
for pipeline_name, pipeline_data in self.pipeline_data.items():
pipeline_config: Dict[str, Any] = dict(pipeline_data["data"])
pipeline_id = pipeline_config.pop("pipeline_name", None)
doc = pipeline_config.get("doc", None)
pipeline_metadata = pipeline_data["metadata"]
op_details = PipelineOperationConfig(
pipeline_name=pipeline_id,
pipeline_config=pipeline_config,
doc=doc,
metadata=pipeline_metadata,
)
op_configs.append(op_details)
return op_configs
def check_matching_operation(
self, module: "KiaraModule"
) -> Union[PipelineOperationDetails, None]:
if isinstance(module, PipelineModule):
op_details = PipelineOperationDetails.create_operation_details(
operation_id=module.config.pipeline_name,
pipeline_inputs_schema=module.inputs_schema,
pipeline_outputs_schema=module.outputs_schema,
pipeline_config=module.config,
)
return op_details
else:
return None