Skip to content

registry

Attributes

Classes

AllEvents

Bases: KiaraEvent

Source code in src/kiara/registries/events/registry.py
21
22
class AllEvents(KiaraEvent):
    pass

EventRegistry

Bases: object

Source code in src/kiara/registries/events/registry.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class EventRegistry(object):
    def __init__(self, kiara: "Kiara"):
        self._kiara: Kiara = kiara
        self._producers: Dict[uuid.UUID, EventProducer] = {}
        self._listeners: Dict[uuid.UUID, EventListener] = {}
        self._subscriptions: Dict[uuid.UUID, List[str]] = {}

    def add_producer(self, producer: EventProducer) -> Callable:
        producer_id = ID_REGISTRY.generate(
            obj=producer, comment="adding event producer"
        )
        func = partial(self.handle_events, producer_id)
        return func

    def add_listener(self, listener, *subscriptions: str):
        if not subscriptions:
            _subscriptions = ["*"]
        else:
            _subscriptions = list(subscriptions)

        listener_id = ID_REGISTRY.generate(
            obj=listener, comment="adding event listener"
        )
        self._listeners[listener_id] = listener
        self._subscriptions[listener_id] = _subscriptions

    def _matches_subscription(
        self, events: Iterable[KiaraEvent], subscriptions: Iterable[str]
    ) -> Iterable[KiaraEvent]:
        result = []
        for subscription in subscriptions:
            for event in events:
                match = fnmatch.filter([event.get_event_type()], subscription)
                if match:
                    result.append(event)

        return result

    def handle_events(self, producer_id: uuid.UUID, *events: KiaraEvent):
        event_targets: Dict[uuid.UUID, List[KiaraEvent]] = {}

        for l_id, listener in self._listeners.items():
            matches = self._matches_subscription(
                events=events, subscriptions=self._subscriptions[l_id]
            )
            if matches:
                event_targets.setdefault(l_id, []).extend(matches)

        responses = {}
        for l_id, l_events in event_targets.items():
            listener = self._listeners[l_id]
            response = listener.handle_events(*l_events)
            responses[l_id] = response

        for l_id, response in responses.items():
            if response is None:
                continue

            a_listener: AsyncEventListener = self._listeners[l_id]  # type: ignore
            if not hasattr(a_listener, "wait_for_processing"):
                raise Exception(
                    "Can't wait for processing of event for listener: listener does not provide 'wait_for_processing' method."
                )
            a_listener.wait_for_processing(response)

Functions

add_producer(producer: EventProducer) -> Callable
Source code in src/kiara/registries/events/registry.py
32
33
34
35
36
37
def add_producer(self, producer: EventProducer) -> Callable:
    producer_id = ID_REGISTRY.generate(
        obj=producer, comment="adding event producer"
    )
    func = partial(self.handle_events, producer_id)
    return func
add_listener(listener, *subscriptions: str)
Source code in src/kiara/registries/events/registry.py
39
40
41
42
43
44
45
46
47
48
49
def add_listener(self, listener, *subscriptions: str):
    if not subscriptions:
        _subscriptions = ["*"]
    else:
        _subscriptions = list(subscriptions)

    listener_id = ID_REGISTRY.generate(
        obj=listener, comment="adding event listener"
    )
    self._listeners[listener_id] = listener
    self._subscriptions[listener_id] = _subscriptions
handle_events(producer_id: uuid.UUID, *events: KiaraEvent)
Source code in src/kiara/registries/events/registry.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def handle_events(self, producer_id: uuid.UUID, *events: KiaraEvent):
    event_targets: Dict[uuid.UUID, List[KiaraEvent]] = {}

    for l_id, listener in self._listeners.items():
        matches = self._matches_subscription(
            events=events, subscriptions=self._subscriptions[l_id]
        )
        if matches:
            event_targets.setdefault(l_id, []).extend(matches)

    responses = {}
    for l_id, l_events in event_targets.items():
        listener = self._listeners[l_id]
        response = listener.handle_events(*l_events)
        responses[l_id] = response

    for l_id, response in responses.items():
        if response is None:
            continue

        a_listener: AsyncEventListener = self._listeners[l_id]  # type: ignore
        if not hasattr(a_listener, "wait_for_processing"):
            raise Exception(
                "Can't wait for processing of event for listener: listener does not provide 'wait_for_processing' method."
            )
        a_listener.wait_for_processing(response)