from typing import List, Optional, Dict, Any, cast
from datetime import datetime
from ._private._cluster_apis import ClusterAPIS
from ._private.dto_converters._document_dto_converter import DocumentDtoConverter
from ._private._decorators import retry
from ._private._kw_dto import AutomaticRtmParametersDto
from .analysis import Analysis
from .analysis_results import AnalysisResults
from .kw_module_enum import KWModuleEnum
from .model_action_enum import ModelActionEnum
from .datetime_utils import str_to_datetime
from .vector import Vector
from .script_result import ScriptResult
from .kw.model.model_parameters import ModelParameters
from .kw.model.model_parser import ModelParser
from .kw.model.model_parameter import ModelParameter
from .kw.well_intake.well_intake_type import WellIntakeType
from .kw.model_book.model_book_parameter import ModelBookParameter
from collections import Counter
[docs]
class Document:
"""
KW document object.
Presents a KW document that can be queried for contained analyses and results.
.. note::
Should not be instantiated directly.
.. note::
:py:obj:`Document.analyses` property is populated on-demand and is cached for the duration of the :class:`Connection`.
If you need to get actual values, use the :py:meth:`Document.update_analyses` method.
"""
def __init__(self,
field_id: str,
well_group_id: Optional[str],
well_id: Optional[str],
custom_workflow_id: Optional[str],
file_id: str,
name: str,
module_type: KWModuleEnum,
parent_ipta: Optional[str],
labels: List[str],
cluster_apis: ClusterAPIS,
document_dto_converter: DocumentDtoConverter):
self.__field_id: str = field_id
self.__well_group_id: Optional[str] = well_group_id
self.__well_id: Optional[str] = well_id
self.__custom_workflow_id: Optional[str] = custom_workflow_id
self.__file_id: str = file_id
self.__name: str = name
self.__type: KWModuleEnum = module_type
self.__analyses: List[Analysis] = []
self.__analysis_results: Optional[AnalysisResults] = None
self.__parent_ipta: Optional[str] = parent_ipta
self.__labels: List[str] = labels
self.__cluster_apis: ClusterAPIS = cluster_apis
self.__dto_converter: DocumentDtoConverter = document_dto_converter
@property
def parent_ipta(self) -> Optional[str]:
""" Gets the parent ipta of the KW :class:`Document` file.
"""
return self.__parent_ipta
@property
def file_id(self) -> str:
""" Gets the id of the KW :class:`Document` file.
"""
return self.__file_id
@property
def well_group_id(self) -> Optional[str]:
"""Gets the well_group_id of the KA :class:`File`.
"""
return self.__well_group_id
@property
def name(self) -> str:
""" Gets the name of the KW :class:`Document`.
"""
return self.__name
@property
def type(self) -> KWModuleEnum:
""" Gets the type of the KW module for this :class:`Document`.
"""
return self.__type
@property
def analyses(self) -> List[Analysis]:
""" Gets the list of analyses contained in this document that has log-log data inside.
.. note::
This property is populated on-demand and is cached for the duration of the :class:`Connection`
unless updated by calling the :py:meth:`Document.update_analyses` method.
Returns
-------
List [:class:`Analysis`]:
A list of Analysis.
"""
if len(self.__analyses) == 0:
self.refresh_analyses()
return self.__analyses
@property
def analysis_results(self) -> AnalysisResults:
""" Returns a wrapper for the "resultsSI.xml" in this document.
Returns
-------
:class:`AnalysisResults`:
AnalysisResults wrapper
"""
if self.__analysis_results is None:
self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.file_id)
if self.__type == KWModuleEnum.saphir:
self.__analysis_results = AnalysisResults(self.__cluster_apis.pta_api.get_model_xml_for_document(self.__field_id, self.file_id, "ResultsSI"))
elif self.__type == KWModuleEnum.topaze:
self.__analysis_results = AnalysisResults(self.__cluster_apis.pta_api.get_model_xml_for_document(self.__field_id, self.file_id, "ResultsSI"))
else:
raise Exception("Document is not a saphir or topaze document")
return self.__analysis_results
@property
def labels(self) -> List[str]:
""" Gets the list of labels of the KA :class:`Document`.
Returns
-------
List [str]:
List of labels.
"""
return self.__labels
def get_results_xml(self) -> str:
""" Returns the content of the "resultsSI.xml" in this document.
Returns
-------
str:
resultsSI xml
"""
self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.file_id)
if self.__type == KWModuleEnum.saphir:
return self.__cluster_apis.pta_api.get_model_xml_for_document(self.__field_id, self.file_id, 'ResultsSI')
elif self.__type == KWModuleEnum.topaze:
return self.__cluster_apis.rta_api.get_model_xml_for_document(self.__field_id, self.file_id, 'ResultsSI')
else:
raise Exception("Document is not a saphir or topaze document")
def get_results_xml_legacy(self) -> str:
""" Returns the content of the "results.xml" in this document.
Returns
-------
str:
results xml legacy
"""
return self.__cluster_apis.field_api.get_standard_results(self.__field_id, self.__file_id)
def refresh_analyses(self) -> List[Analysis]:
""" Refresh the cached value of the :py:obj:`Document.analyses` property.
"""
self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.__file_id)
analyses_dto = self.__cluster_apis.kw_api.get_analyses(self.__field_id, self.file_id)
analyses = self.__dto_converter.get_analyses_from_dto(self.__field_id, self.__well_group_id, self.__well_id, self.__custom_workflow_id, self.__file_id, self.__type, analyses_dto)
self.__analyses = analyses
self.__analysis_results = None
return analyses
def get_model_xml(self, action: ModelActionEnum = ModelActionEnum.model_generation) -> str:
""" Returns analysis model definition(s) in KW-KA exchange XML format for all analyses in the document.
Parameters
----------
action:
Type of Model Action
Returns
-------
str:
model xml
"""
if self.__type == KWModuleEnum.saphir:
return self.__cluster_apis.pta_api.get_model_xml_for_document(self.__field_id, self.file_id, str(action.value))
elif self.__type == KWModuleEnum.topaze:
return self.__cluster_apis.rta_api.get_model_xml_for_document(self.__field_id, self.file_id, str(action.value))
else:
raise Exception("Document is not a saphir or topaze document")
def set_model_xml(self, model_xml: str, action: ModelActionEnum = ModelActionEnum.model_generation) -> None:
""" Applies given analysis model definition(s) in KW-KA exchange XML format to the document.
Parameters
----------
model_xml:
Model xml as a string
action:
Type of Model Action.
"""
script = """Feature: Generate xml model
Scenario: Generate xml model or improve
Given a field <{}> and a well <{}>
And I load a document <{}>
Then I update parameters from xml file and do <{}>
And I reupload the updated document""".format(self.__field_id, self.__well_id, self.__file_id, str(action.value))
dto = self.__dto_converter.script_dto_converter.get_background_script_input_dto(self.__field_id, self.__well_id, self.__type, script, additional_content=model_xml, name="Set model XML")
script_result = self.__cluster_apis.automation_api.execute_background_script(dto)
if not script_result.is_success:
raise Exception(script_result.message)
self.refresh_analyses()
def get_pseudo_pressures(self, pressure_id: str, start_date: datetime = datetime(1900, 1, 1)) -> Vector:
"""Calculate pseudo-pressures from pressures
Parameters
----------
pressure_id:
Vector Id of the original pressures
start_date:
Start date of the pseudo pressures
Returns
-------
:class:`Vector`:
A Vector object which contains a list of pseudo-pressures dates and list of pseudo-pressures values
"""
dto = self.__dto_converter.get_pseudo_pressures_dto(self.__field_id, self.__well_id, pressure_id, start_date)
vector_dto = self.__cluster_apis.rta_api.get_pseudo_pressures(self.file_id, dto)
return Vector([cast(datetime, str_to_datetime(x)) for x in vector_dto.dates], vector_dto.values)
def reset_pressure(self, pressure_id: str) -> None:
""" Reset pressures values
Parameters
----------
pressure_id:
Vector Id of the pressures
"""
script = """Feature: Generate model with new pressures
Scenario: Generate xml model with new pressures
Given a field <{}> and a well <{}>
And I load a document <{}>
And I load or update a pressure <{}>
Then I reupload the updated document""".format(self.__field_id, self.__well_id, self.__file_id, pressure_id)
dto = self.__dto_converter.script_dto_converter.get_background_script_input_dto(self.__field_id, self.__well_id, self.__type, script)
script_result = self.__cluster_apis.automation_api.execute_background_script(dto)
if not script_result.is_success:
raise Exception(script_result.message)
def reset_rate(self, rates_id: str, rate_type: str) -> None:
""" Reset rates values
Parameters
----------
rates_id:
Vector Id of the rate
rate_type:
Type of the rate to reset
"""
if self.__well_id is None:
raise ValueError("You cannot run this scenario under a field, upload the document under a well")
script = self.__dto_converter.script_dto_converter.get_reset_rate_script(self.__field_id, self.__well_id, self.__file_id, rates_id, rate_type)
dto = self.__dto_converter.script_dto_converter.get_background_script_input_dto(self.__field_id, self.__well_id, self.__type, script)
script_result = self.__cluster_apis.automation_api.execute_background_script(dto)
if not script_result.is_success:
raise Exception(script_result.message)
def run_kw_script(self,
script: str,
additional_content: Optional[str] = None,
name: Optional[str] = "Custom KW script",
time_to_live: float = 0.0001) -> ScriptResult:
""" Runs a KW script.
Parameters
----------
script:
The KW script to execute.
time_to_live:
Life duration of the script in days
additional_content:
Additional content that is supplied with the script (optional).
name:
The name of the script (optional).
Returns
-------
:class:`ScriptResult`:
tells whether the KW script was successfully run, and contains error information (if any).
"""
dto = self.__dto_converter.script_dto_converter.get_background_script_input_dto(self.__field_id, self.__well_id, self.__type, script, time_to_live, additional_content, name)
return self.__cluster_apis.automation_api.execute_background_script(dto)
def rename(self, name: str) -> None:
""" Rename this document
Parameters
----------
name:
new name of the document
"""
dto = {'name': f'{name}'}
self.__name = self.__cluster_apis.field_api.rename_file(self.__field_id, self.__well_id, self.__file_id, dto)
def copy(self, new_name: Optional[str] = None, field_id: Optional[str] = None, well_id: Optional[str] = None, well_group_id: Optional[str] = None, user_task_id: Optional[str] = None) -> str:
""" Copy this document to the current file folder or to another field/well file folder
Parameters
----------
new_name:
Use this parameter if you want to rename the copied document
field_id:
Specify the field id to copy the file under a different field folder
well_id:
Specify the well id to copy the file under a different field folder
well_group_id:
Specify the well group id to copy the file under a well group
user_task_id:
Specify the user task id to copy the file under a user task
Returns
-------
str:
id of the new copied file
"""
field_id = self.__field_id if field_id is None else field_id
well_id = self.__well_id if well_id is None else well_id
if new_name is None:
new_file_id = self.__cluster_apis.field_api.copy_file(field_id, well_id, well_group_id, user_task_id, self.__file_id).id
else:
new_file_id = self.__cluster_apis.field_api.copy_and_rename_file(field_id, well_id, well_group_id, user_task_id, self.file_id, new_name).id
return new_file_id
def add_labels(self, labels: List[str]) -> None:
"""Add labels to this document
Parameters
----------
labels:
List of labels to add to the document
"""
dto = {"userDefinedLabels": labels}
self.__cluster_apis.field_api.add_labels(self.__field_id, self.__well_id, self.__file_id, "file", dto)
self.__labels.extend(labels)
def get_improve_parameters(self) -> List[Dict[str, Any]]:
""" Get improve parameters to run an IPTA or IRTA
Returns
-------
List[Dict[str, Any]]:
List of improve parameters
"""
return self.__cluster_apis.kw_api.get_improve_parameters(self.__field_id, self.__file_id, self.__type)
def delete_analyses(self, analysis_names: List[str]) -> None:
""" Delete analyses from the document
Parameters
----------
analysis_names:
Name of analyses to delete
"""
analyses_id_to_remove = []
for analysis_name in analysis_names:
analysis = next(x for x in self.analyses if x.name == analysis_name)
analyses_id_to_remove.append(analysis.id)
self.__analyses.remove(analysis)
script = self.__dto_converter.script_dto_converter.get_delete_analyses_script(self.__field_id, self.__well_id, self.__file_id, analyses_id_to_remove)
dto = self.__dto_converter.script_dto_converter.get_background_script_input_dto(self.__field_id, self.__well_id, self.__type, script)
script_result = self.__cluster_apis.automation_api.execute_background_script(dto)
if not script_result.is_success:
raise Exception(script_result.message)
def copy_analysis(self, reference_analysis_name: str, new_names: List[str]) -> None:
""" Copy the current Analysis from the document
Parameters
----------
reference_analysis_name:
Name of the reference analysis
new_names:
List of copied analysis names
Returns
----------
List[:class:`Analysis`]:
Updated list of analyses in this document
"""
reference_analysis = next(x for x in self.__analyses if x.name == reference_analysis_name)
script = self.__dto_converter.script_dto_converter.get_copy_analyses_script(self.__field_id, self.__well_id, self.__file_id, reference_analysis.id, new_names)
dto = self.__dto_converter.script_dto_converter.get_background_script_input_dto(self.__field_id, self.__well_id, self.__type, script)
script_result = self.__cluster_apis.automation_api.execute_background_script(dto)
if not script_result.is_success:
raise Exception(script_result.message)
self.refresh_analyses()
def get_log_log_data(self) -> List[Analysis]:
""" Get all loglog plots from all analyses in the document"""
self.__cluster_apis.automation_api.wait_for_document_processing(self.__field_id, self.__file_id)
log_log_plots_dto = self.__cluster_apis.kw_api.get_all_log_log_plots_data(self.__field_id, self.__file_id)
if log_log_plots_dto is None:
return []
self.__analyses = self.__dto_converter.get_analyses_from_plot_dto(self.__field_id, self.__well_group_id, self.__well_id, self.__custom_workflow_id, self.__file_id, self.__type, log_log_plots_dto)
return self.__analyses
@retry(tries=3, delay=60)
def update_models_parameters(self, models_parameters: List[ModelParameters], model_xml: Optional[str] = None) -> None:
""" Update all xml model in the document"""
model_xml = self.__update_xml_model(models_parameters, model_xml)
self.set_model_xml(model_xml, ModelActionEnum.model_generation)
def __update_xml_model(self, models_parameters: List[ModelParameters], model_xml: Optional[str] = None) -> str:
parsed_model_xml = ModelParser(self.get_model_xml()) if model_xml is None else ModelParser(model_xml)
analyses = [x.id for x in self.analyses]
counter = Counter(analyses + [x.analysis_id for x in models_parameters])
analyses_to_remove = [x for x, count in counter.items() if count == 1]
for analysis in analyses_to_remove:
parsed_model_xml.remove_analysis(analysis)
for model in models_parameters:
for parameter in model.parameters:
parsed_model_xml.set_parameter_value(model.analysis_id, cast(ModelParameter, parameter["Parameter"]), str(parameter["Value"]))
return parsed_model_xml.export()
def delete(self) -> None:
""" Delete the current document"""
self.__cluster_apis.field_api.delete_file(self.__field_id, self.__well_id, self.file_id)
def get_well_intake_template_xml(self, intake_type: WellIntakeType) -> str:
"""Get a well intake template xml
Parameters
----------
intake_type: Well intake type
Returns
----------
str:
well intake template xml as string
"""
if self.__type != KWModuleEnum.topaze:
raise Exception("You can only work with well intakes in topaze document but this document is not a topaze document")
return self.__cluster_apis.rta_api.get_well_intake_template_xml(str(intake_type.value))
def get_model_book_parameters(self) -> List[ModelBookParameter]:
if self.__type == KWModuleEnum.topaze:
model_parameters_dto = self.__cluster_apis.rta_api.get_model_book_parameters(self.__field_id, self.__file_id).parameters
elif self.__type == KWModuleEnum.saphir:
model_parameters_dto = self.__cluster_apis.pta_api.get_model_book_parameters(self.__field_id, self.__file_id).parameters
else:
raise Exception("The seed document mush be a Saphir or Topaze document")
return self.__dto_converter.build_model_book_parameters_from_model_book_parameters_dto(model_parameters_dto)
def get_artm_parameters(self) -> AutomaticRtmParametersDto:
if self.__type != KWModuleEnum.topaze:
raise Exception("The seed document mush be a Topaze document")
return self.__cluster_apis.kw_api.get_artm_parameters(self.__field_id, self.__file_id)