from ._private._cluster_apis import ClusterAPIS
from ._private._well_dto import WellDtoWithHierarchy, PlotPropertiesDto, WellParamDto
from ._private.dto_converters._well_dto_converter import WellDtoConverter
from datetime import datetime
from typing import List, Optional, cast, Union
from .production_folder_kind_enum import ProductionFolderKindEnum
from .custom_workflow import CustomWorkflow
from .field_data_types_catalog import FieldDataTypesCatalog
from .field_well_properties_catalog import FieldWellPropertiesCatalog
from .data import Data
from .file import File
from .document import Document
from .kw_module_enum import KWModuleEnum
from .analysis import Analysis
from .kw.model_book.model_book import ModelBook
from .user_task import UserTask
from .incremental_pta import IncrementalPTA
from .save_strategy_enum import SaveStrategyEnum
from .well_production_enum import WellProductionTypeEnum
from .workflow_type import IncrementalPTAType, IncrementalRTAType
from .incremental_rta import IncrementalRTA
from .well_property_container import WellPropertyContainer
from .plot import Plot
from .gauge_loading_strategy import GaugeLoadingStrategy
from .workflow_settings import WorkflowImproveSettings
from .measure_enum import MeasureEnum
from .unit_enum import UnitEnum
from .time_format_enum import TimeFormatEnum
from .pta_extraction import PTAExtraction
from .shutin import ShutIn
from .wellbore import Wellbore
from .production_folder import ProductionFolder
import uuid
from .look_for_enum import LookForEnum
from .input_type_enum import InputTypeEnum
from .shut_in_types_enum import ShutInTypesEnum
from .shut_in_categories_enum import ShutInCategoriesEnum
from .replace_by_enum import ReplaceByEnum
from .rate_correction_option_enum import RateCorrectionOptionEnum
from .simplify_method_enum import SimplifyMethodEnum
from .filter_type_enum import FilterTypeEnum
from .denoising_pre_sampling_type_enum import DenoisingPreSamplingTypeEnum
from .denoising_threshold_type_enum import DenoisingThresholdTypeEnum
from .pvt.pvt import PVT
from .file_folder import FileFolder
from .file_folders_extensions import find_file_folders_recursively_by_name, find_file_folder_recursively_by_id, find_file_folder_recursively_by_file
from .survey import Survey
from .well_logs import WellLogs
from .vs_depth_vector import VsDepthVector
from .forward_rates_outputs import ForwardRatesOutputs
from .gas_oil_type_enum import GasOilTypeEnum
from .unit_system_pvt_enum import UnitSystemPvtEnum
from .wellbore.wellbore_perforation import WellborePerforation
from .wellbore.wellbore_geometry import WellboreGeometry
from .data_folder import DataFolder
from .data_folder_extensions import find_data_folder_recursively_by_id, find_data_folders_recursively_by_name
[docs]
class Well:
""" Well object.
Presents a KAPPA Automate well object that can be queried for contained data, documents and every existing object under the well.
Returned as a result of the :py:obj:`Field.wells` query.
.. note:: Should not be instantiated directly.
.. note::
:py:obj:`Well.data`, :py:obj:`Well.shutin`, and :py:obj:`Well.documents` properties are populated on-demand
and are cached for the duration of the :class:`Connection`.
"""
def __init__(self,
field_id: str,
well_group_id: Optional[str],
well_id: str,
name: str,
uwi: Optional[str],
data_types_catalog: FieldDataTypesCatalog,
well_properties_catalog: FieldWellPropertiesCatalog,
cluster_apis: ClusterAPIS,
dto_converter: WellDtoConverter) -> None:
self.__field_id: str = field_id
self.__id: str = well_id
self.__well_group_id: Optional[str] = well_group_id
self.__name: str = name
self.__uwi: Optional[str] = uwi
self.__dto_converter: WellDtoConverter = dto_converter
self.__cluster_apis: ClusterAPIS = cluster_apis
self.__data_types_catalog: FieldDataTypesCatalog = data_types_catalog
self.__well_properties_catalog: FieldWellPropertiesCatalog = well_properties_catalog
self.__labels: Optional[List[str]] = None
self.__production_folders: Optional[List[ProductionFolder]] = None
self.__corrected_production_folders: Optional[List[ProductionFolder]] = None
self.__data: Optional[List[Data]] = None
self.__files: Optional[List[File]] = None
self.__user_tasks: Optional[List[UserTask]] = None
self.__functions: Optional[List[Data]] = None
self.__incremental_pta_workflows: Optional[List[IncrementalPTA]] = None
self.__incremental_rta_workflows: Optional[List[IncrementalRTA]] = None
self.__well_dto: Optional[WellDtoWithHierarchy] = None
self.__shut_in: Optional[Data] = None
self.__well_property_containers: Optional[List[WellPropertyContainer]] = None
self.__well_plots: Optional[List[Plot]] = None
self.__gauges: Optional[List[Data]] = None
self.__filters: Optional[List[Data]] = None
self.__productions: Optional[List[Data]] = None
self.__corrected_productions: Optional[List[Data]] = None
self.__file_folders: Optional[List[FileFolder]] = None
self.__well_info_dto: Optional[List[WellParamDto]] = None
self.__production_type: Optional[WellProductionTypeEnum] = None
self.__wellbore: Optional[Wellbore] = None
self.__custom_workflows: Optional[List[CustomWorkflow]] = None
self.__pvts: Optional[List[PVT]] = None
self.__surveys: Optional[List[Survey]] = None
self.__model_books: Optional[List[ModelBook]] = None
self.__well_logs: Optional[List[WellLogs]] = None
self.__data_folders: Optional[List[DataFolder]] = None
def __get_well_dto(self) -> WellDtoWithHierarchy:
if self.__well_dto is None:
self.__well_dto = self.__cluster_apis.field_api.get_well_dto(self.__field_id, self.__id)
return self.__well_dto
@property
def field_id(self) -> str:
""" Gets the id of the field that contains this :class:`Well`.
"""
return self.__field_id
@property
def id(self) -> str:
""" Gets the id of the :class:`Well` object.
"""
return self.__id
@property
def name(self) -> str:
""" Gets the name of the :class:`Well`.
"""
return self.__name
@property
def uwi(self) -> Optional[str]:
""" Gets the UWI of the :class:`Well` object.
"""
return self.__uwi
@property
def labels(self) -> List[str]:
""" Gets the labels of the :class:`Well` object.
"""
if self.__labels is None:
self.__labels = self.__dto_converter.file_dto_converter.get_labels_from_labels_dto(self.__get_well_dto().labels)
return self.__labels
@property
def well_group_id(self) -> Optional[str]:
""" Gets the well group id that contains this :class:`Well`.
"""
return self.__well_group_id
@property
def production_folders(self) -> List[ProductionFolder]:
""" Gets the raw production folders that contains this :class:`Well`.
"""
if self.__production_folders is None:
self.__production_folders = self.__dto_converter.get_production_folders_from_production_folders_dto(self.__field_id, self.__id, self.__get_well_dto().productions)
return self.__production_folders
@property
def corrected_production_folders(self) -> List[ProductionFolder]:
""" Gets the corrected production folders that contains this :class:`Well`.
"""
if self.__corrected_production_folders is None:
self.__corrected_production_folders = self.__dto_converter.get_production_folders_from_production_folders_dto(self.__field_id, self.__id, self.__get_well_dto().productions, ProductionFolderKindEnum.corrected_production)
return self.__corrected_production_folders
@property
def shut_in(self) -> Optional[Data]:
""" Gets the shut-in data for this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
if self.__shut_in is None:
well_dto_shut_in = self.__get_well_dto().shutin
if well_dto_shut_in is not None:
self.__shut_in = self.__dto_converter.production_folder_dto_converter.build_data(self.__field_id, self.__id, well_dto_shut_in)
return self.__shut_in
@property
def gauges(self) -> List[Data]:
""" Gets the list of gauges contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`."""
if self.__gauges is None:
self.__gauges, self.__filters = self.__dto_converter.get_gauges_and_filters_from_data_folders_dto(self.__field_id, self.__id, self.__get_well_dto().dataFolders)
return self.__gauges
@property
def data(self) -> List[Data]:
""" Gets the list of data contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
return list(self.__get_data())
def __get_data(self) -> List[Data]:
if self.__data is None:
self.__data = [item for sublist in [self.gauges, self.filters, self.productions, self.corrected_productions, self.functions, [y for x in self.user_tasks for y in x.outputs], [y for x in self.incremental_rta_workflows for y in x.outputs],
[self.shut_in] if self.shut_in is not None else [], [item for x in self.custom_workflows for item in x.data]] for item in sublist]
return self.__data
@property
def filters(self) -> List[Data]:
""" Gets the list of filters contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
if self.__filters is None:
self.__gauges, self.__filters = self.__dto_converter.get_gauges_and_filters_from_data_folders_dto(self.__field_id, self.__id, self.__get_well_dto().dataFolders)
return self.__filters
@property
def productions(self) -> List[Data]:
""" Gets the list of production phases contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
if self.__productions is None:
self.__productions = list()
for production_folder in self.production_folders:
self.__productions.extend(production_folder.data)
return self.__productions
@property
def corrected_productions(self) -> List[Data]:
""" Gets the list of corrected production phases contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
if self.__corrected_productions is None:
self.__corrected_productions = list()
for corrected_production_folder in self.corrected_production_folders:
self.__corrected_productions.extend(corrected_production_folder.data)
return self.__corrected_productions
@property
def files(self) -> List[File]:
""" Gets the list of files contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
return list(self.__get_files())
def __get_files(self) -> List[File]:
if self.__files is None:
self.__files = self.__dto_converter.get_files_recursively(self.field_id, self.id, self.__get_well_dto())
return self.__files
@property
def documents(self) -> List[Document]:
""" Gets the list of KW documents contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
document_list = list()
for file in self.__get_files():
try:
document = file.as_kw_document()
except ValueError:
document = None
if document is not None:
document_list.append(document)
for ipta in self.incremental_pta_workflows:
document_list.extend(ipta.output_documents)
return document_list
@property
def user_tasks(self) -> List[UserTask]:
""" Gets the list of user tasks contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
if self.__user_tasks is None:
self.__user_tasks = self.__dto_converter.get_user_tasks_from_user_task_dto(self.field_id, self.id, None, self.__data_types_catalog, self.__get_well_dto().userTasks)
return self.__user_tasks
@property
def functions(self) -> List[Data]:
""" Gets the list of functions contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
if self.__functions is None:
self.__functions = self.__dto_converter.get_functions_from_functions_dto(self.field_id, self.id, self.__get_well_dto().functions)
return self.__functions
@property
def incremental_pta_workflows(self) -> List[IncrementalPTA]:
""" Gets the list of incremental PTA workflows contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
if self.__incremental_pta_workflows is None:
self.__incremental_pta_workflows = self.__dto_converter.ipta_dto_converter.get_incremental_pta_workflows_from_dto(self.field_id, self.__well_group_id, self.id, self.__get_well_dto().incrementalPtas)
return self.__incremental_pta_workflows
@property
def incremental_rta_workflows(self) -> List[IncrementalRTA]:
""" Gets the list of incremental RTA workflows contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
if self.__incremental_rta_workflows is None:
self.__incremental_rta_workflows = self.__dto_converter.get_incremental_rta_workflows_from_dto(self.field_id, self.__well_group_id, self.id, self.__get_well_dto().incrementalRtas)
return self.__incremental_rta_workflows
@property
def well_property_containers(self) -> List[WellPropertyContainer]:
""" Gets the list of well property containers contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
if self.__well_property_containers is None:
self.__well_property_containers = self.__dto_converter.get_well_property_containers_from_well_property_container_dto(self.field_id, self.id, self.__well_properties_catalog, self.__get_well_dto().wellPropertyContainers)
return self.__well_property_containers
@property
def plots(self) -> List[Plot]:
""" Gets the list of plots contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
if self.__well_plots is None:
self.__well_plots = self.__dto_converter.get_plots_from_well_dto(self.field_id, self.id, self.__data_types_catalog, self.__get_well_dto())
return self.__well_plots
@property
def production_type(self) -> WellProductionTypeEnum:
""" Returns the production type of the well
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
if self.__production_type is None:
self.__production_type = self.__get_well_dto().productionType
return self.__production_type
@property
def wellbore(self) -> Optional[Wellbore]:
""" Gets the wellbore of the :class:`Well` object.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
if self.__wellbore is None:
self.__wellbore = self.__dto_converter.get_wellbore_from_wellbore_dto(self.__get_well_dto().wellbore)
return self.__wellbore
@property
def custom_workflows(self) -> List[CustomWorkflow]:
""" Gets the list of custom workflows contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
if self.__custom_workflows is None:
self.__custom_workflows = self.__dto_converter.get_custom_workflows_from_custom_workflows_dto(self.__field_id, self.__id, self.__data_types_catalog, self.__get_well_dto().customWorkflows)
return self.__custom_workflows
@property
def pvts(self) -> List[PVT]:
""" Gets the list of PVTs contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
if self.__pvts is None:
self.__pvts = self.__dto_converter.get_pvts_from_pvts_dto(self.__field_id, self.__id, self.__get_well_dto().pvts)
return self.__pvts
@property
def file_folders(self) -> List[FileFolder]:
""" Gets the list of file folders contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
if self.__file_folders is None:
self.__file_folders = self.__dto_converter.file_dto_converter.get_file_folders_from_file_folder_dto_recursively(self.__field_id, None, self.__id, None, None, self.__get_well_dto().fileFolders)
return self.__file_folders
@property
def data_folders(self) -> List[DataFolder]:
""" Gets the list of data folders contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
"""
if self.__data_folders is None:
self.__data_folders = self.__dto_converter.get_data_folders_from_data_folder_dto_recursively(self.__field_id, self.__id, None, None, self.__get_well_dto().dataFolders)
return self.__data_folders
@property
def surveys(self) -> List[Survey]:
""" Gets the list of surveys contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`."""
if self.__surveys is None:
self.__surveys = self.__dto_converter.get_surveys_from_surveys_dto(self.__field_id, self.__id, self.__get_well_dto().surveyContainer.surveys)
return self.__surveys
@property
def model_books(self) -> List[ModelBook]:
""" Gets the list of model book contained in this :class:`Well`.
.. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`."""
if self.__model_books is None:
self.__model_books = self.__dto_converter.get_model_books_from_field_model_books_dto(self.__field_id, self.__id, self.__get_well_dto().modelBooks)
return self.__model_books
@property
def well_logs(self) -> List[WellLogs]:
"""
Gets the list of well log contained in this :class:`Well`.
Returns:
List[WellLogs]: A list of WellLogs objects containing well logging data.
"""
if self.__well_logs is None:
self.__well_logs = self.__dto_converter.get_well_logs_from_well_logs_dto(self.__field_id, self.__id, self.__get_well_dto().staticLogs)
return self.__well_logs
def create_data(self, name: str, data_type: str, labels: Optional[List[str]] = None, folder_id: Optional[str] = None, measure_depth: float = 0, true_vertical_depth: float = 0, true_vertical_depth_sub_sea: float = 0) -> Data:
"""
Creates and returns a new data object with specified parameters. This method serves as an interface
to initialize a data entity with the provided attributes and configuration settings.
Parameters
----------
name : str
The name of the data entity to create.
data_type : str
The type of data.
labels : list of str, optional
A list of labels associated with the data.
folder_id : str, optional
The identifier of the production folder where the data is stored.
measure_depth : float
The measured depth of the data.
true_vertical_depth : float
The true vertical depth of the data.
true_vertical_depth_sub_sea : float
The true vertical depth of the data below sea level.
Returns
-------
Data
The newly created data object.
"""
return self.__create_data(name, data_type, False, labels, folder_id, measure_depth, true_vertical_depth, true_vertical_depth_sub_sea)
def create_step_data(self, name: str, data_type: str, first_x: datetime, labels: Optional[List[str]] = None, folder_id: Optional[str] = None, measure_depth: float = 0, true_vertical_depth: float = 0, true_vertical_depth_sub_sea: float = 0) -> Data:
"""
Creates and initializes a step data object with the provided parameters.
The function generates a step data object by initializing it with essential details like name, data type and associated
parameters. It requires a ’first_x’ value to be explicitly specified and uses it to update the step data object after
creation. The function optionally accepts additional metadata such as labels or production folder identifier.
Parameters
----------
name : str
The name of the step data to be created.
data_type : str
The type specifying the data format or classification.
first_x : datetime
The datetime object representing the first x-coordinate for the step data. This parameter must not be None.
labels : list of str, optional
A list of labels to associate with the step data. Defaults to None.
folder_id : str, optional
Identifier for the production folder to associate the step data with. Defaults to None.
measure_depth : float
The measured depth of the data.
true_vertical_depth : float
The true vertical depth of the data.
true_vertical_depth_sub_sea : float
The true vertical depth of the data below sea level.
Returns
-------
Data
The created step data object initialized with the given parameters.
"""
if first_x is None:
raise ValueError("Argument error: 'first_x' must be defined for step data.")
data = self.__create_data(name, data_type, True, labels, folder_id, measure_depth, true_vertical_depth, true_vertical_depth_sub_sea)
data.update_first_x(first_x)
return data
def upload_file(self, file_path: str, file_folder_id: Optional[str] = None, overwrite: bool = False, automatic_extraction: bool = False) -> File:
""" Uploads a file to this :class:`Well`.
Parameters
----------
file_path:
Full path and name of the file to upload.
overwrite:
A value indicating whether to overwrite a file with the same name if it already exists in the well.
automatic_extraction:
A value indicating whether to perform automatic extraction of well properties if uploading a KW file.
file_folder_id:
id of the file folder to upload the file
Returns
-------
:class:`File`:
An uploaded file object.
"""
if file_folder_id is not None:
file_folder = find_file_folder_recursively_by_id(file_folder_id, self.file_folders)
if file_folder is None:
raise ValueError(f"Missing File folder {file_folder_id} in well {self.__name}")
file_dto = self.__cluster_apis.field_api.upload_file_to_file_folder_in_well(self.__field_id, self.__id, file_folder.id, file_path, overwrite, automatic_extraction)
else:
file_dto = self.__cluster_apis.field_api.upload_file_to_well(self.__field_id, self.__id, file_path, overwrite, automatic_extraction)
file = self.__dto_converter.file_dto_converter.build_file_from_file_dto(None, file_dto)
self.__get_files().append(file)
return file
def create_plot(self, plot_name: str, pane_name: Optional[str] = None, square_log_cycles: bool = False, stacked_bars: bool = False, x_label: str = "", is_x_log: bool = False, labels: Optional[List[str]] = None) -> Plot:
"""
Create a Kappa Automate Plot instance under the well
Parameters
----------
plot_name:
Name of plot
pane_name:
Name of the pane
square_log_cycles:
Whether or not use Square log cycles
stacked_bars:
Whether or not use stacked bars
x_label:
x label
is_x_log:
Whether or not use logarithmic scale for x values
labels:
Add some labels to the plot
Returns
-------
:class:`Plot`:
The new Plot instance created under the well.
"""
if labels is None:
labels = list()
plot_properties = PlotPropertiesDto(name=plot_name, pane_name=pane_name, square_log_cycles=square_log_cycles, stacked_bars=stacked_bars,
x_label=x_label, is_x_log=is_x_log, labels=labels)
plot_instance_dto = self.__dto_converter.plot_dto_converter.get_plot_instance_dto(plot_properties)
plot_id, plot_name = self.__cluster_apis.field_api.create_plot(self.__field_id, self.__id, plot_instance_dto)
new_plot = Plot(self.__field_id, self.__id, plot_id, plot_name, self.__cluster_apis.field_api, self.__data_types_catalog, self.__dto_converter.plot_dto_converter)
self.plots.append(new_plot)
return new_plot
def __create_data(self, name: str, data_type: str, is_by_step: bool = False, labels: Optional[List[str]] = None, folder_id: Optional[str] = None, measure_depth: float = 0, true_vertical_depth: float = 0, true_vertical_depth_sub_sea: float = 0) -> Data:
unique_data_name = name
i = 1
production_folder_id = next((x for x in self.production_folders if x.id == folder_id), None)
is_production_folder = True if production_folder_id is not None else False
data = self.gauges if is_production_folder else self.productions
while True:
if next((x for x in data if x.name == unique_data_name), None) is None:
break
unique_data_name = name + " #" + str(i)
i += 1
input_data_dto = self.__dto_converter.data_dto_converter.get_basic_data_dto(name, data_type, is_by_step, uuid.uuid4(), labels, measure_depth, true_vertical_depth, true_vertical_depth_sub_sea)
basic_data_dto = self.__cluster_apis.field_api.create_basic_data(self.__field_id, self.__id, folder_id, input_data_dto, is_production_folder)
new_data = self.__dto_converter.production_folder_dto_converter.build_data(self.field_id, self.id, basic_data_dto)
data.append(new_data)
return new_data
def find_gauge_by_id(self, vector_id: str) -> Data:
"""
Finds a gauge by id.
Parameters
----------
vector_id:
The id of a vector to search for.
"""
try:
return next(x for x in self.data if x.vector_id == vector_id)
except StopIteration:
raise ValueError(f"Data with vector_id {vector_id} cannot be find inside the well {self.name}")
def find_gauge_by_type(self, data_type: Optional[str], label: Optional[str] = None, is_reference: bool = True, default_value: Optional[Data] = None) -> Data:
"""
Finds the first matching gauge by data-type and/or label.
Parameters
----------
data_type : Optional[str]
The type of the gauge to find.
label : Optional[str], optional
The label of the gauge, if any (default is None).
is_reference : bool, optional
Whether the gauge is a reference gauge (default is True).
default_value : Optional[Data], optional
The default value to return if the gauge is not found (default is None).
Returns
-------
Data
The found gauge.
"""
data = self.__match_gauge(self.data, data_type, label, is_reference)
if data is not None:
return data
# Recursive search if data_type is provided
if data_type is not None:
return self.find_gauge_by_type(None, data_type, is_reference, default_value)
if default_value is not None:
return default_value
raise ValueError(f"Data with data type {data_type} and label {label} cannot be found inside the well {self.name}")
def refresh_data(self) -> None:
"""
Clean all the well attributes and dto from the cache, to grab updated attributes.
"""
self.__data = None
self.__gauges = None
self.__productions = None
self.__corrected_productions = None
self.__production_folders = None
self.__filters = None
self.__files = None
self.__file_folders = None
self.__user_tasks = None
self.__functions = None
self.__incremental_pta_workflows = None
self.__incremental_rta_workflows = None
self.__well_dto = None
self.__shut_in = None
self.__well_property_containers = None
self.__well_plots = None
self.__well_info_dto = None
self.__production_type = None
self.__wellbore = None
self.__model_books = None
self.__pvts = None
self.__file_folders = None
self.__data_folders = None
self.__surveys = None
self.__well_logs = None
self.__custom_workflows = None
def __match_gauge(self, data_list: List[Data], data_type: Optional[str], label: Optional[str] = None, is_reference: bool = True) -> Optional[Data]:
"""
Finds a first matching gauge by data-type and/or label.
Parameters
----------
data_list:
A list of data to search for matching data.
data_type:
A data-type to search for.
label:
A label to search for.
"""
try:
if data_type is not None and label is not None:
try:
return next(x for x in data_list if x.data_type == data_type and x.is_reference == is_reference and label in x.labels)
except StopIteration:
return next(x for x in data_list if x.data_type == data_type and label in x.labels)
elif data_type is not None and label is None:
return next(x for x in data_list if x.data_type == data_type and x.is_reference == is_reference)
elif label is not None:
return next(x for x in data_list if data_type in x.labels)
except StopIteration:
pass
return None
def create_incremental_pta(self, name: str, workflow_type: IncrementalPTAType, shutin_id: str, pressure_id: str, pta_document_id: str,
improve_settings: Optional[WorkflowImproveSettings] = None, min_shutin_duration: float = 10, replication_of_results: bool = False,
save_strategy: Optional[SaveStrategyEnum] = None, labels: Optional[List[str]] = None, auto_compute_semilog_pstar: bool = False,
shut_in_types: Optional[List[ShutInTypesEnum]] = None, shut_in_categories: Optional[List[ShutInCategoriesEnum]] = None) -> IncrementalPTA:
"""
Create an incremental pta workflow
Parameters
----------
name:
Name of the ipta.
workflow_type:
A workflow-type to use.
shutin_id:
Vector id of the shutin input
pressure_id:
Vector id of the pressure
pta_document_id:
Id of the reference pta document
improve_settings:
Improve Settings parameter when using an improve ipta
min_shutin_duration:
Minimum shutin duration
replication_of_results:
If we replicate the results in the master container or not
save_strategy:
save strategy of the output documents
labels:
a list of labels to add to the ipta, the associated documents and the associated well properties container
auto_compute_semilog_pstar:
Auto compute the semilog pstar
shut_in_types:
List of shut in types to use
shut_in_categories:
List of shut in categories to use
"""
if labels is None:
labels = []
oil_rate_data = next((x for x in self.corrected_productions if x.data_type == "qo"), None)
water_rate_data = next((x for x in self.corrected_productions if x.data_type == "qw"), None)
payload = self.__dto_converter.ipta_dto_converter.build_ipta_dto(self.__field_id, self.__id, name, workflow_type, shutin_id, pressure_id, pta_document_id,
oil_rate_data, water_rate_data, labels, improve_settings, min_shutin_duration, replication_of_results,
save_strategy, auto_compute_semilog_pstar, shut_in_types, shut_in_categories)
incremental_pta_dto = self.__cluster_apis.automation_api.create_incremental_pta(self.__id, payload)
new_ipta = self.__dto_converter.ipta_dto_converter.get_incremental_pta_workflows_from_dto(self.field_id, self.__well_group_id, self.id, [incremental_pta_dto])[0]
self.incremental_pta_workflows.append(new_ipta)
return new_ipta
def create_incremental_rta(self, name: str, workflow_type: IncrementalRTAType, pressure_id: str, rta_document: Document, data_hours_to_analyze: float = 2160,
improve_settings: Optional[WorkflowImproveSettings] = None, oil_rate_vector_id: Optional[str] = None, gas_rate_vector_id: Optional[str] = None,
water_rate_vector_id: Optional[str] = None, gauge_loading_strategy: Optional[GaugeLoadingStrategy] = None, is_improved_only_on_new_data: Optional[bool] = None, replication_of_results: bool = False) -> IncrementalRTA:
"""
Create an incremental rta workflow
Parameters
----------
name:
Name of the ipta.
workflow_type:
A workflow-type to use.
pressure_id:
Vector id of the pressure
rta_document:
reference rta document
data_hours_to_analyze:
number of hours to analyze
improve_settings:
Improve Settings parameter when using an improve ipta
oil_rate_vector_id:
Vector id of the oil rate data
gas_rate_vector_id:
Vector id of the gas rate data
water_rate_vector_id:
Vector id of the water rate data
gauge_loading_strategy:
gauge loading strategy of the output documents
is_improved_only_on_new_data:
If the improve irta is only on new data
replication_of_results:
If we replicate the results in the master container or not
"""
payload = self.__dto_converter.irta_dto_converter.build_irta_payload(self.__field_id, self.__id, name, workflow_type, pressure_id, rta_document,
data_hours_to_analyze, improve_settings, oil_rate_vector_id, gas_rate_vector_id, water_rate_vector_id, gauge_loading_strategy, is_improved_only_on_new_data, replication_of_results)
incremental_rta_dto = self.__cluster_apis.automation_api.create_incremental_rta(self.__id, payload)
new_irta = self.__dto_converter.get_incremental_rta_workflows_from_dto(self.field_id, self.__well_group_id, self.id, [incremental_rta_dto])[0]
self.incremental_rta_workflows.append(new_irta)
return new_irta
def load_gauge(self, datasource_name: str, datasource_gauge_name: str, dimension: MeasureEnum, data_type: str,
unit: UnitEnum, time_format: TimeFormatEnum, gauge_name: Optional[str] = None, children_datasource_names: Optional[List[str]] = None,
last_step_duration_hours: Optional[float] = None,
is_high_frequency: bool = True, gauge_model: str = "", measure_depth: float = 0, true_vertical_depth: float = 0, true_vertical_depth_sub_sea: float = 0, serial_number: str = "", labels: Optional[List[str]] = None,
comment: str = "", read_from: Optional[datetime] = None, read_to: Optional[datetime] = None, folder_id: Optional[str] = None) -> Data:
"""
Loads a gauge with the provided attributes into the system.
This method integrates with external data APIs to fetch required data sources
and constructs a payload with given attributes for the automation API to
register a new gauge. The method ensures the new gauge is properly loaded
by continuously checking the system until it appears in the available gauges list.
Parameters
----------
datasource_name : str
Name of the data source where the gauge data is stored.
datasource_gauge_name : str
Name of the specific gauge within the data source.
dimension : MeasureEnum
The dimension of the data measured by the gauge.
data_type : str
The type of the data being processed (e.g., float, int).
unit : UnitEnum
The unit of measurement for the gauge.
time_format : TimeFormatEnum
The time format associated with the gauge data.
gauge_name : str, optional
The name assigned to the gauge. Defaults to `datasource_gauge_name`.
children_datasource_names : list of str, optional
Names of the child data sources if applicable.
last_step_duration_hours : float, optional
Duration of the last recorded step in hours. Default is None.
is_high_frequency : bool, default=True
Boolean indicating if the gauge produces high frequency data.
gauge_model : str, default=""
Model of the gauge being registered.
measure_depth : float, default=0
The depth at which the gauge measurements are taken, in meters.
true_vertical_depth : float, default=0
The true vertical depth of the gauge.
true_vertical_depth_sub_sea : float, default=0
The true vertical depth below sea level for the gauge.
serial_number : str, default=""
Serial number of the gauge, if available.
labels : list of str, optional
Labels associated with the gauge, useful for categorization.
comment : str, default=""
An optional comment or description for the gauge.
read_from : datetime, optional
Start datetime for reading gauge data.
read_to : datetime, optional
End datetime for reading gauge data.
folder_id : str, optional
ID of the folder to associate the gauge with.
Returns
-------
Data
Returns the newly created gauge object once it is successfully registered.
"""
if gauge_name is None:
gauge_name = datasource_gauge_name
datasource_id, tag_id = self.__cluster_apis.external_data_api.get_datasource_id_and_tag_id(datasource_name, datasource_gauge_name, children_datasource_names)
payload = self.__dto_converter.data_dto_converter.get_gauge_dto(self.__field_id, self.__id, datasource_id, tag_id, gauge_name, dimension, data_type,
unit,
time_format, last_step_duration_hours, is_high_frequency, gauge_model, measure_depth, true_vertical_depth, true_vertical_depth_sub_sea,
serial_number, labels, comment, False, read_from, read_to)
if folder_id is None:
data_dto = self.__cluster_apis.automation_api.load_gauge(self.__id, payload)
else:
data_dto = self.__cluster_apis.automation_api.load_gauge_under_folder(folder_id, payload)
new_gauge = self.__dto_converter.production_folder_dto_converter.build_data(self.__field_id, self.__id, data_dto)
if folder_id is not None:
folder = find_data_folder_recursively_by_id(folder_id, self.data_folders)
folder.data.append(new_gauge)
return new_gauge
def get_shut_in_extractions(self, rates_data_list: List[Data], pressure_id_list: List[str], reference_shut_in: ShutIn,
compared_shut_in_list: Optional[List[ShutIn]] = None, document_id: Optional[str] = None) -> List[PTAExtraction]:
"""
Extract a loglog given the shut-in dates, pressure gauge and a corrected rate gauge.
Parameters
----------
rates_data_list:
List of the corrected rate gauges
pressure_id_list:
List of the vector id of the pressure gauges
reference_shut_in:
Reference shut-in
compared_shut_in_list:
List of the compared shut in, this list must contain the start and end dates of each shutin
document_id:
Document id of the reference document
"""
if compared_shut_in_list is None:
compared_shut_in_list = list()
dto = self.__dto_converter.shut_in_extraction_converter.get_shut_in_extraction_dto(self.__field_id, self.__id, rates_data_list, pressure_id_list,
reference_shut_in,
compared_shut_in_list,
document_id)
shut_in_extraction = self.__cluster_apis.pta_api.get_shut_in_extractions(dto)
return self.__dto_converter.shut_in_extraction_converter.get_pta_extraction_dtos_from_shut_in_extraction_dto(shut_in_extraction.extractionOutputDtos)
def rename(self, new_well_name: str) -> None:
"""
Rename the current well object.
Parameters
----------
new_well_name:
New name of the well
"""
self.__name = self.__cluster_apis.field_api.rename_well(self.__field_id, self.__id, {"name": new_well_name})
def create_file_folder(self, name: str, folder_parent_id: Optional[str] = None) -> FileFolder:
"""
Creates a new file folder within the specified parent folder or within the default
parent folder if none is specified.
This method searches for an existing folder with the specified name in the
given parent folder. If such a folder already exists, it will return the existing
folder. Otherwise, it creates a new folder under the parent folder, updates the
folder hierarchy and appends the new folder to the parent's list of child folders.
Parameters
----------
name : str
The name of the new file folder to be created.
folder_parent_id : Optional[str], optional
The ID of the parent folder under which the new folder should be created. If
not provided, the new folder will be created under the default parent folder.
Returns
-------
FileFolder
The created or pre-existing file folder object.
"""
if folder_parent_id is not None:
parent_file_folder = find_file_folder_recursively_by_id(folder_parent_id, self.file_folders)
if parent_file_folder is None:
raise ValueError("File folder parent {} does not exist".format(folder_parent_id))
else:
folder_parent_name = "Files"
parent_file_folder = find_file_folders_recursively_by_name(folder_parent_name, self.file_folders)[0]
file_folder = next((x for x in parent_file_folder.file_folders if x.name == name), None)
if file_folder is None:
file_folder_dto = self.__cluster_apis.field_api.create_file_folder(self.__field_id, self.__id, parent_file_folder.id, {'name': name})
file_folder = self.__dto_converter.file_dto_converter.get_file_folders_from_file_folder_dto_recursively(self.__field_id, None, self.__id, parent_file_folder.id, parent_file_folder.name, [file_folder_dto])[0]
parent_file_folder.file_folders.append(file_folder)
else:
print(f"File Folder with name {name} already exists in well {self.__name}")
return file_folder
def create_wellbore(self, geometries: List[WellboreGeometry], perforations: Optional[List[WellborePerforation]] = None, delete_existing_wellbore: bool = False) -> None:
"""
Creates or updates a wellbore with the specified geometries and perforations. If the `delete_existing_wellbore`
flag is set to True, the existing wellbore will be deleted before creating a new one.
Parameters
----------
geometries : List[WellboreGeometry]
A list of wellbore geometries to define the structure of the wellbore.
perforations : Optional[List[WellborePerforation]], optional
A list of perforations to specify the locations where fluid can flow
between the wellbore and the reservoir, by default None.
delete_existing_wellbore : bool, optional
A flag indicating whether to delete the existing wellbore before creating
a new one, by default True.
"""
if delete_existing_wellbore:
self.delete_wellbore()
wellbore_command_dto = self.__dto_converter.wellbore_dto_converter.get_wellbore_command_dto(self.__field_id, self.__id, self.__uwi, geometries, perforations)
wellbore_query_dto = self.__cluster_apis.automation_api.create_wellbore(self.__id, wellbore_command_dto)
self.__wellbore = self.__dto_converter.get_wellbore_from_wellbore_dto(wellbore_query_dto)
def delete_wellbore(self) -> None:
"""
Deletes the current wellbore if the well has one.
"""
if self.wellbore is not None:
self.__cluster_apis.field_api.delete_wellbore(self.field_id, self.id, self.wellbore.id)
self.__wellbore = None
def create_production_folder(self, name: str) -> ProductionFolder:
"""
Creates a new production folder in the well
Parameters
----------
name: The name of the production folder
Returns
-------
:class:`ProductionFolder`.
"""
production_folder = self.__dto_converter.get_production_folders_from_production_folders_dto(self.__field_id, self.__id, [self.__cluster_apis.field_api.create_production_folder(self.__field_id, self.__id, {"name": name})])[0]
if self.__production_folders is None:
self.__production_folders = []
self.__production_folders.append(production_folder)
return production_folder
def create_shut_in(self, input_type: InputTypeEnum, input_gauge: Data, look_for: LookForEnum, minimum_shut_in_duration_hr: float = 10, minimum_value_change: Optional[float] = None, requires_validation: bool = False) -> Data:
"""
Creates a shut-in event based on the given parameters.
Parameters
----------
input_type : InputTypeEnum
The type of input data.
input_gauge : Data
The gauge data to be used for identifying shut-ins.
look_for : LookForEnum
The criteria to look for in the data.
minimum_shut_in_duration_hr : float
The minimum duration of the shut-in in hours.
minimum_value_change : float
The minimum value change to identify a shut-in.
requires_validation : bool, optional
Whether the Shut-in requires validation (default is False).
Returns
-------
Data
Shut-in.
"""
if self.__shut_in is not None:
raise ValueError(f"There is already a shut-in object in this well {self.__name}")
if minimum_value_change is None:
minimum_value_change = cast(float, self.__dto_converter.unit_converter.convert_to_internal(UnitEnum.pressure_psia, 50))
shut_in_creation_dto = self.__dto_converter.get_shut_in_command_dto(self.__field_id, self.__id, input_type, input_gauge, look_for, minimum_shut_in_duration_hr, minimum_value_change, requires_validation)
shut_in_dto = self.__cluster_apis.automation_api.create_shut_in(self.__id, shut_in_creation_dto)
shut_in = self.__dto_converter.production_folder_dto_converter.build_data(self.__field_id, self.__id, shut_in_dto)
self.__shut_in = shut_in
return shut_in
def create_corrected_production(self, production_folder: ProductionFolder, shut_in: Data, name: str = "Corrected production #1", use_uptime_correction: bool = False, set_to_0_in_shut_ins: bool = True,
replace_by: Optional[ReplaceByEnum] = None, constant_value_for_liquid_rate: Optional[float] = None, constant_value_for_gas_rate: Optional[float] = 1,
rate_correction_option: Optional[RateCorrectionOptionEnum] = None, difference: float = 0.1, delta_p_difference_multiplier: float = 0.8,
pressure: Optional[Data] = None, iqr_multiplier: Optional[bool] = None, simplify_method: Optional[SimplifyMethodEnum] = None,
delta_y_for_liquid_rate_simplification: float = 0.1, delta_y_for_gas_rate_simplification: float = 0.1, keep_original_rates_before_shut_ins: bool = False,
set_rates_to_negative_values: bool = False) -> None:
"""
Creates a corrected production dataset based on the given parameters.
Parameters
----------
production_folder : ProductionFolder
The production folder to be corrected.
shut_in : Data
The shut-in data to be used for corrections.
name : str, optional
The name of the corrected production dataset (default is "Corrected production #1").
use_uptime_correction : bool, optional
Whether to use uptime correction (default is False).
set_to_0_in_shut_ins : bool, optional
Whether to set rates to 0 during shut-ins (default is True).
replace_by : Optional[ReplaceByEnum], optional
The method to replace values (default is None).
constant_value_for_liquid_rate : Optional[float], optional
A constant value for liquid rate (default is None).
constant_value_for_gas_rate : Optional[float], optional
A constant value for gas rate (default is 1).
rate_correction_option : Optional[RateCorrectionOptionEnum], optional
The option for rate correction (default is None).
difference : float, optional
The difference value for corrections (default is 0.1).
delta_p_difference_multiplier : float, optional
The multiplier for delta pressure difference (default is 0.8).
pressure : Optional[Data], optional
The pressure data to be used (default is None).
iqr_multiplier : Optional[bool], optional
Whether to use IQR multiplier (default is None).
simplify_method : Optional[SimplifyMethodEnum], optional
The method for simplification (default is None).
delta_y_for_liquid_rate_simplification : float, optional
The delta Y value for liquid rate simplification (default is 0.1).
delta_y_for_gas_rate_simplification : float, optional
The delta Y value for gas rate simplification (default is 0.1).
keep_original_rates_before_shut_ins : bool, optional
Whether to keep original rates before shut-ins (default is False).
set_rates_to_negative_values : bool, optional
Whether to set rates to negative values (default is False).
Returns
-------
None
"""
if len(production_folder.data) == 0:
raise ValueError("The production folder does not contain any gauge, you must at least load one to be able to create a corrected production."
"If you loaded a gauge with the production_folder.load_production_gauge() method, you need to refresh the cache in your well with well.refresh_data() and re-grab the production folder")
oil = next((x for x in production_folder.data if x.data_type == "qo"), None)
gas = next((x for x in production_folder.data if x.data_type == "qg"), None)
water = next((x for x in production_folder.data if x.data_type == "qw"), None)
if rate_correction_option is not None and keep_original_rates_before_shut_ins is True:
raise ValueError("You must choose between correcting the last rate and keeping the original rates before shut-ins. You cannot use both.")
constant_value_for_liquid_rate = cast(float, self.__dto_converter.unit_converter.convert_to_internal(UnitEnum.rate_standard_baril_per_day, constant_value_for_liquid_rate)) if constant_value_for_liquid_rate is None else constant_value_for_liquid_rate
constant_value_for_gas_rate = cast(float, self.__dto_converter.unit_converter.convert_to_internal(UnitEnum.rate_thousandcf_per_day, constant_value_for_gas_rate)) if constant_value_for_gas_rate is None else constant_value_for_gas_rate
corrected_production_command_dto = self.__dto_converter.production_folder_dto_converter.get_corrected_production_command_dto(self.__field_id, self.__id, production_folder.id, shut_in, name, use_uptime_correction, set_to_0_in_shut_ins, replace_by,
constant_value_for_liquid_rate, constant_value_for_gas_rate, rate_correction_option, difference, delta_p_difference_multiplier,
pressure, iqr_multiplier, simplify_method, delta_y_for_liquid_rate_simplification, delta_y_for_gas_rate_simplification,
keep_original_rates_before_shut_ins, set_rates_to_negative_values, oil, gas, water)
corrected_production_query_dto = self.__cluster_apis.automation_api.create_corrected_production(self.__id, corrected_production_command_dto)
self.__corrected_productions = [self.__dto_converter.production_folder_dto_converter.build_data(self.field_id, self.__id, y.data) for y in corrected_production_query_dto.phases if y.data is not None]
def create_filter(self, data: Data, filter_type: FilterTypeEnum, filter_name: str = 'New Filter', labels: Optional[List[str]] = None,
denoising_threshold_level: Optional[float] = None, denoising_threshold_value: float = 2340.80561921557, denoising_threshold_type: DenoisingThresholdTypeEnum = DenoisingThresholdTypeEnum.adaptative, delta_t: float = 2.0, delta_p: float = 13789.5, size_of_slice: int = 100000, min_window_x: Optional[datetime] = None,
max_window_x: Optional[datetime] = None, min_window_y: Optional[float] = None, max_window_y: Optional[float] = None, denoising_pre_sampling_type: DenoisingPreSamplingTypeEnum = DenoisingPreSamplingTypeEnum.intermediate,
denoising_pre_sampling_duration: float = 0.004166666666666667, denoising_minimum_gap_duration: Optional[float] = None, output_decimated_raw_data: bool = False) -> Data:
"""
Create a wavelet filter under the gauge associated to this :class:`Data` object.
Parameters
----------
data:
Source gauge to create the filter
filter_type :
Type of the wavelet filter to use
filter_name :
Name of the new filter
labels:
Labels of the filter
denoising_threshold_level:
Level of denoising
denoising_threshold_type:
Type of denoising threshold
denoising_threshold_value:
Threshold value (default value is in Pascal)
delta_t:
Delta T value
delta_p:
Delta P value (default value is in Pascal)
size_of_slice:
Size of the slice, minimum is 10000 and maximum is 100000
min_window_x:
Minimum window x value, must be a datetime
max_window_x:
Maximum window x value, must be a datetime
min_window_y:
Minimum window y value
max_window_y:
Maximum window y value
denoising_pre_sampling_type:
Only works if filter_type == FilterTypeEnum.wavelet_1 and must one of the following value ["Intermediate", "PTA", "User", "PA"]
denoising_pre_sampling_duration:
Only works if filter_type == FilterTypeEnum.wavelet_1, the pre sampling interval duration in seconds
denoising_minimum_gap_duration:
Only works if filter_type == FilterTypeEnum.wavelet_1, denoising minimum gap duration
output_decimated_raw_data:
To output the raw filter
Returns
-------
Data:
New filter
"""
if denoising_threshold_type == DenoisingThresholdTypeEnum.adaptative and denoising_threshold_level is not None:
denoising_threshold_value = 2340.80561921557 * (10 ** (denoising_threshold_level / 50))
labels = labels if labels is not None else []
size_of_slice = 10000 if size_of_slice < 10000 else 100000 if size_of_slice > 100000 else size_of_slice
payload = self.__dto_converter.data_dto_converter.get_filter_dto(self.__field_id, self.__id, data.vector_id, filter_name, labels, filter_type, denoising_threshold_level, denoising_threshold_value, denoising_threshold_type,
delta_t, delta_p, denoising_pre_sampling_type, denoising_pre_sampling_duration, denoising_minimum_gap_duration, size_of_slice, min_window_x, max_window_x, min_window_y, max_window_y, output_decimated_raw_data)
new_filter = self.__dto_converter.production_folder_dto_converter.build_data(self.__field_id, self.__id, self.__cluster_apis.automation_api.create_filter(data.id, payload))
data.filters.append(new_filter)
return new_filter
def copy_file(self, file: Union[Document, File], new_name: Optional[str] = None, field_id: Optional[str] = None, well_id: Optional[str] = None,
well_group_id: Optional[str] = None, user_task_id: Optional[str] = None) -> File:
""" Copy this document to the current file folder or to another field/well file folder
Parameters
----------
file:
File to copy
new_name:
Use this parameter if you want to rename the copied document
field_id:
Specify the field id to copy the file under a different field folder
well_id:
Specify the well id to copy the file under a different field folder
well_group_id:
Specify the well group id to copy the file under a well group
user_task_id:
Specify the user task id to copy the file under a user task
Returns
-------
str:
id of the new copied file
"""
field_id = self.__field_id if field_id is None else field_id
well_id = self.__id if well_id is None and well_group_id is None else well_id
if new_name is None:
file_dto = self.__cluster_apis.field_api.copy_file(field_id, well_id, well_group_id, user_task_id, file.file_id)
else:
file_dto = self.__cluster_apis.field_api.copy_and_rename_file(field_id, well_id, well_group_id, user_task_id, file.file_id, new_name)
file = self.__dto_converter.file_dto_converter.build_file_from_file_dto(well_group_id, file_dto)
if self.__files is None:
self.__files = self.__dto_converter.get_files_recursively(self.field_id, self.id, self.__get_well_dto())
if user_task_id is not None:
user_task = next(x for x in self.user_tasks if x.id == user_task_id)
user_task.files.append(file)
else:
self.__files.append(file)
return file
def extract_well_properties_from_document(self, document: Document, analyses_to_extract: Optional[List[Analysis]] = None, extract_in_master_container: bool = False) -> WellPropertyContainer:
"""
Extracts well properties from a given document.
Parameters
----------
document : Document
The document from which to extract well properties.
analyses_to_extract : Optional[List[Analysis]], optional
List of analyses to extract. If None, all analyses from the document are extracted. Default is None.
extract_in_master_container : bool, optional
Flag indicating whether to extract properties into the master container. Default is False.
Returns
-------
WellPropertyContainer
The container holding the extracted well properties. If `extract_in_master_container` is True, returns the master container; otherwise, returns the container matching the document name."""
analyses_to_extract = document.analyses if analyses_to_extract is None else analyses_to_extract
dto = self.__dto_converter.get_extract_properties_command_dto([x.id for x in analyses_to_extract], extract_in_master_container)
self.__cluster_apis.field_api.extract_well_properties_from_document(self.__field_id, self.__id, document.file_id, dto)
self.refresh_data()
return next(x for x in self.well_property_containers if x.is_master) if extract_in_master_container else next(x for x in self.well_property_containers if x.name == document.name)
def create_pvt_from_kw_document(self, pvt_name: str, document_id: str, analysis_id: str) -> PVT:
"""
Create a pvt object in the well
Parameters
----------
pvt_name: str
Name of the PVT object to create
document_id: str
Id of the document to use
analysis_id:
Id of the analysis to use
Returns
-------
PVT
The PVT object created.
"""
dto = self.__dto_converter.get_command_pvt_from_kw_document_dto(pvt_name, self.__field_id, self.__id, document_id, analysis_id)
pvt = self.__dto_converter.build_pvt(self.__field_id, self.__id, self.__cluster_apis.automation_api.create_pvt_from_kw_document_well(self.id, dto))
self.pvts.append(pvt)
return pvt
def create_pvt_from_file(self, pvt_name: str, file_id: str, start_date: Optional[datetime] = None,
reservoir_pressure: Optional[float] = None, reservoir_temperature: Optional[float] = None,
gas_oil_type: Optional[GasOilTypeEnum] = None, unit_system: Optional[UnitSystemPvtEnum] = None) -> PVT:
"""
Creates a PVT (Pressure-Volume-Temperature) object from a file. You can define fallback parameters when the gas oil type is undetermined.
Parameters
----------
pvt_name : str
The name of the PVT object to be created.
file_id : str
The identifier of the file from which the PVT object will be created.
start_date : datetime, optional
The start date for the PVT data coverage. Defaults to None.
reservoir_pressure : float, optional
The pressure of the reservoir associated with the PVT object. Defaults to None.
reservoir_temperature : float, optional
The temperature of the reservoir associated with the PVT object. Defaults to None.
gas_oil_type : GasOilTypeEnum, optional
The type of gas or oil associated with the PVT object, as per the enumerated
GasOilTypeEnum. Defaults to None.
unit_system : UnitSystemPvtEnum, optional
The unit system used for the PVT object, as per the enumerated UnitSystemPvtEnum.
Defaults to None.
Returns
-------
PVT
An instance of the PVT object created using the provided parameters and data from
the specified text file.
"""
dto = self.__dto_converter.get_command_pvt_from_text_file_dto(pvt_name, self.__field_id, self.__id, file_id, start_date, reservoir_pressure, reservoir_temperature, gas_oil_type, unit_system)
pvt = self.__dto_converter.build_pvt(self.__field_id, self.__id, self.__cluster_apis.automation_api.create_pvt_from_text_file_well(self.id, dto))
self.pvts.append(pvt)
return pvt
def create_well_property_container(self, name: str) -> WellPropertyContainer:
""" Creates a new well property container for this :class:`Well`.
Parameters
----------
name:
Name of the new well property container.
Returns
-------
:class:`WellPropertyContainer`:
The newly created well property container.
"""
dto = self.__cluster_apis.field_api.create_well_property_container(self.__field_id, self.__id, {"name": name})
return self.__dto_converter.get_well_property_containers_from_well_property_container_dto(self.__field_id, self.__id, self.__well_properties_catalog, [dto])[0]
def __str__(self) -> str:
"""String representation of the Well"""
return f"Well(name='{self.name}', id='{self.id}', uwi={self.uwi})"
def __repr__(self) -> str:
"""Detailed representation of the Well"""
return f"Well(field_id='{self.field_id}', id='{self.id}', name='{self.name}', uwi={self.uwi}, well_group_id={self.well_group_id})"
def create_survey(self, name: str, date: datetime) -> Survey:
"""
Create a new survey in the well.
This method creates a new survey to store well logs at a specific date..
Parameters
----------
name : str
The name of the survey container.
date:
Date of the survey
Returns
-------
Survey
The newly created survey container object.
"""
dto = self.__dto_converter.get_survey_command_dto(name, date)
survey = self.__dto_converter.build_survey(self.__field_id, self.__id, self.__cluster_apis.field_api.create_survey(self.__field_id, self.__id, dto))
return survey
def delete_survey(self, survey: Survey) -> None:
self.__cluster_apis.field_api.delete_survey(self.__field_id, self.__id, survey.id)
self.surveys.remove(survey)
def create_model_book(self, document: Document, name: str) -> ModelBook:
"""
Creates a new model book associated with this well.
Parameters
----------
document : Document
The document to use as the basis for creating the model book.
name : str
The name to assign to the new model book.
Returns
-------
ModelBook
The newly created model book instance.
"""
dto = self.__dto_converter.get_create_model_book_dto(document, name, field_id=self.__field_id, well_id=self.__id)
model_book_query_dto = self.__cluster_apis.automation_api.create_model_book(self.__id, dto)
model_book = self.__dto_converter.build_model_book(self.__field_id, self.__id, model_book_query_dto)
self.model_books.append(model_book)
return model_book
def delete_model_book(self, model_book: ModelBook) -> None:
"""
Deletes a model book from this well.
Parameters
----------
model_book : ModelBook
The model book instance to delete.
Returns
-------
None
This method doesn't return any value.
"""
self.__cluster_apis.field_api.delete_model_book(self.field_id, self.id, model_book.id)
self.model_books.remove(model_book)
def delete_file(self, file: Union[File, Document]) -> None:
"""
Deletes a specified file from the cluster and updates the local file list.
This method removes a file either provided directly as a `File` object or
indirectly as a `Document` object. In the case of a `Document`, it searches
for and locates the equivalent `File` object within the local file list. Once
found, the file is deleted from both the remote cluster via the underlying
API and the local files list, keeping the data in sync.
Parameters
----------
file : Union[File, Document]
The file object to be deleted. It can be provided either directly as a
`File` object or as a `Document` object, which is then matched to a
`File` for deletion.
Warnings
--------
Calling this method while iterating over a collection containing the file
(e.g., `for file in self.files:`) can lead to unexpected behavior, as it
modifies the collection during iteration. To avoid this issue, create a copy
of the collection before iteration (e.g., `for file in list(well.files):`)
or collect the files to delete first, then delete them in a separate step.
"""
if type(file) is Document:
file = next(x for x in self.__get_files() if x.file_id == file.file_id)
file = cast(File, file)
self.__cluster_apis.field_api.delete_file(self.__field_id, self.__id, file.file_id)
file = next(x for x in self.__get_files() if x.file_id == file.file_id)
self.__get_files().remove(file)
file_folder = find_file_folder_recursively_by_file(file, self.file_folders)
file = next(x for x in file_folder.files if x.file_id == file.file_id)
file_folder.files.remove(file)
def compute_forward_rates(self, pvt_id: str, wellbore_id: str, computation_date: datetime, pressure_vs_depth_vector: VsDepthVector, temperature_vs_depth_vector: VsDepthVector, velocity_vs_depth_vector: VsDepthVector, heat_transfer_coefficient: Optional[float] = None, reservoir_temperature: Optional[float] = None,
reservoir_temperature_gradient: Optional[float] = None,
reservoir_depth: Optional[float] = None) -> ForwardRatesOutputs:
"""
Computes forward rates based on well and reservoir parameters, including pressure,
temperature, velocity vs depth, and optional reservoir properties. Ensures consistency in
providing all optional reservoir parameters as a single block or none. Converts the input
data into specific data transfer objects (DTOs), makes a remote API request for computation,
and processes the results into output DTO.
Parameters
----------
pvt_id : str
The PVT (Pressure-Volume-Temperature) identifier associated with the computation.
wellbore_id : str
The wellbore identifier for the computation.
computation_date : datetime
The date on which the computation is being carried out.
pressure_vs_depth_vector : VsDepthVector
A vector representing pressure values vs depth in the wellbore.
temperature_vs_depth_vector : VsDepthVector
A vector representing temperature values vs depth in the wellbore.
velocity_vs_depth_vector : VsDepthVector
A vector representing velocity values vs depth in the wellbore.
heat_transfer_coefficient : Optional[float]
The heat transfer coefficient for reservoir interaction, if provided.
reservoir_temperature : Optional[float]
The temperature of the reservoir, if provided.
reservoir_temperature_gradient : Optional[float]
The gradient of temperature across the reservoir, if provided.
reservoir_depth : Optional[float]
The depth of the reservoir, if provided.
Returns
-------
ForwardRatesOutputs
The computed forward rates encapsulated in an output DTO.
Raises
------
ValueError
If any but not all the optional parameters (heat_transfer_coefficient,
reservoir_temperature, reservoir_temperature_gradient, reservoir_depth) are provided.
"""
optional_params = [heat_transfer_coefficient, reservoir_temperature, reservoir_temperature_gradient, reservoir_depth]
if not (all(param is None for param in optional_params) or all(param is not None for param in optional_params)):
raise ValueError("Either all optional parameters (heat_transfer_coefficient, reservoir_temperature, reservoir_temperature_gradient, reservoir_depth) must be provided or none of them.")
compute_forward_rates_command_dto = self.__dto_converter.get_compute_forward_rates_command_dto(self.__field_id, self.__id, pvt_id, wellbore_id, computation_date, pressure_vs_depth_vector, temperature_vs_depth_vector, velocity_vs_depth_vector, heat_transfer_coefficient, reservoir_temperature, reservoir_temperature_gradient, reservoir_depth)
compute_forward_rates_query_dto = self.__cluster_apis.tech_objects_api.compute_forward_rates(compute_forward_rates_command_dto)
return self.__dto_converter.build_forward_rates_outputs(compute_forward_rates_query_dto)
def create_document(self, document_type: KWModuleEnum, target_folder_id: str, name: str, well_radius: float, porosity: float, pay_zone: float, rock_compressibility: float, top_reservoir_depth: float, pvt_id: Optional[str] = None, pressure_id: Optional[str] = None,
oil_rate_id: Optional[str] = None, gas_rate_id: Optional[str] = None, water_rate_id: Optional[str] = None) -> Document:
dto = self.__dto_converter.get_create_document_dto(self.__field_id, self.__id, target_folder_id, name, well_radius, porosity, pay_zone, rock_compressibility, top_reservoir_depth, pvt_id, pressure_id, oil_rate_id, gas_rate_id, water_rate_id)
if document_type == KWModuleEnum.saphir:
document_id = self.__cluster_apis.pta_api.create_document(self.__field_id, dto).documentId
elif document_type == KWModuleEnum.topaze:
document_id = self.__cluster_apis.rta_api.create_document(self.__field_id, dto).documentId
else:
raise ValueError(f"Unsupported document type: {document_type}, only Saphir and Topaze documents are supported.")
self.__files = None
document = next(x for x in self.files if x.file_id == document_id).as_kw_document()
return document
def delete_data(self, data: Data) -> None:
"""
Deletes a specified data object from the current field and updates the internal
data list by removing the corresponding object.
Parameters
----------
data : Data
The data object to be deleted, defined by its unique identifier (vector_id).
The data object must already exist in the cluster, and it should match an
object retrieved by the current field's internal data list.
Returns
-------
None
This method does not return a value.
"""
self.__cluster_apis.field_api.delete_data(self.__field_id, self.__id, data.vector_id)
data = next(x for x in self.__get_data() if x.id == data.id)
self.__get_data().remove(data)
def create_data_folder(self, name: str, folder_parent_id: Optional[str] = None, is_subfolder: bool = True) -> DataFolder:
"""
Creates a data folder within the current context, either as a direct child
of a parent folder or at the well level, depending on the specified parameters.
This method allows hierarchical organization of data folders. If `is_subfolder`
is set to False, the data folder is created at the well level irrespective of
`folder_parent_id`. Otherwise, the folder is created under the specified parent
folder, or under a default folder with the name "Gauges" if no parent folder ID
is provided.
Parameters
----------
name : str
The name of the data folder to be created.
folder_parent_id : Optional[str], optional
The ID of the parent folder under which the new folder is created, by
default None. If None, the folder is created under the "Gauges" folder,
unless `is_subfolder` is set to False, in which case the folder is created
at the well level.
is_subfolder : bool, optional
A flag indicating whether the folder should be created as a subfolder
(default: True). If False, the folder is created at the well level
regardless of `folder_parent_id`.
Returns
-------
DataFolder
The created `DataFolder` object.
Raises
------
ValueError
If `folder_parent_id` is provided but does not correspond to an existing
folder in the current structure.
"""
if is_subfolder is False:
data_folder_dto = self.__cluster_apis.field_api.create_data_folder_at_well_level(self.__field_id, self.__id, {'name': name})
new_data_folder = self.__dto_converter.get_data_folders_from_data_folder_dto_recursively(self.__field_id, self.__id, None, None, [data_folder_dto])[0]
self.data_folders.append(new_data_folder)
return new_data_folder
if folder_parent_id is not None:
parent_data_folder = find_data_folder_recursively_by_id(folder_parent_id, self.data_folders)
if parent_data_folder is None:
raise ValueError("data folder parent {} does not exist".format(folder_parent_id))
else:
folder_parent_name = "Gauges"
parent_data_folder = find_data_folders_recursively_by_name(folder_parent_name, self.data_folders)[0]
data_folder = next((x for x in parent_data_folder.data_folders if x.name == name), None)
if data_folder is None:
data_folder_dto = self.__cluster_apis.field_api.create_data_folder(self.__field_id, self.__id, parent_data_folder.id, {'name': name})
data_folder = self.__dto_converter.get_data_folders_from_data_folder_dto_recursively(self.__field_id, self.__id, parent_data_folder.id, parent_data_folder.name, [data_folder_dto])[0]
parent_data_folder.data_folders.append(data_folder)
else:
print(f"Data Folder with name {name} already exists in well {self.__name}")
return data_folder