from datetime import datetime
from typing import Optional, List
from .vector import Vector
from .enumerator import DataEnumerator, DataEnumeratorSettings, TimeRangeEnum, InterpolationMethodEnum
import bisect
[docs]
def get_value_at_date_from_vector(vector: Vector, targeted_date: datetime) -> float:
"""
Get the value at a specific date from a vector
Parameters
----------
vector:
class:'Vector' object
targeted_date:
the targeted date
Returns
-------
float:
the closest value at a specific date
"""
if len(vector.dates) == 0:
raise ValueError("You provided an empty vector")
if targeted_date == vector.dates[0]:
return vector.values[0]
closest_date_index = None
for i, date in enumerate(vector.dates):
if date == targeted_date:
closest_date_index = i
break
if closest_date_index is None:
raise ValueError(f"Targeted date {targeted_date} is not in the vector")
return vector.values[closest_date_index]
[docs]
def get_closest_value_at_date_from_vector(vector: Vector, targeted_date: datetime) -> float:
"""
Get the closest value at a specific date from a vector using highly optimized approach
Parameters
----------
vector:
class:'Vector' object
targeted_date:
the targeted date
Returns
-------
float or None:
the closest value at a specific date, or None if vector is empty
"""
if len(vector.dates) == 0:
raise ValueError("You provided an empty vector")
if targeted_date <= vector.dates[0]:
return vector.values[0]
if targeted_date >= vector.dates[-1]:
return vector.values[-1]
insertion_point = bisect.bisect_right(vector.dates, targeted_date)
return vector.values[insertion_point - 1]
[docs]
def calculate_cumulative_production_rate_from_vector(vector: Vector) -> float:
"""
Calculate the cumulative production rate from a rate vector
Parameters
----------
vector:
class:'Vector' object
Returns
-------
float:
cumulative prduction rate
"""
if vector.first_x is None:
raise ValueError("Missing rate first x")
volume = 0.0
last_date_read = vector.first_x
for date, rate in zip(vector.dates, vector.values):
if last_date_read is not None:
volume += rate * (date - last_date_read).total_seconds()
last_date_read = date
return volume
[docs]
def remove_nan_from_vector(vector: Vector, value: Optional[float] = None) -> Vector:
"""
Removes NaN or None entries from the input vector. The method creates a new vector
based on the given vector while replacing NaN or None values with the supplied
replacement value if specified. If a replacement value is not provided, only valid
(non-NaN or non-None) data points are retained in the resulting vector.
Parameters
----------
vector : Vector
The input vector containing date-value pairs. The `dates` attribute represents
the timestamps, and the `values` attribute contains associated values.
value : float, optional
The value that replaces NaN or None entries in the input vector. If not
provided, the resulting vector only includes valid entries from the input vector.
Returns
-------
Vector
A new vector where NaN or None values are replaced with the given `value` (if
specified) or are omitted. The output retains the structure of the input
vector, preserving the order of valid data points while applying
transformations for invalid entries (if needed).
"""
clean_dates, clean_values = [], []
for x, y in zip(vector.dates, vector.values):
if (str(y).lower() == 'nan' or y is None) and value is not None:
clean_dates.append(x)
clean_values.append(value)
if str(y).lower() != 'nan' and y is not None:
clean_dates.append(x)
clean_values.append(y)
return Vector(clean_dates, clean_values, vector.first_x)
[docs]
def apply_factor(vector: Vector, value: float) -> Vector:
"""
Applies a scaling factor to the values of a vector.
This function takes a `Vector` object and a `value`, scaling each element
in the vector's values by the given value. It preserves the original dates
and first_x attribute from the input vector object while returning the adjusted
vector.
Parameters
----------
vector : Vector
The vector object whose values are to be scaled. It contains dates and a
list of numerical values.
value : float
The scaling factor by which to multiply each value in the vector.
Returns
-------
Vector
A new `Vector` object with the adjusted values and the original dates
and first_x attributes preserved.
"""
dates = vector.dates.copy()
values = [x * value for x in vector.values]
return Vector(dates, values, vector.first_x)
[docs]
def trunc_vector(vector: Vector, min_elapsed_time: float, max_elapsed_time: Optional[float] = None) -> Vector:
"""
Truncates the provided vector based on the specified elapsed time constraints.
This function returns a new vector containing only the portion of the original
vector's data that falls within the range specified by `min_elapsed_time` and
`max_elapsed_time`. The function identifies the indices corresponding
to these elapsed time constraints, and slices the original vector's dates and
values arrays accordingly. If `max_elapsed_time` is not provided, all entries
with elapsed time greater than `min_elapsed_time` are included.
Parameters
----------
vector : Vector
The input vector containing time series data with attributes `dates`,
`values`, and `elapsed_times`.
min_elapsed_time : float
The minimum elapsed time for truncating the vector. Only data with
elapsed time greater than this value will be included in the output.
max_elapsed_time : float, optional
The maximum elapsed time for truncating the vector. If provided, only
data with elapsed time within the range of `min_elapsed_time` and
`max_elapsed_time` will be included in the output. If not provided,
all data beyond `min_elapsed_time` will be included.
Returns
-------
Vector
A new `Vector` instance containing truncated `dates`, `values`, and
their corresponding elapsed times within the specified range.
"""
index_min_elapsed_time = next((i for i, elapsed_time in enumerate(vector.elapsed_times) if elapsed_time > min_elapsed_time), len(vector.elapsed_times))
index_max_elapsed_time = next((i for i, elapsed_time in enumerate(vector.elapsed_times) if elapsed_time > max_elapsed_time),
len(vector.elapsed_times)) if max_elapsed_time is not None else len(vector.elapsed_times)
return Vector(vector.dates.copy()[index_min_elapsed_time:index_max_elapsed_time], vector.values.copy()[index_min_elapsed_time:index_max_elapsed_time],
vector.first_x)
[docs]
def shift_vector(vector: Vector, value: float) -> Vector:
"""
Shift a vector by applying a scaling factor and modifying elapsed times.
The function adjusts the provided vector by applying a factor to it and then
modifying its elapsed times based on the scaling value. The modified vector is
then returned.
Parameters
----------
vector : Vector
The input vector to be shifted.
value : float
The scaling factor to be applied to the vector.
Returns
-------
Vector
The shifted vector with modified elapsed times.
"""
vector_shifted = apply_factor(vector, value)
vector_shifted.set_elapsed_times([x * value for x in vector_shifted.elapsed_times], vector_shifted.first_x)
return vector_shifted
[docs]
def interpolate_vectors(vectors: List[Vector], reference_vector: Optional[Vector] = None, data_enumerator: Optional[DataEnumerator] = None,
interpolation_method: Optional[InterpolationMethodEnum] = None) -> List[Vector]:
"""
Interpolate a list of vectors based on a specified interpolation method, an optional reference vector,
and an optional data enumerator.
This function interpolates a given list of vectors according to a chosen interpolation method.
Optionally, a reference vector can be added to supply additional context or computation. If a reference
vector is included, the interpolation will be performed over the total time range; otherwise, it will
use the common time range. A data enumerator, if provided, is utilized for vector enumeration; otherwise,
a new one is created using the settings derived from the given inputs. The interpolated vectors are then
returned as a list.
Parameters
----------
vectors : List[Vector]
A list of vectors to be interpolated.
reference_vector : Optional[Vector], optional
An optional reference vector added to the list of vectors for context or interpolation purpose.
If provided, it influences how the time range is defined for interpolation.
data_enumerator : Optional[DataEnumerator], optional
An optional custom enumerator used for vector enumeration. If not provided, a new enumerator
instance will be created based on the interpolation settings.
interpolation_method : Optional[InterpolationMethodEnum], optional
The method of interpolation applied to the vectors. Defaults to linear interpolation if unspecified.
Returns
-------
List[Vector]
A list of interpolated vectors created based on the inputs and the specified interpolation method.
"""
if interpolation_method is None:
interpolation_method = InterpolationMethodEnum.linear_interpolation
if reference_vector is not None:
vectors.append(reference_vector)
time_range_value = TimeRangeEnum.common if reference_vector is None else TimeRangeEnum.total
settings = DataEnumeratorSettings(time_range=time_range_value, interpolation_method=interpolation_method,
reference_vector_id=reference_vector.id if reference_vector is not None else None)
data_enumerator = data_enumerator if data_enumerator is not None else DataEnumerator(settings)
enumeration = data_enumerator.to_enumerable_from_vectors(vectors, settings)
return enumeration.vectors
def remove_trailing_zeros(data: Vector) -> Vector:
"""
Removes trailing zeros from a Vector object, along with their corresponding
dates, if any.
The function iterates over the `values` attribute of the input Vector object
and removes elements from the end of the list as long as the value is zero.
When a value is removed from `values`, the corresponding element in `dates`
is also removed to maintain synchronization between the two attributes.
Parameters
----------
data : Vector
The input Vector object containing `dates` and `values` attributes. `values`
is a list of numerical data, and its corresponding `dates` is a list of
date elements.
Returns
-------
Vector
A new Vector object with trailing zeros removed from the `values` list and
their corresponding elements removed from the `dates` list.
"""
dates = data.dates.copy()
values = data.values.copy()
# Remove trailing zeros
while len(values) > 0 and values[-1] == 0:
values.pop()
dates.pop()
return Vector(dates, values)
def remove_leading_zeros(data: Vector) -> Vector:
"""
Removes leading zeros from the `values` attribute of the provided Vector, and synchronously removes the
corresponding entries in the `dates` attribute to maintain alignment.
This function operates on a copy of the `dates` and `values` attributes of the `data` input, ensuring
that the original instance of `Vector` remains unmodified. The function iteratively removes leading
zero entries (if present) from `values`, along with their corresponding entries in `dates`. The
resulting pruned `dates` and `values` are used to create and return a new Vector object.
Parameters
----------
data : Vector
An object of the class `Vector` containing two attributes, `dates` and `values`,
where `dates` is a sequence of date-like elements, and `values` is a sequence
of numeric elements. This data structure represents paired date-value sequences.
Returns
-------
Vector
A new instance of `Vector`, where any leading zeros in the `values` attribute,
along with their corresponding elements in `dates`, have been removed.
"""
dates = data.dates.copy()
values = data.values.copy()
# Remove leading zeros
while len(values) > 0 and values[0] == 0:
values.pop(0)
dates.pop(0)
return Vector(dates, values)