Skip to content

messages

Attributes

ReqMsg = namedtuple('ReqMsg', ['version', 'endpoint', 'args']) module-attribute

Classes

KiaraApiMsgBuilder

Bases: object

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/zmq/messages/__init__.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class KiaraApiMsgBuilder(object):
    def __init__(self):
        self._version_nr_mayor = 0
        self._version_nr_minor = 0
        self._version = int.to_bytes(
            self._version_nr_mayor, length=1, byteorder="big"
        ) + int.to_bytes(self._version_nr_minor, length=1, byteorder="big")

    def encode_msg(self, endpoint_name: str, args: Any) -> List[bytes]:

        try:
            if args:
                if hasattr(args, "json"):
                    _args = args.json().encode()
                else:
                    _args = orjson.dumps(args, option=DEFAULT_ORJSON_OPTIONS)
                return [self._version, endpoint_name.encode(), _args]
            else:
                return [self._version, endpoint_name.encode()]
        except Exception as e:
            return [
                self._version,
                endpoint_name.encode(),
                orjson.dumps({"error": str(e)}),
            ]

    def decode_msg(self, msg: List[bytes]) -> ReqMsg:

        version, endpoint = msg[0], msg[1]
        if len(msg) == 3:
            args = orjson.loads(msg[2])
        else:
            args = {}

        return ReqMsg(version, endpoint.decode(), args)

Functions

encode_msg(endpoint_name: str, args: Any) -> List[bytes]
Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/zmq/messages/__init__.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def encode_msg(self, endpoint_name: str, args: Any) -> List[bytes]:

    try:
        if args:
            if hasattr(args, "json"):
                _args = args.json().encode()
            else:
                _args = orjson.dumps(args, option=DEFAULT_ORJSON_OPTIONS)
            return [self._version, endpoint_name.encode(), _args]
        else:
            return [self._version, endpoint_name.encode()]
    except Exception as e:
        return [
            self._version,
            endpoint_name.encode(),
            orjson.dumps({"error": str(e)}),
        ]
decode_msg(msg: List[bytes]) -> ReqMsg
Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/zmq/messages/__init__.py
38
39
40
41
42
43
44
45
46
def decode_msg(self, msg: List[bytes]) -> ReqMsg:

    version, endpoint = msg[0], msg[1]
    if len(msg) == 3:
        args = orjson.loads(msg[2])
    else:
        args = {}

    return ReqMsg(version, endpoint.decode(), args)