Source code for kappa_sdk.event_bus

import uuid
from typing import Callable, Deque, Dict, Optional, Any, List
from threading import Thread, Lock
import logging
from collections import deque
from signalrcore.hub_connection_builder import HubConnectionBuilder
from .event import Event

logger = logging.getLogger(__name__)


class EventSubscriber:
    def __init__(self, event_handler: Callable[[Event, str], None], event_filter: Callable[[Event], bool], field_id: Optional[str]):
        self.event_handler = event_handler
        self.event_filter = event_filter
        self.field_id = field_id


def default_event_filter(event: Event) -> bool:
    return True


[docs] class EventBus(Thread): """ KAPPA Automate event bus. SignalR-based event bus, running in a separate thread. .. note:: Should not be instantiated directly. Parameters ---------- service_address: The address of the Signal-R event bus. buffer_size: The size of the message buffer for retrospective subscription. The event bus will keep a given number of messages and will feed them to each new subscriber, so the subscriber wouldn't miss the events due to the eventual timing consistency. """ def __init__(self, service_address: str, access_token: Optional[str], buffer_size: int = 1000, verify_ssl: bool = True): Thread.__init__(self) connection_options: Dict[Any, Any] = {"verify_ssl": verify_ssl, "access_token_factory": lambda: access_token} self.__lock = Lock() self.__buffer_size = max(0, buffer_size) self.__hub_connection = HubConnectionBuilder().with_url(service_address, options=connection_options).configure_logging( logging.ERROR).with_automatic_reconnect({"type": "raw", "keep_alive_interval": 15, "reconnect_interval": 5, "max_attempts": 5}).build() self.__events_queue: Deque[Event] = deque() self.__subscribers: Dict[str, EventSubscriber] = dict() self.__hub_connection.on("notify", self.__on_event) self.setDaemon(True) def run(self) -> None: """ Starts the event bus. Starts the thread's worker of this event bus. """ self.__hub_connection.start() def stop(self) -> None: """ Stops the event bus. Stops the event bus and exits its thread. """ with self.__lock: self.__hub_connection.stop() self.join() def subscribe(self, event_handler: Callable[[Event, str], None], event_filter: Optional[Callable[[Event], bool]], field_id: Optional[str] = None) -> str: """ Subscribes to events. Subscribes to KAPPA Automate events. Parameters ---------- event_handler: Your event handler. When called, your handler will be provided with the :class:`kappa_sdk.Event` and subscription token. event_filter: Event filter that allows you to subscribe to single or multiple events, based on condition. field_id: Identifier of the field that is used to receive field-level events. Returns ------- str: Subscription token that can be used to unsubscribe your handler. """ if event_filter is None: event_filter = default_event_filter with self.__lock: if field_id is not None: self.__hub_connection.send('openField', [field_id]) subscription_token = str(uuid.uuid4()) subscriber = EventSubscriber(event_handler, event_filter, field_id) self.__subscribers[subscription_token] = subscriber events = list(self.__events_queue) for event in events: if subscriber.event_filter(event): subscriber.event_handler(event, subscription_token) return subscription_token def subscribe_topic(self, event_handler: Callable[[Event, str], None], event_topic: str, event_filter: Optional[Callable[[Event], bool]] = None, field_id: Optional[str] = None) -> str: """ Subscribes to events by topic. Subscribes to KAPPA Automate events based on event topic. Parameters ---------- event_handler: Your event handler. When called, your handler will be provided with the :class:`kappa_sdk.Event` and subscription token. event_topic: Event topic to subscribe to. event_filter: Optional event filter that allows to specify an additional subscription condition. field_id: Identifier of the field that is used to receive field-level events. Returns ------- str: Subscription token that can be used to unsubscribe the handler. """ event_filter_lambda = event_filter if event_filter is not None else default_event_filter return self.subscribe(event_handler, (lambda e: e.topic == event_topic and event_filter_lambda(e)), field_id) def unsubscribe(self, subscription_token: str) -> None: """ Unsubscribes the event handler. Unsubscribes your event handler. Parameters ---------- subscription_token: Subscription token that was returned when calling "subscribe" or "subscribe_topic" methods. """ with self.__lock: subscriber = self.__subscribers[subscription_token] self.__subscribers.pop(subscription_token) if subscriber.field_id is not None and all(x.field_id != subscriber.field_id for x in self.__subscribers.values()): self.__hub_connection.send('closeField', [subscriber.field_id]) def __on_event(self, messages: List[Dict[str, Any]]) -> None: for message in messages: event = Event(message["eventId"], message["objectId"], message["objectType"], message["eventType"], message["topic"], message["context"], message["jsonContent"], message["creationTimeUtc"], message["transactionId"]) with self.__lock: if len(self.__events_queue) >= self.__buffer_size: self.__events_queue.popleft() self.__events_queue.append(event) logger.info("SignalR retrospective queue size: {}".format(len(self.__events_queue))) # use a copy of list of subscribers because it can get modified (subscriber's event handler might have another subscription inside) for (subscription_token, subscriber) in list(self.__subscribers.items()): if subscriber.event_filter(event): subscriber.event_handler(event, subscription_token)