Source code for kappa_sdk.analysis

from __future__ import annotations
from typing import List, Optional, Dict
from datetime import datetime
from .model_action_enum import ModelActionEnum
from .kw_module_enum import KWModuleEnum
from .document_vector import DocumentVector
from ._private._cluster_apis import ClusterAPIS
from ._private.dto_converters._analysis_dto_converter import AnalysisDtoConverter
from .datetime_utils import datetime_to_str


[docs] class Analysis: """ KW analysis object. Presents an analysis from the KW document that can be queried for a contained log-log data. .. note:: Should not be instantiated directly. """ def __init__(self, field_id: str, well_group_id: Optional[str], well_id: Optional[str], custom_workflow_id: Optional[str], file_id: str, analysis_id: str, name: str, gof: Optional[float], file_type: KWModuleEnum, cluster_apis: ClusterAPIS, analysis_dto_converter: AnalysisDtoConverter, log_log_data: Optional[List[DocumentVector]] = None) -> None: self.__field_id: str = field_id self.__well_group_id: Optional[str] = well_group_id self.__well_id: Optional[str] = well_id self.__custom_workflow_id: Optional[str] = custom_workflow_id self.__file_id: str = file_id self.__id: str = analysis_id self.__name: str = name self.__gof: Optional[float] = gof self.__file_type: KWModuleEnum = file_type self.__cluster_apis: ClusterAPIS = cluster_apis self.__dto_converter: AnalysisDtoConverter = analysis_dto_converter self.__plot_data_cache: Dict[str, List[DocumentVector]] = {} if log_log_data is not None: self.__plot_data_cache["LogLog"] = log_log_data @property def id(self) -> str: """ Gets the id of the :class:`Analysis` object.""" return self.__id @property def name(self) -> str: """ Gets the name of the :class:`Analysis`.""" return self.__name @property def gof(self) -> Optional[float]: """ Gets the goodness of fit of the :class:`Analysis` if it exists.""" return self.__gof @property def log_log_data(self) -> List[DocumentVector]: """ Returns a list of document vectors contained in this :class:`Analysis`. """ return self.get_plot_data("LogLog") def refresh_log_log_data(self) -> List[DocumentVector]: """ Refresh the cached value of the :py:obj:`Analysis.log_log_data` property. """ return self.get_plot_data("LogLog", use_cache=False) def rename(self, new_name: str) -> None: """ Rename the Analysis""" script = self.__dto_converter.script_dto_converter.get_rename_analysis_script(new_name, self.__field_id, self.__well_id, self.__file_id, self.__id) dto = self.__dto_converter.script_dto_converter.get_background_script_input_dto(self.__field_id, self.__well_id, self.__file_type, script) script_result = self.__cluster_apis.automation_api.execute_background_script(dto) if not script_result.is_success: raise Exception(script_result.message) self.__name = new_name def get_plot_data(self, plot_type: str, use_cache: bool = True) -> List[DocumentVector]: """ Returns a list of document vectors contained in a given plot type of this :class:`Analysis`. """ if use_cache and plot_type in self.__plot_data_cache: return self.__plot_data_cache[plot_type] self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.__file_id) plot_data_dto = self.__cluster_apis.kw_api.get_plot_data(self.__field_id, self.__file_id, self.id, plot_type) if plot_data_dto is None: return [] plot_data = self.__dto_converter.get_plot_data_from_plot_dto(plot_data_dto.dataXyDtos) self.__plot_data_cache[plot_type] = plot_data return plot_data def get_model_xml(self, action: ModelActionEnum) -> str: """ Returns model definition in KW-KA exchange XML format for this analysis. """ self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.__file_id) if self.__file_type == KWModuleEnum.saphir: return self.__cluster_apis.pta_api.get_model_xml_for_analysis(self.__field_id, self.__file_id, self.__id, str(action.value)) elif self.__file_type == KWModuleEnum.topaze: return self.__cluster_apis.rta_api.get_model_xml_for_analysis(self.__field_id, self.__file_id, self.__id, str(action.value)) else: raise Exception("Document is not a saphir or topaze document") def delete_well_intakes(self, dates: List[datetime]) -> None: if self.__file_type != KWModuleEnum.topaze: raise Exception("You can only work with well intakes in topaze document but this document is not a topaze document") dto = self.__dto_converter.get_delete_well_intake_dto(dates) self.__cluster_apis.rta_api.delete_well_intakes_in_analysis(self.__field_id, self.__file_id, self.__id, dto) def get_last_well_intake_xml(self) -> str: """ Returns the last well intake defined for the analysis in XML format """ if self.__file_type != KWModuleEnum.topaze: raise Exception("You can only work with well intakes in topaze document but this document is not a topaze document") self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.__file_id) return self.__cluster_apis.rta_api.get_last_well_intake_xml_for_analysis(self.__field_id, self.__file_id, self.__id) def get_well_intake_xml_for_analysis(self, start_date: Optional[datetime] = None, intake_name: Optional[str] = None) -> str: """ Returns the last well intake that corresponds to the start_date and intake_name. At least one of these two fields is mandatory. """ if self.__file_type != KWModuleEnum.topaze: raise Exception("You can only work with well intakes in topaze document but this document is not a topaze document") dto = {"wellIntakeStartDate": datetime_to_str(start_date), "wellIntakeName": intake_name} self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.__file_id) return self.__cluster_apis.rta_api.get_well_intake_xml_for_analysis(self.__field_id, self.__file_id, self.__id, dto) def set_well_intake_xml_for_analysis(self, well_intake_xml: str) -> None: """ Adds or updates a well intake with a given start date in the specified file and project, if a well intake with given start date already exists it updates it, updates the default well intake if start date is not defined, otherwise adds a new well intake """ if self.__file_type != KWModuleEnum.topaze: raise Exception("You can only work with well intakes in topaze document but this document is not a topaze document") dto = self.__dto_converter.get_well_intake_dto(well_intake_xml) self.__cluster_apis.rta_api.put_well_intake_xml_for_analysis(self.__field_id, self.__file_id, self.__id, dto) self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.__file_id) def get_keg5_from_project(self, with_wells: bool = True, with_faults: bool = True, with_pvt: bool = True, with_properties: bool = True, with_layers_regions: bool = True, with_contours: bool = True) -> str: """ Returns a representation of the run of a Rubis document in keg5 format""" dto = {"withWells": with_wells, "withFaults": with_faults, "withPvt": with_pvt, "withProperties": with_properties, "withLayersRegions": with_layers_regions, "withContours": with_contours} return self.__cluster_apis.num_api.get_keg5_from_project(self.__field_id, self.__file_id, self.__id, dto) def update_project_from_keg5(self, keg5_xml: str, with_pvt: bool = True, with_properties: bool = True, with_layers_regions: bool = True, with_contours: bool = True) -> None: """ Updates a Rubis run with a document in keg5 format""" dto = self.__dto_converter.get_keg5_dto(keg5_xml, with_pvt, with_properties, with_layers_regions, with_contours) self.__cluster_apis.num_api.update_project_from_keg5(self.__field_id, self.__file_id, self.__id, dto)