import yaml
import datetime
import os
from typing import Optional, List, Dict, Any
from pathlib import Path
import importlib.util
from ..context import Context
from .parameters_dictionary import ParametersDictionarySimulation
from .._parameters_group_enum import ParametersGroupEnum
from ..parameters import Parameters
from ..input_parameters import InputParameters
from ..output_parameters import OutputParameters
from .services import ServicesSimulated
from ...user_tasks import user_task_environment
from ...connection import Connection
from ...field import Field
from ...well import Well
from ...data import Data
from ..redefinition import Redefinition
[docs]
class UserTaskInstance:
"""Simulated user task instance.
Used to configure and run a user task in simulation mode.
.. note:: Should not be instantiated directly.
"""
def __init__(self, connection: Connection, context: Context, redefinition: Optional[Redefinition] = None, python_path: Optional[str] = None):
if python_path is None:
import __main__
python_path = Path(__main__.__file__).resolve().stem.replace('_simulation', '') + '.py'
self.__python_path: str = python_path
self.__yaml_filename: str = str(Path(python_path).with_suffix('.yaml'))
if not os.path.exists(self.__python_path):
raise ValueError(
"User task script cannot be found: {}. Please make sure you have named the simulation script properly and you are running it from the same folder.".format(
self.__python_path))
if not os.path.exists(self.__yaml_filename):
raise ValueError(
"User task input definition cannot be found: {}. Please make sure you have named it properly and it is located in the same folder with the user task script.".format(
self.__yaml_filename))
self.__connection: Connection = connection
self.__context: Context = context
self.__redefinition: Optional[Redefinition] = redefinition
with open(self.__yaml_filename) as file:
yaml_definition = file.read()
self.__inputs = ParametersDictionarySimulation(yaml_definition, ParametersGroupEnum.input)
self.__outputs = ParametersDictionarySimulation(yaml_definition, ParametersGroupEnum.output)
if self.__context is not None and self.__context.field_id is not None:
try:
self.__field: Field = next(x for x in self.__connection.get_fields() if x.id == self.__context.field_id)
except StopIteration:
raise ValueError('Field {} cannot be found'.format(self.__context.field_id))
else:
raise ValueError('Field id must be specified')
if self.__context is not None and self.__context.field_id is not None and self.__context.well_id is not None:
try:
self.__well: Well = next(x for x in self.__field.wells if x.id == self.__context.well_id)
except StopIteration:
raise ValueError('Well {} cannot be found'.format(self.__context.well_id))
else:
raise ValueError('Well id must be specified')
def __create_output(self, name: str, data_type: str, first_x: Optional[datetime.datetime] = None, labels: Optional[List[str]] = None) -> Data:
""" Creates a simulated task output if it doesn't exist, or cleans and returns an existing one.
"""
try:
data = next(x for x in self.well.gauges if x.data_type == data_type and x.name.lower() == name.lower())
data.clean()
return data
except StopIteration:
if first_x is None:
# point data
return self.well.create_data(name, data_type, labels)
else:
# step data
return self.well.create_step_data(name, data_type, first_x, labels)
def __bind_outputs(self) -> Optional[Dict[str, Dict[str, str]]]:
""" Automatically creates and configures time-series task outputs.
"""
with open(self.__yaml_filename) as file:
yaml_parameters = yaml.load(file, Loader=yaml.FullLoader)
if yaml_parameters is None:
return None
try:
outputs: Dict[str, Dict[str, Any]] = yaml_parameters["outputs"]
except KeyError:
return None
if outputs is None:
return None
for parameter_name in outputs:
if outputs[parameter_name]['type'].lower() == 'dataset':
reference_date = self.field.reference_date if outputs[parameter_name].get('isByStep', False) else None
labels = outputs[parameter_name]['labels'] if outputs[parameter_name].get('labels') is not None else None
data = self.__create_output(parameter_name, outputs[parameter_name]['dataType'], reference_date, labels)
self.__outputs[parameter_name] = data.vector_id
elif outputs[parameter_name]['type'].lower() == 'wellproperty':
alias = outputs[parameter_name]['alias']
new_container = next((x for x in self.__well.well_property_containers if x.name == self.__python_path.split("/")[-1]), None)
if new_container is None:
new_container = self.well.create_well_property_container(self.__python_path.split("/")[-1])
if len(new_container.get_well_property_values(alias).values) > 0:
new_container.delete_well_property_values(alias)
new_container.set_well_property_value(alias, None, None)
self.__outputs[parameter_name] = new_container.id + '/' + alias
return outputs
def bind_input(self, input_name: str, data_type: Optional[str], label: Optional[str] = None) -> None:
"""
Automatically binds a user task input to data contained in an associated well using provided hints (data type, label).
Raises an exception if data cannot be matched and bound to the input.
Parameters
----------
data_type:
A data-type to search for. If match by data-type is not found, it will try to match by label with the same value.
label:
A label to search for.
"""
data = self.__match_gauge(data_type, label)
if data is None:
raise ValueError("Cannot bind input parameter <{}>: no matching data of type {} found ".format(input_name, data_type))
self.inputs[input_name] = data.vector_id
def __match_gauge(self, data_type: Optional[str], label: Optional[str] = None) -> Optional[Data]:
"""
Finds a first matching gauge by data-type and/or label.
Parameters
----------
data_type:
A data-type to search for. If match by data-type is not found, it will try to match by label with the same value.
label:
A label to search for.
"""
# Data
data_list = [x for x in self.well.data]
data = self.__match_gauge_internal(data_list, data_type, label)
if data is not None:
return data
# Functions
data_list = [x for x in self.well.functions]
data = self.__match_gauge_internal(data_list, data_type, label)
if data is not None:
return data
# Filters
data_list = [x for x in [f for filter_list in [d.filters for d in self.well.data] for f in filter_list]]
data = self.__match_gauge_internal(data_list, data_type, label)
if data is not None:
return data
# User task outputs
data_list = [x for x in [o for output_list in [u.outputs for u in self.well.user_tasks] for o in output_list]]
data = self.__match_gauge_internal(data_list, data_type, label)
if data is not None:
return data
# Shut-in
data_list = [self.well.shut_in] if self.well.shut_in is not None else []
data = self.__match_gauge_internal(data_list, data_type, label)
if data is not None:
return data
if data_type is not None:
return self.__match_gauge(None, data_type)
else:
return None
@staticmethod
def __match_gauge_internal(data_list: List[Data], data_type: Optional[str], label: Optional[str] = None) -> Optional[Data]:
"""
Finds a first matching gauge by data-type and/or label.
Parameters
----------
data_list:
A list of data to search for matching data.
data_type:
A data-type to search for.
label:
A label to search for.
"""
try:
if data_type is not None and label is not None:
try:
return next(x for x in data_list if x.data_type == data_type and x.is_reference and label in x.labels)
except StopIteration:
return next(x for x in data_list if x.data_type == data_type and label in x.labels)
elif data_type is not None and label is None:
return next(x for x in data_list if x.data_type == data_type and x.is_reference)
elif label is not None:
return next(x for x in data_list if data_type in x.labels)
else:
return None
except StopIteration:
return None
def run(self) -> None:
"""
Runs the user task in simulation mode.
Honors the schedule instructions given by user task (i.e. automatically re-runs the task upon completion if the scheduler was instructed to do so).
"""
parameters = Parameters(InputParameters(self.__inputs), OutputParameters(self.__outputs))
outputs = self.__bind_outputs()
executed_once = False
while True:
user_task_environment.services = ServicesSimulated(self.__connection, self.__context, parameters, self.__python_path, outputs, self.__redefinition if not executed_once else None)
user_task_environment.services.scheduler._Scheduler__reschedule_immediate = False # type:ignore[attr-defined]
user_task_environment.services.scheduler._Scheduler__cron_schedule = None # type:ignore[attr-defined]
module_name = 'user_task'
spec = importlib.util.spec_from_file_location(module_name, self.__python_path)
if spec:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module) # type:ignore[union-attr]
executed_once = True
if not user_task_environment.services.scheduler.is_reschedule_immediate and user_task_environment.services.scheduler.cron_schedule is None:
break
@property
def field(self) -> Field:
""" Gets the field associated with the user task simulation.
"""
return self.__field
@property
def well(self) -> Well:
""" Gets the well associated with the user task simulation.
"""
return self.__well
@property
def context(self) -> Context:
""" Gets the simulated user task context.
"""
return self.__context
@property
def inputs(self) -> ParametersDictionarySimulation:
""" Gets an editable input parameters container of the simulated user task.
"""
return self.__inputs
@property
def outputs(self) -> ParametersDictionarySimulation:
""" Gets an editable output parameters container of the simulated user task.
"""
return self.__outputs