Source code for kappa_sdk.vector_extensions

from datetime import datetime
from typing import Optional, List, Union
from .vector import Vector
from .enumerator import DataEnumerator, DataEnumeratorSettings, TimeRangeEnum, InterpolationMethodEnum
import bisect
from .well import Well
from .document import Document
from .user_task import UserTask


[docs] def get_value_at_date_from_vector(vector: Vector, targeted_date: datetime) -> float: """ Get the value at a specific date from a vector Parameters ---------- vector: class:'Vector' object targeted_date: the targeted date Returns ------- float: the closest value at a specific date """ if len(vector.dates) == 0: raise ValueError("You provided an empty vector") if targeted_date == vector.dates[0]: return vector.values[0] closest_date_index = None for i, date in enumerate(vector.dates): if date == targeted_date: closest_date_index = i break if closest_date_index is None: raise ValueError(f"Targeted date {targeted_date} is not in the vector") return vector.values[closest_date_index]
[docs] def get_closest_value_at_date_from_vector(vector: Vector, targeted_date: datetime) -> float: """ Get the closest value at a specific date from a vector using highly optimized approach Parameters ---------- vector: class:'Vector' object targeted_date: the targeted date Returns ------- float or None: the closest value at a specific date, or None if vector is empty """ if len(vector.dates) == 0: raise ValueError("You provided an empty vector") if targeted_date <= vector.dates[0]: return vector.values[0] if targeted_date >= vector.dates[-1]: return vector.values[-1] insertion_point = bisect.bisect_right(vector.dates, targeted_date) return vector.values[insertion_point - 1]
def calculate_cumulative_from_step_data_vector(vector: Vector) -> float: """ Calculate the cumulative production from a step data vector Parameters ---------- vector: class:'Vector' object Returns ------- float: cumulative """ if vector.first_x is None: raise ValueError("Missing rate first x") volume = 0.0 last_date_read = vector.first_x for date, rate in zip(vector.dates, vector.values): if last_date_read is not None: volume += rate * (date - last_date_read).total_seconds() last_date_read = date return volume def calculate_cumulative_from_point_vector(vector: Vector) -> float: """ Calculate the cumulative from a point vector Parameters ---------- vector: class:'Vector' object Returns ------- float: cumulative """ if len(vector.dates) == 0: raise ValueError("You provided an empty vector") if len(vector.dates) == 1: return 0.0 volume = 0.0 for i in range(len(vector.dates) - 1): date_current = vector.dates[i] date_next = vector.dates[i + 1] rate_current = vector.values[i] rate_next = vector.values[i + 1] time_delta_seconds = (date_next - date_current).total_seconds() volume += (rate_current + rate_next) / 2.0 * time_delta_seconds return volume
[docs] def remove_nan_from_vector(vector: Vector, value: Optional[float] = None) -> Vector: """ Removes NaN or None entries from the input vector. The method creates a new vector based on the given vector while replacing NaN or None values with the supplied replacement value if specified. If a replacement value is not provided, only valid (non-NaN or non-None) data points are retained in the resulting vector. Parameters ---------- vector : Vector The input vector containing date-value pairs. The `dates` attribute represents the timestamps, and the `values` attribute contains associated values. value : float, optional The value that replaces NaN or None entries in the input vector. If not provided, the resulting vector only includes valid entries from the input vector. Returns ------- Vector A new vector where NaN or None values are replaced with the given `value` (if specified) or are omitted. The output retains the structure of the input vector, preserving the order of valid data points while applying transformations for invalid entries (if needed). """ clean_dates, clean_values = [], [] for x, y in zip(vector.dates, vector.values): if (str(y).lower() == 'nan' or y is None) and value is not None: clean_dates.append(x) clean_values.append(value) if str(y).lower() != 'nan' and y is not None: clean_dates.append(x) clean_values.append(y) return Vector(clean_dates, clean_values, vector.first_x)
[docs] def apply_factor(vector: Vector, value: float) -> Vector: """ Applies a scaling factor to the values of a vector. This function takes a `Vector` object and a `value`, scaling each element in the vector's values by the given value. It preserves the original dates and first_x attribute from the input vector object while returning the adjusted vector. Parameters ---------- vector : Vector The vector object whose values are to be scaled. It contains dates and a list of numerical values. value : float The scaling factor by which to multiply each value in the vector. Returns ------- Vector A new `Vector` object with the adjusted values and the original dates and first_x attributes preserved. """ dates = vector.dates.copy() values = [x * value for x in vector.values] return Vector(dates, values, vector.first_x)
[docs] def trunc_vector(vector: Vector, min_elapsed_time: float, max_elapsed_time: Optional[float] = None) -> Vector: """ Truncates the provided vector based on the specified elapsed time constraints. This function returns a new vector containing only the portion of the original vector's data that falls within the range specified by `min_elapsed_time` and `max_elapsed_time`. The function identifies the indices corresponding to these elapsed time constraints, and slices the original vector's dates and values arrays accordingly. If `max_elapsed_time` is not provided, all entries with elapsed time greater than `min_elapsed_time` are included. Parameters ---------- vector : Vector The input vector containing time series data with attributes `dates`, `values`, and `elapsed_times`. min_elapsed_time : float The minimum elapsed time for truncating the vector. Only data with elapsed time greater than this value will be included in the output. max_elapsed_time : float, optional The maximum elapsed time for truncating the vector. If provided, only data with elapsed time within the range of `min_elapsed_time` and `max_elapsed_time` will be included in the output. If not provided, all data beyond `min_elapsed_time` will be included. Returns ------- Vector A new `Vector` instance containing truncated `dates`, `values`, and their corresponding elapsed times within the specified range. """ index_min_elapsed_time = next((i for i, elapsed_time in enumerate(vector.elapsed_times) if elapsed_time > min_elapsed_time), len(vector.elapsed_times)) index_max_elapsed_time = next((i for i, elapsed_time in enumerate(vector.elapsed_times) if elapsed_time > max_elapsed_time), len(vector.elapsed_times)) if max_elapsed_time is not None else len(vector.elapsed_times) return Vector(vector.dates.copy()[index_min_elapsed_time:index_max_elapsed_time], vector.values.copy()[index_min_elapsed_time:index_max_elapsed_time], vector.first_x)
[docs] def shift_vector(vector: Vector, value: float) -> Vector: """ Shift a vector by applying a scaling factor and modifying elapsed times. The function adjusts the provided vector by applying a factor to it and then modifying its elapsed times based on the scaling value. The modified vector is then returned. Parameters ---------- vector : Vector The input vector to be shifted. value : float The scaling factor to be applied to the vector. Returns ------- Vector The shifted vector with modified elapsed times. """ vector_shifted = apply_factor(vector, value) vector_shifted.set_elapsed_times([x * value for x in vector_shifted.elapsed_times], vector_shifted.first_x) return vector_shifted
[docs] def interpolate_vectors(vectors: List[Vector], reference_vector: Optional[Vector] = None, data_enumerator: Optional[DataEnumerator] = None, interpolation_method: Optional[InterpolationMethodEnum] = None) -> List[Vector]: """ Interpolate a list of vectors based on a specified interpolation method, an optional reference vector, and an optional data enumerator. This function interpolates a given list of vectors according to a chosen interpolation method. Optionally, a reference vector can be added to supply additional context or computation. If a reference vector is included, the interpolation will be performed over the total time range; otherwise, it will use the common time range. A data enumerator, if provided, is utilized for vector enumeration; otherwise, a new one is created using the settings derived from the given inputs. The interpolated vectors are then returned as a list. Parameters ---------- vectors : List[Vector] A list of vectors to be interpolated. reference_vector : Optional[Vector], optional An optional reference vector added to the list of vectors for context or interpolation purpose. If provided, it influences how the time range is defined for interpolation. data_enumerator : Optional[DataEnumerator], optional An optional custom enumerator used for vector enumeration. If not provided, a new enumerator instance will be created based on the interpolation settings. interpolation_method : Optional[InterpolationMethodEnum], optional The method of interpolation applied to the vectors. Defaults to linear interpolation if unspecified. Returns ------- List[Vector] A list of interpolated vectors created based on the inputs and the specified interpolation method. """ if interpolation_method is None: interpolation_method = InterpolationMethodEnum.linear_interpolation if reference_vector is not None: vectors.append(reference_vector) time_range_value = TimeRangeEnum.common if reference_vector is None else TimeRangeEnum.total settings = DataEnumeratorSettings(time_range=time_range_value, interpolation_method=interpolation_method, reference_vector_id=reference_vector.id if reference_vector is not None else None) data_enumerator = data_enumerator if data_enumerator is not None else DataEnumerator(settings) enumeration = data_enumerator.to_enumerable_from_vectors(vectors, settings) return enumeration.vectors
def remove_trailing_zeros(data: Vector) -> Vector: """ Removes trailing zeros from a Vector object, along with their corresponding dates, if any. The function iterates over the `values` attribute of the input Vector object and removes elements from the end of the list as long as the value is zero. When a value is removed from `values`, the corresponding element in `dates` is also removed to maintain synchronization between the two attributes. Parameters ---------- data : Vector The input Vector object containing `dates` and `values` attributes. `values` is a list of numerical data, and its corresponding `dates` is a list of date elements. Returns ------- Vector A new Vector object with trailing zeros removed from the `values` list and their corresponding elements removed from the `dates` list. """ dates = data.dates.copy() values = data.values.copy() # Remove trailing zeros while len(values) > 0 and values[-1] == 0: values.pop() dates.pop() return Vector(dates, values, first_x=data.first_x) def remove_leading_zeros(data: Vector) -> Vector: """ Removes leading zeros from the `values` attribute of the provided Vector, and synchronously removes the corresponding entries in the `dates` attribute to maintain alignment. This function operates on a copy of the `dates` and `values` attributes of the `data` input, ensuring that the original instance of `Vector` remains unmodified. The function iteratively removes leading zero entries (if present) from `values`, along with their corresponding entries in `dates`. The resulting pruned `dates` and `values` are used to create and return a new Vector object. Parameters ---------- data : Vector An object of the class `Vector` containing two attributes, `dates` and `values`, where `dates` is a sequence of date-like elements, and `values` is a sequence of numeric elements. This data structure represents paired date-value sequences. Returns ------- Vector A new instance of `Vector`, where any leading zeros in the `values` attribute, along with their corresponding elements in `dates`, have been removed. """ dates = data.dates.copy() values = data.values.copy() first_x = data.first_x # Remove leading zeros while len(values) > 0 and values[0] == 0: values.pop(0) first_x = dates[0] if data.first_x is not None else None dates.pop(0) return Vector(dates, values, first_x) def find_vector_origins(vector: Vector, well: Well) -> List[Union[Document, UserTask]]: """ Finds the document or user task associated with each origin in a vector. This function matches the origins (IDs) in a vector with their source objects (documents or user tasks) from a well, preserving the order of origins. Parameters ---------- vector : Vector The vector containing origins to match well : Well The well object containing documents and user tasks to search through Returns ------- List[Union[Document, UserTask]] A list of Document or UserTask objects corresponding to each origin in the vector. The list follows the same order as vector.origins. Raises ------ ValueError If the vector has no origins or if an origin cannot be matched to a document or user task. Example ------- >>> vector = some_data.read() >>> sources = find_origins(vector, well) >>> for i, source in enumerate(sources): ... if isinstance(source, Document): ... print(f"Origin {i}: Document '{source.name}' (ID: {source.file_id})") ... elif isinstance(source, UserTask): ... print(f"Origin {i}: UserTask '{source.name}' (ID: {source.id})") """ if vector.origins is None or len(vector.origins) == 0: raise ValueError("The vector has no origins") results: List[Union[Document, UserTask]] = [] for i, origin in enumerate(vector.origins): try: document = next(document for document in well.documents if document.file_id == origin) results.append(document) continue except StopIteration: pass try: user_task = next(user_task for user_task in well.user_tasks if user_task.id == origin) results.append(user_task) continue except StopIteration: pass raise ValueError(f"Origin at index {i} with ID '{origin}' cannot be found in well documents or user tasks") return results