Skip to content

events

Classes

KiaraEvent (BaseModel) pydantic-model

Source code in kiara/models/events/__init__.py
class KiaraEvent(BaseModel):
    class Config:
        json_loads = orjson.loads
        json_dumps = orjson_dumps

    def get_event_type(self) -> str:

        if hasattr(self, "event_type"):
            return self.event_type  # type: ignore

        name = camel_case_to_snake_case(self.__class__.__name__)
        return name
Config
Source code in kiara/models/events/__init__.py
class Config:
    json_loads = orjson.loads
    json_dumps = orjson_dumps
json_loads
json_dumps(v, *, default=None, **args)
Source code in kiara/models/events/__init__.py
def orjson_dumps(v, *, default=None, **args):
    # orjson.dumps returns bytes, to match standard json.dumps we need to decode

    try:
        return orjson.dumps(v, default=default, **args).decode()
    except Exception as e:
        if is_debug():
            print(f"Error dumping json data: {e}")
            from kiara import dbg

            dbg(v)

        raise e
get_event_type(self)
Source code in kiara/models/events/__init__.py
def get_event_type(self) -> str:

    if hasattr(self, "event_type"):
        return self.event_type  # type: ignore

    name = camel_case_to_snake_case(self.__class__.__name__)
    return name

RegistryEvent (KiaraEvent) pydantic-model

Source code in kiara/models/events/__init__.py
class RegistryEvent(KiaraEvent):

    kiara_id: uuid.UUID = Field(
        description="The id of the kiara context the value was created in."
    )

Attributes

kiara_id: UUID pydantic-field required

The id of the kiara context the value was created in.

Modules

alias_registry

Classes

AliasArchiveAddedEvent (RegistryEvent) pydantic-model
Source code in kiara/models/events/alias_registry.py
class AliasArchiveAddedEvent(RegistryEvent):

    event_type: Literal["alias_archive_added"] = "alias_archive_added"
    alias_archive_id: uuid.UUID = Field(
        description="The unique id of this data archive."
    )
    alias_archive_alias: str = Field(
        description="The alias this data archive was added as."
    )
    is_store: bool = Field(
        description="Whether this archive supports write operations (aka implements the 'DataStore' interface)."
    )
    is_default_store: bool = Field(
        description="Whether this store acts as default store."
    )
Attributes
alias_archive_alias: str pydantic-field required

The alias this data archive was added as.

alias_archive_id: UUID pydantic-field required

The unique id of this data archive.

event_type: Literal['alias_archive_added'] pydantic-field
is_default_store: bool pydantic-field required

Whether this store acts as default store.

is_store: bool pydantic-field required

Whether this archive supports write operations (aka implements the 'DataStore' interface).

AliasPreStoreEvent (RegistryEvent) pydantic-model
Source code in kiara/models/events/alias_registry.py
class AliasPreStoreEvent(RegistryEvent):

    event_type: Literal["alias_pre_store"] = "alias_pre_store"
    aliases: Iterable[str] = Field(description="The alias.")
Attributes
aliases: Iterable[str] pydantic-field required

The alias.

event_type: Literal['alias_pre_store'] pydantic-field
AliasStoredEvent (RegistryEvent) pydantic-model
Source code in kiara/models/events/alias_registry.py
class AliasStoredEvent(RegistryEvent):

    event_type: Literal["alias_stored"] = "alias_stored"
    alias: str = Field(description="The alias.")
Attributes
alias: str pydantic-field required

The alias.

event_type: Literal['alias_stored'] pydantic-field

data_registry

Classes

DataArchiveAddedEvent (RegistryEvent) pydantic-model
Source code in kiara/models/events/data_registry.py
class DataArchiveAddedEvent(RegistryEvent):

    event_type: Literal["data_archive_added"] = "data_archive_added"
    data_archive_id: uuid.UUID = Field(
        description="The unique id of this data archive."
    )
    data_archive_alias: str = Field(
        description="The alias this data archive was added as."
    )
    is_store: bool = Field(
        description="Whether this archive supports write operations (aka implements the 'DataStore' interface)."
    )
    is_default_store: bool = Field(
        description="Whether this store acts as default store."
    )
Attributes
data_archive_alias: str pydantic-field required

The alias this data archive was added as.

data_archive_id: UUID pydantic-field required

The unique id of this data archive.

event_type: Literal['data_archive_added'] pydantic-field
is_default_store: bool pydantic-field required

Whether this store acts as default store.

is_store: bool pydantic-field required

Whether this archive supports write operations (aka implements the 'DataStore' interface).

ValueCreatedEvent (RegistryEvent) pydantic-model
Source code in kiara/models/events/data_registry.py
class ValueCreatedEvent(RegistryEvent):

    event_type: Literal["value_created"] = "value_created"
    value: Value = Field(description="The value metadata.")
Attributes
event_type: Literal['value_created'] pydantic-field
value: Value pydantic-field required

The value metadata.

ValuePreStoreEvent (RegistryEvent) pydantic-model
Source code in kiara/models/events/data_registry.py
class ValuePreStoreEvent(RegistryEvent):

    event_type: Literal["value_pre_store"] = "value_pre_store"
    value: Value = Field(description="The value metadata.")
Attributes
event_type: Literal['value_pre_store'] pydantic-field
value: Value pydantic-field required

The value metadata.

ValueRegisteredEvent (RegistryEvent) pydantic-model
Source code in kiara/models/events/data_registry.py
class ValueRegisteredEvent(RegistryEvent):

    event_type: Literal["value_registered"] = "value_registered"
    value: Value = Field(description="The value metadata.")
Attributes
event_type: Literal['value_registered'] pydantic-field
value: Value pydantic-field required

The value metadata.

ValueStoredEvent (RegistryEvent) pydantic-model
Source code in kiara/models/events/data_registry.py
class ValueStoredEvent(RegistryEvent):

    event_type: Literal["value_stored"] = "value_stored"
    value: Value = Field(description="The value metadata.")
Attributes
event_type: Literal['value_stored'] pydantic-field
value: Value pydantic-field required

The value metadata.

destiny_registry

Classes

DestinyArchiveAddedEvent (RegistryEvent) pydantic-model
Source code in kiara/models/events/destiny_registry.py
class DestinyArchiveAddedEvent(RegistryEvent):

    event_type: Literal["destiny_archive_added"] = "destiny_archive_added"
    destiny_archive_id: uuid.UUID = Field(
        description="The unique id of this destiny archive."
    )
    destiny_archive_alias: str = Field(
        description="The alias this destiny archive was added as."
    )
    is_store: bool = Field(
        description="Whether this archive supports write operations (aka implements the 'DestinyStore' interface)."
    )
    is_default_store: bool = Field(
        description="Whether this store acts as default store."
    )
Attributes
destiny_archive_alias: str pydantic-field required

The alias this destiny archive was added as.

destiny_archive_id: UUID pydantic-field required

The unique id of this destiny archive.

event_type: Literal['destiny_archive_added'] pydantic-field
is_default_store: bool pydantic-field required

Whether this store acts as default store.

is_store: bool pydantic-field required

Whether this archive supports write operations (aka implements the 'DestinyStore' interface).

job_registry

Classes

JobArchiveAddedEvent (RegistryEvent) pydantic-model
Source code in kiara/models/events/job_registry.py
class JobArchiveAddedEvent(RegistryEvent):

    event_type: Literal["job_archive_added"] = "job_archive_added"

    job_archive_id: uuid.UUID = Field(description="The unique id of this job archive.")
    job_archive_alias: str = Field(
        description="The alias this job archive was added as."
    )
    is_store: bool = Field(
        description="Whether this archive supports write operations (aka implements the 'JobStore' interface)."
    )
    is_default_store: bool = Field(
        description="Whether this store acts as default store."
    )
Attributes
event_type: Literal['job_archive_added'] pydantic-field
is_default_store: bool pydantic-field required

Whether this store acts as default store.

is_store: bool pydantic-field required

Whether this archive supports write operations (aka implements the 'JobStore' interface).

job_archive_alias: str pydantic-field required

The alias this job archive was added as.

job_archive_id: UUID pydantic-field required

The unique id of this job archive.

JobRecordPreStoreEvent (RegistryEvent) pydantic-model
Source code in kiara/models/events/job_registry.py
class JobRecordPreStoreEvent(RegistryEvent):

    event_type: Literal["job_record_pre_store"] = "job_record_pre_store"
    job_record: JobRecord = Field(description="The job record.")
Attributes
event_type: Literal['job_record_pre_store'] pydantic-field
job_record: JobRecord pydantic-field required

The job record.

JobRecordStoredEvent (RegistryEvent) pydantic-model
Source code in kiara/models/events/job_registry.py
class JobRecordStoredEvent(RegistryEvent):

    event_type: Literal["job_record_stored"] = "job_record_stored"
    job_record: JobRecord = Field(description="The job record.")
Attributes
event_type: Literal['job_record_stored'] pydantic-field
job_record: JobRecord pydantic-field required

The job record.

pipeline

Classes

ChangedValue (BaseModel) pydantic-model
Source code in kiara/models/events/pipeline.py
class ChangedValue(BaseModel):

    old: Union[uuid.UUID, None]
    new: Union[uuid.UUID, None]
new: UUID pydantic-field
old: UUID pydantic-field
PipelineDetails (BaseModel) pydantic-model
Source code in kiara/models/events/pipeline.py
class PipelineDetails(BaseModel):
    class Config:
        json_loads = orjson.loads
        json_dumps = orjson_dumps

    kiara_id: uuid.UUID = Field(description="The id of the kiara context.")
    pipeline_id: uuid.UUID = Field(description="The id of the pipeline.")

    pipeline_status: StepStatus = Field(
        description="The current status of this pipeline."
    )
    invalid_details: Dict[str, str] = Field(
        description="Details about fields that are invalid (if status < 'INPUTS_READY'.",
        default_factory=dict,
    )

    pipeline_inputs: Dict[str, uuid.UUID] = Field(
        description="The current pipeline inputs."
    )
    pipeline_outputs: Dict[str, uuid.UUID] = Field(
        description="The current pipeline outputs."
    )

    step_states: Dict[str, StepDetails] = Field(
        description="The state of each step within this pipeline."
    )

    def get_steps_by_processing_stage(self) -> MutableMapping[int, List[StepDetails]]:

        result: MutableMapping[int, List[StepDetails]] = SortedDict()
        for step_details in self.step_states.values():
            result.setdefault(step_details.processing_stage, []).append(step_details)
        return result
Attributes
invalid_details: Dict[str, str] pydantic-field

Details about fields that are invalid (if status < 'INPUTS_READY'.

kiara_id: UUID pydantic-field required

The id of the kiara context.

pipeline_id: UUID pydantic-field required

The id of the pipeline.

pipeline_inputs: Dict[str, uuid.UUID] pydantic-field required

The current pipeline inputs.

pipeline_outputs: Dict[str, uuid.UUID] pydantic-field required

The current pipeline outputs.

pipeline_status: StepStatus pydantic-field required

The current status of this pipeline.

step_states: Dict[str, kiara.models.events.pipeline.StepDetails] pydantic-field required

The state of each step within this pipeline.

Config
Source code in kiara/models/events/pipeline.py
class Config:
    json_loads = orjson.loads
    json_dumps = orjson_dumps
json_loads
json_dumps(v, *, default=None, **args)
Source code in kiara/models/events/pipeline.py
def orjson_dumps(v, *, default=None, **args):
    # orjson.dumps returns bytes, to match standard json.dumps we need to decode

    try:
        return orjson.dumps(v, default=default, **args).decode()
    except Exception as e:
        if is_debug():
            print(f"Error dumping json data: {e}")
            from kiara import dbg

            dbg(v)

        raise e
get_steps_by_processing_stage(self)
Source code in kiara/models/events/pipeline.py
def get_steps_by_processing_stage(self) -> MutableMapping[int, List[StepDetails]]:

    result: MutableMapping[int, List[StepDetails]] = SortedDict()
    for step_details in self.step_states.values():
        result.setdefault(step_details.processing_stage, []).append(step_details)
    return result
PipelineEvent (KiaraEvent) pydantic-model
Source code in kiara/models/events/pipeline.py
class PipelineEvent(KiaraEvent):
    @classmethod
    def create_event(
        cls,
        pipeline: "Pipeline",
        changed: Mapping[str, Mapping[str, Mapping[str, ChangedValue]]],
    ) -> Union["PipelineEvent", None]:

        pipeline_inputs = changed.get("__pipeline__", {}).get("inputs", {})
        pipeline_outputs = changed.get("__pipeline__", {}).get("outputs", {})

        step_inputs = {}
        step_outputs = {}

        invalidated_steps: Set[str] = set()

        for step_id, change_details in changed.items():
            if step_id == "__pipeline__":
                continue
            inputs = change_details.get("inputs", None)
            if inputs:
                invalidated_steps.add(step_id)
                step_inputs[step_id] = inputs
            outputs = change_details.get("outputs", None)
            if outputs:
                invalidated_steps.add(step_id)
                step_outputs[step_id] = outputs

        if (
            not pipeline_inputs
            and not pipeline_outputs
            and not step_inputs
            and not step_outputs
            and not invalidated_steps
        ):
            return None

        event = PipelineEvent(
            kiara_id=pipeline.kiara_id,
            pipeline_id=pipeline.pipeline_id,
            pipeline_inputs_changed=pipeline_inputs,
            pipeline_outputs_changed=pipeline_outputs,
            step_inputs_changed=step_inputs,
            step_outputs_changed=step_outputs,
            changed_steps=sorted(invalidated_steps),
        )
        return event

    class Config:
        allow_mutation = False

    kiara_id: uuid.UUID = Field(
        description="The id of the kiara context that created the pipeline."
    )
    pipeline_id: uuid.UUID = Field(description="The pipeline id.")

    pipeline_inputs_changed: Dict[str, ChangedValue] = Field(
        description="Details about changed pipeline input values.", default_factory=dict
    )
    pipeline_outputs_changed: Dict[str, ChangedValue] = Field(
        description="Details about changed pipeline output values.",
        default_factory=dict,
    )

    step_inputs_changed: Dict[str, Mapping[str, ChangedValue]] = Field(
        description="Details about changed step input values.", default_factory=dict
    )
    step_outputs_changed: Dict[str, Mapping[str, ChangedValue]] = Field(
        description="Details about changed step output values.", default_factory=dict
    )

    changed_steps: List[str] = Field(
        description="A list of all step ids that have newly invalidated outputs."
    )

    def __repr__(self):
        return f"{self.__class__.__name__}(pipeline_id={self.pipeline_id}, invalidated_steps={', '.join(self.changed_steps)})"

    def __str__(self):
        return self.__repr__()
Attributes
changed_steps: List[str] pydantic-field required

A list of all step ids that have newly invalidated outputs.

kiara_id: UUID pydantic-field required

The id of the kiara context that created the pipeline.

pipeline_id: UUID pydantic-field required

The pipeline id.

pipeline_inputs_changed: Dict[str, kiara.models.events.pipeline.ChangedValue] pydantic-field

Details about changed pipeline input values.

pipeline_outputs_changed: Dict[str, kiara.models.events.pipeline.ChangedValue] pydantic-field

Details about changed pipeline output values.

step_inputs_changed: Dict[str, Mapping[str, kiara.models.events.pipeline.ChangedValue]] pydantic-field

Details about changed step input values.

step_outputs_changed: Dict[str, Mapping[str, kiara.models.events.pipeline.ChangedValue]] pydantic-field

Details about changed step output values.

Config
Source code in kiara/models/events/pipeline.py
class Config:
    allow_mutation = False
create_event(pipeline, changed) classmethod
Source code in kiara/models/events/pipeline.py
@classmethod
def create_event(
    cls,
    pipeline: "Pipeline",
    changed: Mapping[str, Mapping[str, Mapping[str, ChangedValue]]],
) -> Union["PipelineEvent", None]:

    pipeline_inputs = changed.get("__pipeline__", {}).get("inputs", {})
    pipeline_outputs = changed.get("__pipeline__", {}).get("outputs", {})

    step_inputs = {}
    step_outputs = {}

    invalidated_steps: Set[str] = set()

    for step_id, change_details in changed.items():
        if step_id == "__pipeline__":
            continue
        inputs = change_details.get("inputs", None)
        if inputs:
            invalidated_steps.add(step_id)
            step_inputs[step_id] = inputs
        outputs = change_details.get("outputs", None)
        if outputs:
            invalidated_steps.add(step_id)
            step_outputs[step_id] = outputs

    if (
        not pipeline_inputs
        and not pipeline_outputs
        and not step_inputs
        and not step_outputs
        and not invalidated_steps
    ):
        return None

    event = PipelineEvent(
        kiara_id=pipeline.kiara_id,
        pipeline_id=pipeline.pipeline_id,
        pipeline_inputs_changed=pipeline_inputs,
        pipeline_outputs_changed=pipeline_outputs,
        step_inputs_changed=step_inputs,
        step_outputs_changed=step_outputs,
        changed_steps=sorted(invalidated_steps),
    )
    return event
StepDetails (BaseModel) pydantic-model
Source code in kiara/models/events/pipeline.py
class StepDetails(BaseModel):

    kiara_id: uuid.UUID = Field(description="The id of the kiara context.")
    pipeline_id: uuid.UUID = Field(description="The id of the pipeline.")
    step: PipelineStep = Field(description="The pipeline step details.")
    step_id: str = Field(description="The id of the step.")
    processing_stage: int = Field(
        description="The execution stage where this step is executed."
    )
    status: StepStatus = Field(description="The current status of this step.")
    invalid_details: Dict[str, str] = Field(
        description="Details about fields that are invalid (if status < 'INPUTS_READY'.",
        default_factory=dict,
    )
    inputs: Dict[str, uuid.UUID] = Field(description="The current inputs of this step.")
    outputs: Dict[str, uuid.UUID] = Field(
        description="The current outputs of this step."
    )

    @validator("inputs")
    def replace_none_values_inputs(cls, value):

        result = {}
        for k, v in value.items():
            if v is None:
                v = NONE_VALUE_ID
            result[k] = v
        return result

    @validator("outputs")
    def replace_none_values_outputs(cls, value):

        result = {}
        for k, v in value.items():
            if v is None:
                v = NOT_SET_VALUE_ID
            result[k] = v
        return result

    def _retrieve_data_to_hash(self) -> Any:
        return f"{self.kiara_id}.{self.pipeline_id}.{self.step_id}"

    def _retrieve_id(self) -> str:
        return f"{self.kiara_id}.{self.pipeline_id}.{self.step_id}"
Attributes
inputs: Dict[str, uuid.UUID] pydantic-field required

The current inputs of this step.

invalid_details: Dict[str, str] pydantic-field

Details about fields that are invalid (if status < 'INPUTS_READY'.

kiara_id: UUID pydantic-field required

The id of the kiara context.

outputs: Dict[str, uuid.UUID] pydantic-field required

The current outputs of this step.

pipeline_id: UUID pydantic-field required

The id of the pipeline.

processing_stage: int pydantic-field required

The execution stage where this step is executed.

status: StepStatus pydantic-field required

The current status of this step.

step: PipelineStep pydantic-field required

The pipeline step details.

step_id: str pydantic-field required

The id of the step.

replace_none_values_inputs(value) classmethod
Source code in kiara/models/events/pipeline.py
@validator("inputs")
def replace_none_values_inputs(cls, value):

    result = {}
    for k, v in value.items():
        if v is None:
            v = NONE_VALUE_ID
        result[k] = v
    return result
replace_none_values_outputs(value) classmethod
Source code in kiara/models/events/pipeline.py
@validator("outputs")
def replace_none_values_outputs(cls, value):

    result = {}
    for k, v in value.items():
        if v is None:
            v = NOT_SET_VALUE_ID
        result[k] = v
    return result

workflow_registry

Classes

WorkflowArchiveAddedEvent (RegistryEvent) pydantic-model
Source code in kiara/models/events/workflow_registry.py
class WorkflowArchiveAddedEvent(RegistryEvent):

    event_type: Literal["workflow_archive_added"] = "workflow_archive_added"
    workflow_archive_id: uuid.UUID = Field(
        description="The unique id of this data archive."
    )
    workflow_archive_alias: str = Field(
        description="The alias this workflow archive was added as."
    )
    is_store: bool = Field(
        description="Whether this archive supports write operations (aka implements the 'WorkflowStore' interface)."
    )
    is_default_store: bool = Field(
        description="Whether this store acts as default store."
    )
Attributes
event_type: Literal['workflow_archive_added'] pydantic-field
is_default_store: bool pydantic-field required

Whether this store acts as default store.

is_store: bool pydantic-field required

Whether this archive supports write operations (aka implements the 'WorkflowStore' interface).

workflow_archive_alias: str pydantic-field required

The alias this workflow archive was added as.

workflow_archive_id: UUID pydantic-field required

The unique id of this data archive.