Skip to content

batch

Classes

BatchOperation (BaseModel) pydantic-model

Source code in kiara/interfaces/python_api/batch.py
class BatchOperation(BaseModel):
    class Config:
        validate_assignment = True

    @classmethod
    def from_file(
        cls,
        path: str,
        kiara: Optional["Kiara"] = None,
    ):

        data = get_data_from_file(path)
        pipeline_id = data.get("pipeline_name", None)
        if pipeline_id is None:
            name = os.path.basename(path)
            if name.endswith(".json"):
                name = name[0:-5]
            elif name.endswith(".yaml"):
                name = name[0:-5]
            data["pipeline_name"] = name

        return cls.from_config(data=data, kiara=kiara)

    @classmethod
    def from_config(
        cls,
        data: Mapping[str, Any],
        kiara: Optional["Kiara"],
    ):

        data = dict(data)
        inputs = data.pop("inputs", {})
        save = data.pop("save", False)
        pipeline_id = data.pop("pipeline_name", None)
        if pipeline_id is None:
            pipeline_id = str(uuid.uuid4())

        if kiara is None:
            kiara = Kiara.instance()

        pipeline_config = PipelineConfig.from_config(
            pipeline_name=pipeline_id, data=data, kiara=kiara
        )

        result = cls(pipeline_config=pipeline_config, inputs=inputs, save=save)
        result._kiara = kiara
        return result

    alias: str = Field(description="The batch name/alias.")
    pipeline_config: PipelineConfig = Field(
        description="The configuration of the underlying pipeline."
    )
    inputs: Dict[str, Any] = Field(
        description="The (base) inputs to use. Can be augmented before running the operation."
    )

    save_defaults: Dict[str, List[str]] = Field(
        description="Configuration which values to save, under which alias(es).",
        default_factory=dict,
    )

    _kiara: Kiara = PrivateAttr(default=None)

    @root_validator(pre=True)
    def add_alias(cls, values):

        if not values.get("alias", None):
            pc = values.get("pipeline_config", None)
            if not pc:
                raise ValueError("No pipeline config provided.")
            if isinstance(pc, PipelineConfig):
                alias = pc.pipeline_name
            else:
                alias = pc.get("pipeline_name", None)
            values["alias"] = alias

        return values

    @validator("save_defaults", always=True, pre=True)
    def validate_save(cls, save, values):

        alias = values["alias"]
        pipeline_config = values["pipeline_config"]
        return cls.create_save_aliases(
            save=save, alias=alias, pipeline_config=pipeline_config
        )

    @classmethod
    def create_save_aliases(
        cls,
        save: Union[bool, None, str, Mapping],
        alias: str,
        pipeline_config: PipelineConfig,
    ) -> Mapping[str, Any]:

        assert isinstance(pipeline_config, PipelineConfig)

        if save in [False, None]:
            save_new: Dict[str, Any] = {}
        elif save is True:
            field_names = pipeline_config.structure.pipeline_outputs_schema.keys()
            save_new = create_save_config(field_names=field_names, aliases=alias)
        elif isinstance(save, str):
            field_names = pipeline_config.structure.pipeline_outputs_schema.keys()
            save_new = create_save_config(field_names=field_names, aliases=save)
        elif isinstance(save, Mapping):
            save_new = dict(save)
        else:
            raise ValueError(
                f"Invalid type '{type(save)}' for 'save' attribute: must be None, bool, string or Mapping."
            )

        return save_new

    def run(
        self,
        inputs: Optional[Mapping[str, Any]] = None,
        save: Union[None, bool, str, Mapping[str, Any]] = None,
    ) -> ValueMap:

        pipeline = Pipeline(
            structure=self.pipeline_config.structure,
            kiara=self._kiara,
        )
        pipeline_controller = SinglePipelineBatchController(
            pipeline=pipeline, job_registry=self._kiara.job_registry
        )

        run_inputs = dict(self.inputs)
        if inputs:
            run_inputs.update(inputs)

        pipeline.set_pipeline_inputs(inputs=run_inputs)
        pipeline_controller.process_pipeline()

        result = self._kiara.data_registry.load_values(
            pipeline.get_current_pipeline_outputs()
        )

        if save is not None:
            if save is True:
                save = self.save_defaults
            else:
                save = self.__class__.create_save_aliases(
                    save=save, alias=self.alias, pipeline_config=self.pipeline_config
                )

            self._kiara.save_values(values=result, alias_map=save)

        return result

Attributes

alias: str pydantic-field required

The batch name/alias.

inputs: Dict[str, Any] pydantic-field required

The (base) inputs to use. Can be augmented before running the operation.

pipeline_config: PipelineConfig pydantic-field required

The configuration of the underlying pipeline.

save_defaults: Dict[str, List[str]] pydantic-field

Configuration which values to save, under which alias(es).

Config
Source code in kiara/interfaces/python_api/batch.py
class Config:
    validate_assignment = True
add_alias(values) classmethod
Source code in kiara/interfaces/python_api/batch.py
@root_validator(pre=True)
def add_alias(cls, values):

    if not values.get("alias", None):
        pc = values.get("pipeline_config", None)
        if not pc:
            raise ValueError("No pipeline config provided.")
        if isinstance(pc, PipelineConfig):
            alias = pc.pipeline_name
        else:
            alias = pc.get("pipeline_name", None)
        values["alias"] = alias

    return values
create_save_aliases(save, alias, pipeline_config) classmethod
Source code in kiara/interfaces/python_api/batch.py
@classmethod
def create_save_aliases(
    cls,
    save: Union[bool, None, str, Mapping],
    alias: str,
    pipeline_config: PipelineConfig,
) -> Mapping[str, Any]:

    assert isinstance(pipeline_config, PipelineConfig)

    if save in [False, None]:
        save_new: Dict[str, Any] = {}
    elif save is True:
        field_names = pipeline_config.structure.pipeline_outputs_schema.keys()
        save_new = create_save_config(field_names=field_names, aliases=alias)
    elif isinstance(save, str):
        field_names = pipeline_config.structure.pipeline_outputs_schema.keys()
        save_new = create_save_config(field_names=field_names, aliases=save)
    elif isinstance(save, Mapping):
        save_new = dict(save)
    else:
        raise ValueError(
            f"Invalid type '{type(save)}' for 'save' attribute: must be None, bool, string or Mapping."
        )

    return save_new
from_config(data, kiara) classmethod
Source code in kiara/interfaces/python_api/batch.py
@classmethod
def from_config(
    cls,
    data: Mapping[str, Any],
    kiara: Optional["Kiara"],
):

    data = dict(data)
    inputs = data.pop("inputs", {})
    save = data.pop("save", False)
    pipeline_id = data.pop("pipeline_name", None)
    if pipeline_id is None:
        pipeline_id = str(uuid.uuid4())

    if kiara is None:
        kiara = Kiara.instance()

    pipeline_config = PipelineConfig.from_config(
        pipeline_name=pipeline_id, data=data, kiara=kiara
    )

    result = cls(pipeline_config=pipeline_config, inputs=inputs, save=save)
    result._kiara = kiara
    return result
from_file(path, kiara=None) classmethod
Source code in kiara/interfaces/python_api/batch.py
@classmethod
def from_file(
    cls,
    path: str,
    kiara: Optional["Kiara"] = None,
):

    data = get_data_from_file(path)
    pipeline_id = data.get("pipeline_name", None)
    if pipeline_id is None:
        name = os.path.basename(path)
        if name.endswith(".json"):
            name = name[0:-5]
        elif name.endswith(".yaml"):
            name = name[0:-5]
        data["pipeline_name"] = name

    return cls.from_config(data=data, kiara=kiara)
run(self, inputs=None, save=None)
Source code in kiara/interfaces/python_api/batch.py
def run(
    self,
    inputs: Optional[Mapping[str, Any]] = None,
    save: Union[None, bool, str, Mapping[str, Any]] = None,
) -> ValueMap:

    pipeline = Pipeline(
        structure=self.pipeline_config.structure,
        kiara=self._kiara,
    )
    pipeline_controller = SinglePipelineBatchController(
        pipeline=pipeline, job_registry=self._kiara.job_registry
    )

    run_inputs = dict(self.inputs)
    if inputs:
        run_inputs.update(inputs)

    pipeline.set_pipeline_inputs(inputs=run_inputs)
    pipeline_controller.process_pipeline()

    result = self._kiara.data_registry.load_values(
        pipeline.get_current_pipeline_outputs()
    )

    if save is not None:
        if save is True:
            save = self.save_defaults
        else:
            save = self.__class__.create_save_aliases(
                save=save, alias=self.alias, pipeline_config=self.pipeline_config
            )

        self._kiara.save_values(values=result, alias_map=save)

    return result
validate_save(save, values) classmethod
Source code in kiara/interfaces/python_api/batch.py
@validator("save_defaults", always=True, pre=True)
def validate_save(cls, save, values):

    alias = values["alias"]
    pipeline_config = values["pipeline_config"]
    return cls.create_save_aliases(
        save=save, alias=alias, pipeline_config=pipeline_config
    )