Skip to content

service

Attributes

DEFAULT_LISTEN_HOST = '*' module-attribute

DEFAULT_PORT = 8000 module-attribute

Classes

KiaraZmqAPI

Bases: object

Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/zmq/service/__init__.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
class KiaraZmqAPI(object):
    def __init__(
        self,
        api_wrap: KiaraAPIWrap,
        stdout: Union[str, None] = None,
        stderr: Union[str, None] = None,
        host: Union[str, None] = None,
        port: Union[int, None] = None,
        listen_timout_in_ms: Union[int, None] = None,
    ):

        if listen_timout_in_ms is None:
            listen_timout_in_ms = 0

        if host in [None, "*", "localhost"]:
            host_ip = "127.0.0.1"
        else:
            host_ip = host  # type: ignore

        if not port:
            import socketserver

            with socketserver.TCPServer((host_ip, 0), None) as s:  # type: ignore
                port = s.server_address[1]

        self._api_wrap: KiaraAPIWrap = api_wrap
        self._api_wrap.exit_process = False

        self._listen_host: str = host_ip
        self._port: int = int(port)
        self._service_thread = None
        self._msg_builder = KiaraApiMsgBuilder()
        self._api_endpoints: ApiEndpoints = ApiEndpoints(api_cls=KiaraAPI)

        self._initial_timeout = listen_timout_in_ms
        self._allow_timeout_change = False

        if stdout is None:
            stdout = get_default_stdout_zmq_service_log_path(
                context_name=api_wrap.kiara_context_name
            )

        if stderr is None:
            stderr = get_default_stderr_zmq_service_log_path(
                context_name=api_wrap.kiara_context_name
            )

        if isinstance(stdout, str):
            os.makedirs(os.path.dirname(stdout), exist_ok=True)
            self._stdout = open(stdout, "w")
        else:
            self._stdout = stdout

        if isinstance(stderr, str):
            os.makedirs(os.path.dirname(stderr), exist_ok=True)
            self._stderr = open(stderr, "w")
        else:
            self._stderr = stderr

        # reserving host and port, cross-process
        zmq_base = os.path.join(KIARA_MAIN_CONTEXT_LOCKS_PATH, "zmq")
        service_info_file = os.path.join(
            zmq_base, f"{self._api_wrap.kiara_context_name}.zmq"
        )

        if os.path.exists(service_info_file):
            raise KiaraException(
                f"Zmq service port for context '{self._api_wrap.kiara_context_name}' already reserved: {service_info_file}"
            )

        os.makedirs(os.path.dirname(service_info_file), exist_ok=True)

        details = KiaraZmqServiceDetails.construct(
            context_name=self._api_wrap.kiara_context_name,
            process_id=os.getpid(),
            stdout=stdout,
            stderr=stderr,
            newly_started=None,
            host=host_ip,
            port=port,
        )

        with open(service_info_file, "wb") as f:
            f.write(orjson.dumps(details.dict()))

        def delete_info_file():
            os.unlink(service_info_file)

        atexit.register(delete_info_file)

    def service_loop(self):

        try:

            api = self._api_wrap.kiara_api

            timeout = self._initial_timeout

            context = zmq.Context()
            context_rep_socket = context.socket(zmq.REP)
            context_rep_socket.bind(f"tcp://{self._listen_host}:{self._port}")

            poller = zmq.Poller()
            poller.register(context_rep_socket, zmq.POLLIN)

            stop = False
            while not stop:

                if timeout:
                    socks = dict(poller.poll(timeout))
                else:
                    socks = dict(poller.poll())

                if not socks:
                    print(
                        "Socket timed out, shutting down service...", file=self._stdout
                    )
                    stop = True

                if (
                    context_rep_socket in socks
                    and socks[context_rep_socket] == zmq.POLLIN
                ):

                    #  Wait for next request from client
                    msg = context_rep_socket.recv_multipart()
                    print("Received request: ", msg, file=self._stdout)
                    decoded = self._msg_builder.decode_msg(msg)

                    if decoded.endpoint == "ping":
                        result = "pong"
                    elif decoded.endpoint in ["shutdown", "stop"]:
                        print("Shutting down...", file=self._stdout)
                        result = "ok"
                        stop = True
                    elif decoded.endpoint == "cli":
                        result = self.call_cli(api=api, **decoded.args)
                    elif decoded.endpoint == "control":
                        raise NotImplementedError()
                    else:
                        result = self.call_endpoint(
                            api=api, endpoint=decoded.endpoint, **decoded.args
                        )

                    resp_msg = self._msg_builder.encode_msg(decoded.endpoint, result)
                    context_rep_socket.send_multipart(resp_msg)

        except Exception as e:
            import traceback

            traceback.print_exc()
            print(f"ERROR IN ZMQ SERVICE: {e}", file=self._stderr)
            print("Stopping...", file=self._stderr)

    def call_cli(self, api: KiaraAPI, **kwargs) -> Mapping[str, str]:

        console = get_console()
        old_width = console.width

        console_width = kwargs.get("console_width", old_width)
        color_system = kwargs.get("color_system", None)

        sub_command = kwargs.get("sub-command")

        console.width = console_width
        stdout = ""
        stderr = ""
        try:
            with get_proxy_console(
                width=console_width,
                color_system=color_system,
                restore_default_console=False,
            ) as proxy_console:
                with proxy_console.capture() as capture:

                    try:
                        proxy_cli.main(
                            args=sub_command,
                            prog_name="kiara",
                            obj=self._api_wrap,
                            standalone_mode=False,
                        )
                    except Exception as e:
                        stderr = str(e)

                if not stderr:
                    stdout = capture.get()
        except Exception as oe:
            stderr = str(oe)

        return {"stdout": stdout, "stderr": stderr}

    def call_endpoint(self, api: KiaraAPI, endpoint: str, **kwargs) -> Any:

        try:
            endpoint_proxy = self._api_endpoints.get_api_endpoint(
                endpoint_name=endpoint
            )
        except Exception as e:
            msg = str(e)
            return {"error": msg}

        result = endpoint_proxy.execute(instance=api, **kwargs)
        return result

    def start(self):

        if self._service_thread is not None:
            raise Exception("Service already running")

        self._service_thread = Thread(target=self.service_loop)
        self._service_thread.start()

        return self._service_thread

    def stop(self):

        if self._service_thread is None:
            raise Exception("Service not running")

        if self._listen_host in ["0.0.0.0", "*"]:  # noqa
            c_host = "localhost"
        else:
            c_host = self._listen_host

        from kiara.zmq.client import KiaraZmqClient

        zmq_client = KiaraZmqClient(host=c_host, port=self._port)
        zmq_client.request(endpoint_name="stop", args={})

        self._service_thread.join()
        self._service_thread = None

Functions

service_loop()
Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/zmq/service/__init__.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def service_loop(self):

    try:

        api = self._api_wrap.kiara_api

        timeout = self._initial_timeout

        context = zmq.Context()
        context_rep_socket = context.socket(zmq.REP)
        context_rep_socket.bind(f"tcp://{self._listen_host}:{self._port}")

        poller = zmq.Poller()
        poller.register(context_rep_socket, zmq.POLLIN)

        stop = False
        while not stop:

            if timeout:
                socks = dict(poller.poll(timeout))
            else:
                socks = dict(poller.poll())

            if not socks:
                print(
                    "Socket timed out, shutting down service...", file=self._stdout
                )
                stop = True

            if (
                context_rep_socket in socks
                and socks[context_rep_socket] == zmq.POLLIN
            ):

                #  Wait for next request from client
                msg = context_rep_socket.recv_multipart()
                print("Received request: ", msg, file=self._stdout)
                decoded = self._msg_builder.decode_msg(msg)

                if decoded.endpoint == "ping":
                    result = "pong"
                elif decoded.endpoint in ["shutdown", "stop"]:
                    print("Shutting down...", file=self._stdout)
                    result = "ok"
                    stop = True
                elif decoded.endpoint == "cli":
                    result = self.call_cli(api=api, **decoded.args)
                elif decoded.endpoint == "control":
                    raise NotImplementedError()
                else:
                    result = self.call_endpoint(
                        api=api, endpoint=decoded.endpoint, **decoded.args
                    )

                resp_msg = self._msg_builder.encode_msg(decoded.endpoint, result)
                context_rep_socket.send_multipart(resp_msg)

    except Exception as e:
        import traceback

        traceback.print_exc()
        print(f"ERROR IN ZMQ SERVICE: {e}", file=self._stderr)
        print("Stopping...", file=self._stderr)
call_cli(api: KiaraAPI, **kwargs: KiaraAPI) -> Mapping[str, str]
Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/zmq/service/__init__.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def call_cli(self, api: KiaraAPI, **kwargs) -> Mapping[str, str]:

    console = get_console()
    old_width = console.width

    console_width = kwargs.get("console_width", old_width)
    color_system = kwargs.get("color_system", None)

    sub_command = kwargs.get("sub-command")

    console.width = console_width
    stdout = ""
    stderr = ""
    try:
        with get_proxy_console(
            width=console_width,
            color_system=color_system,
            restore_default_console=False,
        ) as proxy_console:
            with proxy_console.capture() as capture:

                try:
                    proxy_cli.main(
                        args=sub_command,
                        prog_name="kiara",
                        obj=self._api_wrap,
                        standalone_mode=False,
                    )
                except Exception as e:
                    stderr = str(e)

            if not stderr:
                stdout = capture.get()
    except Exception as oe:
        stderr = str(oe)

    return {"stdout": stdout, "stderr": stderr}
call_endpoint(api: KiaraAPI, endpoint: str, **kwargs: str) -> Any
Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/zmq/service/__init__.py
219
220
221
222
223
224
225
226
227
228
229
230
def call_endpoint(self, api: KiaraAPI, endpoint: str, **kwargs) -> Any:

    try:
        endpoint_proxy = self._api_endpoints.get_api_endpoint(
            endpoint_name=endpoint
        )
    except Exception as e:
        msg = str(e)
        return {"error": msg}

    result = endpoint_proxy.execute(instance=api, **kwargs)
    return result
start()
Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/zmq/service/__init__.py
232
233
234
235
236
237
238
239
240
def start(self):

    if self._service_thread is not None:
        raise Exception("Service already running")

    self._service_thread = Thread(target=self.service_loop)
    self._service_thread.start()

    return self._service_thread
stop()
Source code in /opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/site-packages/kiara/zmq/service/__init__.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def stop(self):

    if self._service_thread is None:
        raise Exception("Service not running")

    if self._listen_host in ["0.0.0.0", "*"]:  # noqa
        c_host = "localhost"
    else:
        c_host = self._listen_host

    from kiara.zmq.client import KiaraZmqClient

    zmq_client = KiaraZmqClient(host=c_host, port=self._port)
    zmq_client.request(endpoint_name="stop", args={})

    self._service_thread.join()
    self._service_thread = None

Functions