Source code for kappa_sdk.user_tasks.simulation.services

import datetime
from ..context import Context
from .event_publisher import EventPublisherSimulated
from ..scheduler import Scheduler
from .log import LogSimulated
from ..data_query_service import DataQueryService
from ..data_command_service import DataCommandService
from ..parameters import Parameters
from ..task_settings import TaskSettings
from ...connection import Connection
from ...field import Field
from ...well import Well
from ...well_group import WellGroup
from ...unit_converter import UnitConverter
from ...rest_api import RestAPI
from ...enumerator import TimeRangeEnum, ExtrapolationMethodEnum, InterpolationMethodEnum, DataEnumerator, DataEnumeratorSettings
from .simulated_user_task import SimulatedUserTask
from ...user_task import UserTask
from typing import Optional, List
from ...data import Data
from ..services import Services
from ..redefinition import Redefinition


[docs] class ServicesSimulated(Services):
[docs] def __init__(self, connection: Connection, context: Context, field: Field, well_group: Optional[WellGroup], well: Well, parameters: Parameters, name: str, output_data: List[Data], redefinition: Optional[Redefinition] = None): self.__connection: Connection = connection self.__event_publisher: EventPublisherSimulated = EventPublisherSimulated() self.__context: Context = context self.__redefinition: Optional[Redefinition] = redefinition self.__field: Field = field self.__well_group: Optional[WellGroup] = well_group self.__well: Optional[Well] = well if well_group is None else None self.__rest_api: RestAPI = self.__connection.rest_api self.__settings: TaskSettings = TaskSettings(TimeRangeEnum.total, ExtrapolationMethodEnum.zero, InterpolationMethodEnum.absent_value) self.__data_command_service: DataCommandService = DataCommandService(connection._Connection__cluster_apis.data_api, connection._Connection__connection_dto_converter.field_dto_converter.well_dto_converter.data_dto_converter) # type:ignore[attr-defined] self.__scheduler: Scheduler = Scheduler() self.__log: LogSimulated = LogSimulated() data_enumerator_settings = DataEnumeratorSettings(self.__settings.time_range, datetime.datetime.min, datetime.datetime.max, self.__settings.extrapolation_method, self.__settings.absent_data_treatment, 100000) self.__data_enumerator: DataEnumerator = DataEnumerator(data_enumerator_settings, self.__rest_api) self.__parameters: Parameters = parameters self.__data_query_service: DataQueryService = DataQueryService(connection._Connection__cluster_apis.data_api, connection.unit_converter) # type:ignore[attr-defined] self.__user_task: UserTask = SimulatedUserTask(well, name, output_data, self.__well_group)
@property def rest_api(self) -> RestAPI: """ Gets the rest_api of the :class:`Service`.""" return self.__connection.rest_api @property def unit_converter(self) -> UnitConverter: """ Gets the unit converter of the :class:`Service`.""" return self.__connection.unit_converter @property def context(self) -> Context: """ Gets the context of the :class:`Service`.""" return self.__context @property def redefinition(self) -> Optional[Redefinition]: """ Gets the redefine setting of the user task.""" return self.__redefinition @property def field(self) -> Field: """ Gets the field of the :class:`Service`.""" return self.__field @property def well(self) -> Well: """ Gets the well of the :class:`Service`.""" if self.__well is None: raise ValueError("This User Task is not associated with a Well.") return self.__well @property def well_group(self) -> WellGroup: """ Get the well group of the :class:`Service`.""" if self.__well_group is None: raise ValueError("This User Task is not associated with a Well Group.") return self.__well_group @property def event_publisher(self) -> EventPublisherSimulated: """ Gets the event_publisher of the :class:`Service`.""" return self.__event_publisher @property def data_enumerator(self) -> DataEnumerator: """ Gets the data_enumerator of the :class:`Service`.""" return self.__data_enumerator @property def data_command_service(self) -> DataCommandService: """ Gets the well of the :class:`Service`.""" return self.__data_command_service @property def log(self) -> LogSimulated: """ Gets the log of the :class:`Service`.""" return self.__log @property def scheduler(self) -> Scheduler: """ Gets the scheduler of the :class:`Service`.""" return self.__scheduler @property def parameters(self) -> Parameters: """ Gets the parameters of this :class:`Service`.""" return self.__parameters @property def data_query_service(self) -> DataQueryService: """ Gets the data query service of this :class:'Service'.""" return self.__data_query_service @property def user_task(self) -> UserTask: """ Gets the user task of this :class:'Service'.""" return self.__user_task @property def trigger(self) -> None: """ Gets the trigger associated of this user task. Returns None in simulation mode""" return None