Source code for kappa_sdk.user_tasks.simulation.user_task_instance

import yaml
import datetime
import os
from typing import Optional, List, Dict, Any
from pathlib import Path
import importlib.util
from ..context import Context
from .parameters_dictionary import ParametersDictionarySimulation
from .._parameters_group_enum import ParametersGroupEnum
from ..parameters import Parameters
from ..input_parameters import InputParameters
from ..output_parameters import OutputParameters
from .services import ServicesSimulated
from ...user_tasks import user_task_environment
from ...connection import Connection
from ...field import Field
from ...well import Well
from ...data import Data
from ..redefinition import Redefinition


[docs] class UserTaskInstance: """Simulated user task instance. Used to configure and run a user task in simulation mode. .. note:: Should not be instantiated directly. """ def __init__(self, connection: Connection, context: Context, redefinition: Optional[Redefinition] = None, python_path: Optional[str] = None): if python_path is None: import __main__ python_path = Path(__main__.__file__).resolve().stem.replace('_simulation', '') + '.py' self.__python_path: str = python_path self.__yaml_filename: str = str(Path(python_path).with_suffix('.yaml')) if not os.path.exists(self.__python_path): raise ValueError( "User task script cannot be found: {}. Please make sure you have named the simulation script properly and you are running it from the same folder.".format( self.__python_path)) if not os.path.exists(self.__yaml_filename): raise ValueError( "User task input definition cannot be found: {}. Please make sure you have named it properly and it is located in the same folder with the user task script.".format( self.__yaml_filename)) self.__connection: Connection = connection self.__context: Context = context self.__redefinition: Optional[Redefinition] = redefinition with open(self.__yaml_filename) as file: yaml_definition = file.read() self.__inputs = ParametersDictionarySimulation(yaml_definition, ParametersGroupEnum.input) self.__outputs = ParametersDictionarySimulation(yaml_definition, ParametersGroupEnum.output) if self.__context is not None and self.__context.field_id is not None: try: self.__field: Field = next(x for x in self.__connection.get_fields() if x.id == self.__context.field_id) except StopIteration: raise ValueError('Field {} cannot be found'.format(self.__context.field_id)) else: raise ValueError('Field id must be specified') if self.__context is not None and self.__context.field_id is not None and self.__context.well_id is not None: try: self.__well: Well = next(x for x in self.__field.wells if x.id == self.__context.well_id) except StopIteration: raise ValueError('Well {} cannot be found'.format(self.__context.well_id)) else: raise ValueError('Well id must be specified') def __create_output(self, name: str, data_type: str, first_x: Optional[datetime.datetime] = None, labels: Optional[List[str]] = None) -> Data: """ Creates a simulated task output if it doesn't exist, or cleans and returns an existing one. """ try: data = next(x for x in self.well.gauges if x.data_type == data_type and x.name.lower() == name.lower()) data.clean() return data except StopIteration: if first_x is None: # point data return self.well.create_data(name, data_type, labels) else: # step data return self.well.create_step_data(name, data_type, first_x, labels) def __bind_outputs(self) -> Optional[Dict[str, Dict[str, str]]]: """ Automatically creates and configures time-series task outputs. """ with open(self.__yaml_filename) as file: yaml_parameters = yaml.load(file, Loader=yaml.FullLoader) if yaml_parameters is None: return None try: outputs: Dict[str, Dict[str, Any]] = yaml_parameters["outputs"] except KeyError: return None if outputs is None: return None for parameter_name in outputs: if outputs[parameter_name]['type'].lower() == 'dataset': reference_date = self.field.reference_date if outputs[parameter_name].get('isByStep', False) else None labels = outputs[parameter_name]['labels'] if outputs[parameter_name].get('labels') is not None else None data = self.__create_output(parameter_name, outputs[parameter_name]['dataType'], reference_date, labels) self.__outputs[parameter_name] = data.vector_id elif outputs[parameter_name]['type'].lower() == 'wellproperty': alias = outputs[parameter_name]['alias'] new_container = next((x for x in self.__well.well_property_containers if x.name == self.__python_path.split("/")[-1]), None) if new_container is None: new_container = self.well.create_well_property_container(self.__python_path.split("/")[-1]) if len(new_container.get_well_property_values(alias).values) > 0: new_container.delete_well_property_values(alias) new_container.set_well_property_value(alias, None, None) self.__outputs[parameter_name] = new_container.id + '/' + alias return outputs def bind_input(self, input_name: str, data_type: Optional[str], label: Optional[str] = None) -> None: """ Automatically binds a user task input to data contained in an associated well using provided hints (data type, label). Raises an exception if data cannot be matched and bound to the input. Parameters ---------- data_type: A data-type to search for. If match by data-type is not found, it will try to match by label with the same value. label: A label to search for. """ data = self.__match_gauge(data_type, label) if data is None: raise ValueError("Cannot bind input parameter <{}>: no matching data of type {} found ".format(input_name, data_type)) self.inputs[input_name] = data.vector_id def __match_gauge(self, data_type: Optional[str], label: Optional[str] = None) -> Optional[Data]: """ Finds a first matching gauge by data-type and/or label. Parameters ---------- data_type: A data-type to search for. If match by data-type is not found, it will try to match by label with the same value. label: A label to search for. """ # Data data_list = [x for x in self.well.data] data = self.__match_gauge_internal(data_list, data_type, label) if data is not None: return data # Functions data_list = [x for x in self.well.functions] data = self.__match_gauge_internal(data_list, data_type, label) if data is not None: return data # Filters data_list = [x for x in [f for filter_list in [d.filters for d in self.well.data] for f in filter_list]] data = self.__match_gauge_internal(data_list, data_type, label) if data is not None: return data # User task outputs data_list = [x for x in [o for output_list in [u.outputs for u in self.well.user_tasks] for o in output_list]] data = self.__match_gauge_internal(data_list, data_type, label) if data is not None: return data # Shut-in data_list = [self.well.shut_in] if self.well.shut_in is not None else [] data = self.__match_gauge_internal(data_list, data_type, label) if data is not None: return data if data_type is not None: return self.__match_gauge(None, data_type) else: return None @staticmethod def __match_gauge_internal(data_list: List[Data], data_type: Optional[str], label: Optional[str] = None) -> Optional[Data]: """ Finds a first matching gauge by data-type and/or label. Parameters ---------- data_list: A list of data to search for matching data. data_type: A data-type to search for. label: A label to search for. """ try: if data_type is not None and label is not None: try: return next(x for x in data_list if x.data_type == data_type and x.is_reference and label in x.labels) except StopIteration: return next(x for x in data_list if x.data_type == data_type and label in x.labels) elif data_type is not None and label is None: return next(x for x in data_list if x.data_type == data_type and x.is_reference) elif label is not None: return next(x for x in data_list if data_type in x.labels) else: return None except StopIteration: return None def run(self) -> None: """ Runs the user task in simulation mode. Honors the schedule instructions given by user task (i.e. automatically re-runs the task upon completion if the scheduler was instructed to do so). """ parameters = Parameters(InputParameters(self.__inputs), OutputParameters(self.__outputs)) outputs = self.__bind_outputs() executed_once = False while True: user_task_environment.services = ServicesSimulated(self.__connection, self.__context, parameters, self.__python_path, outputs, self.__redefinition if not executed_once else None) user_task_environment.services.scheduler._Scheduler__reschedule_immediate = False # type:ignore[attr-defined] user_task_environment.services.scheduler._Scheduler__cron_schedule = None # type:ignore[attr-defined] module_name = 'user_task' spec = importlib.util.spec_from_file_location(module_name, self.__python_path) if spec: module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # type:ignore[union-attr] executed_once = True if not user_task_environment.services.scheduler.is_reschedule_immediate and user_task_environment.services.scheduler.cron_schedule is None: break @property def field(self) -> Field: """ Gets the field associated with the user task simulation. """ return self.__field @property def well(self) -> Well: """ Gets the well associated with the user task simulation. """ return self.__well @property def context(self) -> Context: """ Gets the simulated user task context. """ return self.__context @property def inputs(self) -> ParametersDictionarySimulation: """ Gets an editable input parameters container of the simulated user task. """ return self.__inputs @property def outputs(self) -> ParametersDictionarySimulation: """ Gets an editable output parameters container of the simulated user task. """ return self.__outputs