from __future__ import annotations
import warnings
from datetime import datetime
from typing import Iterator, Optional, List, Any, Generator, cast, Dict
from .vector import Vector
from .vector_iterator import VectorIterator
from ._private._cluster_apis import ClusterAPIS
from ._private._well_dto import ShutInDataDto
from ._private.dto_converters._data_dto_converter import DataDtoConverter
from ._private._external_data_dto import GaugeQueryDto
from .data_kind_enum import DataKindEnum
from .automation_status_enum import AutomationStatus
from .datetime_utils import datetime_to_str
from .metadata import Metadata
from .unit_enum import UnitEnum
[docs]
class Data:
""" Data object.
Presents a KAPPA Automate data object that is used to read and write the vector values.
.. note:: Should not be instantiated directly.
"""
def __init__(self,
field_id: str,
well_id: str,
data_id: str,
vector_id: str,
automation_id: Optional[str],
name: str,
kind: DataKindEnum,
data_type: str,
is_by_step: bool,
is_reference: bool,
is_high_frequency: bool,
labels: List[str],
filters: List[Data],
raw_data: Optional[Data],
measure_depth: Optional[float],
true_vertical_depth: Optional[float],
true_vertical_depth_subsea: Optional[float],
cluster_apis: ClusterAPIS,
data_dto_converter: DataDtoConverter):
self.__field_id: str = field_id
self.__well_id: str = well_id
self.__id: str = data_id
self.__vector_id: str = vector_id
self.__automation_id: Optional[str] = automation_id
self.__name: str = name
self.__kind: DataKindEnum = kind
self.__data_type: str = data_type
self.__is_by_step: bool = is_by_step
self.__is_reference: bool = is_reference
self.__is_high_frequency: bool = is_high_frequency
self.__labels: List[str] = labels
self.__filters: List[Data] = filters
self.__raw_data: Optional[Data] = raw_data
self.__write_chunk_size: int = 30000
self.__max_read_size: int = 100000
self.__cluster_apis: ClusterAPIS = cluster_apis
self.__gauge_info_dto: Optional[GaugeQueryDto] = None
self.__data_dto_converter: DataDtoConverter = data_dto_converter
self.__properties: Dict[str, Optional[float]] = {"md": measure_depth, "tvd": true_vertical_depth, "tvdss": true_vertical_depth_subsea}
self.__metadata: Optional[Metadata] = None
def __get_property_value(self, property_name: str) -> float:
""" Get the property value of the :class:`Data`."""
if self.__kind not in [DataKindEnum.gauge, DataKindEnum.production, DataKindEnum.basic_data]:
raise ValueError(f"The data object is a {self.__kind.value} so it does not contain any depth property, only gauges or productions have properties")
return cast(float, self.__properties.get(property_name))
@property
def md(self) -> float:
"""Get the measure depth property value of the :class:`Data`."""
return self.__get_property_value("md")
@property
def tvd(self) -> float:
"""Get the True vertical depth property value of the :class:`Data`."""
return self.__get_property_value("tvd")
@property
def tvdss(self) -> float:
"""Get the True vertical depth sub sea property value of the :class:`Data`."""
return self.__get_property_value("tvdss")
@property
def id(self) -> str:
""" Gets the id of the :class:`Data` object.
"""
return self.__id
@property
def name(self) -> str:
""" Gets the name of the :class:`Data`.
"""
return self.__name
@property
def vector_id(self) -> str:
""" Gets the internal id of the :class:`Data`'s XY vector.
"""
return self.__vector_id
@property
def cumulative_vector_id(self) -> Optional[str]:
""" Gets the internal id of the :class:`Data`'s XY vector.
"""
return self.metadata.cumulative_vector_id
@property
def automation_id(self) -> Optional[str]:
""" Gets the automation_id of the :class:`Data`.
If the :class:`Data` is not linked to an Automation job, it will be None.
If the :class:`Data` is a filter, it will be the filterId.
If the :class:`Data` is a shut-in, it will be the usertaskId.
If the :class:`Data` is a function, it will be the functionId.
If the :class:`Data` is a gauge, it will be the mirrorId.
"""
return self.__automation_id
@property
def kind(self) -> DataKindEnum:
""" Gets the kind of the :class:`Data` (gauge or basic data).
"""
return self.__kind
@property
def data_type(self) -> str:
""" Gets the alias of the associated data-type for this :class:`Data`.
This is the alias of one of the data-types defined on the ``Data Types`` tab
in the ``Automation`` page that were present there before creating the field.
"""
return self.__data_type
@property
def is_by_step(self) -> bool:
""" Gets a value indicating whether this :class:`Data` is defined as steps or points.
"""
return self.__is_by_step
@property
def is_reference(self) -> bool:
""" Gets a value indicating whether this :class:`Data` is a reference data of its data-type in the well.
"""
return self.__is_reference
@property
def is_high_frequency(self) -> bool:
""" Gets a value indicating whether this :class:`Data` has high frequency data.
"""
return self.__is_high_frequency
@property
def labels(self) -> List[str]:
""" Gets a list of labels associated with this :class:`Data`.
"""
return self.__labels
@property
def size(self) -> int:
""" Gets the size of the :class:`Data` (the number of points).
"""
return self.metadata.count
@property
def first_x(self) -> Optional[datetime]:
""" Gets a reference date, or the first X (date) in the :class:`Data`'s vector if a reference date is not defined.
"""
metadata = self.metadata
return metadata.first_x if metadata.first_x is not None else metadata.min_x
@property
def last_x(self) -> Optional[datetime]:
""" Gets the last X (date) in the :class:`Data`'s vector.
"""
return self.metadata.max_x
@property
def filters(self) -> List[Data]:
""" Gets a list of filters of this :class:`Data`.
"""
return self.__filters
@property
def raw_data(self) -> Optional[Data]:
""" Gets the associated raw data (raw filter, raw shut-in, etc.)
"""
return self.__raw_data
@property
def datasource_name(self) -> Optional[str]:
if self.__kind != DataKindEnum.gauge:
raise ValueError("You can only get the datasource name for a gauge")
gauge_info_dto = self.__get_gauge_info_dto()
if gauge_info_dto is not None:
return gauge_info_dto.dataSourceName
return None
def update_first_x(self, first_x: datetime) -> None:
""" Updates a value of the first X (reference date).
Parameters
----------
first_x:
New value of the first X (reference date).
"""
dto = {"firstX": datetime_to_str(first_x)}
self.__cluster_apis.data_api.update_first_x(self.__vector_id, dto)
def read(self, from_time: Optional[datetime] = None, to_time: Optional[datetime] = None, count: int = -1, last: bool = False, unit: Optional[UnitEnum] = None) -> Vector:
""" Reads the data values as :class:`Vector`.
All parameters are optional, if none are given it will read all values.
A warning will be issued if trying to read more than 500000 points at once.
Parameters
----------
from_time:
Date to start reading from.
to_time:
Date to read the data up to.
count:
Maximum count of points to return, regardless of from/to settings.
last:
Will return last (count) of points if set to True.
unit:
Convert values from internal units to a specific unit.
Returns
-------
:class:`Vector`:
Vector that contains the requested data values.
"""
return self.__read(self.vector_id, from_time, to_time, count, last, unit)
def read_cumulative(self, from_time: Optional[datetime] = None, to_time: Optional[datetime] = None, count: int = -1, last: bool = False, unit: Optional[UnitEnum] = None) -> Vector:
""" Reads the cumulative values associated to this data object as :class:`Vector`.
All parameters are optional, if none are given it will read all values.
A warning will be issued if trying to read more than 500000 points at once.
Parameters
----------
from_time:
Date to start reading from.
to_time:
Date to read the data up to.
count:
Maximum count of points to return, regardless of from/to settings.
last:
Will return last (count) of points if set to True.
unit:
Convert values from internal units to a specific unit.
Returns
-------
:class:`Vector`:
Vector that contains the requested data values.
"""
if self.cumulative_vector_id is None:
raise ValueError("This data does not have cumulative values, it has to be a production data type or you should run POST /v1/data/cumulativeProduction to generate the vectors")
return self.__read(self.cumulative_vector_id, from_time, to_time, count, last, unit)
def __read(self, vector_id: str, from_time: Optional[datetime] = None, to_time: Optional[datetime] = None, count: int = -1, last: bool = False, unit: Optional[UnitEnum] = None) -> Vector:
if count > 500000 or (count == -1 and self.size > 500000):
warnings.warn("You are trying to read a large data-set of more than 500000 points at once. "
"It is recommended to use read_by_chunks method to process this data-set by chunks.")
# direct read if number of requested points <= 100K
if 0 <= count <= self.__max_read_size:
dates, values = self.__cluster_apis.data_api.read_vector(vector_id, from_time, to_time, count, last)
if unit is not None:
values = [self.__data_dto_converter.unit_converter.convert_from_internal(unit, value) for value in values]
if self.is_by_step:
return Vector(dates, values, self.first_x, vector_id)
else:
return Vector(dates, values, vector_id=vector_id)
# transparently read by chunks if number of requested points > 100K
chunk_dates: List[datetime] = list()
chunk_values: List[float] = list()
for chunk in self.__read_by_chunks(vector_id, chunk_size=self.__max_read_size,
from_time=from_time,
to_time=to_time,
count=count):
chunk_dates.extend(chunk.dates)
chunk_values.extend(chunk.values)
if unit is not None:
chunk_values = [self.__data_dto_converter.unit_converter.convert_from_internal(unit, value) for value in chunk_values]
if self.is_by_step:
return Vector(chunk_dates, chunk_values, self.first_x, vector_id)
else:
return Vector(chunk_dates, chunk_values, vector_id=vector_id)
def read_by_chunks(self, chunk_size: int, from_time: Optional[datetime] = None, to_time: Optional[datetime] = None,
count: int = -1, unit: Optional[UnitEnum] = None) -> Iterator[Vector]:
""" Reads the data values by chunks of a given size.
This is the preferable way to read and process large sets of data.
Parameters
----------
chunk_size:
Number of points to return in each chunk (100000 points max).
from_time:
Date to start reading from.
to_time:
Date to read the data up to.
count:
Maximum count of points to return, regardless of from/to settings.
unit:
Convert values from internal units to a specific unit.
Returns
-------
:class:`Iterator[Vector]`:
Iterator that gives access to the requested data values that are split into chunks (each presented as :class:`Vector`) of a given size.
"""
return self.__read_by_chunks(self.vector_id, chunk_size, from_time, to_time, count, unit)
def read_cumulative_by_chunks(self, chunk_size: int, from_time: Optional[datetime] = None, to_time: Optional[datetime] = None,
count: int = -1, unit: Optional[UnitEnum] = None) -> Iterator[Vector]:
""" Reads the cumulative values associated to this data by chunks of a given size.
This is the preferable way to read and process large sets of data.
Parameters
----------
chunk_size:
Number of points to return in each chunk (100000 points max).
from_time:
Date to start reading from.
to_time:
Date to read the data up to.
count:
Maximum count of points to return, regardless of from/to settings.
unit:
Convert values from internal units to a specific unit.
Returns
-------
:class:`Iterator[Vector]`:
Iterator that gives access to the requested data values that are split into chunks (each presented as :class:`Vector`) of a given size.
"""
if self.cumulative_vector_id is None:
raise ValueError("This data does not have cumulative values, it has to be a production data type or you should run POST /v1/data/cumulativeProduction to generate the vectors")
return self.__read_by_chunks(self.cumulative_vector_id, chunk_size, from_time, to_time, count, unit)
def __read_by_chunks(self, vector_id: str, chunk_size: int, from_time: Optional[datetime] = None, to_time: Optional[datetime] = None,
count: int = -1, unit: Optional[UnitEnum] = None) -> Iterator[Vector]:
""" Reads the data values by chunks of a given size.
This is the preferable way to read and process large sets of data.
Parameters
----------
chunk_size:
Number of points to return in each chunk (100000 points max).
from_time:
Date to start reading from.
to_time:
Date to read the data up to.
count:
Maximum count of points to return, regardless of from/to settings.
unit:
Convert values from internal units to a specific unit.
Returns
-------
:class:`Iterator[Vector]`:
Iterator that gives access to the requested data values that are split into chunks (each presented as :class:`Vector`) of a given size.
"""
if chunk_size > 100000:
raise ValueError("Chunk size cannot be more than 100000 points")
return VectorIterator(self.__cluster_apis.data_api, self.__data_dto_converter.unit_converter, chunk_size, vector_id, from_time, to_time, count, unit)
def append(self, vector: Vector, unit: Optional[UnitEnum] = None) -> None:
""" Appends given values to the data.
Parameters
----------
vector:
Vector that contains data points to append.
unit:
Convert values to internal units from a specific unit.
"""
x_list = vector.dates
y_list = vector.values
if unit is not None:
y_list = [cast(float, self.__data_dto_converter.unit_converter.convert_to_internal(unit, value)) for value in y_list]
x_chunks = self.__split(x_list, self.__write_chunk_size)
y_chunks = self.__split(y_list, self.__write_chunk_size)
metadata = self.__cluster_apis.data_api.get_metadata(self.vector_id)
for x_chunk, y_chunk in zip(x_chunks, y_chunks):
dto = self.__data_dto_converter.get_append_dto(metadata, x_chunk, y_chunk)
self.__cluster_apis.data_api.append_vector(self.vector_id, dto)
self.refresh_metadata()
def clean(self) -> None:
""" Removes all values of the data.
"""
self.__cluster_apis.data_api.delete_vector_data(self.vector_id)
def overwrite(self, vector: Vector) -> None:
""" Overwrites the content of the data with given values.
Parameters
----------
vector:
Vector with new data points that will completely overwrite existing ones.
"""
self.clean()
self.append(vector)
def add_labels(self, labels: List[str]) -> None:
"""
Adds a list of user-defined labels to the current object.
This method checks the validity of the instance before adding user-defined
labels. The validity is determined by whether the instance has an ID assigned.
Depending on the kind of data (filter or regular data), it dynamically fetches
the appropriate service name. The labels are added via the cluster API and
appended to the instance's internal labels list.
Parameters
----------
labels : List[str]
A list of user-defined labels to be added to the current object. Each
label is expected to be a string.
"""
if self.__id is None:
raise ValueError("")
service_name = "filter" if self.__kind == DataKindEnum.filter else "data"
self.__cluster_apis.field_api.add_labels(self.__field_id, self.__well_id, self.__id, service_name, {"userDefinedLabels": labels})
self.__labels.extend(labels)
def get_automation_status(self) -> Optional[AutomationStatus]:
"""Get automation status"""
if self.__automation_id is not None:
job_properties_dto = self.__cluster_apis.automation_api.get_object_properties(self.__automation_id)
status_dto = next(x for x in job_properties_dto if x.name == "status")
if status_dto is None:
raise ValueError("There is an issue with the Automation Service")
return AutomationStatus(status_dto.value)
else:
return None
def reload_gauge_from_date(self, date: datetime) -> None:
if self.__kind != DataKindEnum.gauge:
raise ValueError("You can only reload a gauge from a date")
if self.__automation_id is not None:
gauge_info_dto = cast(GaugeQueryDto, self.__get_gauge_info_dto())
gauge_update_dto = self.__data_dto_converter.get_gauge_update_dto(gauge_info_dto.tag.tagId, gauge_info_dto.tag.dataSourceId, gauge_info_dto.dataUnit, date)
self.__cluster_apis.external_data_api.update_gauge(self.__automation_id, gauge_update_dto)
@property
def metadata(self) -> Metadata:
"""
Get the associated metadata of this data object
Returns
-------
:class:`Metadata`:
"""
if self.__metadata is None:
self.__metadata = self.__cluster_apis.data_api.get_metadata(self.vector_id)
return self.__metadata
def refresh_metadata(self) -> None:
""" Refresh cache to be able to grab the updated metadata"""
self.__metadata = None
def __get_gauge_info_dto(self) -> Optional[GaugeQueryDto]:
if self.__gauge_info_dto is None and self.__automation_id is not None:
return self.__cluster_apis.external_data_api.get_gauge_info(self.__automation_id)
return self.__gauge_info_dto
def get_shutin_data_dto(self) -> ShutInDataDto:
"""
Get the shut in data dto of the object
"""
if self.__kind == DataKindEnum.shutin:
return self.__cluster_apis.field_api.get_shutin_data_dto(self.__field_id, self.__well_id, self.__id)
else:
raise ValueError("You can only query this function if your object is a Shut-In object under your well")
@staticmethod
def __split(list_to_split: List[Any], size: int) -> Generator[List[Any], None, None]:
for i in range(0, len(list_to_split), size):
yield list_to_split[i:i + size]