import uuid
from typing import Callable, Deque, Dict, Optional, Any, List
from threading import Thread, Lock
import logging
from collections import deque
from signalrcore.hub_connection_builder import HubConnectionBuilder
from .event import Event
logger = logging.getLogger(__name__)
class EventSubscriber:
def __init__(self, event_handler: Callable[[Event, str], None], event_filter: Callable[[Event], bool], field_id: Optional[str]):
self.event_handler = event_handler
self.event_filter = event_filter
self.field_id = field_id
def default_event_filter(event: Event) -> bool:
return True
[docs]
class EventBus(Thread):
""" KAPPA Automate event bus.
SignalR-based event bus, running in a separate thread.
.. note:: Should not be instantiated directly.
Parameters
----------
service_address:
The address of the Signal-R event bus.
buffer_size:
The size of the message buffer for retrospective subscription.
The event bus will keep a given number of messages and will feed them to each new subscriber,
so the subscriber wouldn't miss the events due to the eventual timing consistency.
"""
def __init__(self, service_address: str, access_token: Optional[str], buffer_size: int = 1000, verify_ssl: bool = True):
Thread.__init__(self)
connection_options: Dict[Any, Any] = {"verify_ssl": verify_ssl, "access_token_factory": lambda: access_token}
self.__lock = Lock()
self.__buffer_size = max(0, buffer_size)
self.__hub_connection = HubConnectionBuilder().with_url(service_address, options=connection_options).configure_logging(
logging.ERROR).with_automatic_reconnect({"type": "raw", "keep_alive_interval": 15, "reconnect_interval": 5, "max_attempts": 5}).build()
self.__events_queue: Deque[Event] = deque()
self.__subscribers: Dict[str, EventSubscriber] = dict()
self.__hub_connection.on("notify", self.__on_event)
self.setDaemon(True)
def run(self) -> None:
""" Starts the event bus.
Starts the thread's worker of this event bus.
"""
self.__hub_connection.start()
def stop(self) -> None:
""" Stops the event bus.
Stops the event bus and exits its thread.
"""
with self.__lock:
self.__hub_connection.stop()
self.join()
def subscribe(self, event_handler: Callable[[Event, str], None], event_filter: Optional[Callable[[Event], bool]], field_id: Optional[str] = None) -> str:
""" Subscribes to events.
Subscribes to KAPPA Automate events.
Parameters
----------
event_handler:
Your event handler. When called, your handler will be provided with the :class:`kappa_sdk.Event` and subscription token.
event_filter:
Event filter that allows you to subscribe to single or multiple events,
based on condition.
field_id:
Identifier of the field that is used to receive field-level events.
Returns
-------
str:
Subscription token that can be used to unsubscribe your handler.
"""
if event_filter is None:
event_filter = default_event_filter
with self.__lock:
if field_id is not None:
self.__hub_connection.send('openField', [field_id])
subscription_token = str(uuid.uuid4())
subscriber = EventSubscriber(event_handler, event_filter, field_id)
self.__subscribers[subscription_token] = subscriber
events = list(self.__events_queue)
for event in events:
if subscriber.event_filter(event):
subscriber.event_handler(event, subscription_token)
return subscription_token
def subscribe_topic(self, event_handler: Callable[[Event, str], None], event_topic: str, event_filter: Optional[Callable[[Event], bool]] = None,
field_id: Optional[str] = None) -> str:
""" Subscribes to events by topic.
Subscribes to KAPPA Automate events based on event topic.
Parameters
----------
event_handler:
Your event handler. When called, your handler will be provided with the :class:`kappa_sdk.Event` and subscription token.
event_topic:
Event topic to subscribe to.
event_filter:
Optional event filter that allows to specify an additional subscription condition.
field_id:
Identifier of the field that is used to receive field-level events.
Returns
-------
str:
Subscription token that can be used to unsubscribe the handler.
"""
event_filter_lambda = event_filter if event_filter is not None else default_event_filter
return self.subscribe(event_handler, (lambda e: e.topic == event_topic and event_filter_lambda(e)), field_id)
def unsubscribe(self, subscription_token: str) -> None:
""" Unsubscribes the event handler.
Unsubscribes your event handler.
Parameters
----------
subscription_token:
Subscription token that was returned when calling "subscribe" or "subscribe_topic" methods.
"""
with self.__lock:
subscriber = self.__subscribers[subscription_token]
self.__subscribers.pop(subscription_token)
if subscriber.field_id is not None and all(x.field_id != subscriber.field_id for x in self.__subscribers.values()):
self.__hub_connection.send('closeField', [subscriber.field_id])
def __on_event(self, messages: List[Dict[str, Any]]) -> None:
for message in messages:
event = Event(message["eventId"],
message["objectId"],
message["objectType"],
message["eventType"],
message["topic"],
message["context"],
message["jsonContent"],
message["creationTimeUtc"],
message["transactionId"])
with self.__lock:
if len(self.__events_queue) >= self.__buffer_size:
self.__events_queue.popleft()
self.__events_queue.append(event)
logger.info("SignalR retrospective queue size: {}".format(len(self.__events_queue)))
# use a copy of list of subscribers because it can get modified (subscriber's event handler might have another subscription inside)
for (subscription_token, subscriber) in list(self.__subscribers.items()):
if subscriber.event_filter(event):
subscriber.event_handler(event, subscription_token)