Source code for kappa_sdk.document

from typing import List, Optional, Dict, Any, cast
from datetime import datetime
from ._private._cluster_apis import ClusterAPIS
from ._private.dto_converters._document_dto_converter import DocumentDtoConverter
from ._private._decorators import retry
from ._private._kw_dto import AutomaticRtmParametersDto
from .analysis import Analysis
from .analysis_results import AnalysisResults
from .kw_module_enum import KWModuleEnum
from .model_action_enum import ModelActionEnum
from .datetime_utils import str_to_datetime
from .vector import Vector
from .script_result import ScriptResult
from .kw.model.model_parameters import ModelParameters
from .kw.model.model_parser import ModelParser
from .kw.model.model_parameter import ModelParameter
from .kw.well_intake.well_intake_type import WellIntakeType
from .kw.model_book.model_book_parameter import ModelBookParameter
from collections import Counter


[docs] class Document: """ KW document object. Presents a KW document that can be queried for contained analyses and results. .. note:: Should not be instantiated directly. .. note:: :py:obj:`Document.analyses` property is populated on-demand and is cached for the duration of the :class:`Connection`. If you need to get actual values, use the :py:meth:`Document.update_analyses` method. """ def __init__(self, field_id: str, well_group_id: Optional[str], well_id: Optional[str], custom_workflow_id: Optional[str], file_id: str, name: str, module_type: KWModuleEnum, parent_ipta: Optional[str], labels: List[str], cluster_apis: ClusterAPIS, document_dto_converter: DocumentDtoConverter): self.__field_id: str = field_id self.__well_group_id: Optional[str] = well_group_id self.__well_id: Optional[str] = well_id self.__custom_workflow_id: Optional[str] = custom_workflow_id self.__file_id: str = file_id self.__name: str = name self.__type: KWModuleEnum = module_type self.__analyses: List[Analysis] = [] self.__analysis_results: Optional[AnalysisResults] = None self.__parent_ipta: Optional[str] = parent_ipta self.__labels: List[str] = labels self.__cluster_apis: ClusterAPIS = cluster_apis self.__dto_converter: DocumentDtoConverter = document_dto_converter @property def parent_ipta(self) -> Optional[str]: """ Gets the parent ipta of the KW :class:`Document` file. """ return self.__parent_ipta @property def file_id(self) -> str: """ Gets the id of the KW :class:`Document` file. """ return self.__file_id @property def well_group_id(self) -> Optional[str]: """Gets the well_group_id of the KA :class:`File`. """ return self.__well_group_id @property def name(self) -> str: """ Gets the name of the KW :class:`Document`. """ return self.__name @property def type(self) -> KWModuleEnum: """ Gets the type of the KW module for this :class:`Document`. """ return self.__type @property def analyses(self) -> List[Analysis]: """ Gets the list of analyses contained in this document that has log-log data inside. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection` unless updated by calling the :py:meth:`Document.update_analyses` method. Returns ------- List [:class:`Analysis`]: A list of Analysis. """ if len(self.__analyses) == 0: self.refresh_analyses() return self.__analyses @property def analysis_results(self) -> AnalysisResults: """ Returns a wrapper for the "resultsSI.xml" in this document. Returns ------- :class:`AnalysisResults`: AnalysisResults wrapper """ if self.__analysis_results is None: self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.file_id) if self.__type == KWModuleEnum.saphir: self.__analysis_results = AnalysisResults(self.__cluster_apis.pta_api.get_model_xml_for_document(self.__field_id, self.file_id, "ResultsSI")) elif self.__type == KWModuleEnum.topaze: self.__analysis_results = AnalysisResults(self.__cluster_apis.pta_api.get_model_xml_for_document(self.__field_id, self.file_id, "ResultsSI")) else: raise Exception("Document is not a saphir or topaze document") return self.__analysis_results @property def labels(self) -> List[str]: """ Gets the list of labels of the KA :class:`Document`. Returns ------- List [str]: List of labels. """ return self.__labels def get_results_xml(self) -> str: """ Returns the content of the "resultsSI.xml" in this document. Returns ------- str: resultsSI xml """ self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.file_id) if self.__type == KWModuleEnum.saphir: return self.__cluster_apis.pta_api.get_model_xml_for_document(self.__field_id, self.file_id, 'ResultsSI') elif self.__type == KWModuleEnum.topaze: return self.__cluster_apis.rta_api.get_model_xml_for_document(self.__field_id, self.file_id, 'ResultsSI') else: raise Exception("Document is not a saphir or topaze document") def get_results_xml_legacy(self) -> str: """ Returns the content of the "results.xml" in this document. Returns ------- str: results xml legacy """ return self.__cluster_apis.field_api.get_standard_results(self.__field_id, self.__file_id) def refresh_analyses(self) -> List[Analysis]: """ Refresh the cached value of the :py:obj:`Document.analyses` property. """ self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.__file_id) analyses_dto = self.__cluster_apis.kw_api.get_analyses(self.__field_id, self.file_id) analyses = self.__dto_converter.get_analyses_from_dto(self.__field_id, self.__well_group_id, self.__well_id, self.__custom_workflow_id, self.__file_id, self.__type, analyses_dto) self.__analyses = analyses self.__analysis_results = None return analyses def get_model_xml(self, action: ModelActionEnum = ModelActionEnum.model_generation) -> str: """ Returns analysis model definition(s) in KW-KA exchange XML format for all analyses in the document. Parameters ---------- action: Type of Model Action Returns ------- str: model xml """ if self.__type == KWModuleEnum.saphir: return self.__cluster_apis.pta_api.get_model_xml_for_document(self.__field_id, self.file_id, str(action.value)) elif self.__type == KWModuleEnum.topaze: return self.__cluster_apis.rta_api.get_model_xml_for_document(self.__field_id, self.file_id, str(action.value)) else: raise Exception("Document is not a saphir or topaze document") def set_model_xml(self, model_xml: str, action: ModelActionEnum = ModelActionEnum.model_generation) -> None: """ Applies given analysis model definition(s) in KW-KA exchange XML format to the document. Parameters ---------- model_xml: Model xml as a string action: Type of Model Action. """ script = """Feature: Generate xml model Scenario: Generate xml model or improve Given a field <{}> and a well <{}> And I load a document <{}> Then I update parameters from xml file and do <{}> And I reupload the updated document""".format(self.__field_id, self.__well_id, self.__file_id, str(action.value)) dto = self.__dto_converter.script_dto_converter.get_background_script_input_dto(self.__field_id, self.__well_id, self.__type, script, additional_content=model_xml, name="Set model XML") script_result = self.__cluster_apis.automation_api.execute_background_script(dto) if not script_result.is_success: raise Exception(script_result.message) self.refresh_analyses() def get_pseudo_pressures(self, pressure_id: str, start_date: datetime = datetime(1900, 1, 1)) -> Vector: """Calculate pseudo-pressures from pressures Parameters ---------- pressure_id: Vector Id of the original pressures start_date: Start date of the pseudo pressures Returns ------- :class:`Vector`: A Vector object which contains a list of pseudo-pressures dates and list of pseudo-pressures values """ dto = self.__dto_converter.get_pseudo_pressures_dto(self.__field_id, self.__well_id, pressure_id, start_date) vector_dto = self.__cluster_apis.rta_api.get_pseudo_pressures(self.file_id, dto) return Vector([cast(datetime, str_to_datetime(x)) for x in vector_dto.dates], vector_dto.values) def reset_pressure(self, pressure_id: str) -> None: """ Reset pressures values Parameters ---------- pressure_id: Vector Id of the pressures """ script = """Feature: Generate model with new pressures Scenario: Generate xml model with new pressures Given a field <{}> and a well <{}> And I load a document <{}> And I load or update a pressure <{}> Then I reupload the updated document""".format(self.__field_id, self.__well_id, self.__file_id, pressure_id) dto = self.__dto_converter.script_dto_converter.get_background_script_input_dto(self.__field_id, self.__well_id, self.__type, script) script_result = self.__cluster_apis.automation_api.execute_background_script(dto) if not script_result.is_success: raise Exception(script_result.message) def reset_rate(self, rates_id: str, rate_type: str) -> None: """ Reset rates values Parameters ---------- rates_id: Vector Id of the rate rate_type: Type of the rate to reset """ if self.__well_id is None: raise ValueError("You cannot run this scenario under a field, upload the document under a well") script = self.__dto_converter.script_dto_converter.get_reset_rate_script(self.__field_id, self.__well_id, self.__file_id, rates_id, rate_type) dto = self.__dto_converter.script_dto_converter.get_background_script_input_dto(self.__field_id, self.__well_id, self.__type, script) script_result = self.__cluster_apis.automation_api.execute_background_script(dto) if not script_result.is_success: raise Exception(script_result.message) def run_kw_script(self, script: str, additional_content: Optional[str] = None, name: Optional[str] = "Custom KW script", time_to_live: float = 0.0001) -> ScriptResult: """ Runs a KW script. Parameters ---------- script: The KW script to execute. time_to_live: Life duration of the script in days additional_content: Additional content that is supplied with the script (optional). name: The name of the script (optional). Returns ------- :class:`ScriptResult`: tells whether the KW script was successfully run, and contains error information (if any). """ dto = self.__dto_converter.script_dto_converter.get_background_script_input_dto(self.__field_id, self.__well_id, self.__type, script, time_to_live, additional_content, name) return self.__cluster_apis.automation_api.execute_background_script(dto) def rename(self, name: str) -> None: """ Rename this document Parameters ---------- name: new name of the document """ dto = {'name': f'{name}'} self.__name = self.__cluster_apis.field_api.rename_file(self.__field_id, self.__well_id, self.__file_id, dto) def copy(self, new_name: Optional[str] = None, field_id: Optional[str] = None, well_id: Optional[str] = None, well_group_id: Optional[str] = None, user_task_id: Optional[str] = None) -> str: """ Copy this document to the current file folder or to another field/well file folder Parameters ---------- new_name: Use this parameter if you want to rename the copied document field_id: Specify the field id to copy the file under a different field folder well_id: Specify the well id to copy the file under a different field folder well_group_id: Specify the well group id to copy the file under a well group user_task_id: Specify the user task id to copy the file under a user task Returns ------- str: id of the new copied file """ field_id = self.__field_id if field_id is None else field_id well_id = self.__well_id if well_id is None else well_id if new_name is None: new_file_id = self.__cluster_apis.field_api.copy_file(field_id, well_id, well_group_id, user_task_id, self.__file_id).id else: new_file_id = self.__cluster_apis.field_api.copy_and_rename_file(field_id, well_id, well_group_id, user_task_id, self.file_id, new_name).id return new_file_id def add_labels(self, labels: List[str]) -> None: """Add labels to this document Parameters ---------- labels: List of labels to add to the document """ dto = {"userDefinedLabels": labels} self.__cluster_apis.field_api.add_labels(self.__field_id, self.__well_id, self.__file_id, "file", dto) self.__labels.extend(labels) def get_improve_parameters(self) -> List[Dict[str, Any]]: """ Get improve parameters to run an IPTA or IRTA Returns ------- List[Dict[str, Any]]: List of improve parameters """ return self.__cluster_apis.kw_api.get_improve_parameters(self.__field_id, self.__file_id, self.__type) def delete_analyses(self, analysis_names: List[str]) -> None: """ Delete analyses from the document Parameters ---------- analysis_names: Name of analyses to delete """ analyses_id_to_remove = [] for analysis_name in analysis_names: analysis = next(x for x in self.analyses if x.name == analysis_name) analyses_id_to_remove.append(analysis.id) self.__analyses.remove(analysis) script = self.__dto_converter.script_dto_converter.get_delete_analyses_script(self.__field_id, self.__well_id, self.__file_id, analyses_id_to_remove) dto = self.__dto_converter.script_dto_converter.get_background_script_input_dto(self.__field_id, self.__well_id, self.__type, script) script_result = self.__cluster_apis.automation_api.execute_background_script(dto) if not script_result.is_success: raise Exception(script_result.message) def copy_analysis(self, reference_analysis_name: str, new_names: List[str]) -> None: """ Copy the current Analysis from the document Parameters ---------- reference_analysis_name: Name of the reference analysis new_names: List of copied analysis names Returns ---------- List[:class:`Analysis`]: Updated list of analyses in this document """ reference_analysis = next(x for x in self.__analyses if x.name == reference_analysis_name) script = self.__dto_converter.script_dto_converter.get_copy_analyses_script(self.__field_id, self.__well_id, self.__file_id, reference_analysis.id, new_names) dto = self.__dto_converter.script_dto_converter.get_background_script_input_dto(self.__field_id, self.__well_id, self.__type, script) script_result = self.__cluster_apis.automation_api.execute_background_script(dto) if not script_result.is_success: raise Exception(script_result.message) self.refresh_analyses() def get_log_log_data(self) -> List[Analysis]: """ Get all loglog plots from all analyses in the document""" self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.__file_id) log_log_plots_dto = self.__cluster_apis.kw_api.get_all_log_log_plots_data(self.__field_id, self.__file_id) if log_log_plots_dto is None: return [] self.__analyses = self.__dto_converter.get_analyses_from_plot_dto(self.__field_id, self.__well_group_id, self.__well_id, self.__custom_workflow_id, self.__file_id, self.__type, log_log_plots_dto) return self.__analyses @retry(tries=3, delay=60) def update_models_parameters(self, models_parameters: List[ModelParameters], model_xml: Optional[str] = None) -> None: """ Update all xml model in the document""" model_xml = self.__update_xml_model(models_parameters, model_xml) self.set_model_xml(model_xml, ModelActionEnum.model_generation) def __update_xml_model(self, models_parameters: List[ModelParameters], model_xml: Optional[str] = None) -> str: parsed_model_xml = ModelParser(self.get_model_xml()) if model_xml is None else ModelParser(model_xml) analyses = [x.id for x in self.analyses] counter = Counter(analyses + [x.analysis_id for x in models_parameters]) analyses_to_remove = [x for x, count in counter.items() if count == 1] for analysis in analyses_to_remove: parsed_model_xml.remove_analysis(analysis) for model in models_parameters: for parameter in model.parameters: parsed_model_xml.set_parameter_value(model.analysis_id, cast(ModelParameter, parameter["Parameter"]), str(parameter["Value"])) return parsed_model_xml.export() def delete(self) -> None: """ Delete the current document""" self.__cluster_apis.field_api.delete_file(self.__field_id, self.__well_id, self.file_id) def get_well_intake_template_xml(self, intake_type: WellIntakeType) -> str: """Get a well intake template xml Parameters ---------- intake_type: Well intake type Returns ---------- str: well intake template xml as string """ if self.__type != KWModuleEnum.topaze: raise Exception("You can only work with well intakes in topaze document but this document is not a topaze document") return self.__cluster_apis.rta_api.get_well_intake_template_xml(str(intake_type.value)) def get_model_book_parameters(self) -> List[ModelBookParameter]: if self.__type == KWModuleEnum.topaze: model_parameters_dto = self.__cluster_apis.rta_api.get_model_book_parameters(self.__field_id, self.__file_id).parameters elif self.__type == KWModuleEnum.saphir: model_parameters_dto = self.__cluster_apis.pta_api.get_model_book_parameters(self.__field_id, self.__file_id).parameters else: raise Exception("The seed document mush be a Saphir or Topaze document") return self.__dto_converter.build_model_book_parameters_from_model_book_parameters_dto(model_parameters_dto) def get_artm_parameters(self) -> AutomaticRtmParametersDto: if self.__type != KWModuleEnum.topaze: raise Exception("The seed document mush be a Topaze document") return self.__cluster_apis.kw_api.get_artm_parameters(self.__field_id, self.__file_id)