Source code for kappa_sdk.user_tasks.simulation.user_task_instance

import yaml
import datetime
import os
from typing import Optional, List, Dict, Any
from pathlib import Path
import importlib.util
from ..context import Context
from .parameters_dictionary import ParametersDictionarySimulation
from .._parameters_group_enum import ParametersGroupEnum
from ..parameters import Parameters
from ..input_parameters import InputParameters
from ..output_parameters import OutputParameters
from .services import ServicesSimulated
from ...user_tasks import user_task_environment
from ...connection import Connection
from ...field import Field
from ...well import Well
from ...well_group import WellGroup
from ...data import Data
from ..redefinition_mode_enum import RedefinitionModeEnum
from ..redefinition import Redefinition


[docs] class UserTaskInstance: """Simulated user task instance. Used to configure and run a user task in simulation mode. .. note:: Should not be instantiated directly. """
[docs] def __init__(self, connection: Connection, context: Context, redefinition: Optional[Redefinition] = None, python_path: Optional[str] = None): if python_path is None: import __main__ python_path = Path(__main__.__file__).resolve().stem.replace('_simulation', '') + '.py' self.__python_path: str = python_path self.__yaml_filename: str = str(Path(python_path).with_suffix('.yaml')) if not os.path.exists(self.__python_path): raise ValueError( "User task script cannot be found: {}. Please make sure you have named the simulation script properly and you are running it from the same folder.".format( self.__python_path)) if not os.path.exists(self.__yaml_filename): raise ValueError( "User task input definition cannot be found: {}. Please make sure you have named it properly and it is located in the same folder with the user task script.".format( self.__yaml_filename)) if context.well_id is not None and context.well_group_id is not None: raise ValueError('Well id and Well Group id cannot be specified simultaneously in the user task context') self.__connection: Connection = connection self.__context: Context = context self.__redefinition: Optional[Redefinition] = redefinition with open(self.__yaml_filename) as file: yaml_definition = file.read() self.__inputs = ParametersDictionarySimulation(yaml_definition, ParametersGroupEnum.input) self.__outputs = ParametersDictionarySimulation(yaml_definition, ParametersGroupEnum.output) if self.__context.field_id is not None: try: self.__field: Field = next(x for x in self.__connection.get_fields() if x.id == self.__context.field_id) except StopIteration: raise ValueError('Field {} cannot be found'.format(self.__context.field_id)) else: raise ValueError('Field id must be specified in the user task context') if self.__context.well_id is not None: try: self.__well: Well = next(x for x in self.__field.wells if x.id == self.__context.well_id) self.__well_group: Optional[WellGroup] = None except StopIteration: raise ValueError('Well {} cannot be found'.format(self.__context.well_id)) elif self.__context.well_group_id is not None: try: self.__well_group = next(x for x in self.__field.well_groups if x.id == self.__context.well_group_id) well_name = f"Simulated {self.__python_path.split('.')[0]}" well = next((x for x in self.__well_group.wells if x.name == well_name), None) self.__well = self.__well_group.create_well(well_name, labels=["Simulated User Task"]) if well is None else well except StopIteration: raise ValueError('Well group {} cannot be found'.format(self.__context.well_group_id)) else: raise ValueError('Well id or well group id must be specified in the user task context')
def __create_output(self, name: str, data_type: str, first_x: Optional[datetime.datetime] = None, labels: Optional[List[str]] = None) -> Data: """ Creates a simulated task output if it doesn't exist, or cleans and returns an existing one. """ last_part = self.__python_path.replace('\\', '/').split('/')[-1] data_folder_name = f"Simulated {last_part.split('.')[0]} results" data_folder = self.__well.create_data_folder(data_folder_name) try: data = next(x for x in data_folder.data if x.data_type == data_type and x.name.lower() == name.lower()) if self.__redefinition is None or (self.__redefinition is not None and self.__redefinition.mode == RedefinitionModeEnum.start): data.clean() return data except StopIteration: if first_x is None: # point data return self.well.create_data(name, data_type, labels, data_folder.id) else: # step data return self.well.create_step_data(name, data_type, first_x, labels, data_folder.id) def __bind_outputs(self) -> List[Data]: """ Automatically creates and configures time-series task outputs. """ with open(self.__yaml_filename) as file: yaml_parameters = yaml.load(file, Loader=yaml.FullLoader) if yaml_parameters is None: return list() try: outputs: Dict[str, Dict[str, Any]] = yaml_parameters["outputs"] except KeyError: return list() if outputs is None: return list() output_data: List[Data] = list() for parameter_name in outputs: if outputs[parameter_name]['type'].lower() == 'dataset': reference_date = self.field.reference_date if outputs[parameter_name].get('isByStep', False) else None labels = outputs[parameter_name]['labels'] if outputs[parameter_name].get('labels') is not None else None data = self.__create_output(parameter_name, outputs[parameter_name]['dataType'], reference_date, labels) self.__outputs[parameter_name] = data.vector_id output_data.append(data) elif outputs[parameter_name]['type'].lower() == 'wellproperty': alias = outputs[parameter_name]['alias'] new_container = next((x for x in self.__well.well_property_containers if x.name == self.__python_path.split("/")[-1]), None) if new_container is None: new_container = self.well.create_well_property_container(self.__python_path.split("/")[-1]) if len(new_container.get_well_property_values(alias).values) > 0: new_container.delete_well_property_values(alias) new_container.set_well_property_value(alias, None, None) self.__outputs[parameter_name] = new_container.id + '/' + alias return output_data
[docs] def bind_input(self, input_name: str, data_type: Optional[str], label: Optional[str] = None) -> None: """ Automatically binds a user task input to data contained in an associated well using provided hints (data type, label). Raises an exception if data cannot be matched and bound to the input. Parameters ---------- input_name: The name of the input parameter to bind. data_type: A data-type to search for. If match by data-type is not found, it will try to match by label with the same value. label: A label to search for. """ available_data = [y for x in self.__well_group.wells for y in x.data] if self.__well_group is not None else self.__well.data data = self.__match_gauge_internal(available_data, data_type, label) if data is None: raise ValueError("Cannot bind input parameter <{}>: no matching data of type {} found ".format(input_name, data_type)) self.inputs[input_name] = data.vector_id
@staticmethod def __match_gauge_internal(data_list: List[Data], data_type: Optional[str], label: Optional[str] = None) -> Optional[Data]: """ Finds a first matching gauge by data-type and/or label. Parameters ---------- data_list: A list of data to search for matching data. data_type: A data-type to search for. label: A label to search for. """ try: if data_type is not None and label is not None: try: return next(x for x in data_list if x.data_type == data_type and x.is_reference and label in x.labels) except StopIteration: return next(x for x in data_list if x.data_type == data_type and label in x.labels) elif data_type is not None and label is None: return next(x for x in data_list if x.data_type == data_type and x.is_reference) elif label is not None: return next(x for x in data_list if data_type in x.labels) else: return None except StopIteration: return None def run(self) -> None: """ Runs the user task in simulation mode. Honors the schedule instructions given by user task (i.e. automatically re-runs the task upon completion if the scheduler was instructed to do so). """ parameters = Parameters(InputParameters(self.__inputs), OutputParameters(self.__outputs)) output_data = self.__bind_outputs() while True: user_task_environment.services = ServicesSimulated(self.__connection, self.__context, self.__field, self.__well_group, self.__well, parameters, self.__python_path, output_data, self.__redefinition) user_task_environment.services.scheduler._Scheduler__reschedule_immediate = False # type:ignore[attr-defined] user_task_environment.services.scheduler._Scheduler__cron_schedule = None # type:ignore[attr-defined] module_name = 'user_task' spec = importlib.util.spec_from_file_location(module_name, self.__python_path) if spec: module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # type:ignore[union-attr] self.__redefinition = None if not user_task_environment.services.scheduler.is_reschedule_immediate and user_task_environment.services.scheduler.cron_schedule is None: break @property def field(self) -> Field: """ Gets the field associated with the user task simulation. """ return self.__field @property def well(self) -> Well: """ Gets the well associated with the user task simulation. """ return self.__well @property def context(self) -> Context: """ Gets the simulated user task context. """ return self.__context @property def inputs(self) -> ParametersDictionarySimulation: """ Gets an editable input parameters container of the simulated user task. """ return self.__inputs @property def outputs(self) -> ParametersDictionarySimulation: """ Gets an editable output parameters container of the simulated user task. """ return self.__outputs