Source code for kappa_sdk.vector_extensions

from datetime import datetime
from typing import Optional, List
from .vector import Vector
from .enumerator import DataEnumerator, DataEnumeratorSettings, TimeRangeEnum, InterpolationMethodEnum
import bisect


[docs] def get_value_at_date_from_vector(vector: Vector, targeted_date: datetime) -> float: """ Get the value at a specific date from a vector Parameters ---------- vector: class:'Vector' object targeted_date: the targeted date Returns ------- float: the closest value at a specific date """ if len(vector.dates) == 0: raise ValueError("You provided an empty vector") if targeted_date == vector.dates[0]: return vector.values[0] closest_date_index = None for i, date in enumerate(vector.dates): if date == targeted_date: closest_date_index = i break if closest_date_index is None: raise ValueError(f"Targeted date {targeted_date} is not in the vector") return vector.values[closest_date_index]
[docs] def get_closest_value_at_date_from_vector(vector: Vector, targeted_date: datetime) -> float: """ Get the closest value at a specific date from a vector using highly optimized approach Parameters ---------- vector: class:'Vector' object targeted_date: the targeted date Returns ------- float or None: the closest value at a specific date, or None if vector is empty """ if len(vector.dates) == 0: raise ValueError("You provided an empty vector") if targeted_date <= vector.dates[0]: return vector.values[0] if targeted_date >= vector.dates[-1]: return vector.values[-1] insertion_point = bisect.bisect_right(vector.dates, targeted_date) return vector.values[insertion_point - 1]
[docs] def calculate_cumulative_production_rate_from_vector(vector: Vector) -> float: """ Calculate the cumulative production rate from a rate vector Parameters ---------- vector: class:'Vector' object Returns ------- float: cumulative prduction rate """ if vector.first_x is None: raise ValueError("Missing rate first x") volume = 0.0 last_date_read = vector.first_x for date, rate in zip(vector.dates, vector.values): if last_date_read is not None: volume += rate * (date - last_date_read).total_seconds() last_date_read = date return volume
[docs] def remove_nan_from_vector(vector: Vector, value: Optional[float] = None) -> Vector: """ Removes NaN or None entries from the input vector. The method creates a new vector based on the given vector while replacing NaN or None values with the supplied replacement value if specified. If a replacement value is not provided, only valid (non-NaN or non-None) data points are retained in the resulting vector. Parameters ---------- vector : Vector The input vector containing date-value pairs. The `dates` attribute represents the timestamps, and the `values` attribute contains associated values. value : float, optional The value that replaces NaN or None entries in the input vector. If not provided, the resulting vector only includes valid entries from the input vector. Returns ------- Vector A new vector where NaN or None values are replaced with the given `value` (if specified) or are omitted. The output retains the structure of the input vector, preserving the order of valid data points while applying transformations for invalid entries (if needed). """ clean_dates, clean_values = [], [] for x, y in zip(vector.dates, vector.values): if (str(y).lower() == 'nan' or y is None) and value is not None: clean_dates.append(x) clean_values.append(value) if str(y).lower() != 'nan' and y is not None: clean_dates.append(x) clean_values.append(y) return Vector(clean_dates, clean_values, vector.first_x)
[docs] def apply_factor(vector: Vector, value: float) -> Vector: """ Applies a scaling factor to the values of a vector. This function takes a `Vector` object and a `value`, scaling each element in the vector's values by the given value. It preserves the original dates and first_x attribute from the input vector object while returning the adjusted vector. Parameters ---------- vector : Vector The vector object whose values are to be scaled. It contains dates and a list of numerical values. value : float The scaling factor by which to multiply each value in the vector. Returns ------- Vector A new `Vector` object with the adjusted values and the original dates and first_x attributes preserved. """ dates = vector.dates.copy() values = [x * value for x in vector.values] return Vector(dates, values, vector.first_x)
[docs] def trunc_vector(vector: Vector, min_elapsed_time: float, max_elapsed_time: Optional[float] = None) -> Vector: """ Truncates the provided vector based on the specified elapsed time constraints. This function returns a new vector containing only the portion of the original vector's data that falls within the range specified by `min_elapsed_time` and `max_elapsed_time`. The function identifies the indices corresponding to these elapsed time constraints, and slices the original vector's dates and values arrays accordingly. If `max_elapsed_time` is not provided, all entries with elapsed time greater than `min_elapsed_time` are included. Parameters ---------- vector : Vector The input vector containing time series data with attributes `dates`, `values`, and `elapsed_times`. min_elapsed_time : float The minimum elapsed time for truncating the vector. Only data with elapsed time greater than this value will be included in the output. max_elapsed_time : float, optional The maximum elapsed time for truncating the vector. If provided, only data with elapsed time within the range of `min_elapsed_time` and `max_elapsed_time` will be included in the output. If not provided, all data beyond `min_elapsed_time` will be included. Returns ------- Vector A new `Vector` instance containing truncated `dates`, `values`, and their corresponding elapsed times within the specified range. """ index_min_elapsed_time = next((i for i, elapsed_time in enumerate(vector.elapsed_times) if elapsed_time > min_elapsed_time), len(vector.elapsed_times)) index_max_elapsed_time = next((i for i, elapsed_time in enumerate(vector.elapsed_times) if elapsed_time > max_elapsed_time), len(vector.elapsed_times)) if max_elapsed_time is not None else len(vector.elapsed_times) return Vector(vector.dates.copy()[index_min_elapsed_time:index_max_elapsed_time], vector.values.copy()[index_min_elapsed_time:index_max_elapsed_time], vector.first_x)
[docs] def shift_vector(vector: Vector, value: float) -> Vector: """ Shift a vector by applying a scaling factor and modifying elapsed times. The function adjusts the provided vector by applying a factor to it and then modifying its elapsed times based on the scaling value. The modified vector is then returned. Parameters ---------- vector : Vector The input vector to be shifted. value : float The scaling factor to be applied to the vector. Returns ------- Vector The shifted vector with modified elapsed times. """ vector_shifted = apply_factor(vector, value) vector_shifted.set_elapsed_times([x * value for x in vector_shifted.elapsed_times], vector_shifted.first_x) return vector_shifted
[docs] def interpolate_vectors(vectors: List[Vector], reference_vector: Optional[Vector] = None, data_enumerator: Optional[DataEnumerator] = None, interpolation_method: Optional[InterpolationMethodEnum] = None) -> List[Vector]: """ Interpolate a list of vectors based on a specified interpolation method, an optional reference vector, and an optional data enumerator. This function interpolates a given list of vectors according to a chosen interpolation method. Optionally, a reference vector can be added to supply additional context or computation. If a reference vector is included, the interpolation will be performed over the total time range; otherwise, it will use the common time range. A data enumerator, if provided, is utilized for vector enumeration; otherwise, a new one is created using the settings derived from the given inputs. The interpolated vectors are then returned as a list. Parameters ---------- vectors : List[Vector] A list of vectors to be interpolated. reference_vector : Optional[Vector], optional An optional reference vector added to the list of vectors for context or interpolation purpose. If provided, it influences how the time range is defined for interpolation. data_enumerator : Optional[DataEnumerator], optional An optional custom enumerator used for vector enumeration. If not provided, a new enumerator instance will be created based on the interpolation settings. interpolation_method : Optional[InterpolationMethodEnum], optional The method of interpolation applied to the vectors. Defaults to linear interpolation if unspecified. Returns ------- List[Vector] A list of interpolated vectors created based on the inputs and the specified interpolation method. """ if interpolation_method is None: interpolation_method = InterpolationMethodEnum.linear_interpolation if reference_vector is not None: vectors.append(reference_vector) time_range_value = TimeRangeEnum.common if reference_vector is None else TimeRangeEnum.total settings = DataEnumeratorSettings(time_range=time_range_value, interpolation_method=interpolation_method, reference_vector_id=reference_vector.id if reference_vector is not None else None) data_enumerator = data_enumerator if data_enumerator is not None else DataEnumerator(settings) enumeration = data_enumerator.to_enumerable_from_vectors(vectors, settings) return enumeration.vectors
def remove_trailing_zeros(data: Vector) -> Vector: """ Removes trailing zeros from a Vector object, along with their corresponding dates, if any. The function iterates over the `values` attribute of the input Vector object and removes elements from the end of the list as long as the value is zero. When a value is removed from `values`, the corresponding element in `dates` is also removed to maintain synchronization between the two attributes. Parameters ---------- data : Vector The input Vector object containing `dates` and `values` attributes. `values` is a list of numerical data, and its corresponding `dates` is a list of date elements. Returns ------- Vector A new Vector object with trailing zeros removed from the `values` list and their corresponding elements removed from the `dates` list. """ dates = data.dates.copy() values = data.values.copy() # Remove trailing zeros while len(values) > 0 and values[-1] == 0: values.pop() dates.pop() return Vector(dates, values) def remove_leading_zeros(data: Vector) -> Vector: """ Removes leading zeros from the `values` attribute of the provided Vector, and synchronously removes the corresponding entries in the `dates` attribute to maintain alignment. This function operates on a copy of the `dates` and `values` attributes of the `data` input, ensuring that the original instance of `Vector` remains unmodified. The function iteratively removes leading zero entries (if present) from `values`, along with their corresponding entries in `dates`. The resulting pruned `dates` and `values` are used to create and return a new Vector object. Parameters ---------- data : Vector An object of the class `Vector` containing two attributes, `dates` and `values`, where `dates` is a sequence of date-like elements, and `values` is a sequence of numeric elements. This data structure represents paired date-value sequences. Returns ------- Vector A new instance of `Vector`, where any leading zeros in the `values` attribute, along with their corresponding elements in `dates`, have been removed. """ dates = data.dates.copy() values = data.values.copy() # Remove leading zeros while len(values) > 0 and values[0] == 0: values.pop(0) dates.pop(0) return Vector(dates, values)