Skip to content

value

Attributes

log = logging.getLogger('kiara') module-attribute

yaml = StringYAML() module-attribute

SERIALIZE_TYPES = {'chunk': SerializedBytes, 'chunks': SerializedListOfBytes, 'file': SerializedFile, 'files': SerializedFiles, 'inline-json': SerializedInlineJson, 'chunk-ids': SerializedChunkIDs} module-attribute

ORPHAN = ValuePedigree(kiara_id=VOID_KIARA_ID, environments={}, module_type=NO_MODULE_TYPE, inputs={}) module-attribute

Classes

SerializedChunks

Bases: BaseModel, abc.ABC

Source code in kiara/models/values/value.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
class SerializedChunks(BaseModel, abc.ABC):
    class Config:
        json_loads = orjson.loads
        json_dumps = orjson_dumps
        extra = Extra.forbid

    _size_cache: Union[int, None] = PrivateAttr(default=None)
    _hashes_cache: Dict[str, Sequence[CID]] = PrivateAttr(default_factory=dict)

    @abc.abstractmethod
    def get_chunks(
        self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
    ) -> Iterable[Union[str, BytesLike]]:
        """
        Retrieve the chunks belonging to this data instance.

        If 'as_file' is False, return the data as bytes. If set to 'True' store it to an arbitrary location (or use
        an existing one), and return the path to that file. If 'as_file' is a string, write the data (bytes) into
        a new file using the string as path. If 'symlink_ok' is set to True, symlinking an existing file to the value of
        'as_file' is also ok, otherwise copy the content.
        """

    @abc.abstractmethod
    def get_number_of_chunks(self) -> int:
        pass

    @abc.abstractmethod
    def _get_size(self) -> int:
        pass

    @abc.abstractmethod
    def _create_cids(self, hash_codec: str) -> Sequence[CID]:
        pass

    def get_size(self) -> int:

        if self._size_cache is None:
            self._size_cache = self._get_size()
        return self._size_cache

    def get_cids(self, hash_codec: str) -> Sequence[CID]:

        if self._hashes_cache.get(hash_codec, None) is None:
            self._hashes_cache[hash_codec] = self._create_cids(hash_codec=hash_codec)
        return self._hashes_cache[hash_codec]

    def _store_bytes_to_file(
        self, chunks: Iterable[bytes], file: Union[str, None] = None
    ) -> str:
        "Utility method to store bytes to a file."
        if file is None:
            file_desc, file = tempfile.mkstemp()

            def del_temp_file():
                os.remove(file)

            atexit.register(del_temp_file)

        else:
            if os.path.exists(file):
                raise Exception(f"Can't write to file, file exists: {file}")
            file_desc = os.open(file, 0o600)

        with os.fdopen(file_desc, "wb") as tmp:
            for chunk in chunks:
                tmp.write(chunk)

        return file

    def _read_bytes_from_file(self, file: str) -> bytes:

        with open(file, "rb") as f:
            content = f.read()

        return content

Classes

Config
Source code in kiara/models/values/value.py
72
73
74
75
class Config:
    json_loads = orjson.loads
    json_dumps = orjson_dumps
    extra = Extra.forbid
Attributes
json_loads = orjson.loads class-attribute
json_dumps = orjson_dumps class-attribute
extra = Extra.forbid class-attribute

Functions

get_chunks(as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True) -> Iterable[Union[str, BytesLike]] abstractmethod

Retrieve the chunks belonging to this data instance.

If 'as_file' is False, return the data as bytes. If set to 'True' store it to an arbitrary location (or use an existing one), and return the path to that file. If 'as_file' is a string, write the data (bytes) into a new file using the string as path. If 'symlink_ok' is set to True, symlinking an existing file to the value of 'as_file' is also ok, otherwise copy the content.

Source code in kiara/models/values/value.py
80
81
82
83
84
85
86
87
88
89
90
91
@abc.abstractmethod
def get_chunks(
    self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
) -> Iterable[Union[str, BytesLike]]:
    """
    Retrieve the chunks belonging to this data instance.

    If 'as_file' is False, return the data as bytes. If set to 'True' store it to an arbitrary location (or use
    an existing one), and return the path to that file. If 'as_file' is a string, write the data (bytes) into
    a new file using the string as path. If 'symlink_ok' is set to True, symlinking an existing file to the value of
    'as_file' is also ok, otherwise copy the content.
    """
get_number_of_chunks() -> int abstractmethod
Source code in kiara/models/values/value.py
93
94
95
@abc.abstractmethod
def get_number_of_chunks(self) -> int:
    pass
get_size() -> int
Source code in kiara/models/values/value.py
105
106
107
108
109
def get_size(self) -> int:

    if self._size_cache is None:
        self._size_cache = self._get_size()
    return self._size_cache
get_cids(hash_codec: str) -> Sequence[CID]
Source code in kiara/models/values/value.py
111
112
113
114
115
def get_cids(self, hash_codec: str) -> Sequence[CID]:

    if self._hashes_cache.get(hash_codec, None) is None:
        self._hashes_cache[hash_codec] = self._create_cids(hash_codec=hash_codec)
    return self._hashes_cache[hash_codec]

SerializedPreStoreChunks

Bases: SerializedChunks

Source code in kiara/models/values/value.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
class SerializedPreStoreChunks(SerializedChunks):

    codec: str = Field(
        description="The codec used to encode the chunks in this model. Using the [multicodecs](https://github.com/multiformats/multicodec) codec table."
    )

    def _create_cid_from_chunk(self, chunk: bytes, hash_codec: str) -> CID:

        multihash = Multihash(codec=hash_codec)
        hash = multihash.digest(chunk)
        return create_cid_digest(digest=hash, codec=self.codec)

    def _create_cid_from_file(self, file: str, hash_codec: str) -> CID:

        assert hash_codec == "sha2-256"

        hash_func = hashlib.sha256
        file_hash = hash_func()

        CHUNK_SIZE = 65536
        with open(file, "rb") as f:
            fb = f.read(CHUNK_SIZE)
            while len(fb) > 0:
                file_hash.update(fb)
                fb = f.read(CHUNK_SIZE)

        wrapped = multihash.wrap(file_hash.digest(), "sha2-256")
        return create_cid_digest(digest=wrapped, codec=self.codec)

Attributes

codec: str = Field(description='The codec used to encode the chunks in this model. Using the [multicodecs](https://github.com/multiformats/multicodec) codec table.') class-attribute

SerializedBytes

Bases: SerializedPreStoreChunks

Source code in kiara/models/values/value.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class SerializedBytes(SerializedPreStoreChunks):

    type: Literal["chunk"] = "chunk"
    chunk: bytes = Field(description="A byte-array")

    def get_chunks(
        self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
    ) -> Iterable[Union[str, BytesLike]]:

        if as_files is False:
            return [self.chunk]
        else:
            if as_files is True:
                file = None
            elif isinstance(as_files, str):
                file = as_files
            else:
                assert len(as_files) == 1
                file = as_files[0]
            path = self._store_bytes_to_file([self.chunk], file=file)
            return path

    def get_number_of_chunks(self) -> int:
        return 1

    def _get_size(self) -> int:
        return len(self.chunk)

    def _create_cids(self, hash_codec: str) -> Sequence[CID]:
        return [self._create_cid_from_chunk(self.chunk, hash_codec=hash_codec)]

Attributes

type: Literal['chunk'] = 'chunk' class-attribute
chunk: bytes = Field(description='A byte-array') class-attribute

Functions

get_chunks(as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True) -> Iterable[Union[str, BytesLike]]
Source code in kiara/models/values/value.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def get_chunks(
    self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
) -> Iterable[Union[str, BytesLike]]:

    if as_files is False:
        return [self.chunk]
    else:
        if as_files is True:
            file = None
        elif isinstance(as_files, str):
            file = as_files
        else:
            assert len(as_files) == 1
            file = as_files[0]
        path = self._store_bytes_to_file([self.chunk], file=file)
        return path
get_number_of_chunks() -> int
Source code in kiara/models/values/value.py
200
201
def get_number_of_chunks(self) -> int:
    return 1

SerializedListOfBytes

Bases: SerializedPreStoreChunks

Source code in kiara/models/values/value.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
class SerializedListOfBytes(SerializedPreStoreChunks):

    type: Literal["chunks"] = "chunks"
    chunks: List[bytes] = Field(description="A list of byte arrays.")

    def get_chunks(
        self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
    ) -> Iterable[Union[str, BytesLike]]:
        if as_files is False:
            return self.chunks
        else:
            if as_files is None or as_files is True or isinstance(as_files, str):
                # means we write all the chunks into one file
                file = None if as_files is True else as_files
                path = self._store_bytes_to_file(self.chunks, file=file)
                return [path]
            else:
                assert len(as_files) == self.get_number_of_chunks()
                result = []
                for idx, chunk in enumerate(self.chunks):
                    _file = as_files[idx]
                    path = self._store_bytes_to_file([chunk], file=_file)
                    result.append(path)
                return result

    def get_number_of_chunks(self) -> int:
        return len(self.chunks)

    def _get_size(self) -> int:
        size = 0
        for chunk in self.chunks:
            size = size + len(chunk)
        return size

    def _create_cids(self, hash_codec: str) -> Sequence[CID]:
        return [
            self._create_cid_from_chunk(chunk, hash_codec=hash_codec)
            for chunk in self.chunks
        ]

Attributes

type: Literal['chunks'] = 'chunks' class-attribute
chunks: List[bytes] = Field(description='A list of byte arrays.') class-attribute

Functions

get_chunks(as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True) -> Iterable[Union[str, BytesLike]]
Source code in kiara/models/values/value.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def get_chunks(
    self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
) -> Iterable[Union[str, BytesLike]]:
    if as_files is False:
        return self.chunks
    else:
        if as_files is None or as_files is True or isinstance(as_files, str):
            # means we write all the chunks into one file
            file = None if as_files is True else as_files
            path = self._store_bytes_to_file(self.chunks, file=file)
            return [path]
        else:
            assert len(as_files) == self.get_number_of_chunks()
            result = []
            for idx, chunk in enumerate(self.chunks):
                _file = as_files[idx]
                path = self._store_bytes_to_file([chunk], file=_file)
                result.append(path)
            return result
get_number_of_chunks() -> int
Source code in kiara/models/values/value.py
235
236
def get_number_of_chunks(self) -> int:
    return len(self.chunks)

SerializedFile

Bases: SerializedPreStoreChunks

Source code in kiara/models/values/value.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
class SerializedFile(SerializedPreStoreChunks):

    type: Literal["file"] = "file"
    file: str = Field(description="A path to a file containing the serialized data.")

    def get_chunks(
        self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
    ) -> Iterable[Union[str, BytesLike]]:

        if as_files is False:
            chunk = self._read_bytes_from_file(self.file)
            return [chunk]
        else:
            if as_files is True:
                return [self.file]
            else:
                if isinstance(as_files, str):
                    file = as_files
                else:
                    assert len(as_files) == 1
                    file = as_files[0]
                if os.path.exists(file):
                    raise Exception(f"Can't write to file '{file}': file exists.")
                if symlink_ok:
                    os.symlink(self.file, file)
                    return [file]
                else:
                    raise NotImplementedError()

    def get_number_of_chunks(self) -> int:
        return 1

    def _get_size(self) -> int:
        return os.path.getsize(os.path.realpath(self.file))

    def _create_cids(self, hash_codec: str) -> Sequence[CID]:
        return [self._create_cid_from_file(self.file, hash_codec=hash_codec)]

Attributes

type: Literal['file'] = 'file' class-attribute
file: str = Field(description='A path to a file containing the serialized data.') class-attribute

Functions

get_chunks(as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True) -> Iterable[Union[str, BytesLike]]
Source code in kiara/models/values/value.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def get_chunks(
    self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
) -> Iterable[Union[str, BytesLike]]:

    if as_files is False:
        chunk = self._read_bytes_from_file(self.file)
        return [chunk]
    else:
        if as_files is True:
            return [self.file]
        else:
            if isinstance(as_files, str):
                file = as_files
            else:
                assert len(as_files) == 1
                file = as_files[0]
            if os.path.exists(file):
                raise Exception(f"Can't write to file '{file}': file exists.")
            if symlink_ok:
                os.symlink(self.file, file)
                return [file]
            else:
                raise NotImplementedError()
get_number_of_chunks() -> int
Source code in kiara/models/values/value.py
280
281
def get_number_of_chunks(self) -> int:
    return 1

SerializedFiles

Bases: SerializedPreStoreChunks

Source code in kiara/models/values/value.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
class SerializedFiles(SerializedPreStoreChunks):

    type: Literal["files"] = "files"
    files: List[str] = Field(
        description="A list of strings, pointing to files containing parts of the serialized data."
    )

    def get_chunks(
        self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
    ) -> Iterable[Union[str, BytesLike]]:
        raise NotImplementedError()

    def get_number_of_chunks(self) -> int:
        return len(self.files)

    def _get_size(self) -> int:

        size = 0
        for file in self.files:
            size = size + os.path.getsize(os.path.realpath(file))
        return size

    def _create_cids(self, hash_codec: str) -> Sequence[CID]:
        return [
            self._create_cid_from_file(file, hash_codec=hash_codec)
            for file in self.files
        ]

Attributes

type: Literal['files'] = 'files' class-attribute
files: List[str] = Field(description='A list of strings, pointing to files containing parts of the serialized data.') class-attribute

Functions

get_chunks(as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True) -> Iterable[Union[str, BytesLike]]
Source code in kiara/models/values/value.py
297
298
299
300
def get_chunks(
    self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
) -> Iterable[Union[str, BytesLike]]:
    raise NotImplementedError()
get_number_of_chunks() -> int
Source code in kiara/models/values/value.py
302
303
def get_number_of_chunks(self) -> int:
    return len(self.files)

SerializedInlineJson

Bases: SerializedPreStoreChunks

Source code in kiara/models/values/value.py
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
class SerializedInlineJson(SerializedPreStoreChunks):

    type: Literal["inline-json"] = "inline-json"
    inline_data: Any = Field(
        description="Data that will not be stored externally, but inline in the containing model. This should only contain data types that can be serialized reliably using json (scalars, etc.)."
    )
    _json_cache: Union[bytes, None] = PrivateAttr(default=None)

    def as_json(self) -> bytes:
        assert self.inline_data is not None
        if self._json_cache is None:
            self._json_cache = orjson.dumps(
                self.inline_data,
                option=orjson.OPT_NON_STR_KEYS,
            )
        return self._json_cache

    def get_chunks(
        self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
    ) -> Iterable[Union[str, BytesLike]]:

        if as_files is False:
            return [self.as_json()]
        else:
            raise NotImplementedError()

    def get_number_of_chunks(self) -> int:
        return 1

    def _get_size(self) -> int:
        return len(self.as_json())

    def _create_cids(self, hash_codec: str) -> Sequence[CID]:
        return [self._create_cid_from_chunk(self.as_json(), hash_codec=hash_codec)]

Attributes

type: Literal['inline-json'] = 'inline-json' class-attribute
inline_data: Any = Field(description='Data that will not be stored externally, but inline in the containing model. This should only contain data types that can be serialized reliably using json (scalars, etc.).') class-attribute

Functions

as_json() -> bytes
Source code in kiara/models/values/value.py
327
328
329
330
331
332
333
334
def as_json(self) -> bytes:
    assert self.inline_data is not None
    if self._json_cache is None:
        self._json_cache = orjson.dumps(
            self.inline_data,
            option=orjson.OPT_NON_STR_KEYS,
        )
    return self._json_cache
get_chunks(as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True) -> Iterable[Union[str, BytesLike]]
Source code in kiara/models/values/value.py
336
337
338
339
340
341
342
343
def get_chunks(
    self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
) -> Iterable[Union[str, BytesLike]]:

    if as_files is False:
        return [self.as_json()]
    else:
        raise NotImplementedError()
get_number_of_chunks() -> int
Source code in kiara/models/values/value.py
345
346
def get_number_of_chunks(self) -> int:
    return 1

SerializedChunkIDs

Bases: SerializedChunks

Source code in kiara/models/values/value.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
class SerializedChunkIDs(SerializedChunks):

    type: Literal["chunk-ids"] = "chunk-ids"
    chunk_id_list: List[str] = Field(
        description="A list of chunk ids, which will be resolved via the attached data registry."
    )
    archive_id: Union[uuid.UUID, None] = Field(
        description="The preferred data archive to get the chunks from."
    )
    size: int = Field(description="The size of all chunks combined.")
    _data_registry: "DataRegistry" = PrivateAttr(default=None)

    def get_chunks(
        self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
    ) -> Iterable[Union[str, BytesLike]]:

        if isinstance(as_files, (bool, str)):
            return (
                self._data_registry.retrieve_chunk(
                    chunk_id=chunk,
                    archive_id=self.archive_id,
                    as_file=as_files,
                    symlink_ok=symlink_ok,
                )
                for chunk in self.chunk_id_list
            )
        else:
            result = []
            for idx, chunk_id in enumerate(self.chunk_id_list):
                file = as_files[idx]
                self._data_registry.retrieve_chunk(
                    chunk_id=chunk_id,
                    archive_id=self.archive_id,
                    as_file=file,
                    symlink_ok=symlink_ok,
                )
                result.append(file)
            return result

    def get_number_of_chunks(self) -> int:
        return len(self.chunk_id_list)

    def _get_size(self) -> int:
        return self.size

    def _create_cids(self, hash_codec: str) -> Sequence[CID]:

        result = []
        for chunk_id in self.chunk_id_list:
            cid = CID.decode(chunk_id)
            result.append(cid)

        return result

Attributes

type: Literal['chunk-ids'] = 'chunk-ids' class-attribute
chunk_id_list: List[str] = Field(description='A list of chunk ids, which will be resolved via the attached data registry.') class-attribute
archive_id: Union[uuid.UUID, None] = Field(description='The preferred data archive to get the chunks from.') class-attribute
size: int = Field(description='The size of all chunks combined.') class-attribute

Functions

get_chunks(as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True) -> Iterable[Union[str, BytesLike]]
Source code in kiara/models/values/value.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def get_chunks(
    self, as_files: Union[bool, str, Sequence[str]] = True, symlink_ok: bool = True
) -> Iterable[Union[str, BytesLike]]:

    if isinstance(as_files, (bool, str)):
        return (
            self._data_registry.retrieve_chunk(
                chunk_id=chunk,
                archive_id=self.archive_id,
                as_file=as_files,
                symlink_ok=symlink_ok,
            )
            for chunk in self.chunk_id_list
        )
    else:
        result = []
        for idx, chunk_id in enumerate(self.chunk_id_list):
            file = as_files[idx]
            self._data_registry.retrieve_chunk(
                chunk_id=chunk_id,
                archive_id=self.archive_id,
                as_file=file,
                symlink_ok=symlink_ok,
            )
            result.append(file)
        return result
get_number_of_chunks() -> int
Source code in kiara/models/values/value.py
394
395
def get_number_of_chunks(self) -> int:
    return len(self.chunk_id_list)

SerializationMetadata

Bases: KiaraModel

Source code in kiara/models/values/value.py
420
421
422
423
424
425
426
427
428
429
430
431
class SerializationMetadata(KiaraModel):

    _kiara_model_id = "metadata.serialized_data"

    environment: Mapping[str, int] = Field(
        description="Hash(es) for the environments the value was created/serialized.",
        default_factory=dict,
    )
    deserialize: Mapping[str, Manifest] = Field(
        description="Suggested manifest configs to use to de-serialize the data.",
        default_factory=dict,
    )

Attributes

environment: Mapping[str, int] = Field(description='Hash(es) for the environments the value was created/serialized.', default_factory=dict) class-attribute
deserialize: Mapping[str, Manifest] = Field(description='Suggested manifest configs to use to de-serialize the data.', default_factory=dict) class-attribute

SerializedData

Bases: KiaraModel

Source code in kiara/models/values/value.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
class SerializedData(KiaraModel):

    data_type: str = Field(
        description="The name of the data type for this serialized value."
    )
    data_type_config: Mapping[str, Any] = Field(
        description="The (optional) config for the data type for this serialized value.",
        default_factory=dict,
    )
    serialization_profile: str = Field(
        description="An identifying name for the serialization method used."
    )
    metadata: SerializationMetadata = Field(
        description="Optional metadata describing aspects of the serialization used.",
        default_factory=SerializationMetadata,
    )

    hash_codec: str = Field(
        description="The codec used to hash the value.", default="sha2-256"
    )
    _cids_cache: Dict[str, Sequence[CID]] = PrivateAttr(default_factory=dict)

    _cached_data_size: Union[int, None] = PrivateAttr(default=None)
    _cached_dag: Union[Dict[str, Sequence[CID]], None] = PrivateAttr(default=None)
    # _cached_cid: Optional[CID] = PrivateAttr(default=None)

    def _retrieve_data_to_hash(self) -> Any:

        return self.dag

    @property
    def data_size(self) -> int:
        if self._cached_data_size is not None:
            return self._cached_data_size

        size = 0
        for k in self.get_keys():
            model = self.get_serialized_data(k)
            size = size + model.get_size()
        self._cached_data_size = size
        return self._cached_data_size

    @abc.abstractmethod
    def get_keys(self) -> Iterable[str]:
        pass

    @abc.abstractmethod
    def get_serialized_data(self, key: str) -> SerializedChunks:
        pass

    def get_cids_for_key(self, key) -> Sequence[CID]:

        if key in self._cids_cache.keys():
            return self._cids_cache[key]

        model = self.get_serialized_data(key)
        self._cids_cache[key] = model.get_cids(hash_codec=self.hash_codec)
        return self._cids_cache[key]

    @property
    def dag(self) -> Mapping[str, Sequence[CID]]:

        if self._cached_dag is not None:
            return self._cached_dag

        dag: Dict[str, Sequence[CID]] = {}
        for key in self.get_keys():
            dag[key] = self.get_cids_for_key(key)

        self._cached_dag = dag
        return self._cached_dag

Attributes

data_type: str = Field(description='The name of the data type for this serialized value.') class-attribute
data_type_config: Mapping[str, Any] = Field(description='The (optional) config for the data type for this serialized value.', default_factory=dict) class-attribute
serialization_profile: str = Field(description='An identifying name for the serialization method used.') class-attribute
metadata: SerializationMetadata = Field(description='Optional metadata describing aspects of the serialization used.', default_factory=SerializationMetadata) class-attribute
hash_codec: str = Field(description='The codec used to hash the value.', default='sha2-256') class-attribute
data_size: int property
dag: Mapping[str, Sequence[CID]] property

Functions

get_keys() -> Iterable[str] abstractmethod
Source code in kiara/models/values/value.py
476
477
478
@abc.abstractmethod
def get_keys(self) -> Iterable[str]:
    pass
get_serialized_data(key: str) -> SerializedChunks abstractmethod
Source code in kiara/models/values/value.py
480
481
482
@abc.abstractmethod
def get_serialized_data(self, key: str) -> SerializedChunks:
    pass
get_cids_for_key(key) -> Sequence[CID]
Source code in kiara/models/values/value.py
484
485
486
487
488
489
490
491
def get_cids_for_key(self, key) -> Sequence[CID]:

    if key in self._cids_cache.keys():
        return self._cids_cache[key]

    model = self.get_serialized_data(key)
    self._cids_cache[key] = model.get_cids(hash_codec=self.hash_codec)
    return self._cids_cache[key]

SerializationResult

Bases: SerializedData

Source code in kiara/models/values/value.py
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
class SerializationResult(SerializedData):

    _kiara_model_id = "instance.serialization_result"

    data: Dict[
        str,
        Union[
            SerializedBytes,
            SerializedListOfBytes,
            SerializedFile,
            SerializedFiles,
            SerializedInlineJson,
        ],
    ] = Field(
        description="One or several byte arrays representing the serialized state of the value."
    )

    def get_keys(self) -> Iterable[str]:
        return self.data.keys()

    def get_serialized_data(self, key: str) -> SerializedChunks:
        return self.data[key]

    @root_validator(pre=True)
    def validate_data(cls, values):

        codec = values.get("codec", None)
        if codec is None:
            codec = "sha2-256"
            values["hash_codec"] = codec

        v = values.get("data")
        assert isinstance(v, Mapping)

        result = {}
        for field_name, data in v.items():
            if isinstance(data, SerializedChunks):
                result[field_name] = data
            elif isinstance(data, Mapping):
                s_type = data.get("type", None)
                if not s_type:
                    raise ValueError(
                        f"Invalid serialized data config, missing 'type' key: {data}"
                    )

                if s_type not in SERIALIZE_TYPES.keys():
                    raise ValueError(
                        f"Invalid serialized data type '{s_type}'. Allowed types: {', '.join(SERIALIZE_TYPES.keys())}"
                    )

                assert s_type != "chunk-ids"
                cls = SERIALIZE_TYPES[s_type]
                result[field_name] = cls(**data)

        values["data"] = result
        return values

    def create_renderable(self, **config: Any) -> RenderableType:

        table = Table(show_header=False, box=box.SIMPLE)
        table.add_column("key")
        table.add_column("value")
        table.add_row("data_type", self.data_type)
        _config = Syntax(
            orjson_dumps(self.data_type_config), "json", background_color="default"
        )
        table.add_row("data_type_config", _config)

        data_fields = {}
        for field, model in self.data.items():
            data_fields[field] = {"type": model.type}
        data_json = Syntax(
            orjson_dumps(data_fields), "json", background_color="default"
        )
        table.add_row("data", data_json)
        table.add_row("size", str(self.data_size))
        table.add_row("hash", self.instance_id)

        return table

    def __repr__(self):

        return f"{self.__class__.__name__}(type={self.data_type} size={self.data_size})"

    def __str__(self):
        return self.__repr__()

Attributes

data: Dict[str, Union[SerializedBytes, SerializedListOfBytes, SerializedFile, SerializedFiles, SerializedInlineJson]] = Field(description='One or several byte arrays representing the serialized state of the value.') class-attribute

Functions

get_keys() -> Iterable[str]
Source code in kiara/models/values/value.py
524
525
def get_keys(self) -> Iterable[str]:
    return self.data.keys()
get_serialized_data(key: str) -> SerializedChunks
Source code in kiara/models/values/value.py
527
528
def get_serialized_data(self, key: str) -> SerializedChunks:
    return self.data[key]
validate_data(values)
Source code in kiara/models/values/value.py
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
@root_validator(pre=True)
def validate_data(cls, values):

    codec = values.get("codec", None)
    if codec is None:
        codec = "sha2-256"
        values["hash_codec"] = codec

    v = values.get("data")
    assert isinstance(v, Mapping)

    result = {}
    for field_name, data in v.items():
        if isinstance(data, SerializedChunks):
            result[field_name] = data
        elif isinstance(data, Mapping):
            s_type = data.get("type", None)
            if not s_type:
                raise ValueError(
                    f"Invalid serialized data config, missing 'type' key: {data}"
                )

            if s_type not in SERIALIZE_TYPES.keys():
                raise ValueError(
                    f"Invalid serialized data type '{s_type}'. Allowed types: {', '.join(SERIALIZE_TYPES.keys())}"
                )

            assert s_type != "chunk-ids"
            cls = SERIALIZE_TYPES[s_type]
            result[field_name] = cls(**data)

    values["data"] = result
    return values
create_renderable(**config: Any) -> RenderableType
Source code in kiara/models/values/value.py
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
def create_renderable(self, **config: Any) -> RenderableType:

    table = Table(show_header=False, box=box.SIMPLE)
    table.add_column("key")
    table.add_column("value")
    table.add_row("data_type", self.data_type)
    _config = Syntax(
        orjson_dumps(self.data_type_config), "json", background_color="default"
    )
    table.add_row("data_type_config", _config)

    data_fields = {}
    for field, model in self.data.items():
        data_fields[field] = {"type": model.type}
    data_json = Syntax(
        orjson_dumps(data_fields), "json", background_color="default"
    )
    table.add_row("data", data_json)
    table.add_row("size", str(self.data_size))
    table.add_row("hash", self.instance_id)

    return table

PersistedData

Bases: SerializedData

Source code in kiara/models/values/value.py
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
class PersistedData(SerializedData):

    _kiara_model_id = "instance.persisted_data"

    archive_id: uuid.UUID = Field(
        description="The id of the store that persisted the data."
    )
    chunk_id_map: Mapping[str, SerializedChunkIDs] = Field(
        description="Reference-ids that resolve to the values' serialized chunks."
    )

    def get_keys(self) -> Iterable[str]:
        return self.chunk_id_map.keys()

    def get_serialized_data(self, key: str) -> SerializedChunks:
        return self.chunk_id_map[key]

Attributes

archive_id: uuid.UUID = Field(description='The id of the store that persisted the data.') class-attribute
chunk_id_map: Mapping[str, SerializedChunkIDs] = Field(description="Reference-ids that resolve to the values' serialized chunks.") class-attribute

Functions

get_keys() -> Iterable[str]
Source code in kiara/models/values/value.py
606
607
def get_keys(self) -> Iterable[str]:
    return self.chunk_id_map.keys()
get_serialized_data(key: str) -> SerializedChunks
Source code in kiara/models/values/value.py
609
610
def get_serialized_data(self, key: str) -> SerializedChunks:
    return self.chunk_id_map[key]

ValuePedigree

Bases: InputsManifest

Source code in kiara/models/values/value.py
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
class ValuePedigree(InputsManifest):

    _kiara_model_id = "instance.value_pedigree"

    kiara_id: uuid.UUID = Field(
        description="The id of the kiara context a value was created in."
    )
    environments: Dict[str, str] = Field(
        description="References to the runtime environment details a value was created in."
    )

    def _retrieve_data_to_hash(self) -> Any:
        return {
            "manifest": self.manifest_cid,
            "inputs": self.inputs_cid,
            "environments": self.environments,
        }

    def __repr__(self):
        return f"ValuePedigree(module_type={self.module_type}, inputs=[{', '.join(self.inputs.keys())}], instance_id={self.instance_id})"

    def __str__(self):
        return self.__repr__()

Attributes

kiara_id: uuid.UUID = Field(description='The id of the kiara context a value was created in.') class-attribute
environments: Dict[str, str] = Field(description='References to the runtime environment details a value was created in.') class-attribute

DataTypeInfo

Bases: KiaraModel

Source code in kiara/models/values/value.py
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
class DataTypeInfo(KiaraModel):

    _kiara_model_id = "info.data_type_instance"

    data_type_name: str = Field(description="The registered name of this data type.")
    data_type_config: Mapping[str, Any] = Field(
        description="The (optional) configuration for this data type.",
        default_factory=dict,
    )
    characteristics: DataTypeCharacteristics = Field(
        description="Characteristics of this data type."
    )
    data_type_class: PythonClass = Field(
        description="The python class that is associated with this model."
    )
    _data_type_instance: "DataType" = PrivateAttr(default=None)

    @property
    def data_type_instance(self) -> "DataType":

        if self._data_type_instance is not None:
            return self._data_type_instance

        self._data_type_instance = self.data_type_class.get_class()(
            **self.data_type_config
        )
        return self._data_type_instance

Attributes

data_type_name: str = Field(description='The registered name of this data type.') class-attribute
data_type_config: Mapping[str, Any] = Field(description='The (optional) configuration for this data type.', default_factory=dict) class-attribute
characteristics: DataTypeCharacteristics = Field(description='Characteristics of this data type.') class-attribute
data_type_class: PythonClass = Field(description='The python class that is associated with this model.') class-attribute
data_type_instance: DataType property

ValueDetails

Bases: KiaraModel

A wrapper class that manages and retieves value data and its details.

Source code in kiara/models/values/value.py
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
class ValueDetails(KiaraModel):

    """A wrapper class that manages and retieves value data and its details."""

    _kiara_model_id = "instance.value_details"

    value_id: uuid.UUID = Field(description="The id of the value.")

    kiara_id: uuid.UUID = Field(
        description="The id of the kiara context this value belongs to."
    )

    value_schema: ValueSchema = Field(
        description="The schema that was used for this Value."
    )

    value_status: ValueStatus = Field(description="The set/unset status of this value.")
    value_size: int = Field(description="The size of this value, in bytes.")
    value_hash: str = Field(description="The hash of this value.")
    pedigree: ValuePedigree = Field(
        description="Information about the module and inputs that went into creating this value."
    )
    pedigree_output_name: str = Field(
        description="The output name that produced this value (using the manifest inside the pedigree)."
    )
    data_type_info: DataTypeInfo = Field(
        description="Information about the data type this value is made of."
    )

    def _retrieve_id(self) -> str:
        return str(self.value_id)

    def _retrieve_data_to_hash(self) -> Any:
        return {
            "value_type": self.value_schema.type,
            "value_hash": self.value_hash,
            "value_size": self.value_size,
        }

    @property
    def data_type_name(self) -> str:
        return self.data_type_info.data_type_name

    @property
    def data_type_config(self) -> Mapping[str, Any]:
        return self.data_type_info.data_type_config

    @property
    def is_optional(self) -> bool:
        return self.value_schema.optional

    @property
    def is_valid(self) -> bool:
        """Check whether the current value is valid."""
        if self.is_optional:
            return True
        else:
            return self.value_status == ValueStatus.SET

    @property
    def is_set(self) -> bool:
        return self.value_status in [ValueStatus.SET, ValueStatus.DEFAULT]

    @property
    def value_status_string(self) -> str:
        """Print a human readable short description of this values status."""
        if self.value_status == ValueStatus.DEFAULT:
            return "set (default)"
        elif self.value_status == ValueStatus.SET:
            return "set"
        elif self.value_status == ValueStatus.NONE:
            result = "no value"
        elif self.value_status == ValueStatus.NOT_SET:
            result = "not set"
        else:
            raise Exception(
                f"Invalid internal status of value '{self.value_id}'. This is most likely a bug."
            )

        if self.is_optional:
            result = f"{result} (not required)"
        return result

    def __repr__(self):

        return f"{self.__class__.__name__}(id={self.value_id}, type={self.data_type_name}, status={self.value_status.value})"

    def __str__(self):

        return self.__repr__()

Attributes

value_id: uuid.UUID = Field(description='The id of the value.') class-attribute
kiara_id: uuid.UUID = Field(description='The id of the kiara context this value belongs to.') class-attribute
value_schema: ValueSchema = Field(description='The schema that was used for this Value.') class-attribute
value_status: ValueStatus = Field(description='The set/unset status of this value.') class-attribute
value_size: int = Field(description='The size of this value, in bytes.') class-attribute
value_hash: str = Field(description='The hash of this value.') class-attribute
pedigree: ValuePedigree = Field(description='Information about the module and inputs that went into creating this value.') class-attribute
pedigree_output_name: str = Field(description='The output name that produced this value (using the manifest inside the pedigree).') class-attribute
data_type_info: DataTypeInfo = Field(description='Information about the data type this value is made of.') class-attribute
data_type_name: str property
data_type_config: Mapping[str, Any] property
is_optional: bool property
is_valid: bool property

Check whether the current value is valid.

is_set: bool property
value_status_string: str property

Print a human readable short description of this values status.

Value

Bases: ValueDetails

Source code in kiara/models/values/value.py
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
class Value(ValueDetails):

    _kiara_model_id = "instance.value"

    _value_data: Any = PrivateAttr(default=SpecialValue.NOT_SET)
    _serialized_data: Union[None, str, SerializedData] = PrivateAttr(default=None)
    _data_retrieved: bool = PrivateAttr(default=False)
    _data_registry: "DataRegistry" = PrivateAttr(default=None)
    # _data_type: "DataType" = PrivateAttr(default=None)
    _is_stored: bool = PrivateAttr(default=False)
    _cached_properties: Union["ValueMap", None] = PrivateAttr(default=None)
    _lineage: Union["ValueLineage", None] = PrivateAttr(default=None)

    environment_hashes: Mapping[str, Mapping[str, str]] = Field(
        description="Hashes for the environments this value was created in."
    )
    enviroments: Union[Mapping[str, Mapping[str, Any]], None] = Field(
        description="Information about the environments this value was created in.",
        default=None,
    )
    property_links: Mapping[str, uuid.UUID] = Field(
        description="Links to values that are properties of this value.",
        default_factory=dict,
    )
    destiny_backlinks: Mapping[uuid.UUID, str] = Field(
        description="Backlinks to values that this value acts as destiny/or property for.",
        default_factory=dict,
    )

    def add_property(
        self,
        value_id: Union[uuid.UUID, "Value"],
        property_path: str,
        add_origin_to_property_value: bool = True,
    ):

        value = None
        try:
            value_temp = value
            value_id = value_id.value_id  # type: ignore
            value = value_temp
        except Exception:
            # in case a Value object was provided
            pass
        finally:
            del value_temp

        if add_origin_to_property_value:
            if value is None:
                value = self._data_registry.get_value(value=value_id)  # type: ignore

            if value._is_stored:
                raise Exception(
                    f"Can't add property to value '{self.value_id}': referenced value '{value.value_id}' already locked, so it's not possible to add the property backlink (as requested)."
                )

        assert value is not None

        if self._is_stored:
            raise Exception(
                f"Can't add property to value '{self.value_id}': value already locked."
            )

        if property_path in self.property_links.keys():
            raise Exception(
                f"Can't add property to value '{self.value_id}': property '{property_path}' already set."
            )

        self.property_links[property_path] = value_id  # type: ignore

        if add_origin_to_property_value:
            value.add_destiny_details(
                value_id=self.value_id, destiny_alias=property_path
            )

        self._cached_properties = None

    def add_destiny_details(self, value_id: uuid.UUID, destiny_alias: str):

        if self._is_stored:
            raise Exception(
                f"Can't set destiny_refs to value '{self.value_id}': value already locked."
            )

        self.destiny_backlinks[value_id] = destiny_alias  # type: ignore

    @property
    def is_serializable(self) -> bool:

        try:
            if self._serialized_data == NO_SERIALIZATION_MARKER:
                return False
            self.serialized_data
            return True
        except Exception:
            pass

        return False

    # @property
    # def data_type_class(self) -> "PythonClass":
    #     """Return the (Python) type of the underlying 'DataType' subclass."""
    #     return self.data_type_info.data_type_class

    @property
    def serialized_data(self) -> SerializedData:

        # if not self.is_set:
        #     raise Exception(f"Can't retrieve serialized data: value not set.")

        if self._serialized_data is not None:
            if isinstance(self._serialized_data, str):
                raise Exception(
                    f"Data type '{self.data_type_name}' does not support serializing: {self._serialized_data}"
                )

            return self._serialized_data

        self._serialized_data = self._data_registry.retrieve_persisted_value_details(
            self.value_id
        )
        return self._serialized_data

    @property
    def data(self) -> Any:
        if not self.is_initialized:
            raise Exception(
                f"Can't retrieve data for value '{self.value_id}': value not initialized yet. This is most likely a bug."
            )
        try:
            return self._retrieve_data()
        except DataTypeUnknownException as dtue:
            dtue._value = self
            raise dtue

    def _retrieve_data(self) -> Any:

        if self._value_data is not SpecialValue.NOT_SET:
            return self._value_data

        if self.value_status in [ValueStatus.NOT_SET, ValueStatus.NONE]:
            self._value_data = None
            return self._value_data
        elif self.value_status not in [ValueStatus.SET, ValueStatus.DEFAULT]:
            raise Exception(f"Invalid internal state of value '{self.value_id}'.")

        retrieved = self._data_registry.retrieve_value_data(value=self)

        if retrieved is None or isinstance(retrieved, SpecialValue):
            raise Exception(
                f"Can't set value data, invalid data type: {type(retrieved)}"
            )

        self._value_data = retrieved
        self._data_retrieved = True
        return self._value_data

    # def retrieve_load_config(self) -> Optional[LoadConfig]:
    #     return self._data_registry.retrieve_persisted_value_details(
    #         value_id=self.value_id
    #     )

    def __repr__(self):

        return f"{self.__class__.__name__}(id={self.value_id}, type={self.data_type_name}, status={self.value_status.value}, initialized={self.is_initialized} optional={self.value_schema.optional})"

    def _set_registry(self, data_registry: "DataRegistry") -> None:
        self._data_registry = data_registry

    @property
    def is_initialized(self) -> bool:
        result = not self.is_set or self._data_registry is not None
        return result

    @property
    def is_stored(self) -> bool:
        return self._is_stored

    @property
    def data_type(self) -> "DataType":

        return self.data_type_info.data_type_instance

    @property
    def lineage(self) -> "ValueLineage":
        if self._lineage is not None:
            return self._lineage

        from kiara.models.values.lineage import ValueLineage

        self._lineage = ValueLineage(kiara=self._data_registry._kiara, value=self)
        return self._lineage

    @property
    def property_values(self) -> "ValueMap":

        if self._cached_properties is not None:
            return self._cached_properties

        self._cached_properties = self._data_registry.load_values(self.property_links)
        return self._cached_properties

    @property
    def property_names(self) -> Iterable[str]:
        return self.property_links.keys()

    def get_property_value(self, property_key) -> "Value":

        if property_key not in self.property_links.keys():
            raise Exception(
                f"Value '{self.value_id}' has no property with key '{property_key}."
            )

        return self._data_registry.get_value(self.property_links[property_key])

    def get_property_data(self, property_key: str) -> Any:

        try:
            return self.get_property_value(property_key=property_key).data
        except Exception as e:
            log_exception(e)
            return None

    def get_all_property_data(self) -> Mapping[str, Any]:

        return {k: self.get_property_data(k) for k in self.property_names}

    def lookup_self_aliases(self) -> Set[str]:

        if not self._data_registry:
            raise Exception(
                f"Can't lookup aliases for value '{self.value_id}': data registry not set (yet)."
            )

        return self._data_registry.lookup_aliases(self)

    def create_info(self) -> "ValueInfo":

        if not self._data_registry:
            raise Exception(
                f"Can't create info object for value '{self.value_id}': data registry not set (yet)."
            )

        return self._data_registry.create_value_info(value=self.value_id)

    def create_info_data(self, **config: Any) -> Mapping[str, Any]:

        show_pedigree = config.get("show_pedigree", False)
        show_lineage = config.get("show_lineage", False)
        show_properties = config.get("show_properties", False)
        # show_destinies = config.get("show_destinies", False)
        # show_destiny_backlinks = config.get("show_destiny_backlinks", False)
        # show_data = config.get("show_data_preview", False)
        show_serialized = config.get("show_serialized", False)
        show_env_data_hashes = config.get("show_environment_hashes", False)
        show_env_data = config.get("show_environment_data", False)

        ignore_fields = config.get("ignore_fields", [])

        table: Dict[str, Any] = {}

        if "value_id" not in ignore_fields:
            table["value_id"] = self.value_id
        if "aliases" not in ignore_fields:
            if hasattr(self, "aliases"):
                table["aliases"] = self.aliases  # type: ignore

        if "kiara_id" not in ignore_fields:
            table["kiara_id"] = self.kiara_id

        for k in sorted(self.__fields__.keys()):

            if (
                k
                in [
                    "serialized",
                    "value_id",
                    "aliases",
                    "kiara_id",
                    "environments",
                    "lineage",
                    "environment_hashes",
                ]
                or k in ignore_fields
            ):
                continue

            attr = getattr(self, k)
            if k in ["pedigree_output_name", "pedigree"]:
                continue
            else:
                v = attr

            table[k] = v

        if show_pedigree:
            pedigree = getattr(self, "pedigree")

            table["pedigree"] = pedigree
            if pedigree == ORPHAN:
                pedigree_output_name: Union[Any, None] = None
            else:
                pedigree_output_name = getattr(self, "pedigree_output_name")

            table["pedigree_output_name"] = pedigree_output_name

        if show_lineage:
            table["lineage"] = self.lineage

        if show_serialized:
            serialized = self._data_registry.retrieve_persisted_value_details(
                self.value_id
            )
            table["serialized"] = serialized

        if show_env_data_hashes:
            env_hashes = Syntax(
                orjson_dumps(self.environment_hashes, option=orjson.OPT_INDENT_2),
                "json",
                background_color="default",
            )
            table["environment_hashes"] = env_hashes

        if show_env_data:
            raise NotImplementedError()

        if show_properties:
            if not self.property_links:
                table["properties"] = {}
            else:
                properties = self._data_registry.load_values(self.property_links)
                table["properties"] = properties

        # if hasattr(self, "destiny_links") and show_destinies:
        #     if not self.destiny_links:  # type: ignore
        #         table["destinies"] = {}
        #     else:
        #         destinies = self._data_registry.load_values(self.destiny_links)  # type: ignore
        #         table["destinies"] = destinies
        #
        # if show_destiny_backlinks:
        #     if not self.destiny_backlinks:
        #         table["destiny backlinks"] = {}
        #     else:
        #         destiny_items: List[Any] = []
        #         for v_id, alias in self.destiny_backlinks.items():
        #             destiny_items.append(
        #                 f"[b]Value: [i]{v_id}[/i] (destiny alias: {alias})[/b]"
        #             )
        #             rendered = self._data_registry.pretty_print_data(
        #                 value_id=v_id, **config
        #             )
        #             destiny_items.append(rendered)
        #         table["destiny backlinks"] = destiny_items
        #
        # if show_data:
        #     rendered = self._data_registry.pretty_print_data(
        #         self.value_id, target_type="terminal_renderable"
        #     )
        #     table["data preview"] = rendered

        return table

    def create_renderable(self, **render_config: Any) -> RenderableType:

        from kiara.utils.output import extract_renderable

        show_pedigree = render_config.get("show_pedigree", False)
        show_lineage = render_config.get("show_lineage", False)
        show_properties = render_config.get("show_properties", False)
        show_destinies = render_config.get("show_destinies", False)
        show_destiny_backlinks = render_config.get("show_destiny_backlinks", False)
        show_data = render_config.get("show_data_preview", False)
        show_serialized = render_config.get("show_serialized", False)
        show_env_data_hashes = render_config.get("show_environment_hashes", False)
        show_env_data = render_config.get("show_environment_data", False)

        ignore_fields = render_config.get("ignore_fields", [])

        table = Table(show_header=False, box=box.SIMPLE)
        table.add_column("Key", style="i")
        table.add_column("Value")

        info_data = self.create_info_data(**render_config)

        if "value_id" not in ignore_fields:
            table.add_row("value_id", str(info_data["value_id"]))
        if "aliases" not in ignore_fields:
            if info_data.get("aliases", None):
                aliases_str = ", ".join(info_data["aliases"])  # type: ignore
                table.add_row("aliases", aliases_str)
            # else:
            #     aliases_str = "-- n/a --"
            #     table.add_row("aliases", aliases_str)

        if "kiara_id" not in ignore_fields:
            table.add_row("kiara_id", str(info_data["kiara_id"]))

        table.add_row("", "")
        table.add_row("", Rule())
        for k in sorted(info_data.keys()):

            if (
                k
                in [
                    "serialized",
                    "value_id",
                    "aliases",
                    "kiara_id",
                    "lineage",
                    "properties",
                    "environments",
                    "environment_hashes",
                ]
                or k in ignore_fields
            ):
                continue

            attr = info_data[k]
            if k in ["pedigree_output_name", "pedigree"]:
                continue

            elif k == "value_status":
                v: RenderableType = f"[i]-- {attr.value} --[/i]"
            elif k == "value_size":
                v = format_size(attr)
            else:
                v = extract_renderable(attr)

            table.add_row(k, v)

        if (
            show_pedigree
            or show_lineage
            or show_serialized
            or show_properties
            or show_destinies
            or show_destiny_backlinks
            or show_env_data_hashes
            or show_env_data
        ):
            table.add_row("", "")
            table.add_row("", Rule())
            table.add_row("", "")

        if show_pedigree:
            pedigree = info_data["pedigree"]

            if pedigree == ORPHAN:
                v = "[i]-- external data --[/i]"
                pedigree_output_name: Union[Any, None] = None
            else:
                v = extract_renderable(pedigree)
                pedigree_output_name = info_data["pedigree_output_name"]

            row = ["pedigree", v]
            table.add_row(*row)
            if pedigree_output_name:
                row = ["pedigree_output_name", pedigree_output_name]
                table.add_row(*row)

        if show_lineage:
            table.add_row(
                "lineage", info_data["lineage"].create_renderable(include_ids=True)
            )

        if show_serialized:
            serialized = info_data["serialized"]
            table.add_row("serialized", serialized.create_renderable())

        if show_env_data_hashes:
            env_hashes = Syntax(
                orjson_dumps(
                    info_data["environment_hashes"], option=orjson.OPT_INDENT_2
                ),
                "json",
                background_color="default",
            )
            table.add_row("environment_hashes", env_hashes)

        if show_env_data:
            raise NotImplementedError()

        if show_properties:
            if not info_data["properties"]:
                table.add_row("properties", "{}")
            else:
                properties = info_data["properties"]
                pr = properties.create_renderable(show_header=False)
                table.add_row("properties", pr)

        if hasattr(self, "destiny_links") and show_destinies:
            if not self.destiny_links:  # type: ignore
                table.add_row("destinies", "{}")
            else:
                destinies = self._data_registry.load_values(self.destiny_links)  # type: ignore
                dr = destinies.create_renderable(show_header=False)
                table.add_row("destinies", dr)

        if show_destiny_backlinks:
            if not self.destiny_backlinks:
                table.add_row("destiny backlinks", "{}")
            else:
                destiny_items: List[Any] = []
                for v_id, alias in self.destiny_backlinks.items():
                    destiny_items.append(Rule())
                    destiny_items.append(
                        f"[b]Value: [i]{v_id}[/i] (destiny alias: {alias})[/b]"
                    )
                    rendered = self._data_registry.pretty_print_data(
                        value_id=v_id, **render_config
                    )
                    destiny_items.append(rendered)
                table.add_row("destiny backlinks", Group(*destiny_items))

        if show_data:
            rendered = self._data_registry.pretty_print_data(
                self.value_id, target_type="terminal_renderable"
            )
            table.add_row("", "")
            table.add_row("", Rule())
            table.add_row("data preview", rendered)

        return table

Attributes

environment_hashes: Mapping[str, Mapping[str, str]] = Field(description='Hashes for the environments this value was created in.') class-attribute
enviroments: Union[Mapping[str, Mapping[str, Any]], None] = Field(description='Information about the environments this value was created in.', default=None) class-attribute
is_serializable: bool property
serialized_data: SerializedData property
data: Any property
is_initialized: bool property
is_stored: bool property
data_type: DataType property
lineage: ValueLineage property
property_values: ValueMap property
property_names: Iterable[str] property

Functions

add_property(value_id: Union[uuid.UUID, Value], property_path: str, add_origin_to_property_value: bool = True)
Source code in kiara/models/values/value.py
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
def add_property(
    self,
    value_id: Union[uuid.UUID, "Value"],
    property_path: str,
    add_origin_to_property_value: bool = True,
):

    value = None
    try:
        value_temp = value
        value_id = value_id.value_id  # type: ignore
        value = value_temp
    except Exception:
        # in case a Value object was provided
        pass
    finally:
        del value_temp

    if add_origin_to_property_value:
        if value is None:
            value = self._data_registry.get_value(value=value_id)  # type: ignore

        if value._is_stored:
            raise Exception(
                f"Can't add property to value '{self.value_id}': referenced value '{value.value_id}' already locked, so it's not possible to add the property backlink (as requested)."
            )

    assert value is not None

    if self._is_stored:
        raise Exception(
            f"Can't add property to value '{self.value_id}': value already locked."
        )

    if property_path in self.property_links.keys():
        raise Exception(
            f"Can't add property to value '{self.value_id}': property '{property_path}' already set."
        )

    self.property_links[property_path] = value_id  # type: ignore

    if add_origin_to_property_value:
        value.add_destiny_details(
            value_id=self.value_id, destiny_alias=property_path
        )

    self._cached_properties = None
add_destiny_details(value_id: uuid.UUID, destiny_alias: str)
Source code in kiara/models/values/value.py
836
837
838
839
840
841
842
843
def add_destiny_details(self, value_id: uuid.UUID, destiny_alias: str):

    if self._is_stored:
        raise Exception(
            f"Can't set destiny_refs to value '{self.value_id}': value already locked."
        )

    self.destiny_backlinks[value_id] = destiny_alias  # type: ignore
get_property_value(property_key) -> Value
Source code in kiara/models/values/value.py
965
966
967
968
969
970
971
972
def get_property_value(self, property_key) -> "Value":

    if property_key not in self.property_links.keys():
        raise Exception(
            f"Value '{self.value_id}' has no property with key '{property_key}."
        )

    return self._data_registry.get_value(self.property_links[property_key])
get_property_data(property_key: str) -> Any
Source code in kiara/models/values/value.py
974
975
976
977
978
979
980
def get_property_data(self, property_key: str) -> Any:

    try:
        return self.get_property_value(property_key=property_key).data
    except Exception as e:
        log_exception(e)
        return None
get_all_property_data() -> Mapping[str, Any]
Source code in kiara/models/values/value.py
982
983
984
def get_all_property_data(self) -> Mapping[str, Any]:

    return {k: self.get_property_data(k) for k in self.property_names}
lookup_self_aliases() -> Set[str]
Source code in kiara/models/values/value.py
986
987
988
989
990
991
992
993
def lookup_self_aliases(self) -> Set[str]:

    if not self._data_registry:
        raise Exception(
            f"Can't lookup aliases for value '{self.value_id}': data registry not set (yet)."
        )

    return self._data_registry.lookup_aliases(self)
create_info() -> ValueInfo
Source code in kiara/models/values/value.py
 995
 996
 997
 998
 999
1000
1001
1002
def create_info(self) -> "ValueInfo":

    if not self._data_registry:
        raise Exception(
            f"Can't create info object for value '{self.value_id}': data registry not set (yet)."
        )

    return self._data_registry.create_value_info(value=self.value_id)
create_info_data(**config: Any) -> Mapping[str, Any]
Source code in kiara/models/values/value.py
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
def create_info_data(self, **config: Any) -> Mapping[str, Any]:

    show_pedigree = config.get("show_pedigree", False)
    show_lineage = config.get("show_lineage", False)
    show_properties = config.get("show_properties", False)
    # show_destinies = config.get("show_destinies", False)
    # show_destiny_backlinks = config.get("show_destiny_backlinks", False)
    # show_data = config.get("show_data_preview", False)
    show_serialized = config.get("show_serialized", False)
    show_env_data_hashes = config.get("show_environment_hashes", False)
    show_env_data = config.get("show_environment_data", False)

    ignore_fields = config.get("ignore_fields", [])

    table: Dict[str, Any] = {}

    if "value_id" not in ignore_fields:
        table["value_id"] = self.value_id
    if "aliases" not in ignore_fields:
        if hasattr(self, "aliases"):
            table["aliases"] = self.aliases  # type: ignore

    if "kiara_id" not in ignore_fields:
        table["kiara_id"] = self.kiara_id

    for k in sorted(self.__fields__.keys()):

        if (
            k
            in [
                "serialized",
                "value_id",
                "aliases",
                "kiara_id",
                "environments",
                "lineage",
                "environment_hashes",
            ]
            or k in ignore_fields
        ):
            continue

        attr = getattr(self, k)
        if k in ["pedigree_output_name", "pedigree"]:
            continue
        else:
            v = attr

        table[k] = v

    if show_pedigree:
        pedigree = getattr(self, "pedigree")

        table["pedigree"] = pedigree
        if pedigree == ORPHAN:
            pedigree_output_name: Union[Any, None] = None
        else:
            pedigree_output_name = getattr(self, "pedigree_output_name")

        table["pedigree_output_name"] = pedigree_output_name

    if show_lineage:
        table["lineage"] = self.lineage

    if show_serialized:
        serialized = self._data_registry.retrieve_persisted_value_details(
            self.value_id
        )
        table["serialized"] = serialized

    if show_env_data_hashes:
        env_hashes = Syntax(
            orjson_dumps(self.environment_hashes, option=orjson.OPT_INDENT_2),
            "json",
            background_color="default",
        )
        table["environment_hashes"] = env_hashes

    if show_env_data:
        raise NotImplementedError()

    if show_properties:
        if not self.property_links:
            table["properties"] = {}
        else:
            properties = self._data_registry.load_values(self.property_links)
            table["properties"] = properties

    # if hasattr(self, "destiny_links") and show_destinies:
    #     if not self.destiny_links:  # type: ignore
    #         table["destinies"] = {}
    #     else:
    #         destinies = self._data_registry.load_values(self.destiny_links)  # type: ignore
    #         table["destinies"] = destinies
    #
    # if show_destiny_backlinks:
    #     if not self.destiny_backlinks:
    #         table["destiny backlinks"] = {}
    #     else:
    #         destiny_items: List[Any] = []
    #         for v_id, alias in self.destiny_backlinks.items():
    #             destiny_items.append(
    #                 f"[b]Value: [i]{v_id}[/i] (destiny alias: {alias})[/b]"
    #             )
    #             rendered = self._data_registry.pretty_print_data(
    #                 value_id=v_id, **config
    #             )
    #             destiny_items.append(rendered)
    #         table["destiny backlinks"] = destiny_items
    #
    # if show_data:
    #     rendered = self._data_registry.pretty_print_data(
    #         self.value_id, target_type="terminal_renderable"
    #     )
    #     table["data preview"] = rendered

    return table
create_renderable(**render_config: Any) -> RenderableType
Source code in kiara/models/values/value.py
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
def create_renderable(self, **render_config: Any) -> RenderableType:

    from kiara.utils.output import extract_renderable

    show_pedigree = render_config.get("show_pedigree", False)
    show_lineage = render_config.get("show_lineage", False)
    show_properties = render_config.get("show_properties", False)
    show_destinies = render_config.get("show_destinies", False)
    show_destiny_backlinks = render_config.get("show_destiny_backlinks", False)
    show_data = render_config.get("show_data_preview", False)
    show_serialized = render_config.get("show_serialized", False)
    show_env_data_hashes = render_config.get("show_environment_hashes", False)
    show_env_data = render_config.get("show_environment_data", False)

    ignore_fields = render_config.get("ignore_fields", [])

    table = Table(show_header=False, box=box.SIMPLE)
    table.add_column("Key", style="i")
    table.add_column("Value")

    info_data = self.create_info_data(**render_config)

    if "value_id" not in ignore_fields:
        table.add_row("value_id", str(info_data["value_id"]))
    if "aliases" not in ignore_fields:
        if info_data.get("aliases", None):
            aliases_str = ", ".join(info_data["aliases"])  # type: ignore
            table.add_row("aliases", aliases_str)
        # else:
        #     aliases_str = "-- n/a --"
        #     table.add_row("aliases", aliases_str)

    if "kiara_id" not in ignore_fields:
        table.add_row("kiara_id", str(info_data["kiara_id"]))

    table.add_row("", "")
    table.add_row("", Rule())
    for k in sorted(info_data.keys()):

        if (
            k
            in [
                "serialized",
                "value_id",
                "aliases",
                "kiara_id",
                "lineage",
                "properties",
                "environments",
                "environment_hashes",
            ]
            or k in ignore_fields
        ):
            continue

        attr = info_data[k]
        if k in ["pedigree_output_name", "pedigree"]:
            continue

        elif k == "value_status":
            v: RenderableType = f"[i]-- {attr.value} --[/i]"
        elif k == "value_size":
            v = format_size(attr)
        else:
            v = extract_renderable(attr)

        table.add_row(k, v)

    if (
        show_pedigree
        or show_lineage
        or show_serialized
        or show_properties
        or show_destinies
        or show_destiny_backlinks
        or show_env_data_hashes
        or show_env_data
    ):
        table.add_row("", "")
        table.add_row("", Rule())
        table.add_row("", "")

    if show_pedigree:
        pedigree = info_data["pedigree"]

        if pedigree == ORPHAN:
            v = "[i]-- external data --[/i]"
            pedigree_output_name: Union[Any, None] = None
        else:
            v = extract_renderable(pedigree)
            pedigree_output_name = info_data["pedigree_output_name"]

        row = ["pedigree", v]
        table.add_row(*row)
        if pedigree_output_name:
            row = ["pedigree_output_name", pedigree_output_name]
            table.add_row(*row)

    if show_lineage:
        table.add_row(
            "lineage", info_data["lineage"].create_renderable(include_ids=True)
        )

    if show_serialized:
        serialized = info_data["serialized"]
        table.add_row("serialized", serialized.create_renderable())

    if show_env_data_hashes:
        env_hashes = Syntax(
            orjson_dumps(
                info_data["environment_hashes"], option=orjson.OPT_INDENT_2
            ),
            "json",
            background_color="default",
        )
        table.add_row("environment_hashes", env_hashes)

    if show_env_data:
        raise NotImplementedError()

    if show_properties:
        if not info_data["properties"]:
            table.add_row("properties", "{}")
        else:
            properties = info_data["properties"]
            pr = properties.create_renderable(show_header=False)
            table.add_row("properties", pr)

    if hasattr(self, "destiny_links") and show_destinies:
        if not self.destiny_links:  # type: ignore
            table.add_row("destinies", "{}")
        else:
            destinies = self._data_registry.load_values(self.destiny_links)  # type: ignore
            dr = destinies.create_renderable(show_header=False)
            table.add_row("destinies", dr)

    if show_destiny_backlinks:
        if not self.destiny_backlinks:
            table.add_row("destiny backlinks", "{}")
        else:
            destiny_items: List[Any] = []
            for v_id, alias in self.destiny_backlinks.items():
                destiny_items.append(Rule())
                destiny_items.append(
                    f"[b]Value: [i]{v_id}[/i] (destiny alias: {alias})[/b]"
                )
                rendered = self._data_registry.pretty_print_data(
                    value_id=v_id, **render_config
                )
                destiny_items.append(rendered)
            table.add_row("destiny backlinks", Group(*destiny_items))

    if show_data:
        rendered = self._data_registry.pretty_print_data(
            self.value_id, target_type="terminal_renderable"
        )
        table.add_row("", "")
        table.add_row("", Rule())
        table.add_row("data preview", rendered)

    return table

UnloadableData

Bases: KiaraModel

A special 'marker' model, indicating that the data of value can't be loaded.

In most cases, the reason this happens is because the current kiara context is missing some value types and/or modules.

Source code in kiara/models/values/value.py
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
class UnloadableData(KiaraModel):

    """
    A special 'marker' model, indicating that the data of value can't be loaded.

    In most cases, the reason this happens is because the current kiara context is missing some value types and/or modules.
    """

    _kiara_model_id = "instance.unloadable_data"

    value: Value = Field(description="A reference to the value.")

    def _retrieve_id(self) -> str:
        return self.value.instance_id

    def _retrieve_data_to_hash(self) -> Any:
        return self.value.value_id.bytes

Attributes

value: Value = Field(description='A reference to the value.') class-attribute

ValueMap

Bases: KiaraModel, MutableMapping[str, Value]

Source code in kiara/models/values/value.py
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
class ValueMap(KiaraModel, MutableMapping[str, Value]):  # type: ignore

    values_schema: Dict[str, ValueSchema] = Field(
        description="The schemas for all the values in this set."
    )

    @property
    def field_names(self) -> Iterable[str]:
        return sorted(self.values_schema.keys())

    @abc.abstractmethod
    def get_value_obj(self, field_name: str) -> Value:
        pass

    @property
    def all_items_valid(self) -> bool:
        for field_name in self.values_schema.keys():
            item = self.get_value_obj(field_name)
            if not item.is_valid:
                return False
        return True

    def _retrieve_data_to_hash(self) -> Any:
        return {
            k: self.get_value_obj(k).instance_cid for k in self.values_schema.keys()
        }

    def check_invalid(self) -> Dict[str, str]:
        """Check whether the value set is invalid, if it is, return a description of what's wrong."""
        invalid: Dict[str, str] = {}
        for field_name in self.values_schema.keys():

            item = self.get_value_obj(field_name)
            field_schema = self.values_schema[field_name]
            if not field_schema.optional:
                msg: Union[str, None] = None
                if not item.value_status == ValueStatus.SET:

                    item_schema = self.values_schema[field_name]
                    if item_schema.is_required():

                        if not item.is_set:
                            msg = "not set"
                        elif item.value_status == ValueStatus.NONE:
                            msg = "no value"
                if msg:
                    invalid[field_name] = msg

        return invalid

    def get_value_data_for_fields(
        self, *field_names: str, raise_exception_when_unset: bool = False
    ) -> Dict[str, Any]:
        """
        Return the data for a one or several fields of this ValueMap.

        If a value is unset, by default 'None' is returned for it. Unless 'raise_exception_when_unset' is set to 'True',
        in which case an Exception will be raised (obviously).
        """
        if raise_exception_when_unset:
            unset: List[str] = []
            for k in field_names:
                v = self.get_value_obj(k)
                if not v.is_set:
                    if raise_exception_when_unset:
                        unset.append(k)
            if unset:
                raise Exception(
                    f"Can't get data for fields, one or several of the requested fields are not set yet: {', '.join(unset)}."
                )

        result: Dict[str, Any] = {}
        for k in field_names:
            v = self.get_value_obj(k)
            if not v.is_set:
                result[k] = None
            else:
                result[k] = v.data
        return result

    def get_value_data(
        self, field_name: str, raise_exception_when_unset: bool = False
    ) -> Any:
        return self.get_value_data_for_fields(
            field_name, raise_exception_when_unset=raise_exception_when_unset
        )[field_name]

    def get_all_value_ids(self) -> Dict[str, uuid.UUID]:
        return {k: self.get_value_obj(k).value_id for k in self.field_names}

    def get_all_value_data(
        self, raise_exception_when_unset: bool = False
    ) -> Dict[str, Any]:
        return self.get_value_data_for_fields(
            *self.field_names,
            raise_exception_when_unset=raise_exception_when_unset,
        )

    def set_values(self, **values) -> None:

        for k, v in values.items():
            self.set_value(k, v)

    def set_value(self, field_name: str, data: Any) -> None:
        raise Exception(
            f"The value set implementation '{self.__class__.__name__}' is read-only, and does not support the setting or changing of values."
        )

    def __getitem__(self, item: str) -> Value:

        return self.get_value_obj(item)

    def __setitem__(self, key: str, value):

        raise NotImplementedError()
        # self.set_value(key, value)

    def __delitem__(self, key: str):

        raise Exception(f"Removing items not supported: {key}")

    def __iter__(self):
        return iter(self.field_names)

    def __len__(self):
        return len(list(self.values_schema))

    def __repr__(self):
        return f"{self.__class__.__name__}(field_names={self.field_names})"

    def __str__(self):
        return self.__repr__()

    def create_invalid_renderable(self, **config) -> Union[RenderableType, None]:

        inv = self.check_invalid()
        if not inv:
            return None

        table = Table(show_header=False, box=box.SIMPLE)
        table.add_column("field name", style="i")
        table.add_column("details", style="b red")

        for field, err in inv.items():
            table.add_row(field, err)

        return table

    def create_renderable(self, **config: Any) -> RenderableType:

        in_panel = config.get("in_panel", None)
        if in_panel is None:
            if is_jupyter():
                in_panel = True
            else:
                in_panel = False

        render_value_data = config.get("render_value_data", True)
        field_title = config.get("field_title", "field")
        value_title = config.get("value_title", "value")
        show_header = config.get("show_header", True)
        show_type = config.get("show_data_type", False)

        table = Table(show_lines=False, show_header=show_header, box=box.SIMPLE)
        table.add_column(field_title, style="b")
        if show_type:
            table.add_column("data_type")
        table.add_column(value_title, style="i")

        for field_name in self.field_names:

            value = self.get_value_obj(field_name=field_name)
            if render_value_data:
                rendered = value._data_registry.pretty_print_data(
                    value_id=value.value_id, target_type="terminal_renderable", **config
                )
            else:
                rendered = value.create_renderable(**config)

            if show_type:
                table.add_row(field_name, value.value_schema.type, rendered)
            else:
                table.add_row(field_name, rendered)

        if in_panel:
            return Panel(table)
        else:
            return table

Attributes

values_schema: Dict[str, ValueSchema] = Field(description='The schemas for all the values in this set.') class-attribute
field_names: Iterable[str] property
all_items_valid: bool property

Functions

get_value_obj(field_name: str) -> Value abstractmethod
Source code in kiara/models/values/value.py
1314
1315
1316
@abc.abstractmethod
def get_value_obj(self, field_name: str) -> Value:
    pass
check_invalid() -> Dict[str, str]

Check whether the value set is invalid, if it is, return a description of what's wrong.

Source code in kiara/models/values/value.py
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
def check_invalid(self) -> Dict[str, str]:
    """Check whether the value set is invalid, if it is, return a description of what's wrong."""
    invalid: Dict[str, str] = {}
    for field_name in self.values_schema.keys():

        item = self.get_value_obj(field_name)
        field_schema = self.values_schema[field_name]
        if not field_schema.optional:
            msg: Union[str, None] = None
            if not item.value_status == ValueStatus.SET:

                item_schema = self.values_schema[field_name]
                if item_schema.is_required():

                    if not item.is_set:
                        msg = "not set"
                    elif item.value_status == ValueStatus.NONE:
                        msg = "no value"
            if msg:
                invalid[field_name] = msg

    return invalid
get_value_data_for_fields(*field_names: str, raise_exception_when_unset: bool = False) -> Dict[str, Any]

Return the data for a one or several fields of this ValueMap.

If a value is unset, by default 'None' is returned for it. Unless 'raise_exception_when_unset' is set to 'True', in which case an Exception will be raised (obviously).

Source code in kiara/models/values/value.py
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
def get_value_data_for_fields(
    self, *field_names: str, raise_exception_when_unset: bool = False
) -> Dict[str, Any]:
    """
    Return the data for a one or several fields of this ValueMap.

    If a value is unset, by default 'None' is returned for it. Unless 'raise_exception_when_unset' is set to 'True',
    in which case an Exception will be raised (obviously).
    """
    if raise_exception_when_unset:
        unset: List[str] = []
        for k in field_names:
            v = self.get_value_obj(k)
            if not v.is_set:
                if raise_exception_when_unset:
                    unset.append(k)
        if unset:
            raise Exception(
                f"Can't get data for fields, one or several of the requested fields are not set yet: {', '.join(unset)}."
            )

    result: Dict[str, Any] = {}
    for k in field_names:
        v = self.get_value_obj(k)
        if not v.is_set:
            result[k] = None
        else:
            result[k] = v.data
    return result
get_value_data(field_name: str, raise_exception_when_unset: bool = False) -> Any
Source code in kiara/models/values/value.py
1384
1385
1386
1387
1388
1389
def get_value_data(
    self, field_name: str, raise_exception_when_unset: bool = False
) -> Any:
    return self.get_value_data_for_fields(
        field_name, raise_exception_when_unset=raise_exception_when_unset
    )[field_name]
get_all_value_ids() -> Dict[str, uuid.UUID]
Source code in kiara/models/values/value.py
1391
1392
def get_all_value_ids(self) -> Dict[str, uuid.UUID]:
    return {k: self.get_value_obj(k).value_id for k in self.field_names}
get_all_value_data(raise_exception_when_unset: bool = False) -> Dict[str, Any]
Source code in kiara/models/values/value.py
1394
1395
1396
1397
1398
1399
1400
def get_all_value_data(
    self, raise_exception_when_unset: bool = False
) -> Dict[str, Any]:
    return self.get_value_data_for_fields(
        *self.field_names,
        raise_exception_when_unset=raise_exception_when_unset,
    )
set_values(**values) -> None
Source code in kiara/models/values/value.py
1402
1403
1404
1405
def set_values(self, **values) -> None:

    for k, v in values.items():
        self.set_value(k, v)
set_value(field_name: str, data: Any) -> None
Source code in kiara/models/values/value.py
1407
1408
1409
1410
def set_value(self, field_name: str, data: Any) -> None:
    raise Exception(
        f"The value set implementation '{self.__class__.__name__}' is read-only, and does not support the setting or changing of values."
    )
create_invalid_renderable(**config) -> Union[RenderableType, None]
Source code in kiara/models/values/value.py
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
def create_invalid_renderable(self, **config) -> Union[RenderableType, None]:

    inv = self.check_invalid()
    if not inv:
        return None

    table = Table(show_header=False, box=box.SIMPLE)
    table.add_column("field name", style="i")
    table.add_column("details", style="b red")

    for field, err in inv.items():
        table.add_row(field, err)

    return table
create_renderable(**config: Any) -> RenderableType
Source code in kiara/models/values/value.py
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
def create_renderable(self, **config: Any) -> RenderableType:

    in_panel = config.get("in_panel", None)
    if in_panel is None:
        if is_jupyter():
            in_panel = True
        else:
            in_panel = False

    render_value_data = config.get("render_value_data", True)
    field_title = config.get("field_title", "field")
    value_title = config.get("value_title", "value")
    show_header = config.get("show_header", True)
    show_type = config.get("show_data_type", False)

    table = Table(show_lines=False, show_header=show_header, box=box.SIMPLE)
    table.add_column(field_title, style="b")
    if show_type:
        table.add_column("data_type")
    table.add_column(value_title, style="i")

    for field_name in self.field_names:

        value = self.get_value_obj(field_name=field_name)
        if render_value_data:
            rendered = value._data_registry.pretty_print_data(
                value_id=value.value_id, target_type="terminal_renderable", **config
            )
        else:
            rendered = value.create_renderable(**config)

        if show_type:
            table.add_row(field_name, value.value_schema.type, rendered)
        else:
            table.add_row(field_name, rendered)

    if in_panel:
        return Panel(table)
    else:
        return table

ValueMapReadOnly

Bases: ValueMap

Source code in kiara/models/values/value.py
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
class ValueMapReadOnly(ValueMap):  # type: ignore

    _kiara_model_id = "instance.value_map.readonly"

    @classmethod
    def create_from_ids(cls, data_registry: "DataRegistry", **value_ids: uuid.UUID):

        values = {k: data_registry.get_value(v) for k, v in value_ids.items()}
        values_schema = {k: v.value_schema for k, v in values.items()}
        return ValueMapReadOnly.construct(
            value_items=values, values_schema=values_schema
        )

    @classmethod
    def create_from_values(cls, **values: Value):

        values_schema = {k: v.value_schema for k, v in values.items()}
        return ValueMapReadOnly.construct(
            value_items=values, values_schema=values_schema
        )

    value_items: Dict[str, Value] = Field(
        description="The values contained in this set."
    )

    def get_value_obj(self, field_name: str) -> Value:

        if field_name not in self.value_items.keys():
            raise KeyError(
                f"Field '{field_name}' not available in value set. Available fields: {', '.join(self.field_names)}"
            )
        return self.value_items[field_name]

Attributes

value_items: Dict[str, Value] = Field(description='The values contained in this set.') class-attribute

Functions

create_from_ids(data_registry: DataRegistry, **value_ids: uuid.UUID) classmethod
Source code in kiara/models/values/value.py
1498
1499
1500
1501
1502
1503
1504
1505
@classmethod
def create_from_ids(cls, data_registry: "DataRegistry", **value_ids: uuid.UUID):

    values = {k: data_registry.get_value(v) for k, v in value_ids.items()}
    values_schema = {k: v.value_schema for k, v in values.items()}
    return ValueMapReadOnly.construct(
        value_items=values, values_schema=values_schema
    )
create_from_values(**values: Value) classmethod
Source code in kiara/models/values/value.py
1507
1508
1509
1510
1511
1512
1513
@classmethod
def create_from_values(cls, **values: Value):

    values_schema = {k: v.value_schema for k, v in values.items()}
    return ValueMapReadOnly.construct(
        value_items=values, values_schema=values_schema
    )
get_value_obj(field_name: str) -> Value
Source code in kiara/models/values/value.py
1519
1520
1521
1522
1523
1524
1525
def get_value_obj(self, field_name: str) -> Value:

    if field_name not in self.value_items.keys():
        raise KeyError(
            f"Field '{field_name}' not available in value set. Available fields: {', '.join(self.field_names)}"
        )
    return self.value_items[field_name]

ValueMapWritable

Bases: ValueMap

Source code in kiara/models/values/value.py
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
class ValueMapWritable(ValueMap):  # type: ignore

    _kiara_model_id = "instance.value_map.writeable"

    @classmethod
    def create_from_schema(
        cls,
        kiara: "Kiara",
        schema: Mapping[str, ValueSchema],
        pedigree: ValuePedigree,
        unique_value_ids: bool = False,
    ) -> "ValueMapWritable":

        v = ValueMapWritable(
            values_schema=dict(schema),
            pedigree=pedigree,
            unique_value_ids=unique_value_ids,
        )
        v._kiara = kiara
        v._data_registry = kiara.data_registry
        return v

    value_items: Dict[str, Value] = Field(
        description="The values contained in this set.", default_factory=dict
    )
    pedigree: ValuePedigree = Field(
        description="The pedigree to add to all of the result values."
    )
    unique_value_ids: bool = Field(
        description="Whether this value map always creates new value(id)s, even when a dataset with matching hash is found.",
        default=True,
    )

    _values_uncommitted: Dict[str, Any] = PrivateAttr(default_factory=dict)
    _kiara: "Kiara" = PrivateAttr(default=None)
    _data_registry: "DataRegistry" = PrivateAttr(default=None)
    _auto_commit: bool = PrivateAttr(default=True)

    def get_value_obj(self, field_name: str) -> Value:
        """
        Retrieve the value object for the specified field.

        This class only creates the actual value object the first time it is requested, because there is a potential
        cost to assembling it, and it might not be needed ever.
        """
        if field_name not in self.values_schema.keys():
            raise Exception(
                f"Can't set data for field '{field_name}': field not valid, valid field names: {', '.join(self.field_names)}."
            )

        if field_name in self.value_items.keys():
            return self.value_items[field_name]
        elif field_name not in self._values_uncommitted.keys():
            raise Exception(
                f"Can't retrieve value for field '{field_name}': value not set (yet)."
            )

        schema = self.values_schema[field_name]
        value_data = self._values_uncommitted[field_name]
        if isinstance(value_data, Value):
            value = value_data
        elif isinstance(value_data, uuid.UUID):
            value = self._data_registry.get_value(value_data)
        else:
            value = self._data_registry.register_data(
                data=value_data,
                schema=schema,
                pedigree=self.pedigree,
                pedigree_output_name=field_name,
                reuse_existing=not self.unique_value_ids,
            )

        self._values_uncommitted.pop(field_name)
        self.value_items[field_name] = value
        return self.value_items[field_name]

    def sync_values(self):

        for field_name in self.field_names:
            self.get_value_obj(field_name)

        invalid = self.check_invalid()
        if invalid:
            e = InvalidValuesException(invalid_values=invalid)
            try:
                raise e
            except Exception:
                # this is silly, I know
                log_exception(e)
                raise e

    def set_value(self, field_name: str, data: Any) -> None:
        """Set the value for the specified field."""
        if field_name not in self.field_names:
            raise Exception(
                f"Can't set data for field '{field_name}': field not valid, valid field names: {', '.join(self.field_names)}."
            )
        if self.value_items.get(field_name, False):
            raise Exception(
                f"Can't set data for field '{field_name}': field already committed."
            )
        if self._values_uncommitted.get(field_name, None) is not None:
            raise Exception(
                f"Can't set data for field '{field_name}': field already set."
            )

        self._values_uncommitted[field_name] = data
        if self._auto_commit:
            self.get_value_obj(field_name=field_name)

Attributes

value_items: Dict[str, Value] = Field(description='The values contained in this set.', default_factory=dict) class-attribute
pedigree: ValuePedigree = Field(description='The pedigree to add to all of the result values.') class-attribute
unique_value_ids: bool = Field(description='Whether this value map always creates new value(id)s, even when a dataset with matching hash is found.', default=True) class-attribute

Functions

create_from_schema(kiara: Kiara, schema: Mapping[str, ValueSchema], pedigree: ValuePedigree, unique_value_ids: bool = False) -> ValueMapWritable classmethod
Source code in kiara/models/values/value.py
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
@classmethod
def create_from_schema(
    cls,
    kiara: "Kiara",
    schema: Mapping[str, ValueSchema],
    pedigree: ValuePedigree,
    unique_value_ids: bool = False,
) -> "ValueMapWritable":

    v = ValueMapWritable(
        values_schema=dict(schema),
        pedigree=pedigree,
        unique_value_ids=unique_value_ids,
    )
    v._kiara = kiara
    v._data_registry = kiara.data_registry
    return v
get_value_obj(field_name: str) -> Value

Retrieve the value object for the specified field.

This class only creates the actual value object the first time it is requested, because there is a potential cost to assembling it, and it might not be needed ever.

Source code in kiara/models/values/value.py
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
def get_value_obj(self, field_name: str) -> Value:
    """
    Retrieve the value object for the specified field.

    This class only creates the actual value object the first time it is requested, because there is a potential
    cost to assembling it, and it might not be needed ever.
    """
    if field_name not in self.values_schema.keys():
        raise Exception(
            f"Can't set data for field '{field_name}': field not valid, valid field names: {', '.join(self.field_names)}."
        )

    if field_name in self.value_items.keys():
        return self.value_items[field_name]
    elif field_name not in self._values_uncommitted.keys():
        raise Exception(
            f"Can't retrieve value for field '{field_name}': value not set (yet)."
        )

    schema = self.values_schema[field_name]
    value_data = self._values_uncommitted[field_name]
    if isinstance(value_data, Value):
        value = value_data
    elif isinstance(value_data, uuid.UUID):
        value = self._data_registry.get_value(value_data)
    else:
        value = self._data_registry.register_data(
            data=value_data,
            schema=schema,
            pedigree=self.pedigree,
            pedigree_output_name=field_name,
            reuse_existing=not self.unique_value_ids,
        )

    self._values_uncommitted.pop(field_name)
    self.value_items[field_name] = value
    return self.value_items[field_name]
sync_values()
Source code in kiara/models/values/value.py
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
def sync_values(self):

    for field_name in self.field_names:
        self.get_value_obj(field_name)

    invalid = self.check_invalid()
    if invalid:
        e = InvalidValuesException(invalid_values=invalid)
        try:
            raise e
        except Exception:
            # this is silly, I know
            log_exception(e)
            raise e
set_value(field_name: str, data: Any) -> None

Set the value for the specified field.

Source code in kiara/models/values/value.py
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
def set_value(self, field_name: str, data: Any) -> None:
    """Set the value for the specified field."""
    if field_name not in self.field_names:
        raise Exception(
            f"Can't set data for field '{field_name}': field not valid, valid field names: {', '.join(self.field_names)}."
        )
    if self.value_items.get(field_name, False):
        raise Exception(
            f"Can't set data for field '{field_name}': field already committed."
        )
    if self._values_uncommitted.get(field_name, None) is not None:
        raise Exception(
            f"Can't set data for field '{field_name}': field already set."
        )

    self._values_uncommitted[field_name] = data
    if self._auto_commit:
        self.get_value_obj(field_name=field_name)

Functions