Source code for kappa_sdk.enumerator.data_enumerator

import copy
from datetime import datetime, timezone
import uuid
from typing import List, Dict, Optional
from ..rest_api import RestAPI
from .._private._data_api import DataAPI
from ..metadata import Metadata
from .data_enumerator_settings import DataEnumeratorSettings
from .interpolation_method_enum import InterpolationMethodEnum
from .extrapolation_method_enum import ExtrapolationMethodEnum
from .time_range_enum import TimeRangeEnum
from .data_point import DataPoint
from .data_enumeration import DataEnumeration
from ..vector import Vector


[docs] class DataEnumerator: """Python wrapper for a Dataset Enumerator. .. note:: Should not be instantiated directly. """ def __init__(self, settings: DataEnumeratorSettings, rest_api: Optional[RestAPI] = None): self.__settings: DataEnumeratorSettings = settings self.__data_api: Optional[DataAPI] = DataAPI(rest_api) if rest_api is not None else None def get_default_settings(self) -> DataEnumeratorSettings: return copy.deepcopy(self.__settings) def __get_enumeration(self, x_inputs: Dict[str, List[datetime]], y_inputs: Dict[str, List[float]], vector_metadata: Dict[str, Metadata], last_points: List[DataPoint]) -> DataEnumeration: x_outputs = self.get_time_range(x_inputs, vector_metadata) y_outputs = self.get_outputs_values(x_inputs, y_inputs, x_outputs, vector_metadata) vectors = [Vector(x_outputs, outputs, vector_id=key) for key, outputs in zip(y_inputs, y_outputs)] return DataEnumeration(vectors, last_points) def to_enumerable_from_vectors(self, params: List[Vector], settings: Optional[DataEnumeratorSettings] = None, metadata_params: Optional[List[Metadata]] = None) -> DataEnumeration: """Gets a data enumerator for multiple datasets from class:`Vector`. Generates an enumerator that allows to iterate through the list of points of given KAPPA Automate datasets using a common time index. Parameters ---------- params: The list of class:`Vector`. settings: The data enumeration service. metadata_params: List of metadata for each class:`Vector` """ if settings is not None: self.__settings = settings if len(params) < 1: return DataEnumeration([Vector([], [])], []) ids = list(set([p.id for p in params])) if len(ids) < len(params): raise ValueError("Duplicate vectors specified in input parameters") if not all(isinstance(element, Vector) for element in params): raise ValueError("Parameters input are not Vector objects") x_inputs: Dict[str, List[datetime]] = dict() y_inputs: Dict[str, List[float]] = dict() vector_metadata: Dict[str, Metadata] = dict() last_points: List[DataPoint] = [] for i, vector in enumerate(params): index = str(vector.id) default_metadata = Metadata("", False, vector.first_x, vector.values[0], vector.values[-1], False, "", len(vector.dates)) vector_metadata[index] = default_metadata if metadata_params is None else metadata_params[i] if i < len(metadata_params) else default_metadata x_inputs[index] = vector.dates y_inputs[index] = vector.values if len(vector.dates) > 0: last_points.append(DataPoint(vector.dates[-1], vector.values[-1])) return self.__get_enumeration(x_inputs, y_inputs, vector_metadata, last_points) def to_enumerable(self, params: List[uuid.UUID], settings: Optional[DataEnumeratorSettings] = None) -> DataEnumeration: """Gets a data enumerator for multiple datasets. Generates an enumerator that allows to iterate through the list of points of given KAPPA Automate datasets using a common time index. Parameters ---------- params: The list of KAPPA Automate vector identifiers. settings: The data enumeration service. """ if settings is not None: self.__settings = settings if len(params) < 1: return DataEnumeration([Vector([], [])], []) if self.__data_api is None: raise ValueError("You must provide a rest API object when you initialize the data enumerator") x_inputs: Dict[str, List[datetime]] = dict() y_inputs: Dict[str, List[float]] = dict() vector_metadata: Dict[str, Metadata] = dict() last_points: List[DataPoint] = [] ids = list(set([str(p) for p in params])) if len(ids) < len(params): raise ValueError("Duplicate vectors specified in input parameters") # Python sets are not keeping order ids = [str(p) for p in params] max_count_dict = dict() for p in ids: if isinstance(self.__settings.max_count, dict): max_count = self.__settings.max_count[p] if p in self.__settings.max_count.keys() else 100000 elif isinstance(self.__settings.max_count, int): max_count = self.__settings.max_count else: max_count = 100000 if max_count > 100000: raise ValueError("Max count is 100000 points") max_count_dict[p] = max_count read_vectors = self.__data_api.read_vectors(ids, max_count_dict, self.__settings.exclusive_start, self.__settings.inclusive_end) for i in range(0, len(params)): x_inputs[ids[i]] = read_vectors[ids[i]][0] y_inputs[ids[i]] = read_vectors[ids[i]][1] if len(read_vectors[ids[i]][0]) > 0: last_points.append(DataPoint(read_vectors[ids[i]][0][-1], read_vectors[ids[i]][1][-1])) vector_metadata[ids[i]] = self.__data_api.get_metadata(ids[i]) return self.__get_enumeration(x_inputs, y_inputs, vector_metadata, last_points) def get_time_range(self, x_inputs: Dict[str, List[datetime]], vector_metadata: Dict[str, Metadata]) -> List[datetime]: """ Calculate the time_range of the interpolation Parameters ---------- x_inputs: list of the input times vector_metadata: metadata of the inputs Returns ------- list of the output times with in the specified time range """ x_outputs = list() x_input_values = list(x_inputs.values()) if self.__settings.reference_vector_id is not None: ref_id = str(self.__settings.reference_vector_id) x_outputs = x_inputs[ref_id].copy() vector_metadata_ref_id = vector_metadata[ref_id] if vector_metadata_ref_id.is_by_step and vector_metadata_ref_id.first_x is not None and not vector_metadata_ref_id.is_step_at_start: first_x = vector_metadata_ref_id.first_x if first_x is not None: x_outputs.insert(0, first_x) else: for key, x_value in x_inputs.items(): res_set = set(x_outputs + x_value) x_outputs = list(res_set) vector_metadata_key = vector_metadata[key] if vector_metadata_key.is_by_step and vector_metadata_key.first_x is not None and not vector_metadata_key.is_step_at_start: x_outputs.insert(0, vector_metadata_key.first_x) x_outputs = sorted(set(x_outputs)) if self.__settings.time_range == TimeRangeEnum.common: minimum = datetime.min.replace(tzinfo=timezone.utc) maximum = datetime.max.replace(tzinfo=timezone.utc) for inputs in x_input_values: if len(inputs) == 0: return [] min_input = min(inputs) max_input = max(inputs) if min_input > minimum: minimum = min_input if max_input < maximum: maximum = max_input res = list() for output in x_outputs: if minimum <= output <= maximum: res.append(output) x_outputs = res try: if self.__settings.exclusive_start is not None: while x_outputs[0] <= self.__settings.exclusive_start: x_outputs.pop(0) if self.__settings.inclusive_end is not None: while x_outputs[len(x_outputs) - 1] > self.__settings.inclusive_end: x_outputs.pop() except IndexError: return x_outputs return x_outputs def get_outputs_values(self, x_inputs: Dict[str, List[datetime]], y_inputs: Dict[str, List[float]], x_outputs: List[datetime], vector_metadata: Dict[str, Metadata]) -> List[List[float]]: """ Get the values after interpolation Parameters ---------- x_inputs: list of the input times y_inputs: list of the input values x_outputs: list of the output times that will be use for the interpolation and extrapolation vector_metadata: List with the info about the vectors Returns ------- Lists of the interpolated values """ y_outputs: List[List[float]] = [[] for _ in range(len(x_inputs))] for k in range(0, len(x_inputs)): xid = list(x_inputs.keys())[k] x_input = x_inputs[xid] y_input = y_inputs[xid] len_vect = len(x_input) if len_vect > 0: vector_metadata_xid = vector_metadata[xid] by_step = vector_metadata_xid.is_by_step first_x = vector_metadata_xid.first_x if by_step and vector_metadata_xid.first_x is not None else \ x_input[0] step_at_start = vector_metadata_xid.is_step_at_start extrapolation_method = self.__settings.extrapolation_method \ if not (by_step and self.__settings.extrapolation_method == ExtrapolationMethodEnum.use_slope) \ else ExtrapolationMethodEnum.boundary_value j = 0 for ref_date in x_outputs: if ref_date < first_x or ref_date > x_input[len_vect - 1]: if by_step and step_at_start and ref_date > x_input[len_vect - 1]: y = self.extrapolation(x_input, y_input, first_x, ref_date, ExtrapolationMethodEnum.boundary_value) else: y = self.extrapolation(x_input, y_input, first_x, ref_date, extrapolation_method) else: while ref_date > x_input[j] and j < len_vect - 1: j += 1 if j == 0: if by_step or ref_date == x_input[0]: y = y_input[0] else: y = self.interpolation(1, by_step, step_at_start, x_input, y_input, ref_date) else: y = self.interpolation(j, by_step, step_at_start, x_input, y_input, ref_date) y_outputs[k].append(float(y)) else: if self.__settings.extrapolation_method == ExtrapolationMethodEnum.zero: for i in range(0, len(x_outputs)): y_outputs[k].append(0) else: for i in range(0, len(x_outputs)): y_outputs[k].append(float("NaN")) return y_outputs def interpolation(self, next_index: int, by_step: bool, step_at_start: bool, ref_vect_x: List[datetime], ref_vect_y: List[float], ref_date: datetime) -> float: next_x = ref_vect_x[next_index] next_y = ref_vect_y[next_index] previous_x = ref_vect_x[next_index - 1] previous_y = ref_vect_y[next_index - 1] if ref_date == next_x: return next_y if self.__settings.interpolation_method == InterpolationMethodEnum.linear_interpolation or by_step: if len(ref_vect_x) <= 1: return ref_vect_y[0] else: if by_step: if step_at_start: return previous_y else: return next_y else: return self.linear_interpolation(ref_date, previous_x, next_x, previous_y, next_y) elif self.__settings.interpolation_method == InterpolationMethodEnum.absent_value: return float("NaN") elif self.__settings.interpolation_method == InterpolationMethodEnum.zero: return 0 elif self.__settings.interpolation_method == InterpolationMethodEnum.previous_value: return previous_y elif self.__settings.interpolation_method == InterpolationMethodEnum.next_value: return next_y else: return round((next_y + previous_y) / 2.0, 5) @staticmethod def extrapolation(ref_vect_x: List[datetime], ref_vect_y: List[float], first_x: datetime, ref_date: datetime, extrapolation_method: ExtrapolationMethodEnum) -> float: len_x = len(ref_vect_x) if extrapolation_method == ExtrapolationMethodEnum.use_slope: if ref_vect_x is None: return float("NaN") if len(ref_vect_x) == 1: return ref_vect_y[0] else: if ref_date < ref_vect_x[0]: return ((ref_vect_x[1] - ref_date).total_seconds() * ref_vect_y[0] - (ref_vect_x[0] - ref_date).total_seconds() * ref_vect_y[1]) \ / (ref_vect_x[1] - ref_vect_x[0]).total_seconds() else: return ((ref_date - ref_vect_x[len_x - 2]).total_seconds() * ref_vect_y[len_x - 1] - (ref_date - ref_vect_x[len_x - 1]).total_seconds() * ref_vect_y[len_x - 2]) \ / (ref_vect_x[len_x - 1] - ref_vect_x[len_x - 2]).total_seconds() elif extrapolation_method == ExtrapolationMethodEnum.zero: return 0 elif extrapolation_method == ExtrapolationMethodEnum.absent_value: return float("NaN") else: if ref_date < first_x: return ref_vect_y[0] else: return ref_vect_y[len(ref_vect_y) - 1] @staticmethod def linear_interpolation(reference_x: datetime, previous_x: datetime, next_x: datetime, previous_y: float, next_y: float) -> float: if next_x == previous_x: return 0.5 * (previous_y + next_y) return previous_y + ((next_y - previous_y) / (next_x - previous_x).total_seconds()) * (reference_x - previous_x).total_seconds()