events
Classes¶
KiaraEvent (BaseModel)
pydantic-model
¶
Source code in kiara/models/events/__init__.py
class KiaraEvent(BaseModel):
class Config:
json_loads = orjson.loads
json_dumps = orjson_dumps
def get_event_type(self) -> str:
if hasattr(self, "event_type"):
return self.event_type # type: ignore
name = camel_case_to_snake_case(self.__class__.__name__)
return name
Config
¶
Source code in kiara/models/events/__init__.py
class Config:
json_loads = orjson.loads
json_dumps = orjson_dumps
json_loads
¶json_dumps(v, *, default=None, **args)
¶Source code in kiara/models/events/__init__.py
def orjson_dumps(v, *, default=None, **args):
# orjson.dumps returns bytes, to match standard json.dumps we need to decode
try:
return orjson.dumps(v, default=default, **args).decode()
except Exception as e:
if is_debug():
print(f"Error dumping json data: {e}")
from kiara import dbg
dbg(v)
raise e
get_event_type(self)
¶
Source code in kiara/models/events/__init__.py
def get_event_type(self) -> str:
if hasattr(self, "event_type"):
return self.event_type # type: ignore
name = camel_case_to_snake_case(self.__class__.__name__)
return name
RegistryEvent (KiaraEvent)
pydantic-model
¶
Modules¶
alias_registry
¶
Classes¶
AliasArchiveAddedEvent (RegistryEvent)
pydantic-model
¶
Source code in kiara/models/events/alias_registry.py
class AliasArchiveAddedEvent(RegistryEvent):
event_type: Literal["alias_archive_added"] = "alias_archive_added"
alias_archive_id: uuid.UUID = Field(
description="The unique id of this data archive."
)
alias_archive_alias: str = Field(
description="The alias this data archive was added as."
)
is_store: bool = Field(
description="Whether this archive supports write operations (aka implements the 'DataStore' interface)."
)
is_default_store: bool = Field(
description="Whether this store acts as default store."
)
Attributes¶
alias_archive_alias: str
pydantic-field
required
¶The alias this data archive was added as.
alias_archive_id: UUID
pydantic-field
required
¶The unique id of this data archive.
event_type: Literal['alias_archive_added']
pydantic-field
¶is_default_store: bool
pydantic-field
required
¶Whether this store acts as default store.
is_store: bool
pydantic-field
required
¶Whether this archive supports write operations (aka implements the 'DataStore' interface).
AliasPreStoreEvent (RegistryEvent)
pydantic-model
¶
Source code in kiara/models/events/alias_registry.py
class AliasPreStoreEvent(RegistryEvent):
event_type: Literal["alias_pre_store"] = "alias_pre_store"
aliases: Iterable[str] = Field(description="The alias.")
AliasStoredEvent (RegistryEvent)
pydantic-model
¶
Source code in kiara/models/events/alias_registry.py
class AliasStoredEvent(RegistryEvent):
event_type: Literal["alias_stored"] = "alias_stored"
alias: str = Field(description="The alias.")
data_registry
¶
Classes¶
DataArchiveAddedEvent (RegistryEvent)
pydantic-model
¶
Source code in kiara/models/events/data_registry.py
class DataArchiveAddedEvent(RegistryEvent):
event_type: Literal["data_archive_added"] = "data_archive_added"
data_archive_id: uuid.UUID = Field(
description="The unique id of this data archive."
)
data_archive_alias: str = Field(
description="The alias this data archive was added as."
)
is_store: bool = Field(
description="Whether this archive supports write operations (aka implements the 'DataStore' interface)."
)
is_default_store: bool = Field(
description="Whether this store acts as default store."
)
Attributes¶
data_archive_alias: str
pydantic-field
required
¶The alias this data archive was added as.
data_archive_id: UUID
pydantic-field
required
¶The unique id of this data archive.
event_type: Literal['data_archive_added']
pydantic-field
¶is_default_store: bool
pydantic-field
required
¶Whether this store acts as default store.
is_store: bool
pydantic-field
required
¶Whether this archive supports write operations (aka implements the 'DataStore' interface).
ValueCreatedEvent (RegistryEvent)
pydantic-model
¶
Source code in kiara/models/events/data_registry.py
class ValueCreatedEvent(RegistryEvent):
event_type: Literal["value_created"] = "value_created"
value: Value = Field(description="The value metadata.")
ValuePreStoreEvent (RegistryEvent)
pydantic-model
¶
Source code in kiara/models/events/data_registry.py
class ValuePreStoreEvent(RegistryEvent):
event_type: Literal["value_pre_store"] = "value_pre_store"
value: Value = Field(description="The value metadata.")
ValueRegisteredEvent (RegistryEvent)
pydantic-model
¶
Source code in kiara/models/events/data_registry.py
class ValueRegisteredEvent(RegistryEvent):
event_type: Literal["value_registered"] = "value_registered"
value: Value = Field(description="The value metadata.")
ValueStoredEvent (RegistryEvent)
pydantic-model
¶
Source code in kiara/models/events/data_registry.py
class ValueStoredEvent(RegistryEvent):
event_type: Literal["value_stored"] = "value_stored"
value: Value = Field(description="The value metadata.")
destiny_registry
¶
Classes¶
DestinyArchiveAddedEvent (RegistryEvent)
pydantic-model
¶
Source code in kiara/models/events/destiny_registry.py
class DestinyArchiveAddedEvent(RegistryEvent):
event_type: Literal["destiny_archive_added"] = "destiny_archive_added"
destiny_archive_id: uuid.UUID = Field(
description="The unique id of this destiny archive."
)
destiny_archive_alias: str = Field(
description="The alias this destiny archive was added as."
)
is_store: bool = Field(
description="Whether this archive supports write operations (aka implements the 'DestinyStore' interface)."
)
is_default_store: bool = Field(
description="Whether this store acts as default store."
)
Attributes¶
destiny_archive_alias: str
pydantic-field
required
¶The alias this destiny archive was added as.
destiny_archive_id: UUID
pydantic-field
required
¶The unique id of this destiny archive.
event_type: Literal['destiny_archive_added']
pydantic-field
¶is_default_store: bool
pydantic-field
required
¶Whether this store acts as default store.
is_store: bool
pydantic-field
required
¶Whether this archive supports write operations (aka implements the 'DestinyStore' interface).
job_registry
¶
Classes¶
JobArchiveAddedEvent (RegistryEvent)
pydantic-model
¶
Source code in kiara/models/events/job_registry.py
class JobArchiveAddedEvent(RegistryEvent):
event_type: Literal["job_archive_added"] = "job_archive_added"
job_archive_id: uuid.UUID = Field(description="The unique id of this job archive.")
job_archive_alias: str = Field(
description="The alias this job archive was added as."
)
is_store: bool = Field(
description="Whether this archive supports write operations (aka implements the 'JobStore' interface)."
)
is_default_store: bool = Field(
description="Whether this store acts as default store."
)
Attributes¶
event_type: Literal['job_archive_added']
pydantic-field
¶is_default_store: bool
pydantic-field
required
¶Whether this store acts as default store.
is_store: bool
pydantic-field
required
¶Whether this archive supports write operations (aka implements the 'JobStore' interface).
job_archive_alias: str
pydantic-field
required
¶The alias this job archive was added as.
job_archive_id: UUID
pydantic-field
required
¶The unique id of this job archive.
JobRecordPreStoreEvent (RegistryEvent)
pydantic-model
¶
Source code in kiara/models/events/job_registry.py
class JobRecordPreStoreEvent(RegistryEvent):
event_type: Literal["job_record_pre_store"] = "job_record_pre_store"
job_record: JobRecord = Field(description="The job record.")
JobRecordStoredEvent (RegistryEvent)
pydantic-model
¶
Source code in kiara/models/events/job_registry.py
class JobRecordStoredEvent(RegistryEvent):
event_type: Literal["job_record_stored"] = "job_record_stored"
job_record: JobRecord = Field(description="The job record.")
pipeline
¶
Classes¶
ChangedValue (BaseModel)
pydantic-model
¶
PipelineDetails (BaseModel)
pydantic-model
¶
Source code in kiara/models/events/pipeline.py
class PipelineDetails(BaseModel):
class Config:
json_loads = orjson.loads
json_dumps = orjson_dumps
kiara_id: uuid.UUID = Field(description="The id of the kiara context.")
pipeline_id: uuid.UUID = Field(description="The id of the pipeline.")
pipeline_status: StepStatus = Field(
description="The current status of this pipeline."
)
invalid_details: Dict[str, str] = Field(
description="Details about fields that are invalid (if status < 'INPUTS_READY'.",
default_factory=dict,
)
pipeline_inputs: Dict[str, uuid.UUID] = Field(
description="The current pipeline inputs."
)
pipeline_outputs: Dict[str, uuid.UUID] = Field(
description="The current pipeline outputs."
)
step_states: Dict[str, StepDetails] = Field(
description="The state of each step within this pipeline."
)
def get_steps_by_processing_stage(self) -> MutableMapping[int, List[StepDetails]]:
result: MutableMapping[int, List[StepDetails]] = SortedDict()
for step_details in self.step_states.values():
result.setdefault(step_details.processing_stage, []).append(step_details)
return result
Attributes¶
invalid_details: Dict[str, str]
pydantic-field
¶Details about fields that are invalid (if status < 'INPUTS_READY'.
kiara_id: UUID
pydantic-field
required
¶The id of the kiara context.
pipeline_id: UUID
pydantic-field
required
¶The id of the pipeline.
pipeline_inputs: Dict[str, uuid.UUID]
pydantic-field
required
¶The current pipeline inputs.
pipeline_outputs: Dict[str, uuid.UUID]
pydantic-field
required
¶The current pipeline outputs.
pipeline_status: StepStatus
pydantic-field
required
¶The current status of this pipeline.
step_states: Dict[str, kiara.models.events.pipeline.StepDetails]
pydantic-field
required
¶The state of each step within this pipeline.
Config
¶Source code in kiara/models/events/pipeline.py
class Config:
json_loads = orjson.loads
json_dumps = orjson_dumps
json_loads
¶json_dumps(v, *, default=None, **args)
¶Source code in kiara/models/events/pipeline.py
def orjson_dumps(v, *, default=None, **args):
# orjson.dumps returns bytes, to match standard json.dumps we need to decode
try:
return orjson.dumps(v, default=default, **args).decode()
except Exception as e:
if is_debug():
print(f"Error dumping json data: {e}")
from kiara import dbg
dbg(v)
raise e
get_steps_by_processing_stage(self)
¶Source code in kiara/models/events/pipeline.py
def get_steps_by_processing_stage(self) -> MutableMapping[int, List[StepDetails]]:
result: MutableMapping[int, List[StepDetails]] = SortedDict()
for step_details in self.step_states.values():
result.setdefault(step_details.processing_stage, []).append(step_details)
return result
PipelineEvent (KiaraEvent)
pydantic-model
¶
Source code in kiara/models/events/pipeline.py
class PipelineEvent(KiaraEvent):
@classmethod
def create_event(
cls,
pipeline: "Pipeline",
changed: Mapping[str, Mapping[str, Mapping[str, ChangedValue]]],
) -> Union["PipelineEvent", None]:
pipeline_inputs = changed.get("__pipeline__", {}).get("inputs", {})
pipeline_outputs = changed.get("__pipeline__", {}).get("outputs", {})
step_inputs = {}
step_outputs = {}
invalidated_steps: Set[str] = set()
for step_id, change_details in changed.items():
if step_id == "__pipeline__":
continue
inputs = change_details.get("inputs", None)
if inputs:
invalidated_steps.add(step_id)
step_inputs[step_id] = inputs
outputs = change_details.get("outputs", None)
if outputs:
invalidated_steps.add(step_id)
step_outputs[step_id] = outputs
if (
not pipeline_inputs
and not pipeline_outputs
and not step_inputs
and not step_outputs
and not invalidated_steps
):
return None
event = PipelineEvent(
kiara_id=pipeline.kiara_id,
pipeline_id=pipeline.pipeline_id,
pipeline_inputs_changed=pipeline_inputs,
pipeline_outputs_changed=pipeline_outputs,
step_inputs_changed=step_inputs,
step_outputs_changed=step_outputs,
changed_steps=sorted(invalidated_steps),
)
return event
class Config:
allow_mutation = False
kiara_id: uuid.UUID = Field(
description="The id of the kiara context that created the pipeline."
)
pipeline_id: uuid.UUID = Field(description="The pipeline id.")
pipeline_inputs_changed: Dict[str, ChangedValue] = Field(
description="Details about changed pipeline input values.", default_factory=dict
)
pipeline_outputs_changed: Dict[str, ChangedValue] = Field(
description="Details about changed pipeline output values.",
default_factory=dict,
)
step_inputs_changed: Dict[str, Mapping[str, ChangedValue]] = Field(
description="Details about changed step input values.", default_factory=dict
)
step_outputs_changed: Dict[str, Mapping[str, ChangedValue]] = Field(
description="Details about changed step output values.", default_factory=dict
)
changed_steps: List[str] = Field(
description="A list of all step ids that have newly invalidated outputs."
)
def __repr__(self):
return f"{self.__class__.__name__}(pipeline_id={self.pipeline_id}, invalidated_steps={', '.join(self.changed_steps)})"
def __str__(self):
return self.__repr__()
Attributes¶
changed_steps: List[str]
pydantic-field
required
¶A list of all step ids that have newly invalidated outputs.
kiara_id: UUID
pydantic-field
required
¶The id of the kiara context that created the pipeline.
pipeline_id: UUID
pydantic-field
required
¶The pipeline id.
pipeline_inputs_changed: Dict[str, kiara.models.events.pipeline.ChangedValue]
pydantic-field
¶Details about changed pipeline input values.
pipeline_outputs_changed: Dict[str, kiara.models.events.pipeline.ChangedValue]
pydantic-field
¶Details about changed pipeline output values.
step_inputs_changed: Dict[str, Mapping[str, kiara.models.events.pipeline.ChangedValue]]
pydantic-field
¶Details about changed step input values.
step_outputs_changed: Dict[str, Mapping[str, kiara.models.events.pipeline.ChangedValue]]
pydantic-field
¶Details about changed step output values.
Config
¶Source code in kiara/models/events/pipeline.py
class Config:
allow_mutation = False
create_event(pipeline, changed)
classmethod
¶Source code in kiara/models/events/pipeline.py
@classmethod
def create_event(
cls,
pipeline: "Pipeline",
changed: Mapping[str, Mapping[str, Mapping[str, ChangedValue]]],
) -> Union["PipelineEvent", None]:
pipeline_inputs = changed.get("__pipeline__", {}).get("inputs", {})
pipeline_outputs = changed.get("__pipeline__", {}).get("outputs", {})
step_inputs = {}
step_outputs = {}
invalidated_steps: Set[str] = set()
for step_id, change_details in changed.items():
if step_id == "__pipeline__":
continue
inputs = change_details.get("inputs", None)
if inputs:
invalidated_steps.add(step_id)
step_inputs[step_id] = inputs
outputs = change_details.get("outputs", None)
if outputs:
invalidated_steps.add(step_id)
step_outputs[step_id] = outputs
if (
not pipeline_inputs
and not pipeline_outputs
and not step_inputs
and not step_outputs
and not invalidated_steps
):
return None
event = PipelineEvent(
kiara_id=pipeline.kiara_id,
pipeline_id=pipeline.pipeline_id,
pipeline_inputs_changed=pipeline_inputs,
pipeline_outputs_changed=pipeline_outputs,
step_inputs_changed=step_inputs,
step_outputs_changed=step_outputs,
changed_steps=sorted(invalidated_steps),
)
return event
StepDetails (BaseModel)
pydantic-model
¶
Source code in kiara/models/events/pipeline.py
class StepDetails(BaseModel):
kiara_id: uuid.UUID = Field(description="The id of the kiara context.")
pipeline_id: uuid.UUID = Field(description="The id of the pipeline.")
step: PipelineStep = Field(description="The pipeline step details.")
step_id: str = Field(description="The id of the step.")
processing_stage: int = Field(
description="The execution stage where this step is executed."
)
status: StepStatus = Field(description="The current status of this step.")
invalid_details: Dict[str, str] = Field(
description="Details about fields that are invalid (if status < 'INPUTS_READY'.",
default_factory=dict,
)
inputs: Dict[str, uuid.UUID] = Field(description="The current inputs of this step.")
outputs: Dict[str, uuid.UUID] = Field(
description="The current outputs of this step."
)
@validator("inputs")
def replace_none_values_inputs(cls, value):
result = {}
for k, v in value.items():
if v is None:
v = NONE_VALUE_ID
result[k] = v
return result
@validator("outputs")
def replace_none_values_outputs(cls, value):
result = {}
for k, v in value.items():
if v is None:
v = NOT_SET_VALUE_ID
result[k] = v
return result
def _retrieve_data_to_hash(self) -> Any:
return f"{self.kiara_id}.{self.pipeline_id}.{self.step_id}"
def _retrieve_id(self) -> str:
return f"{self.kiara_id}.{self.pipeline_id}.{self.step_id}"
Attributes¶
inputs: Dict[str, uuid.UUID]
pydantic-field
required
¶The current inputs of this step.
invalid_details: Dict[str, str]
pydantic-field
¶Details about fields that are invalid (if status < 'INPUTS_READY'.
kiara_id: UUID
pydantic-field
required
¶The id of the kiara context.
outputs: Dict[str, uuid.UUID]
pydantic-field
required
¶The current outputs of this step.
pipeline_id: UUID
pydantic-field
required
¶The id of the pipeline.
processing_stage: int
pydantic-field
required
¶The execution stage where this step is executed.
status: StepStatus
pydantic-field
required
¶The current status of this step.
step: PipelineStep
pydantic-field
required
¶The pipeline step details.
step_id: str
pydantic-field
required
¶The id of the step.
replace_none_values_inputs(value)
classmethod
¶Source code in kiara/models/events/pipeline.py
@validator("inputs")
def replace_none_values_inputs(cls, value):
result = {}
for k, v in value.items():
if v is None:
v = NONE_VALUE_ID
result[k] = v
return result
replace_none_values_outputs(value)
classmethod
¶Source code in kiara/models/events/pipeline.py
@validator("outputs")
def replace_none_values_outputs(cls, value):
result = {}
for k, v in value.items():
if v is None:
v = NOT_SET_VALUE_ID
result[k] = v
return result
workflow_registry
¶
Classes¶
WorkflowArchiveAddedEvent (RegistryEvent)
pydantic-model
¶
Source code in kiara/models/events/workflow_registry.py
class WorkflowArchiveAddedEvent(RegistryEvent):
event_type: Literal["workflow_archive_added"] = "workflow_archive_added"
workflow_archive_id: uuid.UUID = Field(
description="The unique id of this data archive."
)
workflow_archive_alias: str = Field(
description="The alias this workflow archive was added as."
)
is_store: bool = Field(
description="Whether this archive supports write operations (aka implements the 'WorkflowStore' interface)."
)
is_default_store: bool = Field(
description="Whether this store acts as default store."
)
Attributes¶
event_type: Literal['workflow_archive_added']
pydantic-field
¶is_default_store: bool
pydantic-field
required
¶Whether this store acts as default store.
is_store: bool
pydantic-field
required
¶Whether this archive supports write operations (aka implements the 'WorkflowStore' interface).
workflow_archive_alias: str
pydantic-field
required
¶The alias this workflow archive was added as.
workflow_archive_id: UUID
pydantic-field
required
¶The unique id of this data archive.