Source code for kappa_sdk.user_tasks.simulation.event_publisher

from datetime import datetime
from ..event_publisher import EventPublisher


[docs] class EventPublisherSimulated(EventPublisher): """Event publishing simulation. .. note:: Should not be instantiated directly. """ def publish(self, from_date: datetime, to_date: datetime, value: str) -> None: """Simulates an event publishing. Simulates publishing of an event associated with the currently running User Task by doing.. nothing (except for parameter type-checking). Parameters ---------- from_date: The date when the event started. to_date: The date when the event has finished. value: Any additional information that should be published with this User Task event. Raises ------ TypeError If parameters have unexpected types. """ if not isinstance(from_date, datetime) or not isinstance(to_date, datetime): raise TypeError('"publish" expects Python parameters "from_date" and "to_date" of "datetime" type, but an instance of a different type was given.') if not isinstance(value, str): raise TypeError('"publish" expects a Python paramete "value" of "str" type, but an instance of a different type was given.') pass