from __future__ import annotations
from typing import List, Optional, Dict
from datetime import datetime
from .model_action_enum import ModelActionEnum
from .kw_module_enum import KWModuleEnum
from .document_vector import DocumentVector
from ._private._cluster_apis import ClusterAPIS
from ._private.dto_converters._analysis_dto_converter import AnalysisDtoConverter
from .datetime_utils import datetime_to_str
[docs]
class Analysis:
""" KW analysis object.
Presents an analysis from the KW document that can be queried for a contained log-log data.
.. note:: Should not be instantiated directly.
"""
def __init__(self, field_id: str, well_group_id: Optional[str], well_id: Optional[str], custom_workflow_id: Optional[str], file_id: str, analysis_id: str, name: str, gof: Optional[float], file_type: KWModuleEnum, cluster_apis: ClusterAPIS,
analysis_dto_converter: AnalysisDtoConverter, log_log_data: Optional[List[DocumentVector]] = None) -> None:
self.__field_id: str = field_id
self.__well_group_id: Optional[str] = well_group_id
self.__well_id: Optional[str] = well_id
self.__custom_workflow_id: Optional[str] = custom_workflow_id
self.__file_id: str = file_id
self.__id: str = analysis_id
self.__name: str = name
self.__gof: Optional[float] = gof
self.__file_type: KWModuleEnum = file_type
self.__cluster_apis: ClusterAPIS = cluster_apis
self.__dto_converter: AnalysisDtoConverter = analysis_dto_converter
self.__plot_data_cache: Dict[str, List[DocumentVector]] = {}
if log_log_data is not None:
self.__plot_data_cache["LogLog"] = log_log_data
@property
def id(self) -> str:
""" Gets the id of the :class:`Analysis` object."""
return self.__id
@property
def name(self) -> str:
""" Gets the name of the :class:`Analysis`."""
return self.__name
@property
def gof(self) -> Optional[float]:
""" Gets the goodness of fit of the :class:`Analysis` if it exists."""
return self.__gof
@property
def log_log_data(self) -> List[DocumentVector]:
""" Returns a list of document vectors contained in this :class:`Analysis`.
"""
return self.get_plot_data("LogLog")
def refresh_log_log_data(self) -> List[DocumentVector]:
""" Refresh the cached value of the :py:obj:`Analysis.log_log_data` property.
"""
return self.get_plot_data("LogLog", use_cache=False)
def rename(self, new_name: str) -> None:
""" Rename the Analysis"""
script = self.__dto_converter.script_dto_converter.get_rename_analysis_script(new_name, self.__field_id, self.__well_id, self.__file_id, self.__id)
dto = self.__dto_converter.script_dto_converter.get_background_script_input_dto(self.__field_id, self.__well_id, self.__file_type, script)
script_result = self.__cluster_apis.automation_api.execute_background_script(dto)
if not script_result.is_success:
raise Exception(script_result.message)
self.__name = new_name
def get_plot_data(self, plot_type: str, use_cache: bool = True) -> List[DocumentVector]:
""" Returns a list of document vectors contained in a given plot type of this :class:`Analysis`.
"""
if use_cache and plot_type in self.__plot_data_cache:
return self.__plot_data_cache[plot_type]
self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.__file_id)
plot_data_dto = self.__cluster_apis.kw_api.get_plot_data(self.__field_id, self.__file_id, self.id, plot_type)
if plot_data_dto is None:
return []
plot_data = self.__dto_converter.get_plot_data_from_plot_dto(plot_data_dto.dataXyDtos)
self.__plot_data_cache[plot_type] = plot_data
return plot_data
def get_model_xml(self, action: ModelActionEnum) -> str:
""" Returns model definition in KW-KA exchange XML format for this analysis.
"""
self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.__file_id)
if self.__file_type == KWModuleEnum.saphir:
return self.__cluster_apis.pta_api.get_model_xml_for_analysis(self.__field_id, self.__file_id, self.__id, str(action.value))
elif self.__file_type == KWModuleEnum.topaze:
return self.__cluster_apis.rta_api.get_model_xml_for_analysis(self.__field_id, self.__file_id, self.__id, str(action.value))
else:
raise Exception("Document is not a saphir or topaze document")
def delete_well_intakes(self, dates: List[datetime]) -> None:
if self.__file_type != KWModuleEnum.topaze:
raise Exception("You can only work with well intakes in topaze document but this document is not a topaze document")
dto = self.__dto_converter.get_delete_well_intake_dto(dates)
self.__cluster_apis.rta_api.delete_well_intakes_in_analysis(self.__field_id, self.__file_id, self.__id, dto)
def get_last_well_intake_xml(self) -> str:
""" Returns the last well intake defined for the analysis in XML format
"""
if self.__file_type != KWModuleEnum.topaze:
raise Exception("You can only work with well intakes in topaze document but this document is not a topaze document")
self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.__file_id)
return self.__cluster_apis.rta_api.get_last_well_intake_xml_for_analysis(self.__field_id, self.__file_id, self.__id)
def get_well_intake_xml_for_analysis(self, start_date: Optional[datetime] = None, intake_name: Optional[str] = None) -> str:
""" Returns the last well intake that corresponds to the start_date and intake_name. At least one of these two fields is mandatory.
"""
if self.__file_type != KWModuleEnum.topaze:
raise Exception("You can only work with well intakes in topaze document but this document is not a topaze document")
dto = {"wellIntakeStartDate": datetime_to_str(start_date), "wellIntakeName": intake_name}
self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.__file_id)
return self.__cluster_apis.rta_api.get_well_intake_xml_for_analysis(self.__field_id, self.__file_id, self.__id, dto)
def set_well_intake_xml_for_analysis(self, well_intake_xml: str) -> None:
""" Adds or updates a well intake with a given start date in the specified file and project, if a well intake with given start date already exists it updates it, updates the default well intake if start date is not defined, otherwise adds a new well intake
"""
if self.__file_type != KWModuleEnum.topaze:
raise Exception("You can only work with well intakes in topaze document but this document is not a topaze document")
dto = self.__dto_converter.get_well_intake_dto(well_intake_xml)
self.__cluster_apis.rta_api.put_well_intake_xml_for_analysis(self.__field_id, self.__file_id, self.__id, dto)
self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.__file_id)
def get_keg5_from_project(self, with_wells: bool = True, with_faults: bool = True, with_pvt: bool = True, with_properties: bool = True,
with_layers_regions: bool = True, with_contours: bool = True) -> str:
""" Returns a representation of the run of a Rubis document in keg5 format"""
dto = {"withWells": with_wells, "withFaults": with_faults, "withPvt": with_pvt, "withProperties": with_properties,
"withLayersRegions": with_layers_regions, "withContours": with_contours}
return self.__cluster_apis.num_api.get_keg5_from_project(self.__field_id, self.__file_id, self.__id, dto)
def update_project_from_keg5(self, keg5_xml: str, with_pvt: bool = True, with_properties: bool = True, with_layers_regions: bool = True,
with_contours: bool = True) -> None:
""" Updates a Rubis run with a document in keg5 format"""
dto = self.__dto_converter.get_keg5_dto(keg5_xml, with_pvt, with_properties, with_layers_regions, with_contours)
self.__cluster_apis.num_api.update_project_from_keg5(self.__field_id, self.__file_id, self.__id, dto)