from datetime import datetime
from typing import Optional, List, Union
from .vector import Vector
from .enumerator import DataEnumerator, DataEnumeratorSettings, TimeRangeEnum, InterpolationMethodEnum
import bisect
from .well import Well
from .document import Document
from .user_task import UserTask
[docs]
def get_value_at_date_from_vector(vector: Vector, targeted_date: datetime) -> float:
"""
Get the value at a specific date from a vector
Parameters
----------
vector:
class:'Vector' object
targeted_date:
the targeted date
Returns
-------
float:
the closest value at a specific date
"""
if len(vector.dates) == 0:
raise ValueError("You provided an empty vector")
if targeted_date == vector.dates[0]:
return vector.values[0]
closest_date_index = None
for i, date in enumerate(vector.dates):
if date == targeted_date:
closest_date_index = i
break
if closest_date_index is None:
raise ValueError(f"Targeted date {targeted_date} is not in the vector")
return vector.values[closest_date_index]
[docs]
def get_closest_value_at_date_from_vector(vector: Vector, targeted_date: datetime) -> float:
"""
Get the closest value at a specific date from a vector using highly optimized approach
Parameters
----------
vector:
class:'Vector' object
targeted_date:
the targeted date
Returns
-------
float or None:
the closest value at a specific date, or None if vector is empty
"""
if len(vector.dates) == 0:
raise ValueError("You provided an empty vector")
if targeted_date <= vector.dates[0]:
return vector.values[0]
if targeted_date >= vector.dates[-1]:
return vector.values[-1]
insertion_point = bisect.bisect_right(vector.dates, targeted_date)
return vector.values[insertion_point - 1]
def calculate_cumulative_from_step_data_vector(vector: Vector) -> float:
"""
Calculate the cumulative production from a step data vector
Parameters
----------
vector:
class:'Vector' object
Returns
-------
float:
cumulative
"""
if vector.first_x is None:
raise ValueError("Missing rate first x")
volume = 0.0
last_date_read = vector.first_x
for date, rate in zip(vector.dates, vector.values):
if last_date_read is not None:
volume += rate * (date - last_date_read).total_seconds()
last_date_read = date
return volume
def calculate_cumulative_from_point_vector(vector: Vector) -> float:
"""
Calculate the cumulative from a point vector
Parameters
----------
vector:
class:'Vector' object
Returns
-------
float:
cumulative
"""
if len(vector.dates) == 0:
raise ValueError("You provided an empty vector")
if len(vector.dates) == 1:
return 0.0
volume = 0.0
for i in range(len(vector.dates) - 1):
date_current = vector.dates[i]
date_next = vector.dates[i + 1]
rate_current = vector.values[i]
rate_next = vector.values[i + 1]
time_delta_seconds = (date_next - date_current).total_seconds()
volume += (rate_current + rate_next) / 2.0 * time_delta_seconds
return volume
[docs]
def remove_nan_from_vector(vector: Vector, value: Optional[float] = None) -> Vector:
"""
Removes NaN or None entries from the input vector. The method creates a new vector
based on the given vector while replacing NaN or None values with the supplied
replacement value if specified. If a replacement value is not provided, only valid
(non-NaN or non-None) data points are retained in the resulting vector.
Parameters
----------
vector : Vector
The input vector containing date-value pairs. The `dates` attribute represents
the timestamps, and the `values` attribute contains associated values.
value : float, optional
The value that replaces NaN or None entries in the input vector. If not
provided, the resulting vector only includes valid entries from the input vector.
Returns
-------
Vector
A new vector where NaN or None values are replaced with the given `value` (if
specified) or are omitted. The output retains the structure of the input
vector, preserving the order of valid data points while applying
transformations for invalid entries (if needed).
"""
clean_dates, clean_values = [], []
for x, y in zip(vector.dates, vector.values):
if (str(y).lower() == 'nan' or y is None) and value is not None:
clean_dates.append(x)
clean_values.append(value)
if str(y).lower() != 'nan' and y is not None:
clean_dates.append(x)
clean_values.append(y)
return Vector(clean_dates, clean_values, vector.first_x)
[docs]
def apply_factor(vector: Vector, value: float) -> Vector:
"""
Applies a scaling factor to the values of a vector.
This function takes a `Vector` object and a `value`, scaling each element
in the vector's values by the given value. It preserves the original dates
and first_x attribute from the input vector object while returning the adjusted
vector.
Parameters
----------
vector : Vector
The vector object whose values are to be scaled. It contains dates and a
list of numerical values.
value : float
The scaling factor by which to multiply each value in the vector.
Returns
-------
Vector
A new `Vector` object with the adjusted values and the original dates
and first_x attributes preserved.
"""
dates = vector.dates.copy()
values = [x * value for x in vector.values]
return Vector(dates, values, vector.first_x)
[docs]
def trunc_vector(vector: Vector, min_elapsed_time: float, max_elapsed_time: Optional[float] = None) -> Vector:
"""
Truncates the provided vector based on the specified elapsed time constraints.
This function returns a new vector containing only the portion of the original
vector's data that falls within the range specified by `min_elapsed_time` and
`max_elapsed_time`. The function identifies the indices corresponding
to these elapsed time constraints, and slices the original vector's dates and
values arrays accordingly. If `max_elapsed_time` is not provided, all entries
with elapsed time greater than `min_elapsed_time` are included.
Parameters
----------
vector : Vector
The input vector containing time series data with attributes `dates`,
`values`, and `elapsed_times`.
min_elapsed_time : float
The minimum elapsed time for truncating the vector. Only data with
elapsed time greater than this value will be included in the output.
max_elapsed_time : float, optional
The maximum elapsed time for truncating the vector. If provided, only
data with elapsed time within the range of `min_elapsed_time` and
`max_elapsed_time` will be included in the output. If not provided,
all data beyond `min_elapsed_time` will be included.
Returns
-------
Vector
A new `Vector` instance containing truncated `dates`, `values`, and
their corresponding elapsed times within the specified range.
"""
index_min_elapsed_time = next((i for i, elapsed_time in enumerate(vector.elapsed_times) if elapsed_time > min_elapsed_time), len(vector.elapsed_times))
index_max_elapsed_time = next((i for i, elapsed_time in enumerate(vector.elapsed_times) if elapsed_time > max_elapsed_time),
len(vector.elapsed_times)) if max_elapsed_time is not None else len(vector.elapsed_times)
return Vector(vector.dates.copy()[index_min_elapsed_time:index_max_elapsed_time], vector.values.copy()[index_min_elapsed_time:index_max_elapsed_time],
vector.first_x)
[docs]
def shift_vector(vector: Vector, value: float) -> Vector:
"""
Shift a vector by applying a scaling factor and modifying elapsed times.
The function adjusts the provided vector by applying a factor to it and then
modifying its elapsed times based on the scaling value. The modified vector is
then returned.
Parameters
----------
vector : Vector
The input vector to be shifted.
value : float
The scaling factor to be applied to the vector.
Returns
-------
Vector
The shifted vector with modified elapsed times.
"""
vector_shifted = apply_factor(vector, value)
vector_shifted.set_elapsed_times([x * value for x in vector_shifted.elapsed_times], vector_shifted.first_x)
return vector_shifted
[docs]
def interpolate_vectors(vectors: List[Vector], reference_vector: Optional[Vector] = None, data_enumerator: Optional[DataEnumerator] = None,
interpolation_method: Optional[InterpolationMethodEnum] = None) -> List[Vector]:
"""
Interpolate a list of vectors based on a specified interpolation method, an optional reference vector,
and an optional data enumerator.
This function interpolates a given list of vectors according to a chosen interpolation method.
Optionally, a reference vector can be added to supply additional context or computation. If a reference
vector is included, the interpolation will be performed over the total time range; otherwise, it will
use the common time range. A data enumerator, if provided, is utilized for vector enumeration; otherwise,
a new one is created using the settings derived from the given inputs. The interpolated vectors are then
returned as a list.
Parameters
----------
vectors : List[Vector]
A list of vectors to be interpolated.
reference_vector : Optional[Vector], optional
An optional reference vector added to the list of vectors for context or interpolation purpose.
If provided, it influences how the time range is defined for interpolation.
data_enumerator : Optional[DataEnumerator], optional
An optional custom enumerator used for vector enumeration. If not provided, a new enumerator
instance will be created based on the interpolation settings.
interpolation_method : Optional[InterpolationMethodEnum], optional
The method of interpolation applied to the vectors. Defaults to linear interpolation if unspecified.
Returns
-------
List[Vector]
A list of interpolated vectors created based on the inputs and the specified interpolation method.
"""
if interpolation_method is None:
interpolation_method = InterpolationMethodEnum.linear_interpolation
if reference_vector is not None:
vectors.append(reference_vector)
time_range_value = TimeRangeEnum.common if reference_vector is None else TimeRangeEnum.total
settings = DataEnumeratorSettings(time_range=time_range_value, interpolation_method=interpolation_method,
reference_vector_id=reference_vector.id if reference_vector is not None else None)
data_enumerator = data_enumerator if data_enumerator is not None else DataEnumerator(settings)
enumeration = data_enumerator.to_enumerable_from_vectors(vectors, settings)
return enumeration.vectors
def remove_trailing_zeros(data: Vector) -> Vector:
"""
Removes trailing zeros from a Vector object, along with their corresponding
dates, if any.
The function iterates over the `values` attribute of the input Vector object
and removes elements from the end of the list as long as the value is zero.
When a value is removed from `values`, the corresponding element in `dates`
is also removed to maintain synchronization between the two attributes.
Parameters
----------
data : Vector
The input Vector object containing `dates` and `values` attributes. `values`
is a list of numerical data, and its corresponding `dates` is a list of
date elements.
Returns
-------
Vector
A new Vector object with trailing zeros removed from the `values` list and
their corresponding elements removed from the `dates` list.
"""
dates = data.dates.copy()
values = data.values.copy()
# Remove trailing zeros
while len(values) > 0 and values[-1] == 0:
values.pop()
dates.pop()
return Vector(dates, values, first_x=data.first_x)
def remove_leading_zeros(data: Vector) -> Vector:
"""
Removes leading zeros from the `values` attribute of the provided Vector, and synchronously removes the
corresponding entries in the `dates` attribute to maintain alignment.
This function operates on a copy of the `dates` and `values` attributes of the `data` input, ensuring
that the original instance of `Vector` remains unmodified. The function iteratively removes leading
zero entries (if present) from `values`, along with their corresponding entries in `dates`. The
resulting pruned `dates` and `values` are used to create and return a new Vector object.
Parameters
----------
data : Vector
An object of the class `Vector` containing two attributes, `dates` and `values`,
where `dates` is a sequence of date-like elements, and `values` is a sequence
of numeric elements. This data structure represents paired date-value sequences.
Returns
-------
Vector
A new instance of `Vector`, where any leading zeros in the `values` attribute,
along with their corresponding elements in `dates`, have been removed.
"""
dates = data.dates.copy()
values = data.values.copy()
first_x = data.first_x
# Remove leading zeros
while len(values) > 0 and values[0] == 0:
values.pop(0)
first_x = dates[0] if data.first_x is not None else None
dates.pop(0)
return Vector(dates, values, first_x)
def find_vector_origins(vector: Vector, well: Well) -> List[Union[Document, UserTask]]:
"""
Finds the document or user task associated with each origin in a vector.
This function matches the origins (IDs) in a vector with their source
objects (documents or user tasks) from a well, preserving the order of origins.
Parameters
----------
vector : Vector
The vector containing origins to match
well : Well
The well object containing documents and user tasks to search through
Returns
-------
List[Union[Document, UserTask]]
A list of Document or UserTask objects corresponding to each origin in the vector.
The list follows the same order as vector.origins.
Raises
------
ValueError
If the vector has no origins or if an origin cannot be matched to a document or user task.
Example
-------
>>> vector = some_data.read()
>>> sources = find_origins(vector, well)
>>> for i, source in enumerate(sources):
... if isinstance(source, Document):
... print(f"Origin {i}: Document '{source.name}' (ID: {source.file_id})")
... elif isinstance(source, UserTask):
... print(f"Origin {i}: UserTask '{source.name}' (ID: {source.id})")
"""
if vector.origins is None or len(vector.origins) == 0:
raise ValueError("The vector has no origins")
results: List[Union[Document, UserTask]] = []
for i, origin in enumerate(vector.origins):
try:
document = next(document for document in well.documents if document.file_id == origin)
results.append(document)
continue
except StopIteration:
pass
try:
user_task = next(user_task for user_task in well.user_tasks if user_task.id == origin)
results.append(user_task)
continue
except StopIteration:
pass
raise ValueError(f"Origin at index {i} with ID '{origin}' cannot be found in well documents or user tasks")
return results