import uuid
from typing import Callable, Deque, Dict, Optional, Any, List
from threading import Thread, Lock
import logging
from collections import deque
from signalrcore.hub_connection_builder import HubConnectionBuilder
from .event import Event
logger = logging.getLogger(__name__)
class EventSubscriber:
    def __init__(self, event_handler: Callable[[Event, str], None], event_filter: Callable[[Event], bool], field_id: Optional[str]):
        self.event_handler = event_handler
        self.event_filter = event_filter
        self.field_id = field_id
def default_event_filter(event: Event) -> bool:
    return True
[docs]
class EventBus(Thread):
    """ KAPPA Automate event bus.
    SignalR-based event bus, running in a separate thread.
    .. note:: Should not be instantiated directly.
    Parameters
    ----------
    service_address:
        The address of the Signal-R event bus.
    buffer_size:
        The size of the message buffer for retrospective subscription.
        The event bus will keep a given number of messages and will feed them to each new subscriber,
        so the subscriber wouldn't miss the events due to the eventual timing consistency.
    """
[docs]
    def __init__(self, service_address: str, access_token: Optional[str],  buffer_size: int = 1000, verify_ssl: bool = True):
        Thread.__init__(self)
        connection_options: Dict[Any, Any] = {"verify_ssl": verify_ssl,  "access_token_factory": lambda: access_token}
        self.__lock = Lock()
        self.__buffer_size = max(0, buffer_size)
        self.__hub_connection = HubConnectionBuilder().with_url(service_address, options=connection_options).configure_logging(
            logging.ERROR).with_automatic_reconnect({"type": "raw", "keep_alive_interval": 15, "reconnect_interval": 5, "max_attempts": 5}).build()
        self.__events_queue: Deque[Event] = deque()
        self.__subscribers: Dict[str, EventSubscriber] = dict()
        self.__hub_connection.on("notify", self.__on_event)
        self.setDaemon(True) 
    def run(self) -> None:
        """ Starts the event bus.
        Starts the thread's worker of this event bus.
        """
        self.__hub_connection.start()
    def stop(self) -> None:
        """ Stops the event bus.
        Stops the event bus and exits its thread.
        """
        with self.__lock:
            self.__hub_connection.stop()
            self.join()
[docs]
    def subscribe(self, event_handler: Callable[[Event, str], None], event_filter: Optional[Callable[[Event], bool]], field_id: Optional[str] = None) -> str:
        """ Subscribes to events.
        Subscribes to KAPPA Automate events.
        Parameters
        ----------
        event_handler:
            Your event handler. When called, your handler will be provided with the :class:`kappa_sdk.Event` and subscription token.
        event_filter:
            Event filter that allows you to subscribe to single or multiple events,
            based on condition.
        field_id:
            Identifier of the field that is used to receive field-level events.
        Returns
        -------
        str:
            Subscription token that can be used to unsubscribe your handler.
        """
        if event_filter is None:
            event_filter = default_event_filter
        with self.__lock:
            if field_id is not None:
                self.__hub_connection.send('openField', [field_id])
            subscription_token = str(uuid.uuid4())
            subscriber = EventSubscriber(event_handler, event_filter, field_id)
            self.__subscribers[subscription_token] = subscriber
            events = list(self.__events_queue)
            for event in events:
                if subscriber.event_filter(event):
                    subscriber.event_handler(event, subscription_token)
            return subscription_token 
[docs]
    def subscribe_topic(self, event_handler: Callable[[Event, str], None], event_topic: str, event_filter: Optional[Callable[[Event], bool]] = None,
                        field_id: Optional[str] = None) -> str:
        """ Subscribes to events by topic.
        Subscribes to KAPPA Automate events based on event topic.
        Parameters
        ----------
        event_handler:
            Your event handler. When called, your handler will be provided with the :class:`kappa_sdk.Event` and subscription token.
        event_topic:
            Event topic to subscribe to.
        event_filter:
            Optional event filter that allows to specify an additional subscription condition.
        field_id:
            Identifier of the field that is used to receive field-level events.
        Returns
        -------
        str:
            Subscription token that can be used to unsubscribe the handler.
        """
        event_filter_lambda = event_filter if event_filter is not None else default_event_filter
        return self.subscribe(event_handler, (lambda e: e.topic == event_topic and event_filter_lambda(e)), field_id) 
[docs]
    def unsubscribe(self, subscription_token: str) -> None:
        """ Unsubscribes the event handler.
        Unsubscribes your event handler.
        Parameters
        ----------
        subscription_token:
            Subscription token that was returned when calling "subscribe" or "subscribe_topic" methods.
        """
        with self.__lock:
            subscriber = self.__subscribers[subscription_token]
            self.__subscribers.pop(subscription_token)
            if subscriber.field_id is not None and all(x.field_id != subscriber.field_id for x in self.__subscribers.values()):
                self.__hub_connection.send('closeField', [subscriber.field_id]) 
    def __on_event(self, messages: List[Dict[str, Any]]) -> None:
        for message in messages:
            event = Event(message["eventId"],
                          message["objectId"],
                          message["objectType"],
                          message["eventType"],
                          message["topic"],
                          message["context"],
                          message["jsonContent"],
                          message["creationTimeUtc"],
                          message["transactionId"])
            with self.__lock:
                if len(self.__events_queue) >= self.__buffer_size:
                    self.__events_queue.popleft()
                self.__events_queue.append(event)
                logger.info("SignalR retrospective queue size: {}".format(len(self.__events_queue)))
            # use a copy of list of subscribers because it can get modified (subscriber's event handler might have another subscription inside)
            for (subscription_token, subscriber) in list(self.__subscribers.items()):
                if subscriber.event_filter(event):
                    subscriber.event_handler(event, subscription_token)