Source code for kappa_sdk.user_tasks.simulation.services
import datetime
from ..context import Context
from .event_publisher import EventPublisherSimulated
from ..scheduler import Scheduler
from .log import LogSimulated
from ..data_query_service import DataQueryService
from ..data_command_service import DataCommandService
from ..parameters import Parameters
from ..task_settings import TaskSettings
from ...connection import Connection
from ...field import Field
from ...well import Well
from ...well_group import WellGroup
from ...unit_converter import UnitConverter
from ...rest_api import RestAPI
from ...enumerator import TimeRangeEnum, ExtrapolationMethodEnum, InterpolationMethodEnum, DataEnumerator, DataEnumeratorSettings
from .simulated_user_task import SimulatedUserTask
from ...user_task import UserTask
from typing import Optional, List
from ...data import Data
from ..services import Services
from ..redefinition import Redefinition
[docs]
class ServicesSimulated(Services):
[docs]
def __init__(self, connection: Connection, context: Context, field: Field, well_group: Optional[WellGroup], well: Well, parameters: Parameters, name: str, output_data: List[Data], redefinition: Optional[Redefinition] = None):
self.__connection: Connection = connection
self.__event_publisher: EventPublisherSimulated = EventPublisherSimulated()
self.__context: Context = context
self.__redefinition: Optional[Redefinition] = redefinition
self.__field: Field = field
self.__well_group: Optional[WellGroup] = well_group
self.__well: Optional[Well] = well if well_group is None else None
self.__rest_api: RestAPI = self.__connection.rest_api
self.__settings: TaskSettings = TaskSettings(TimeRangeEnum.total, ExtrapolationMethodEnum.zero, InterpolationMethodEnum.absent_value)
self.__data_command_service: DataCommandService = DataCommandService(connection._Connection__cluster_apis.data_api, connection._Connection__connection_dto_converter.field_dto_converter.well_dto_converter.data_dto_converter) # type:ignore[attr-defined]
self.__scheduler: Scheduler = Scheduler()
self.__log: LogSimulated = LogSimulated()
data_enumerator_settings = DataEnumeratorSettings(self.__settings.time_range,
datetime.datetime.min,
datetime.datetime.max,
self.__settings.extrapolation_method,
self.__settings.absent_data_treatment,
100000)
self.__data_enumerator: DataEnumerator = DataEnumerator(data_enumerator_settings, self.__rest_api)
self.__parameters: Parameters = parameters
self.__data_query_service: DataQueryService = DataQueryService(connection._Connection__cluster_apis.data_api, connection.unit_converter) # type:ignore[attr-defined]
self.__user_task: UserTask = SimulatedUserTask(well, name, output_data, self.__well_group)
@property
def rest_api(self) -> RestAPI:
""" Gets the rest_api of the :class:`Service`."""
return self.__connection.rest_api
@property
def unit_converter(self) -> UnitConverter:
""" Gets the unit converter of the :class:`Service`."""
return self.__connection.unit_converter
@property
def context(self) -> Context:
""" Gets the context of the :class:`Service`."""
return self.__context
@property
def redefinition(self) -> Optional[Redefinition]:
""" Gets the redefine setting of the user task."""
return self.__redefinition
@property
def field(self) -> Field:
""" Gets the field of the :class:`Service`."""
return self.__field
@property
def well(self) -> Well:
""" Gets the well of the :class:`Service`."""
if self.__well is None:
raise ValueError("This User Task is not associated with a Well.")
return self.__well
@property
def well_group(self) -> WellGroup:
""" Get the well group of the :class:`Service`."""
if self.__well_group is None:
raise ValueError("This User Task is not associated with a Well Group.")
return self.__well_group
@property
def event_publisher(self) -> EventPublisherSimulated:
""" Gets the event_publisher of the :class:`Service`."""
return self.__event_publisher
@property
def data_enumerator(self) -> DataEnumerator:
""" Gets the data_enumerator of the :class:`Service`."""
return self.__data_enumerator
@property
def data_command_service(self) -> DataCommandService:
""" Gets the well of the :class:`Service`."""
return self.__data_command_service
@property
def log(self) -> LogSimulated:
""" Gets the log of the :class:`Service`."""
return self.__log
@property
def scheduler(self) -> Scheduler:
""" Gets the scheduler of the :class:`Service`."""
return self.__scheduler
@property
def parameters(self) -> Parameters:
""" Gets the parameters of this :class:`Service`."""
return self.__parameters
@property
def data_query_service(self) -> DataQueryService:
""" Gets the data query service of this :class:'Service'."""
return self.__data_query_service
@property
def user_task(self) -> UserTask:
""" Gets the user task of this :class:'Service'."""
return self.__user_task
@property
def trigger(self) -> None:
""" Gets the trigger associated of this user task.
Returns None in simulation mode"""
return None