Skip to content

destinies

Classes

DestinyArchive (BaseArchive)

Source code in kiara/registries/destinies/__init__.py
class DestinyArchive(BaseArchive):
    @classmethod
    def supported_item_types(cls) -> Iterable[str]:
        return ["destiny"]

    def __init__(self, archive_id: uuid.UUID, config: ARCHIVE_CONFIG_CLS):

        super().__init__(archive_id=archive_id, config=config)

    @abc.abstractmethod
    def get_all_value_ids(self) -> Set[uuid.UUID]:
        """Retrun a list of all value ids that have destinies stored in this archive."""

    @abc.abstractmethod
    def get_destiny_aliases_for_value(
        self, value_id: uuid.UUID
    ) -> Union[Set[str], None]:
        """Retrieve all the destinies for the specified value within this archive.

        In case this archive discovers its value destinies dynamically, this can return 'None'.
        """

    @abc.abstractmethod
    def get_destiny(self, value_id: uuid.UUID, destiny: str) -> Destiny:
        pass

Methods

get_all_value_ids(self)

Retrun a list of all value ids that have destinies stored in this archive.

Source code in kiara/registries/destinies/__init__.py
@abc.abstractmethod
def get_all_value_ids(self) -> Set[uuid.UUID]:
    """Retrun a list of all value ids that have destinies stored in this archive."""
get_destiny(self, value_id, destiny)
Source code in kiara/registries/destinies/__init__.py
@abc.abstractmethod
def get_destiny(self, value_id: uuid.UUID, destiny: str) -> Destiny:
    pass
get_destiny_aliases_for_value(self, value_id)

Retrieve all the destinies for the specified value within this archive.

In case this archive discovers its value destinies dynamically, this can return 'None'.

Source code in kiara/registries/destinies/__init__.py
@abc.abstractmethod
def get_destiny_aliases_for_value(
    self, value_id: uuid.UUID
) -> Union[Set[str], None]:
    """Retrieve all the destinies for the specified value within this archive.

    In case this archive discovers its value destinies dynamically, this can return 'None'.
    """
supported_item_types() classmethod
Source code in kiara/registries/destinies/__init__.py
@classmethod
def supported_item_types(cls) -> Iterable[str]:
    return ["destiny"]

DestinyStore (DestinyArchive)

Source code in kiara/registries/destinies/__init__.py
class DestinyStore(DestinyArchive):
    @abc.abstractmethod
    def persist_destiny(self, destiny: Destiny):
        pass
persist_destiny(self, destiny)
Source code in kiara/registries/destinies/__init__.py
@abc.abstractmethod
def persist_destiny(self, destiny: Destiny):
    pass

Modules

filesystem_store

logger

Classes

FileSystemDestinyArchive (DestinyArchive)
Source code in kiara/registries/destinies/filesystem_store.py
class FileSystemDestinyArchive(DestinyArchive):

    _archive_type_name = "filesystem_destiny_archive"
    _config_cls = FileSystemArchiveConfig

    @classmethod
    def is_writeable(cls) -> bool:
        return False

    # @classmethod
    # def create_from_kiara_context(cls, kiara: "Kiara"):
    #
    #     TODO = kiara_app_dirs.user_data_dir
    #     base_path = Path(TODO) / "destiny_store"
    #     base_path.mkdir(parents=True, exist_ok=True)
    #     result = cls(base_path=base_path, store_id=kiara.id)
    #     ID_REGISTRY.update_metadata(
    #         result.get_destiny_archive_id(), kiara_id=kiara.id, obj=result
    #     )
    #     return result

    def __init__(self, archive_id: uuid.UUID, config: FileSystemArchiveConfig):

        super().__init__(archive_id=archive_id, config=config)
        self._base_path: Union[Path, None] = None

        # base_path = config.archive_path
        # if not base_path.is_dir():
        #     raise Exception(
        #         f"Can't create file system archive instance, base path does not exist or is not a folder: {base_path.as_posix()}."
        #     )

        # self._store_id: uuid.UUID = store_id
        # self._base_path: Path = base_path
        # self._destinies_path: Path = self._base_path / "destinies"
        # self._value_id_path: Path = self._base_path / "value_ids"

    @property
    def destiny_store_path(self) -> Path:

        if self._base_path is not None:
            return self._base_path

        self._base_path = Path(self.config.archive_path).absolute()  # type: ignore
        self._base_path.mkdir(parents=True, exist_ok=True)
        return self._base_path

    def get_archive_details(self) -> ArchiveDetails:

        size = sum(
            f.stat().st_size
            for f in self.destiny_store_path.glob("**/*")
            if f.is_file()
        )
        return ArchiveDetails(size=size)

    @property
    def destinies_path(self) -> Path:
        return self.destiny_store_path / "destinies"

    @property
    def value_id_path(self) -> Path:
        return self.destiny_store_path / "value_ids"

    def _translate_destiny_id_to_path(self, destiny_id: uuid.UUID) -> Path:

        tokens = str(destiny_id).split("-")
        destiny_path = (
            self.destinies_path.joinpath(*tokens[0:-1]) / f"{tokens[-1]}.json"
        )
        return destiny_path

    def _translate_destinies_path_to_id(self, destinies_path: Path) -> uuid.UUID:

        relative = destinies_path.relative_to(self.destinies_path).as_posix()[:-5]

        destninies_id = "-".join(relative.split(os.path.sep))

        return uuid.UUID(destninies_id)

    def _translate_value_id(self, value_id: uuid.UUID, destiny_alias: str) -> Path:

        tokens = str(value_id).split("-")
        value_id_path = self.value_id_path.joinpath(*tokens)

        full_path = value_id_path / f"{destiny_alias}.json"
        return full_path

    def _translate_value_id_path(self, value_path: Path) -> uuid.UUID:

        relative = value_path.relative_to(self.value_id_path)

        value_id_str = "-".join(relative.as_posix().split(os.path.sep))
        return uuid.UUID(value_id_str)

    def _translate_alias_path(self, alias_path: Path) -> Tuple[uuid.UUID, str]:

        value_id = self._translate_value_id_path(alias_path.parent)

        alias = alias_path.name[0:-5]

        return value_id, alias

    def get_all_value_ids(self) -> Set[uuid.UUID]:

        all_root_folders = self.value_id_path.glob("*/*/*/*/*")

        result = set()
        for folder in all_root_folders:
            if not folder.is_dir():
                continue

            value_id = self._translate_value_id_path(folder)
            result.add(value_id)

        return result

    def get_destiny_aliases_for_value(self, value_id: uuid.UUID) -> Set[str]:

        tokens = str(value_id).split("-")
        value_id_path = self.value_id_path.joinpath(*tokens)

        aliases = value_id_path.glob("*.json")

        return set(a.name[0:-5] for a in aliases)

    def get_destiny(self, value_id: uuid.UUID, destiny_alias: str) -> Destiny:

        tokens = str(value_id).split("-")
        value_id_path = self.value_id_path.joinpath(*tokens)

        destiny_path = value_id_path / f"{destiny_alias}.json"

        destiny_data = orjson.loads(destiny_path.read_text())

        destiny = Destiny.construct(**destiny_data)
        return destiny
destinies_path: Path property readonly
destiny_store_path: Path property readonly
value_id_path: Path property readonly
Classes
_config_cls (ArchiveConfig) private pydantic-model
Source code in kiara/registries/destinies/filesystem_store.py
class FileSystemArchiveConfig(ArchiveConfig):

    archive_path: str = Field(
        description="The path where the data for this archive is stored."
    )
Attributes
archive_path: str pydantic-field required

The path where the data for this archive is stored.

Methods
get_all_value_ids(self)

Retrun a list of all value ids that have destinies stored in this archive.

Source code in kiara/registries/destinies/filesystem_store.py
def get_all_value_ids(self) -> Set[uuid.UUID]:

    all_root_folders = self.value_id_path.glob("*/*/*/*/*")

    result = set()
    for folder in all_root_folders:
        if not folder.is_dir():
            continue

        value_id = self._translate_value_id_path(folder)
        result.add(value_id)

    return result
get_archive_details(self)
Source code in kiara/registries/destinies/filesystem_store.py
def get_archive_details(self) -> ArchiveDetails:

    size = sum(
        f.stat().st_size
        for f in self.destiny_store_path.glob("**/*")
        if f.is_file()
    )
    return ArchiveDetails(size=size)
get_destiny(self, value_id, destiny_alias)
Source code in kiara/registries/destinies/filesystem_store.py
def get_destiny(self, value_id: uuid.UUID, destiny_alias: str) -> Destiny:

    tokens = str(value_id).split("-")
    value_id_path = self.value_id_path.joinpath(*tokens)

    destiny_path = value_id_path / f"{destiny_alias}.json"

    destiny_data = orjson.loads(destiny_path.read_text())

    destiny = Destiny.construct(**destiny_data)
    return destiny
get_destiny_aliases_for_value(self, value_id)

Retrieve all the destinies for the specified value within this archive.

In case this archive discovers its value destinies dynamically, this can return 'None'.

Source code in kiara/registries/destinies/filesystem_store.py
def get_destiny_aliases_for_value(self, value_id: uuid.UUID) -> Set[str]:

    tokens = str(value_id).split("-")
    value_id_path = self.value_id_path.joinpath(*tokens)

    aliases = value_id_path.glob("*.json")

    return set(a.name[0:-5] for a in aliases)
is_writeable() classmethod
Source code in kiara/registries/destinies/filesystem_store.py
@classmethod
def is_writeable(cls) -> bool:
    return False
FileSystemDestinyStore (FileSystemDestinyArchive, DestinyStore)
Source code in kiara/registries/destinies/filesystem_store.py
class FileSystemDestinyStore(FileSystemDestinyArchive, DestinyStore):

    _archive_type_name = "filesystem_destiny_store"

    @classmethod
    def is_writeable(cls) -> bool:
        return True

    def persist_destiny(self, destiny: Destiny):

        destiny_path = self._translate_destiny_id_to_path(destiny_id=destiny.destiny_id)
        destiny_path.parent.mkdir(parents=True, exist_ok=True)
        destiny_path.write_text(destiny.json())

        for value_id in destiny.fixed_inputs.values():

            path = self._translate_value_id(
                value_id=value_id, destiny_alias=destiny.destiny_alias
            )
            if path.exists():
                logger.debug("replace.destiny.file", path=path.as_posix())
                path.unlink()
                # raise Exception(
                #     f"Can't persist destiny '{destiny.destiny_id}': already persisted."
                # )

            path.parent.mkdir(parents=True, exist_ok=True)
            path.symlink_to(destiny_path)
is_writeable() classmethod
Source code in kiara/registries/destinies/filesystem_store.py
@classmethod
def is_writeable(cls) -> bool:
    return True
persist_destiny(self, destiny)
Source code in kiara/registries/destinies/filesystem_store.py
def persist_destiny(self, destiny: Destiny):

    destiny_path = self._translate_destiny_id_to_path(destiny_id=destiny.destiny_id)
    destiny_path.parent.mkdir(parents=True, exist_ok=True)
    destiny_path.write_text(destiny.json())

    for value_id in destiny.fixed_inputs.values():

        path = self._translate_value_id(
            value_id=value_id, destiny_alias=destiny.destiny_alias
        )
        if path.exists():
            logger.debug("replace.destiny.file", path=path.as_posix())
            path.unlink()
            # raise Exception(
            #     f"Can't persist destiny '{destiny.destiny_id}': already persisted."
            # )

        path.parent.mkdir(parents=True, exist_ok=True)
        path.symlink_to(destiny_path)

registry

Classes

DestinyRegistry
Source code in kiara/registries/destinies/registry.py
class DestinyRegistry(object):
    def __init__(self, kiara: "Kiara"):

        self._kiara: Kiara = kiara

        self._event_callback: Callable = self._kiara.event_registry.add_producer(self)

        self._destiny_archives: Dict[str, DestinyArchive] = {}
        self._default_destiny_store: Union[str, None] = None
        # default_metadata_archive = FileSystemDestinyStore.create_from_kiara_context(
        #     self._kiara
        # )
        # self.register_destiny_archive("metadata", default_metadata_archive)

        self._all_values: Union[Dict[uuid.UUID, Set[str]], None] = None
        self._cached_value_aliases: Dict[
            uuid.UUID, Dict[str, Union[Destiny, None]]
        ] = {}

        self._destinies: Dict[uuid.UUID, Destiny] = {}
        self._destinies_by_value: Dict[uuid.UUID, Dict[str, Destiny]] = {}
        self._destiny_store_map: Dict[uuid.UUID, str] = {}

    @property
    def default_destiny_store(self) -> DestinyStore:

        if self._default_destiny_store is None:
            raise Exception("No default destiny store set (yet).")

        return self._destiny_archives[self._default_destiny_store]  # type: ignore

    @property
    def destiny_archives(self) -> Mapping[str, DestinyArchive]:
        return self._destiny_archives

    def register_destiny_archive(
        self,
        archive: DestinyArchive,
        alias: str = None,
        set_as_default_store: Union[bool, None] = None,
    ):

        destiny_store_id = archive.archive_id
        archive.register_archive(kiara=self._kiara)
        if alias is None:
            alias = str(destiny_store_id)

        if alias in self._destiny_archives.keys():
            raise Exception(
                f"Can't add destiny archive, alias '{alias}' already registered."
            )

        self._destiny_archives[alias] = archive

        is_store = False
        is_default_store = False

        if isinstance(archive, DestinyStore):
            is_store = True

            if set_as_default_store and self._default_destiny_store is not None:
                raise Exception(
                    f"Can't set data store '{alias}' as default store: default store already set."
                )

            if self._default_destiny_store is None or set_as_default_store:
                is_default_store = True
                self._default_destiny_store = alias

        event = DestinyArchiveAddedEvent.construct(
            kiara_id=self._kiara.id,
            destiny_archive_id=archive.archive_id,
            destiny_archive_alias=alias,
            is_store=is_store,
            is_default_store=is_default_store,
        )
        self._event_callback(event)

        # if not registered_name.isalnum():
        #     raise Exception(
        #         f"Can't register destiny archive with name '{registered_name}: name must only contain alphanumeric characters.'"
        #     )
        #
        # if registered_name in self._destiny_archives.keys():
        #     raise Exception(
        #         f"Can't register alias store, store id already registered: {registered_name}."
        #     )
        #
        # self._destiny_archives[registered_name] = alias_store
        #
        # if self._default_destiny_store is None and isinstance(
        #     alias_store, DestinyStore
        # ):
        #     self._default_destiny_store = registered_name

    def _extract_archive(self, alias: str) -> Tuple[str, str]:

        if "." not in alias:
            assert self._default_destiny_store is not None
            return (self._default_destiny_store, alias)

        store_id, rest = alias.split(".", maxsplit=1)

        if store_id not in self._destiny_archives.keys():
            assert self._default_destiny_store is not None
            return (self._default_destiny_store, alias)
        else:
            return (store_id, rest)

    def add_destiny(
        self,
        destiny_alias: str,
        values: Dict[str, uuid.UUID],
        manifest: Manifest,
        result_field_name: Union[str, None] = None,
    ) -> Destiny:
        """Add a destiny for one (or in some rare cases several) values.

        A destiny alias must be unique for every one of the involved input values.
        """

        if not values:
            raise Exception("Can't add destiny, no values provided.")

        store_id, alias = self._extract_archive(destiny_alias)

        destiny = Destiny.create_from_values(
            kiara=self._kiara,
            destiny_alias=alias,
            manifest=manifest,
            result_field_name=result_field_name,
            values=values,
        )

        for value_id in destiny.fixed_inputs.values():
            self._destinies[destiny.destiny_id] = destiny
            # TODO: store history?
            self._destinies_by_value.setdefault(value_id, {})[destiny_alias] = destiny
            self._cached_value_aliases.setdefault(value_id, {})[destiny_alias] = destiny

        self._destiny_store_map[destiny.destiny_id] = store_id

        return destiny

    def get_destiny(self, value_id: uuid.UUID, destiny_alias: str) -> Destiny:

        destiny = self._destinies_by_value.get(value_id, {}).get(destiny_alias, None)
        if destiny is None:
            raise Exception(
                f"No destiny '{destiny_alias}' available for value '{value_id}'."
            )

        return destiny

    @property
    def _all_values_store_map(self) -> Dict[uuid.UUID, Set[str]]:

        if self._all_values is not None:
            return self._all_values

        all_values: Dict[uuid.UUID, Set[str]] = {}
        for archive_id, archive in self._destiny_archives.items():

            all_value_ids = archive.get_all_value_ids()
            for v_id in all_value_ids:
                all_values.setdefault(v_id, set()).add(archive_id)

        self._all_values = all_values
        return self._all_values

    @property
    def all_values(self) -> Iterable[uuid.UUID]:

        all_stored_values = set(self._all_values_store_map.keys())
        all_stored_values.update(self._destinies_by_value.keys())
        return all_stored_values

    def get_destiny_aliases_for_value(
        self, value_id: uuid.UUID, alias_filter: Union[str, None] = None
    ) -> Iterable[str]:

        # TODO: cache the result of this

        if alias_filter is not None:
            raise NotImplementedError()

        all_stores = self._all_values_store_map.get(value_id)
        aliases: Set[str] = set()
        if all_stores:
            for prefix in all_stores:
                all_aliases = self._destiny_archives[
                    prefix
                ].get_destiny_aliases_for_value(value_id=value_id)
                if all_aliases is not None:
                    aliases.update((f"{prefix}.{a}" for a in all_aliases))

        current = self._destinies_by_value.get(value_id, None)
        if current:
            aliases.update(current.keys())

        return sorted(aliases)

    # def get_destinies_for_value(
    #     self,
    #     value_id: uuid.UUID,
    #     destiny_alias_filter: Optional[str] = None
    # ) -> Mapping[str, Destiny]:
    #
    #
    #
    #     return self._destinies_by_value.get(value_id, {})

    def resolve_destiny(self, destiny: Destiny) -> Value:

        results = self._kiara.job_registry.execute_and_retrieve(
            manifest=destiny, inputs=destiny.merged_inputs
        )
        value = results.get_value_obj(field_name=destiny.result_field_name)

        destiny.result_value_id = value.value_id

        return value

    def attach_as_property(
        self,
        destiny: Union[uuid.UUID, Destiny],
        field_names: Union[Iterable[str], None] = None,
    ):

        if field_names:
            raise NotImplementedError()

        if isinstance(destiny, uuid.UUID):
            destiny = self._destinies[destiny]

        values = self._kiara.data_registry.load_values(destiny.fixed_inputs)

        already_stored: List[uuid.UUID] = []
        for v in values.values():
            if v.is_stored:
                already_stored.append(v.value_id)

        if already_stored:
            stored = (str(v) for v in already_stored)
            raise Exception(
                f"Can't attach destiny as property, value(s) already stored: {', '.join(stored)}"
            )

        store_id = self._destiny_store_map[destiny.destiny_id]

        full_path = f"{store_id}.{destiny.destiny_alias}"

        for v in values.values():
            assert destiny.result_value_id is not None
            v.add_property(
                value_id=destiny.result_value_id,
                property_path=full_path,
                add_origin_to_property_value=True,
            )

    def store_destiny(self, destiny_id: Union[Destiny, uuid.UUID]):

        try:
            _destiny_id: uuid.UUID = destiny_id.destiny_id  # type: ignore
        except Exception:
            # just in case this is a 'Destiny' object
            _destiny_id = destiny_id  # type: ignore

        store_id = self._destiny_store_map[_destiny_id]
        destiny = self._destinies[_destiny_id]
        store: DestinyStore = self._destiny_archives[store_id]  # type: ignore

        if not isinstance(store, DestinyStore):
            full_alias = f"{store_id}.{destiny.destiny_alias}"
            raise Exception(
                f"Can't store destiny '{full_alias}': prefix '{store_id}' not writable in this kiara context."
            )

        store.persist_destiny(destiny=destiny)
all_values: Iterable[uuid.UUID] property readonly
default_destiny_store: DestinyStore property readonly
destiny_archives: Mapping[str, kiara.registries.destinies.DestinyArchive] property readonly
Methods
add_destiny(self, destiny_alias, values, manifest, result_field_name=None)

Add a destiny for one (or in some rare cases several) values.

A destiny alias must be unique for every one of the involved input values.

Source code in kiara/registries/destinies/registry.py
def add_destiny(
    self,
    destiny_alias: str,
    values: Dict[str, uuid.UUID],
    manifest: Manifest,
    result_field_name: Union[str, None] = None,
) -> Destiny:
    """Add a destiny for one (or in some rare cases several) values.

    A destiny alias must be unique for every one of the involved input values.
    """

    if not values:
        raise Exception("Can't add destiny, no values provided.")

    store_id, alias = self._extract_archive(destiny_alias)

    destiny = Destiny.create_from_values(
        kiara=self._kiara,
        destiny_alias=alias,
        manifest=manifest,
        result_field_name=result_field_name,
        values=values,
    )

    for value_id in destiny.fixed_inputs.values():
        self._destinies[destiny.destiny_id] = destiny
        # TODO: store history?
        self._destinies_by_value.setdefault(value_id, {})[destiny_alias] = destiny
        self._cached_value_aliases.setdefault(value_id, {})[destiny_alias] = destiny

    self._destiny_store_map[destiny.destiny_id] = store_id

    return destiny
attach_as_property(self, destiny, field_names=None)
Source code in kiara/registries/destinies/registry.py
def attach_as_property(
    self,
    destiny: Union[uuid.UUID, Destiny],
    field_names: Union[Iterable[str], None] = None,
):

    if field_names:
        raise NotImplementedError()

    if isinstance(destiny, uuid.UUID):
        destiny = self._destinies[destiny]

    values = self._kiara.data_registry.load_values(destiny.fixed_inputs)

    already_stored: List[uuid.UUID] = []
    for v in values.values():
        if v.is_stored:
            already_stored.append(v.value_id)

    if already_stored:
        stored = (str(v) for v in already_stored)
        raise Exception(
            f"Can't attach destiny as property, value(s) already stored: {', '.join(stored)}"
        )

    store_id = self._destiny_store_map[destiny.destiny_id]

    full_path = f"{store_id}.{destiny.destiny_alias}"

    for v in values.values():
        assert destiny.result_value_id is not None
        v.add_property(
            value_id=destiny.result_value_id,
            property_path=full_path,
            add_origin_to_property_value=True,
        )
get_destiny(self, value_id, destiny_alias)
Source code in kiara/registries/destinies/registry.py
def get_destiny(self, value_id: uuid.UUID, destiny_alias: str) -> Destiny:

    destiny = self._destinies_by_value.get(value_id, {}).get(destiny_alias, None)
    if destiny is None:
        raise Exception(
            f"No destiny '{destiny_alias}' available for value '{value_id}'."
        )

    return destiny
get_destiny_aliases_for_value(self, value_id, alias_filter=None)
Source code in kiara/registries/destinies/registry.py
def get_destiny_aliases_for_value(
    self, value_id: uuid.UUID, alias_filter: Union[str, None] = None
) -> Iterable[str]:

    # TODO: cache the result of this

    if alias_filter is not None:
        raise NotImplementedError()

    all_stores = self._all_values_store_map.get(value_id)
    aliases: Set[str] = set()
    if all_stores:
        for prefix in all_stores:
            all_aliases = self._destiny_archives[
                prefix
            ].get_destiny_aliases_for_value(value_id=value_id)
            if all_aliases is not None:
                aliases.update((f"{prefix}.{a}" for a in all_aliases))

    current = self._destinies_by_value.get(value_id, None)
    if current:
        aliases.update(current.keys())

    return sorted(aliases)
register_destiny_archive(self, archive, alias=None, set_as_default_store=None)
Source code in kiara/registries/destinies/registry.py
def register_destiny_archive(
    self,
    archive: DestinyArchive,
    alias: str = None,
    set_as_default_store: Union[bool, None] = None,
):

    destiny_store_id = archive.archive_id
    archive.register_archive(kiara=self._kiara)
    if alias is None:
        alias = str(destiny_store_id)

    if alias in self._destiny_archives.keys():
        raise Exception(
            f"Can't add destiny archive, alias '{alias}' already registered."
        )

    self._destiny_archives[alias] = archive

    is_store = False
    is_default_store = False

    if isinstance(archive, DestinyStore):
        is_store = True

        if set_as_default_store and self._default_destiny_store is not None:
            raise Exception(
                f"Can't set data store '{alias}' as default store: default store already set."
            )

        if self._default_destiny_store is None or set_as_default_store:
            is_default_store = True
            self._default_destiny_store = alias

    event = DestinyArchiveAddedEvent.construct(
        kiara_id=self._kiara.id,
        destiny_archive_id=archive.archive_id,
        destiny_archive_alias=alias,
        is_store=is_store,
        is_default_store=is_default_store,
    )
    self._event_callback(event)

    # if not registered_name.isalnum():
    #     raise Exception(
    #         f"Can't register destiny archive with name '{registered_name}: name must only contain alphanumeric characters.'"
    #     )
    #
    # if registered_name in self._destiny_archives.keys():
    #     raise Exception(
    #         f"Can't register alias store, store id already registered: {registered_name}."
    #     )
    #
    # self._destiny_archives[registered_name] = alias_store
    #
    # if self._default_destiny_store is None and isinstance(
    #     alias_store, DestinyStore
    # ):
    #     self._default_destiny_store = registered_name
resolve_destiny(self, destiny)
Source code in kiara/registries/destinies/registry.py
def resolve_destiny(self, destiny: Destiny) -> Value:

    results = self._kiara.job_registry.execute_and_retrieve(
        manifest=destiny, inputs=destiny.merged_inputs
    )
    value = results.get_value_obj(field_name=destiny.result_field_name)

    destiny.result_value_id = value.value_id

    return value
store_destiny(self, destiny_id)
Source code in kiara/registries/destinies/registry.py
def store_destiny(self, destiny_id: Union[Destiny, uuid.UUID]):

    try:
        _destiny_id: uuid.UUID = destiny_id.destiny_id  # type: ignore
    except Exception:
        # just in case this is a 'Destiny' object
        _destiny_id = destiny_id  # type: ignore

    store_id = self._destiny_store_map[_destiny_id]
    destiny = self._destinies[_destiny_id]
    store: DestinyStore = self._destiny_archives[store_id]  # type: ignore

    if not isinstance(store, DestinyStore):
        full_alias = f"{store_id}.{destiny.destiny_alias}"
        raise Exception(
            f"Can't store destiny '{full_alias}': prefix '{store_id}' not writable in this kiara context."
        )

    store.persist_destiny(destiny=destiny)