User task programming patterns#

There are some common patterns which user task code employs. Below we offer a few code fragments in Python as demonstration. Often you will find these patterns in combination as well. See the tutorials for examples of the patterns below.

Processing only new data#

It is often the case that a user task will only wish to perform calculations on new data. The following code fragment illustrates how to save the last date when calculations were performed and read only new data beyond that date.

from kappa_sdk.user_tasks.user_task_environment import services
from datetime import datetime

last_date_string = services.context['last_date_read']  # <== Get the string representation of last date of data which has been processed.  None if key is not in context.
last_date = datetime.fromisoformat(last_date_string) if last_date_string is not None else None  # <== Convert date to datetime object

rate_vector_id = str(services.parameters.input['Rate'])  # <== This is how we obtain the ID of the input rate vector

rate_data = next(x for x in services.well.data if x.vector_id == rate_vector_id)

if last_date is None:
    rate_vector = rate_data.read()
else:
    rate_vector = rate_data.read(from_time=last_date)

dates = rate_vector.dates

#
#  Do user task stuff ....
#

services.context['last_date_read'] = dates[-1].isoformat()

Delayed processing of new data#

Remember that a user task is triggered whenever one of its inputs is updated (datasets or well properties). One may wish to execute a user task only when a sufficient amount of data has been accumulated. For example, if a user task performs a production forecast, we may wish to only perform that forecast on a monthly basis.

In the code fragment below, the user task calculation will take place only when one week of data has accumulated.

from kappa_sdk.user_tasks.user_task_environment import services
from datetime import datetime

rate_vector_id = str(services.parameters.input['Rate'])  # <== This is how we obtain the ID of the input rate vector
rate_data = next(x for x in services.well.data if x.vector_id == rate_vector_id)

last_date_string = services.context['last_date_read']  # <== Get the string representation of last date of data which has been processed.  None if key is not in context.
last_date = datetime.fromisoformat(last_date_string) if last_date_string is not None else None  # <== Convert date to datetime object

if last_date is not None and rate_data.last_x is not None:
    ONE_WEEK = 7 * 24  # Time is in hours
    accumulated_time_of_rate_date = (rate_data.last_x - last_date).total_seconds() / 3600.0  # Time is in hours
    if accumulated_time_of_rate_date < ONE_WEEK:
        gauge_has_enough_data = False  # <== There is not enough data
    else:
        gauge_has_enough_data = True  # <== There is enough data
else:
    gauge_has_enough_data = False

if gauge_has_enough_data:
    ##################################################################
    #  Only do processing if there is enough data
    ##################################################################
    if last_date is None:
        rate_vector = rate_data.read()
    else:
        rate_vector = rate_data.read(from_time=last_date)

    dates = rate_vector.dates

    #
    #  Do user task stuff ....
    #

    services.context['last_date_read'] = dates[-1].isoformat()

Reading multiple input datasets using kappa_sdk.DataEnumerator#

In Tutorial 1 and the examples above, the method kappa_sdk.Data.read() is used to read data. An alternative way of reading data is using kappa_sdk.DataEnumerator. When reading a single dataset, this class performs in the same manner as kappa_sdk.Data.read(). However, it becomes very useful when it is employed to read multiple datasets, as we will see in the Tutorial 2. The code fragment contains a kappa_sdk.DataEnumeratorSettings instance that controls the output (which dates to output, how to interpolate, etc.).

Listing 1 Example use of the data_enumerator class#
from kappa_sdk.user_tasks.user_task_environment import services
import kappa_sdk as ka

data_enumerator_settings = services.data_enumerator.get_default_settings()  # Get settings
ref_vector_id = str(services.parameters.input["OilRate"])
data_enumerator_settings.reference_vector_id = ref_vector_id  # Set reference vector
data_enumerator_settings.extrapolation_method = ka.ExtrapolationMethodEnum.absent_value  # Do not extrapolate
data_enumerator_settings.absent_data_treatment = ka.InterpolationMethodEnum.linear_interpolation  # Use linear interpolation
data_enumerator_settings.time_range = ka.TimeRangeEnum.common  # Use only time range which all datasets have in common

#  Create list of inputs
input_list = [services.parameters.input["Pressure"], services.parameters.input["OilRate"], services.parameters.input["GasRate"], services.parameters.input["WaterRate"]]
data = services.data_enumerator.to_enumerable(input_list, data_enumerator_settings)
dates, oil_rates, gas_rates, water_rates, pressures = (list(), list(), list(), list(), list())
for point in data:
    dates.append(point.date)
    pressures.append(point.values[0])
    oil_rates.append(point.values[1])
    gas_rates.append(point.values[2])
    water_rates.append(point.values[3])

#
#  Process data in user task ..
#

Use of Scheduler class to process user task logic in small “chunks”#

The key concept in this code snippet is the use of kappa_sdk.user_tasks.Scheduler.reschedule_immediate(). This method queues the user task in the user task processing background. The result will be that after a few seconds, the user task will be called again. This approach allows the user task workflow to be divided up into smaller parts, allowing the user task workflow engine to schedule other processes and free up resources for other tasks. The example below simply reads the rate data 100 values at a time.

from kappa_sdk.user_tasks.user_task_environment import services
from datetime import datetime

last_date_string = services.context['last_date_read']  # <== Get the string representation of last date of data which has been processed.  None if key is not in context.
last_date = datetime.fromisoformat(last_date_string) if last_date_string is not None else None  # <== Convert date to datetime object

CHUNK_SIZE = 100

rate_vector_id = str(services.parameters.input['Rate'])  # <== This is how we obtain the ID of the input rate vector
rate_data = next(x for x in services.well.data if x.vector_id == rate_vector_id)
if last_date is None:
    rate_vector = rate_data.read(count=CHUNK_SIZE)
else:
    rate_vector = rate_data.read(from_time=last_date, count=CHUNK_SIZE)

dates = rate_vector.dates

#
#  Process data in user task ..
#

services.context['last_date_read'] = dates[-1].isoformat()

if len(dates) == CHUNK_SIZE:  # Assume that there is more data available to read if this is true
    services.scheduler.reschedule_immediate()  # <--  Tells the user task scheduler to call this task again