Skip to content

msgpack

DeserializeFromMsgPackModule (KiaraModule)

Source code in core/bytes/msgpack.py
class DeserializeFromMsgPackModule(KiaraModule):

    _module_type_name = "to_value"
    _config_cls = SerializeToMsgPackModuleConfig

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {"bytes": {"type": "bytes", "doc": "The msgpack-serialized value."}}

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {
            "value_type": {"type": "string", "doc": "The type of the value."},
            "value_data": {
                "type": "any",
                "doc": f"The {self.get_config_value('value_type')} value.",
            },
            "value_metadata": {
                "type": "dict",
                "doc": "A dictionary with metadata of the serialized table. The result dict has the metadata key as key, and two sub-values under each key: 'metadata_item' (the actual metadata) and 'metadata_item_schema' (the schema for the metadata).",
            },
        }

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        import msgpack

        msg = inputs.get_value_data("bytes")

        unpacked = msgpack.unpackb(msg, raw=False)

        value_type = unpacked["value_type"]
        outputs.set_value("value_type", value_type)

        metadata = unpacked["metadata"]
        outputs.set_value("value_metadata", metadata)

        new_data = unpacked["data"]

        if not hasattr(self, f"to_{value_type}"):
            raise KiaraProcessingException(
                f"Value type not supported for msgpack deserialization: {value_type}"
            )

        func = getattr(self, f"to_{value_type}")
        obj = func(data=new_data)
        outputs.set_value("value_data", obj)

    def to_table(self, data: bytes) -> typing.Any:

        import pyarrow as pa

        reader = pa.ipc.open_stream(data)

        batches = [b for b in reader]
        new_table = pa.Table.from_batches(batches)

        return new_table

    def to_boolean(self, data: bytes):

        return data

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/bytes/msgpack.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {"bytes": {"type": "bytes", "doc": "The msgpack-serialized value."}}

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/bytes/msgpack.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {
        "value_type": {"type": "string", "doc": "The type of the value."},
        "value_data": {
            "type": "any",
            "doc": f"The {self.get_config_value('value_type')} value.",
        },
        "value_metadata": {
            "type": "dict",
            "doc": "A dictionary with metadata of the serialized table. The result dict has the metadata key as key, and two sub-values under each key: 'metadata_item' (the actual metadata) and 'metadata_item_schema' (the schema for the metadata).",
        },
    }

SerializeToMsgPackModule (KiaraModule)

Source code in core/bytes/msgpack.py
class SerializeToMsgPackModule(KiaraModule):

    _module_type_name = "from_value"
    _config_cls = SerializeToMsgPackModuleConfig

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {
            "value_item": {
                "type": self.config.get("value_type"),
                "doc": f"A {self.get_config_value('value_type')} value.",
            }
        }

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {
            "bytes": {
                "type": "bytes",
                "doc": f"The msgpack-serialized {self.get_config_value('value_type')} value.",
            }
        }

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        import msgpack

        type_name: str = self.get_config_value("value_type")

        if not hasattr(self, f"from_{type_name}"):
            raise KiaraProcessingException(
                f"Value type not supported for msgpack serialization: {type_name}"
            )

        func = getattr(self, f"from_{type_name}")

        value = inputs.get_value_obj("value_item")

        metadata = value.get_metadata(also_return_schema=True)

        msg = func(value=value)
        data = {"value_type": value.type_name, "metadata": metadata, "data": msg}

        msg = msgpack.packb(data, use_bin_type=True)

        outputs.set_value("bytes", msg)

    def from_table(self, value: Value) -> bytes:

        import pyarrow as pa

        table_val: Value = value
        table: pa.Table = table_val.get_value_data()

        sink = pa.BufferOutputStream()
        writer = pa.ipc.new_stream(sink, table.schema)

        writer.write(table)
        writer.close()

        buf: pa.Buffer = sink.getvalue()
        return memoryview(buf)

    def from_boolean(self, value: Value) -> bytes:

        return value.get_value_data()

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/bytes/msgpack.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {
        "value_item": {
            "type": self.config.get("value_type"),
            "doc": f"A {self.get_config_value('value_type')} value.",
        }
    }

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/bytes/msgpack.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {
        "bytes": {
            "type": "bytes",
            "doc": f"The msgpack-serialized {self.get_config_value('value_type')} value.",
        }
    }

SerializeToMsgPackModuleConfig (ModuleTypeConfigSchema) pydantic-model

Source code in core/bytes/msgpack.py
class SerializeToMsgPackModuleConfig(ModuleTypeConfigSchema):

    value_type: str = Field(description="The value type to serialize/deserialize.")

value_type: str pydantic-field required

The value type to serialize/deserialize.