Skip to content

dict

SaveDictModule (StoreValueTypeModule)

Source code in core/dict.py
class SaveDictModule(StoreValueTypeModule):

    _config_cls = JsonSerializationConfig
    _module_type_name = "store"

    @classmethod
    def retrieve_supported_types(cls) -> typing.Union[str, typing.Iterable[str]]:
        return "dict"

    def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:

        import orjson

        options = self.get_config_value("options")
        file_name = self.get_config_value("file_name")
        json_str = orjson.dumps(value.get_value_data(), option=options)

        bp = Path(base_path)
        bp.mkdir(parents=True, exist_ok=True)

        full_path = bp / file_name
        full_path.write_bytes(json_str)

        load_config = {
            "module_type": "generic.restore_from_json",
            "base_path_input_name": "base_path",
            "inputs": {
                "base_path": base_path,
                "file_name": self.get_config_value("file_name"),
            },
            "output_name": "value_item",
        }

        return load_config

store_value(self, value, base_path)

Save the value, and return the load config needed to load it again.

Source code in core/dict.py
def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:

    import orjson

    options = self.get_config_value("options")
    file_name = self.get_config_value("file_name")
    json_str = orjson.dumps(value.get_value_data(), option=options)

    bp = Path(base_path)
    bp.mkdir(parents=True, exist_ok=True)

    full_path = bp / file_name
    full_path.write_bytes(json_str)

    load_config = {
        "module_type": "generic.restore_from_json",
        "base_path_input_name": "base_path",
        "inputs": {
            "base_path": base_path,
            "file_name": self.get_config_value("file_name"),
        },
        "output_name": "value_item",
    }

    return load_config