Source code for kappa_sdk.well

from ._private._cluster_apis import ClusterAPIS
from ._private._well_dto import WellDtoWithHierarchy, PlotPropertiesDto, WellParamDto
from ._private.dto_converters._well_dto_converter import WellDtoConverter
from datetime import datetime
from typing import List, Optional, cast, Union
from .production_folder_kind_enum import ProductionFolderKindEnum
from .custom_workflow import CustomWorkflow
from .field_data_types_catalog import FieldDataTypesCatalog
from .field_well_properties_catalog import FieldWellPropertiesCatalog
from .data import Data
from .file import File
from .document import Document
from .kw_module_enum import KWModuleEnum
from .analysis import Analysis
from .kw.model_book.model_book import ModelBook
from .user_task import UserTask
from .incremental_pta import IncrementalPTA
from .save_strategy_enum import SaveStrategyEnum
from .well_production_enum import WellProductionTypeEnum
from .workflow_type import IncrementalPTAType, IncrementalRTAType
from .incremental_rta import IncrementalRTA
from .well_property_container import WellPropertyContainer
from .plot import Plot
from .gauge_loading_strategy import GaugeLoadingStrategy
from .workflow_settings import WorkflowImproveSettings
from .measure_enum import MeasureEnum
from .unit_enum import UnitEnum
from .time_format_enum import TimeFormatEnum
from .pta_extraction import PTAExtraction
from .shutin import ShutIn
from .wellbore import Wellbore
from .production_folder import ProductionFolder
import uuid
from .look_for_enum import LookForEnum
from .input_type_enum import InputTypeEnum
from .shut_in_types_enum import ShutInTypesEnum
from .shut_in_categories_enum import ShutInCategoriesEnum
from .replace_by_enum import ReplaceByEnum
from .rate_correction_option_enum import RateCorrectionOptionEnum
from .simplify_method_enum import SimplifyMethodEnum
from .filter_type_enum import FilterTypeEnum
from .denoising_pre_sampling_type_enum import DenoisingPreSamplingTypeEnum
from .denoising_threshold_type_enum import DenoisingThresholdTypeEnum
from .pvt.pvt import PVT
from .file_folder import FileFolder
from .file_folders_extensions import find_file_folders_recursively_by_name, find_file_folder_recursively_by_id, find_file_folder_recursively_by_file
from .survey import Survey
from .well_logs import WellLogs
from .vs_depth_vector import VsDepthVector
from .forward_rates_outputs import ForwardRatesOutputs
from .gas_oil_type_enum import GasOilTypeEnum
from .unit_system_pvt_enum import UnitSystemPvtEnum
from .wellbore.wellbore_perforation import WellborePerforation
from .wellbore.wellbore_geometry import WellboreGeometry
from .data_folder import DataFolder
from .data_folder_extensions import find_data_folder_recursively_by_id, find_data_folders_recursively_by_name


[docs] class Well: """ Well object. Presents a KAPPA Automate well object that can be queried for contained data, documents and every existing object under the well. Returned as a result of the :py:obj:`Field.wells` query. .. note:: Should not be instantiated directly. .. note:: :py:obj:`Well.data`, :py:obj:`Well.shutin`, and :py:obj:`Well.documents` properties are populated on-demand and are cached for the duration of the :class:`Connection`. """ def __init__(self, field_id: str, well_group_id: Optional[str], well_id: str, name: str, uwi: Optional[str], data_types_catalog: FieldDataTypesCatalog, well_properties_catalog: FieldWellPropertiesCatalog, cluster_apis: ClusterAPIS, dto_converter: WellDtoConverter) -> None: self.__field_id: str = field_id self.__id: str = well_id self.__well_group_id: Optional[str] = well_group_id self.__name: str = name self.__uwi: Optional[str] = uwi self.__dto_converter: WellDtoConverter = dto_converter self.__cluster_apis: ClusterAPIS = cluster_apis self.__data_types_catalog: FieldDataTypesCatalog = data_types_catalog self.__well_properties_catalog: FieldWellPropertiesCatalog = well_properties_catalog self.__labels: Optional[List[str]] = None self.__production_folders: Optional[List[ProductionFolder]] = None self.__corrected_production_folders: Optional[List[ProductionFolder]] = None self.__data: Optional[List[Data]] = None self.__files: Optional[List[File]] = None self.__user_tasks: Optional[List[UserTask]] = None self.__functions: Optional[List[Data]] = None self.__incremental_pta_workflows: Optional[List[IncrementalPTA]] = None self.__incremental_rta_workflows: Optional[List[IncrementalRTA]] = None self.__well_dto: Optional[WellDtoWithHierarchy] = None self.__shut_in: Optional[Data] = None self.__well_property_containers: Optional[List[WellPropertyContainer]] = None self.__well_plots: Optional[List[Plot]] = None self.__gauges: Optional[List[Data]] = None self.__filters: Optional[List[Data]] = None self.__productions: Optional[List[Data]] = None self.__corrected_productions: Optional[List[Data]] = None self.__file_folders: Optional[List[FileFolder]] = None self.__well_info_dto: Optional[List[WellParamDto]] = None self.__production_type: Optional[WellProductionTypeEnum] = None self.__wellbore: Optional[Wellbore] = None self.__custom_workflows: Optional[List[CustomWorkflow]] = None self.__pvts: Optional[List[PVT]] = None self.__surveys: Optional[List[Survey]] = None self.__model_books: Optional[List[ModelBook]] = None self.__well_logs: Optional[List[WellLogs]] = None self.__data_folders: Optional[List[DataFolder]] = None def __get_well_dto(self) -> WellDtoWithHierarchy: if self.__well_dto is None: self.__well_dto = self.__cluster_apis.field_api.get_well_dto(self.__field_id, self.__id) return self.__well_dto @property def field_id(self) -> str: """ Gets the id of the field that contains this :class:`Well`. """ return self.__field_id @property def id(self) -> str: """ Gets the id of the :class:`Well` object. """ return self.__id @property def name(self) -> str: """ Gets the name of the :class:`Well`. """ return self.__name @property def uwi(self) -> Optional[str]: """ Gets the UWI of the :class:`Well` object. """ return self.__uwi @property def labels(self) -> List[str]: """ Gets the labels of the :class:`Well` object. """ if self.__labels is None: self.__labels = self.__dto_converter.file_dto_converter.get_labels_from_labels_dto(self.__get_well_dto().labels) return self.__labels @property def well_group_id(self) -> Optional[str]: """ Gets the well group id that contains this :class:`Well`. """ return self.__well_group_id @property def production_folders(self) -> List[ProductionFolder]: """ Gets the raw production folders that contains this :class:`Well`. """ if self.__production_folders is None: self.__production_folders = self.__dto_converter.get_production_folders_from_production_folders_dto(self.__field_id, self.__id, self.__get_well_dto().productions) return self.__production_folders @property def corrected_production_folders(self) -> List[ProductionFolder]: """ Gets the corrected production folders that contains this :class:`Well`. """ if self.__corrected_production_folders is None: self.__corrected_production_folders = self.__dto_converter.get_production_folders_from_production_folders_dto(self.__field_id, self.__id, self.__get_well_dto().productions, ProductionFolderKindEnum.corrected_production) return self.__corrected_production_folders @property def shut_in(self) -> Optional[Data]: """ Gets the shut-in data for this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ if self.__shut_in is None: well_dto_shut_in = self.__get_well_dto().shutin if well_dto_shut_in is not None: self.__shut_in = self.__dto_converter.production_folder_dto_converter.build_data(self.__field_id, self.__id, well_dto_shut_in) return self.__shut_in @property def gauges(self) -> List[Data]: """ Gets the list of gauges contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.""" if self.__gauges is None: self.__gauges, self.__filters = self.__dto_converter.get_gauges_and_filters_from_data_folders_dto(self.__field_id, self.__id, self.__get_well_dto().dataFolders) return self.__gauges @property def data(self) -> List[Data]: """ Gets the list of data contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ return list(self.__get_data()) def __get_data(self) -> List[Data]: if self.__data is None: self.__data = [item for sublist in [self.gauges, self.filters, self.productions, self.corrected_productions, self.functions, [y for x in self.user_tasks for y in x.outputs], [y for x in self.incremental_rta_workflows for y in x.outputs], [self.shut_in] if self.shut_in is not None else [], [item for x in self.custom_workflows for item in x.data]] for item in sublist] return self.__data @property def filters(self) -> List[Data]: """ Gets the list of filters contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ if self.__filters is None: self.__gauges, self.__filters = self.__dto_converter.get_gauges_and_filters_from_data_folders_dto(self.__field_id, self.__id, self.__get_well_dto().dataFolders) return self.__filters @property def productions(self) -> List[Data]: """ Gets the list of production phases contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ if self.__productions is None: self.__productions = list() for production_folder in self.production_folders: self.__productions.extend(production_folder.data) return self.__productions @property def corrected_productions(self) -> List[Data]: """ Gets the list of corrected production phases contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ if self.__corrected_productions is None: self.__corrected_productions = list() for corrected_production_folder in self.corrected_production_folders: self.__corrected_productions.extend(corrected_production_folder.data) return self.__corrected_productions @property def files(self) -> List[File]: """ Gets the list of files contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ return list(self.__get_files()) def __get_files(self) -> List[File]: if self.__files is None: self.__files = self.__dto_converter.get_files_recursively(self.field_id, self.id, self.__get_well_dto()) return self.__files @property def documents(self) -> List[Document]: """ Gets the list of KW documents contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ document_list = list() for file in self.__get_files(): try: document = file.as_kw_document() except ValueError: document = None if document is not None: document_list.append(document) for ipta in self.incremental_pta_workflows: document_list.extend(ipta.output_documents) return document_list @property def user_tasks(self) -> List[UserTask]: """ Gets the list of user tasks contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ if self.__user_tasks is None: self.__user_tasks = self.__dto_converter.get_user_tasks_from_user_task_dto(self.field_id, self.id, None, self.__data_types_catalog, self.__get_well_dto().userTasks) return self.__user_tasks @property def functions(self) -> List[Data]: """ Gets the list of functions contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ if self.__functions is None: self.__functions = self.__dto_converter.get_functions_from_functions_dto(self.field_id, self.id, self.__get_well_dto().functions) return self.__functions @property def incremental_pta_workflows(self) -> List[IncrementalPTA]: """ Gets the list of incremental PTA workflows contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ if self.__incremental_pta_workflows is None: self.__incremental_pta_workflows = self.__dto_converter.ipta_dto_converter.get_incremental_pta_workflows_from_dto(self.field_id, self.__well_group_id, self.id, self.__get_well_dto().incrementalPtas) return self.__incremental_pta_workflows @property def incremental_rta_workflows(self) -> List[IncrementalRTA]: """ Gets the list of incremental RTA workflows contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ if self.__incremental_rta_workflows is None: self.__incremental_rta_workflows = self.__dto_converter.get_incremental_rta_workflows_from_dto(self.field_id, self.__well_group_id, self.id, self.__get_well_dto().incrementalRtas) return self.__incremental_rta_workflows @property def well_property_containers(self) -> List[WellPropertyContainer]: """ Gets the list of well property containers contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ if self.__well_property_containers is None: self.__well_property_containers = self.__dto_converter.get_well_property_containers_from_well_property_container_dto(self.field_id, self.id, self.__well_properties_catalog, self.__get_well_dto().wellPropertyContainers) return self.__well_property_containers @property def plots(self) -> List[Plot]: """ Gets the list of plots contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ if self.__well_plots is None: self.__well_plots = self.__dto_converter.get_plots_from_well_dto(self.field_id, self.id, self.__data_types_catalog, self.__get_well_dto()) return self.__well_plots @property def production_type(self) -> WellProductionTypeEnum: """ Returns the production type of the well .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ if self.__production_type is None: self.__production_type = self.__get_well_dto().productionType return self.__production_type @property def wellbore(self) -> Optional[Wellbore]: """ Gets the wellbore of the :class:`Well` object. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ if self.__wellbore is None: self.__wellbore = self.__dto_converter.get_wellbore_from_wellbore_dto(self.__get_well_dto().wellbore) return self.__wellbore @property def custom_workflows(self) -> List[CustomWorkflow]: """ Gets the list of custom workflows contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ if self.__custom_workflows is None: self.__custom_workflows = self.__dto_converter.get_custom_workflows_from_custom_workflows_dto(self.__field_id, self.__id, self.__data_types_catalog, self.__get_well_dto().customWorkflows) return self.__custom_workflows @property def pvts(self) -> List[PVT]: """ Gets the list of PVTs contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ if self.__pvts is None: self.__pvts = self.__dto_converter.get_pvts_from_pvts_dto(self.__field_id, self.__id, self.__get_well_dto().pvts) return self.__pvts @property def file_folders(self) -> List[FileFolder]: """ Gets the list of file folders contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ if self.__file_folders is None: self.__file_folders = self.__dto_converter.file_dto_converter.get_file_folders_from_file_folder_dto_recursively(self.__field_id, None, self.__id, None, None, self.__get_well_dto().fileFolders) return self.__file_folders @property def data_folders(self) -> List[DataFolder]: """ Gets the list of data folders contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`. """ if self.__data_folders is None: self.__data_folders = self.__dto_converter.get_data_folders_from_data_folder_dto_recursively(self.__field_id, self.__id, None, None, self.__get_well_dto().dataFolders) return self.__data_folders @property def surveys(self) -> List[Survey]: """ Gets the list of surveys contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.""" if self.__surveys is None: self.__surveys = self.__dto_converter.get_surveys_from_surveys_dto(self.__field_id, self.__id, self.__get_well_dto().surveyContainer.surveys) return self.__surveys @property def model_books(self) -> List[ModelBook]: """ Gets the list of model book contained in this :class:`Well`. .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.""" if self.__model_books is None: self.__model_books = self.__dto_converter.get_model_books_from_field_model_books_dto(self.__field_id, self.__id, self.__get_well_dto().modelBooks) return self.__model_books @property def well_logs(self) -> List[WellLogs]: """ Gets the list of well log contained in this :class:`Well`. Returns: List[WellLogs]: A list of WellLogs objects containing well logging data. """ if self.__well_logs is None: self.__well_logs = self.__dto_converter.get_well_logs_from_well_logs_dto(self.__field_id, self.__id, self.__get_well_dto().staticLogs) return self.__well_logs def create_data(self, name: str, data_type: str, labels: Optional[List[str]] = None, folder_id: Optional[str] = None, measure_depth: float = 0, true_vertical_depth: float = 0, true_vertical_depth_sub_sea: float = 0) -> Data: """ Creates and returns a new data object with specified parameters. This method serves as an interface to initialize a data entity with the provided attributes and configuration settings. Parameters ---------- name : str The name of the data entity to create. data_type : str The type of data. labels : list of str, optional A list of labels associated with the data. folder_id : str, optional The identifier of the production folder where the data is stored. measure_depth : float The measured depth of the data. true_vertical_depth : float The true vertical depth of the data. true_vertical_depth_sub_sea : float The true vertical depth of the data below sea level. Returns ------- Data The newly created data object. """ return self.__create_data(name, data_type, False, labels, folder_id, measure_depth, true_vertical_depth, true_vertical_depth_sub_sea) def create_step_data(self, name: str, data_type: str, first_x: datetime, labels: Optional[List[str]] = None, folder_id: Optional[str] = None, measure_depth: float = 0, true_vertical_depth: float = 0, true_vertical_depth_sub_sea: float = 0) -> Data: """ Creates and initializes a step data object with the provided parameters. The function generates a step data object by initializing it with essential details like name, data type and associated parameters. It requires a ’first_x’ value to be explicitly specified and uses it to update the step data object after creation. The function optionally accepts additional metadata such as labels or production folder identifier. Parameters ---------- name : str The name of the step data to be created. data_type : str The type specifying the data format or classification. first_x : datetime The datetime object representing the first x-coordinate for the step data. This parameter must not be None. labels : list of str, optional A list of labels to associate with the step data. Defaults to None. folder_id : str, optional Identifier for the production folder to associate the step data with. Defaults to None. measure_depth : float The measured depth of the data. true_vertical_depth : float The true vertical depth of the data. true_vertical_depth_sub_sea : float The true vertical depth of the data below sea level. Returns ------- Data The created step data object initialized with the given parameters. """ if first_x is None: raise ValueError("Argument error: 'first_x' must be defined for step data.") data = self.__create_data(name, data_type, True, labels, folder_id, measure_depth, true_vertical_depth, true_vertical_depth_sub_sea) data.update_first_x(first_x) return data def upload_file(self, file_path: str, file_folder_id: Optional[str] = None, overwrite: bool = False, automatic_extraction: bool = False) -> File: """ Uploads a file to this :class:`Well`. Parameters ---------- file_path: Full path and name of the file to upload. overwrite: A value indicating whether to overwrite a file with the same name if it already exists in the well. automatic_extraction: A value indicating whether to perform automatic extraction of well properties if uploading a KW file. file_folder_id: id of the file folder to upload the file Returns ------- :class:`File`: An uploaded file object. """ if file_folder_id is not None: file_folder = find_file_folder_recursively_by_id(file_folder_id, self.file_folders) if file_folder is None: raise ValueError(f"Missing File folder {file_folder_id} in well {self.__name}") file_dto = self.__cluster_apis.field_api.upload_file_to_file_folder_in_well(self.__field_id, self.__id, file_folder.id, file_path, overwrite, automatic_extraction) else: file_dto = self.__cluster_apis.field_api.upload_file_to_well(self.__field_id, self.__id, file_path, overwrite, automatic_extraction) file = self.__dto_converter.file_dto_converter.build_file_from_file_dto(None, file_dto) self.__get_files().append(file) return file def create_plot(self, plot_name: str, pane_name: Optional[str] = None, square_log_cycles: bool = False, stacked_bars: bool = False, x_label: str = "", is_x_log: bool = False, labels: Optional[List[str]] = None) -> Plot: """ Create a Kappa Automate Plot instance under the well Parameters ---------- plot_name: Name of plot pane_name: Name of the pane square_log_cycles: Whether or not use Square log cycles stacked_bars: Whether or not use stacked bars x_label: x label is_x_log: Whether or not use logarithmic scale for x values labels: Add some labels to the plot Returns ------- :class:`Plot`: The new Plot instance created under the well. """ if labels is None: labels = list() plot_properties = PlotPropertiesDto(name=plot_name, pane_name=pane_name, square_log_cycles=square_log_cycles, stacked_bars=stacked_bars, x_label=x_label, is_x_log=is_x_log, labels=labels) plot_instance_dto = self.__dto_converter.plot_dto_converter.get_plot_instance_dto(plot_properties) plot_id, plot_name = self.__cluster_apis.field_api.create_plot(self.__field_id, self.__id, plot_instance_dto) new_plot = Plot(self.__field_id, self.__id, plot_id, plot_name, self.__cluster_apis.field_api, self.__data_types_catalog, self.__dto_converter.plot_dto_converter) self.plots.append(new_plot) return new_plot def __create_data(self, name: str, data_type: str, is_by_step: bool = False, labels: Optional[List[str]] = None, folder_id: Optional[str] = None, measure_depth: float = 0, true_vertical_depth: float = 0, true_vertical_depth_sub_sea: float = 0) -> Data: unique_data_name = name i = 1 production_folder_id = next((x for x in self.production_folders if x.id == folder_id), None) is_production_folder = True if production_folder_id is not None else False data = self.gauges if is_production_folder else self.productions while True: if next((x for x in data if x.name == unique_data_name), None) is None: break unique_data_name = name + " #" + str(i) i += 1 input_data_dto = self.__dto_converter.data_dto_converter.get_basic_data_dto(name, data_type, is_by_step, uuid.uuid4(), labels, measure_depth, true_vertical_depth, true_vertical_depth_sub_sea) basic_data_dto = self.__cluster_apis.field_api.create_basic_data(self.__field_id, self.__id, folder_id, input_data_dto, is_production_folder) new_data = self.__dto_converter.production_folder_dto_converter.build_data(self.field_id, self.id, basic_data_dto) data.append(new_data) return new_data def find_gauge_by_id(self, vector_id: str) -> Data: """ Finds a gauge by id. Parameters ---------- vector_id: The id of a vector to search for. """ try: return next(x for x in self.data if x.vector_id == vector_id) except StopIteration: raise ValueError(f"Data with vector_id {vector_id} cannot be find inside the well {self.name}") def find_gauge_by_type(self, data_type: Optional[str], label: Optional[str] = None, is_reference: bool = True, default_value: Optional[Data] = None) -> Data: """ Finds the first matching gauge by data-type and/or label. Parameters ---------- data_type : Optional[str] The type of the gauge to find. label : Optional[str], optional The label of the gauge, if any (default is None). is_reference : bool, optional Whether the gauge is a reference gauge (default is True). default_value : Optional[Data], optional The default value to return if the gauge is not found (default is None). Returns ------- Data The found gauge. """ data = self.__match_gauge(self.data, data_type, label, is_reference) if data is not None: return data # Recursive search if data_type is provided if data_type is not None: return self.find_gauge_by_type(None, data_type, is_reference, default_value) if default_value is not None: return default_value raise ValueError(f"Data with data type {data_type} and label {label} cannot be found inside the well {self.name}") def refresh_data(self) -> None: """ Clean all the well attributes and dto from the cache, to grab updated attributes. """ self.__data = None self.__gauges = None self.__productions = None self.__corrected_productions = None self.__production_folders = None self.__filters = None self.__files = None self.__file_folders = None self.__user_tasks = None self.__functions = None self.__incremental_pta_workflows = None self.__incremental_rta_workflows = None self.__well_dto = None self.__shut_in = None self.__well_property_containers = None self.__well_plots = None self.__well_info_dto = None self.__production_type = None self.__wellbore = None self.__model_books = None self.__pvts = None self.__file_folders = None self.__data_folders = None self.__surveys = None self.__well_logs = None self.__custom_workflows = None def __match_gauge(self, data_list: List[Data], data_type: Optional[str], label: Optional[str] = None, is_reference: bool = True) -> Optional[Data]: """ Finds a first matching gauge by data-type and/or label. Parameters ---------- data_list: A list of data to search for matching data. data_type: A data-type to search for. label: A label to search for. """ try: if data_type is not None and label is not None: try: return next(x for x in data_list if x.data_type == data_type and x.is_reference == is_reference and label in x.labels) except StopIteration: return next(x for x in data_list if x.data_type == data_type and label in x.labels) elif data_type is not None and label is None: return next(x for x in data_list if x.data_type == data_type and x.is_reference == is_reference) elif label is not None: return next(x for x in data_list if data_type in x.labels) except StopIteration: pass return None def create_incremental_pta(self, name: str, workflow_type: IncrementalPTAType, shutin_id: str, pressure_id: str, pta_document_id: str, improve_settings: Optional[WorkflowImproveSettings] = None, min_shutin_duration: float = 10, replication_of_results: bool = False, save_strategy: Optional[SaveStrategyEnum] = None, labels: Optional[List[str]] = None, auto_compute_semilog_pstar: bool = False, shut_in_types: Optional[List[ShutInTypesEnum]] = None, shut_in_categories: Optional[List[ShutInCategoriesEnum]] = None) -> IncrementalPTA: """ Create an incremental pta workflow Parameters ---------- name: Name of the ipta. workflow_type: A workflow-type to use. shutin_id: Vector id of the shutin input pressure_id: Vector id of the pressure pta_document_id: Id of the reference pta document improve_settings: Improve Settings parameter when using an improve ipta min_shutin_duration: Minimum shutin duration replication_of_results: If we replicate the results in the master container or not save_strategy: save strategy of the output documents labels: a list of labels to add to the ipta, the associated documents and the associated well properties container auto_compute_semilog_pstar: Auto compute the semilog pstar shut_in_types: List of shut in types to use shut_in_categories: List of shut in categories to use """ if labels is None: labels = [] oil_rate_data = next((x for x in self.corrected_productions if x.data_type == "qo"), None) water_rate_data = next((x for x in self.corrected_productions if x.data_type == "qw"), None) payload = self.__dto_converter.ipta_dto_converter.build_ipta_dto(self.__field_id, self.__id, name, workflow_type, shutin_id, pressure_id, pta_document_id, oil_rate_data, water_rate_data, labels, improve_settings, min_shutin_duration, replication_of_results, save_strategy, auto_compute_semilog_pstar, shut_in_types, shut_in_categories) incremental_pta_dto = self.__cluster_apis.automation_api.create_incremental_pta(self.__id, payload) new_ipta = self.__dto_converter.ipta_dto_converter.get_incremental_pta_workflows_from_dto(self.field_id, self.__well_group_id, self.id, [incremental_pta_dto])[0] self.incremental_pta_workflows.append(new_ipta) return new_ipta def create_incremental_rta(self, name: str, workflow_type: IncrementalRTAType, pressure_id: str, rta_document: Document, data_hours_to_analyze: float = 2160, improve_settings: Optional[WorkflowImproveSettings] = None, oil_rate_vector_id: Optional[str] = None, gas_rate_vector_id: Optional[str] = None, water_rate_vector_id: Optional[str] = None, gauge_loading_strategy: Optional[GaugeLoadingStrategy] = None, is_improved_only_on_new_data: Optional[bool] = None, replication_of_results: bool = False) -> IncrementalRTA: """ Create an incremental rta workflow Parameters ---------- name: Name of the ipta. workflow_type: A workflow-type to use. pressure_id: Vector id of the pressure rta_document: reference rta document data_hours_to_analyze: number of hours to analyze improve_settings: Improve Settings parameter when using an improve ipta oil_rate_vector_id: Vector id of the oil rate data gas_rate_vector_id: Vector id of the gas rate data water_rate_vector_id: Vector id of the water rate data gauge_loading_strategy: gauge loading strategy of the output documents is_improved_only_on_new_data: If the improve irta is only on new data replication_of_results: If we replicate the results in the master container or not """ payload = self.__dto_converter.irta_dto_converter.build_irta_payload(self.__field_id, self.__id, name, workflow_type, pressure_id, rta_document, data_hours_to_analyze, improve_settings, oil_rate_vector_id, gas_rate_vector_id, water_rate_vector_id, gauge_loading_strategy, is_improved_only_on_new_data, replication_of_results) incremental_rta_dto = self.__cluster_apis.automation_api.create_incremental_rta(self.__id, payload) new_irta = self.__dto_converter.get_incremental_rta_workflows_from_dto(self.field_id, self.__well_group_id, self.id, [incremental_rta_dto])[0] self.incremental_rta_workflows.append(new_irta) return new_irta def load_gauge(self, datasource_name: str, datasource_gauge_name: str, dimension: MeasureEnum, data_type: str, unit: UnitEnum, time_format: TimeFormatEnum, gauge_name: Optional[str] = None, children_datasource_names: Optional[List[str]] = None, last_step_duration_hours: Optional[float] = None, is_high_frequency: bool = True, gauge_model: str = "", measure_depth: float = 0, true_vertical_depth: float = 0, true_vertical_depth_sub_sea: float = 0, serial_number: str = "", labels: Optional[List[str]] = None, comment: str = "", read_from: Optional[datetime] = None, read_to: Optional[datetime] = None, folder_id: Optional[str] = None) -> Data: """ Loads a gauge with the provided attributes into the system. This method integrates with external data APIs to fetch required data sources and constructs a payload with given attributes for the automation API to register a new gauge. The method ensures the new gauge is properly loaded by continuously checking the system until it appears in the available gauges list. Parameters ---------- datasource_name : str Name of the data source where the gauge data is stored. datasource_gauge_name : str Name of the specific gauge within the data source. dimension : MeasureEnum The dimension of the data measured by the gauge. data_type : str The type of the data being processed (e.g., float, int). unit : UnitEnum The unit of measurement for the gauge. time_format : TimeFormatEnum The time format associated with the gauge data. gauge_name : str, optional The name assigned to the gauge. Defaults to `datasource_gauge_name`. children_datasource_names : list of str, optional Names of the child data sources if applicable. last_step_duration_hours : float, optional Duration of the last recorded step in hours. Default is None. is_high_frequency : bool, default=True Boolean indicating if the gauge produces high frequency data. gauge_model : str, default="" Model of the gauge being registered. measure_depth : float, default=0 The depth at which the gauge measurements are taken, in meters. true_vertical_depth : float, default=0 The true vertical depth of the gauge. true_vertical_depth_sub_sea : float, default=0 The true vertical depth below sea level for the gauge. serial_number : str, default="" Serial number of the gauge, if available. labels : list of str, optional Labels associated with the gauge, useful for categorization. comment : str, default="" An optional comment or description for the gauge. read_from : datetime, optional Start datetime for reading gauge data. read_to : datetime, optional End datetime for reading gauge data. folder_id : str, optional ID of the folder to associate the gauge with. Returns ------- Data Returns the newly created gauge object once it is successfully registered. """ if gauge_name is None: gauge_name = datasource_gauge_name datasource_id, tag_id = self.__cluster_apis.external_data_api.get_datasource_id_and_tag_id(datasource_name, datasource_gauge_name, children_datasource_names) payload = self.__dto_converter.data_dto_converter.get_gauge_dto(self.__field_id, self.__id, datasource_id, tag_id, gauge_name, dimension, data_type, unit, time_format, last_step_duration_hours, is_high_frequency, gauge_model, measure_depth, true_vertical_depth, true_vertical_depth_sub_sea, serial_number, labels, comment, False, read_from, read_to) if folder_id is None: data_dto = self.__cluster_apis.automation_api.load_gauge(self.__id, payload) else: data_dto = self.__cluster_apis.automation_api.load_gauge_under_folder(folder_id, payload) new_gauge = self.__dto_converter.production_folder_dto_converter.build_data(self.__field_id, self.__id, data_dto) if folder_id is not None: folder = find_data_folder_recursively_by_id(folder_id, self.data_folders) folder.data.append(new_gauge) return new_gauge def get_shut_in_extractions(self, rates_data_list: List[Data], pressure_id_list: List[str], reference_shut_in: ShutIn, compared_shut_in_list: Optional[List[ShutIn]] = None, document_id: Optional[str] = None) -> List[PTAExtraction]: """ Extract a loglog given the shut-in dates, pressure gauge and a corrected rate gauge. Parameters ---------- rates_data_list: List of the corrected rate gauges pressure_id_list: List of the vector id of the pressure gauges reference_shut_in: Reference shut-in compared_shut_in_list: List of the compared shut in, this list must contain the start and end dates of each shutin document_id: Document id of the reference document """ if compared_shut_in_list is None: compared_shut_in_list = list() dto = self.__dto_converter.shut_in_extraction_converter.get_shut_in_extraction_dto(self.__field_id, self.__id, rates_data_list, pressure_id_list, reference_shut_in, compared_shut_in_list, document_id) shut_in_extraction = self.__cluster_apis.pta_api.get_shut_in_extractions(dto) return self.__dto_converter.shut_in_extraction_converter.get_pta_extraction_dtos_from_shut_in_extraction_dto(shut_in_extraction.extractionOutputDtos) def rename(self, new_well_name: str) -> None: """ Rename the current well object. Parameters ---------- new_well_name: New name of the well """ self.__name = self.__cluster_apis.field_api.rename_well(self.__field_id, self.__id, {"name": new_well_name}) def create_file_folder(self, name: str, folder_parent_id: Optional[str] = None) -> FileFolder: """ Creates a new file folder within the specified parent folder or within the default parent folder if none is specified. This method searches for an existing folder with the specified name in the given parent folder. If such a folder already exists, it will return the existing folder. Otherwise, it creates a new folder under the parent folder, updates the folder hierarchy and appends the new folder to the parent's list of child folders. Parameters ---------- name : str The name of the new file folder to be created. folder_parent_id : Optional[str], optional The ID of the parent folder under which the new folder should be created. If not provided, the new folder will be created under the default parent folder. Returns ------- FileFolder The created or pre-existing file folder object. """ if folder_parent_id is not None: parent_file_folder = find_file_folder_recursively_by_id(folder_parent_id, self.file_folders) if parent_file_folder is None: raise ValueError("File folder parent {} does not exist".format(folder_parent_id)) else: folder_parent_name = "Files" parent_file_folder = find_file_folders_recursively_by_name(folder_parent_name, self.file_folders)[0] file_folder = next((x for x in parent_file_folder.file_folders if x.name == name), None) if file_folder is None: file_folder_dto = self.__cluster_apis.field_api.create_file_folder(self.__field_id, self.__id, parent_file_folder.id, {'name': name}) file_folder = self.__dto_converter.file_dto_converter.get_file_folders_from_file_folder_dto_recursively(self.__field_id, None, self.__id, parent_file_folder.id, parent_file_folder.name, [file_folder_dto])[0] parent_file_folder.file_folders.append(file_folder) else: print(f"File Folder with name {name} already exists in well {self.__name}") return file_folder def create_wellbore(self, geometries: List[WellboreGeometry], perforations: Optional[List[WellborePerforation]] = None, delete_existing_wellbore: bool = False) -> None: """ Creates or updates a wellbore with the specified geometries and perforations. If the `delete_existing_wellbore` flag is set to True, the existing wellbore will be deleted before creating a new one. Parameters ---------- geometries : List[WellboreGeometry] A list of wellbore geometries to define the structure of the wellbore. perforations : Optional[List[WellborePerforation]], optional A list of perforations to specify the locations where fluid can flow between the wellbore and the reservoir, by default None. delete_existing_wellbore : bool, optional A flag indicating whether to delete the existing wellbore before creating a new one, by default True. """ if delete_existing_wellbore: self.delete_wellbore() wellbore_command_dto = self.__dto_converter.wellbore_dto_converter.get_wellbore_command_dto(self.__field_id, self.__id, self.__uwi, geometries, perforations) wellbore_query_dto = self.__cluster_apis.automation_api.create_wellbore(self.__id, wellbore_command_dto) self.__wellbore = self.__dto_converter.get_wellbore_from_wellbore_dto(wellbore_query_dto) def delete_wellbore(self) -> None: """ Deletes the current wellbore if the well has one. """ if self.wellbore is not None: self.__cluster_apis.field_api.delete_wellbore(self.field_id, self.id, self.wellbore.id) self.__wellbore = None def create_production_folder(self, name: str) -> ProductionFolder: """ Creates a new production folder in the well Parameters ---------- name: The name of the production folder Returns ------- :class:`ProductionFolder`. """ production_folder = self.__dto_converter.get_production_folders_from_production_folders_dto(self.__field_id, self.__id, [self.__cluster_apis.field_api.create_production_folder(self.__field_id, self.__id, {"name": name})])[0] if self.__production_folders is None: self.__production_folders = [] self.__production_folders.append(production_folder) return production_folder def create_shut_in(self, input_type: InputTypeEnum, input_gauge: Data, look_for: LookForEnum, minimum_shut_in_duration_hr: float = 10, minimum_value_change: Optional[float] = None, requires_validation: bool = False) -> Data: """ Creates a shut-in event based on the given parameters. Parameters ---------- input_type : InputTypeEnum The type of input data. input_gauge : Data The gauge data to be used for identifying shut-ins. look_for : LookForEnum The criteria to look for in the data. minimum_shut_in_duration_hr : float The minimum duration of the shut-in in hours. minimum_value_change : float The minimum value change to identify a shut-in. requires_validation : bool, optional Whether the Shut-in requires validation (default is False). Returns ------- Data Shut-in. """ if self.__shut_in is not None: raise ValueError(f"There is already a shut-in object in this well {self.__name}") if minimum_value_change is None: minimum_value_change = cast(float, self.__dto_converter.unit_converter.convert_to_internal(UnitEnum.pressure_psia, 50)) shut_in_creation_dto = self.__dto_converter.get_shut_in_command_dto(self.__field_id, self.__id, input_type, input_gauge, look_for, minimum_shut_in_duration_hr, minimum_value_change, requires_validation) shut_in_dto = self.__cluster_apis.automation_api.create_shut_in(self.__id, shut_in_creation_dto) shut_in = self.__dto_converter.production_folder_dto_converter.build_data(self.__field_id, self.__id, shut_in_dto) self.__shut_in = shut_in return shut_in def create_corrected_production(self, production_folder: ProductionFolder, shut_in: Data, name: str = "Corrected production #1", use_uptime_correction: bool = False, set_to_0_in_shut_ins: bool = True, replace_by: Optional[ReplaceByEnum] = None, constant_value_for_liquid_rate: Optional[float] = None, constant_value_for_gas_rate: Optional[float] = 1, rate_correction_option: Optional[RateCorrectionOptionEnum] = None, difference: float = 0.1, delta_p_difference_multiplier: float = 0.8, pressure: Optional[Data] = None, iqr_multiplier: Optional[bool] = None, simplify_method: Optional[SimplifyMethodEnum] = None, delta_y_for_liquid_rate_simplification: float = 0.1, delta_y_for_gas_rate_simplification: float = 0.1, keep_original_rates_before_shut_ins: bool = False, set_rates_to_negative_values: bool = False) -> None: """ Creates a corrected production dataset based on the given parameters. Parameters ---------- production_folder : ProductionFolder The production folder to be corrected. shut_in : Data The shut-in data to be used for corrections. name : str, optional The name of the corrected production dataset (default is "Corrected production #1"). use_uptime_correction : bool, optional Whether to use uptime correction (default is False). set_to_0_in_shut_ins : bool, optional Whether to set rates to 0 during shut-ins (default is True). replace_by : Optional[ReplaceByEnum], optional The method to replace values (default is None). constant_value_for_liquid_rate : Optional[float], optional A constant value for liquid rate (default is None). constant_value_for_gas_rate : Optional[float], optional A constant value for gas rate (default is 1). rate_correction_option : Optional[RateCorrectionOptionEnum], optional The option for rate correction (default is None). difference : float, optional The difference value for corrections (default is 0.1). delta_p_difference_multiplier : float, optional The multiplier for delta pressure difference (default is 0.8). pressure : Optional[Data], optional The pressure data to be used (default is None). iqr_multiplier : Optional[bool], optional Whether to use IQR multiplier (default is None). simplify_method : Optional[SimplifyMethodEnum], optional The method for simplification (default is None). delta_y_for_liquid_rate_simplification : float, optional The delta Y value for liquid rate simplification (default is 0.1). delta_y_for_gas_rate_simplification : float, optional The delta Y value for gas rate simplification (default is 0.1). keep_original_rates_before_shut_ins : bool, optional Whether to keep original rates before shut-ins (default is False). set_rates_to_negative_values : bool, optional Whether to set rates to negative values (default is False). Returns ------- None """ if len(production_folder.data) == 0: raise ValueError("The production folder does not contain any gauge, you must at least load one to be able to create a corrected production." "If you loaded a gauge with the production_folder.load_production_gauge() method, you need to refresh the cache in your well with well.refresh_data() and re-grab the production folder") oil = next((x for x in production_folder.data if x.data_type == "qo"), None) gas = next((x for x in production_folder.data if x.data_type == "qg"), None) water = next((x for x in production_folder.data if x.data_type == "qw"), None) if rate_correction_option is not None and keep_original_rates_before_shut_ins is True: raise ValueError("You must choose between correcting the last rate and keeping the original rates before shut-ins. You cannot use both.") constant_value_for_liquid_rate = cast(float, self.__dto_converter.unit_converter.convert_to_internal(UnitEnum.rate_standard_baril_per_day, constant_value_for_liquid_rate)) if constant_value_for_liquid_rate is None else constant_value_for_liquid_rate constant_value_for_gas_rate = cast(float, self.__dto_converter.unit_converter.convert_to_internal(UnitEnum.rate_thousandcf_per_day, constant_value_for_gas_rate)) if constant_value_for_gas_rate is None else constant_value_for_gas_rate corrected_production_command_dto = self.__dto_converter.production_folder_dto_converter.get_corrected_production_command_dto(self.__field_id, self.__id, production_folder.id, shut_in, name, use_uptime_correction, set_to_0_in_shut_ins, replace_by, constant_value_for_liquid_rate, constant_value_for_gas_rate, rate_correction_option, difference, delta_p_difference_multiplier, pressure, iqr_multiplier, simplify_method, delta_y_for_liquid_rate_simplification, delta_y_for_gas_rate_simplification, keep_original_rates_before_shut_ins, set_rates_to_negative_values, oil, gas, water) corrected_production_query_dto = self.__cluster_apis.automation_api.create_corrected_production(self.__id, corrected_production_command_dto) self.__corrected_productions = [self.__dto_converter.production_folder_dto_converter.build_data(self.field_id, self.__id, y.data) for y in corrected_production_query_dto.phases if y.data is not None] def create_filter(self, data: Data, filter_type: FilterTypeEnum, filter_name: str = 'New Filter', labels: Optional[List[str]] = None, denoising_threshold_level: Optional[float] = None, denoising_threshold_value: float = 2340.80561921557, denoising_threshold_type: DenoisingThresholdTypeEnum = DenoisingThresholdTypeEnum.adaptative, delta_t: float = 2.0, delta_p: float = 13789.5, size_of_slice: int = 100000, min_window_x: Optional[datetime] = None, max_window_x: Optional[datetime] = None, min_window_y: Optional[float] = None, max_window_y: Optional[float] = None, denoising_pre_sampling_type: DenoisingPreSamplingTypeEnum = DenoisingPreSamplingTypeEnum.intermediate, denoising_pre_sampling_duration: float = 0.004166666666666667, denoising_minimum_gap_duration: Optional[float] = None, output_decimated_raw_data: bool = False) -> Data: """ Create a wavelet filter under the gauge associated to this :class:`Data` object. Parameters ---------- data: Source gauge to create the filter filter_type : Type of the wavelet filter to use filter_name : Name of the new filter labels: Labels of the filter denoising_threshold_level: Level of denoising denoising_threshold_type: Type of denoising threshold denoising_threshold_value: Threshold value (default value is in Pascal) delta_t: Delta T value delta_p: Delta P value (default value is in Pascal) size_of_slice: Size of the slice, minimum is 10000 and maximum is 100000 min_window_x: Minimum window x value, must be a datetime max_window_x: Maximum window x value, must be a datetime min_window_y: Minimum window y value max_window_y: Maximum window y value denoising_pre_sampling_type: Only works if filter_type == FilterTypeEnum.wavelet_1 and must one of the following value ["Intermediate", "PTA", "User", "PA"] denoising_pre_sampling_duration: Only works if filter_type == FilterTypeEnum.wavelet_1, the pre sampling interval duration in seconds denoising_minimum_gap_duration: Only works if filter_type == FilterTypeEnum.wavelet_1, denoising minimum gap duration output_decimated_raw_data: To output the raw filter Returns ------- Data: New filter """ if denoising_threshold_type == DenoisingThresholdTypeEnum.adaptative and denoising_threshold_level is not None: denoising_threshold_value = 2340.80561921557 * (10 ** (denoising_threshold_level / 50)) labels = labels if labels is not None else [] size_of_slice = 10000 if size_of_slice < 10000 else 100000 if size_of_slice > 100000 else size_of_slice payload = self.__dto_converter.data_dto_converter.get_filter_dto(self.__field_id, self.__id, data.vector_id, filter_name, labels, filter_type, denoising_threshold_level, denoising_threshold_value, denoising_threshold_type, delta_t, delta_p, denoising_pre_sampling_type, denoising_pre_sampling_duration, denoising_minimum_gap_duration, size_of_slice, min_window_x, max_window_x, min_window_y, max_window_y, output_decimated_raw_data) new_filter = self.__dto_converter.production_folder_dto_converter.build_data(self.__field_id, self.__id, self.__cluster_apis.automation_api.create_filter(data.id, payload)) data.filters.append(new_filter) return new_filter def copy_file(self, file: Union[Document, File], new_name: Optional[str] = None, field_id: Optional[str] = None, well_id: Optional[str] = None, well_group_id: Optional[str] = None, user_task_id: Optional[str] = None) -> File: """ Copy this document to the current file folder or to another field/well file folder Parameters ---------- file: File to copy new_name: Use this parameter if you want to rename the copied document field_id: Specify the field id to copy the file under a different field folder well_id: Specify the well id to copy the file under a different field folder well_group_id: Specify the well group id to copy the file under a well group user_task_id: Specify the user task id to copy the file under a user task Returns ------- str: id of the new copied file """ field_id = self.__field_id if field_id is None else field_id well_id = self.__id if well_id is None and well_group_id is None else well_id if new_name is None: file_dto = self.__cluster_apis.field_api.copy_file(field_id, well_id, well_group_id, user_task_id, file.file_id) else: file_dto = self.__cluster_apis.field_api.copy_and_rename_file(field_id, well_id, well_group_id, user_task_id, file.file_id, new_name) file = self.__dto_converter.file_dto_converter.build_file_from_file_dto(well_group_id, file_dto) if self.__files is None: self.__files = self.__dto_converter.get_files_recursively(self.field_id, self.id, self.__get_well_dto()) if user_task_id is not None: user_task = next(x for x in self.user_tasks if x.id == user_task_id) user_task.files.append(file) else: self.__files.append(file) return file def extract_well_properties_from_document(self, document: Document, analyses_to_extract: Optional[List[Analysis]] = None, extract_in_master_container: bool = False) -> WellPropertyContainer: """ Extracts well properties from a given document. Parameters ---------- document : Document The document from which to extract well properties. analyses_to_extract : Optional[List[Analysis]], optional List of analyses to extract. If None, all analyses from the document are extracted. Default is None. extract_in_master_container : bool, optional Flag indicating whether to extract properties into the master container. Default is False. Returns ------- WellPropertyContainer The container holding the extracted well properties. If `extract_in_master_container` is True, returns the master container; otherwise, returns the container matching the document name.""" analyses_to_extract = document.analyses if analyses_to_extract is None else analyses_to_extract dto = self.__dto_converter.get_extract_properties_command_dto([x.id for x in analyses_to_extract], extract_in_master_container) self.__cluster_apis.field_api.extract_well_properties_from_document(self.__field_id, self.__id, document.file_id, dto) self.refresh_data() return next(x for x in self.well_property_containers if x.is_master) if extract_in_master_container else next(x for x in self.well_property_containers if x.name == document.name) def create_pvt_from_kw_document(self, pvt_name: str, document_id: str, analysis_id: str) -> PVT: """ Create a pvt object in the well Parameters ---------- pvt_name: str Name of the PVT object to create document_id: str Id of the document to use analysis_id: Id of the analysis to use Returns ------- PVT The PVT object created. """ dto = self.__dto_converter.get_command_pvt_from_kw_document_dto(pvt_name, self.__field_id, self.__id, document_id, analysis_id) pvt = self.__dto_converter.build_pvt(self.__field_id, self.__id, self.__cluster_apis.automation_api.create_pvt_from_kw_document_well(self.id, dto)) self.pvts.append(pvt) return pvt def create_pvt_from_file(self, pvt_name: str, file_id: str, start_date: Optional[datetime] = None, reservoir_pressure: Optional[float] = None, reservoir_temperature: Optional[float] = None, gas_oil_type: Optional[GasOilTypeEnum] = None, unit_system: Optional[UnitSystemPvtEnum] = None) -> PVT: """ Creates a PVT (Pressure-Volume-Temperature) object from a file. You can define fallback parameters when the gas oil type is undetermined. Parameters ---------- pvt_name : str The name of the PVT object to be created. file_id : str The identifier of the file from which the PVT object will be created. start_date : datetime, optional The start date for the PVT data coverage. Defaults to None. reservoir_pressure : float, optional The pressure of the reservoir associated with the PVT object. Defaults to None. reservoir_temperature : float, optional The temperature of the reservoir associated with the PVT object. Defaults to None. gas_oil_type : GasOilTypeEnum, optional The type of gas or oil associated with the PVT object, as per the enumerated GasOilTypeEnum. Defaults to None. unit_system : UnitSystemPvtEnum, optional The unit system used for the PVT object, as per the enumerated UnitSystemPvtEnum. Defaults to None. Returns ------- PVT An instance of the PVT object created using the provided parameters and data from the specified text file. """ dto = self.__dto_converter.get_command_pvt_from_text_file_dto(pvt_name, self.__field_id, self.__id, file_id, start_date, reservoir_pressure, reservoir_temperature, gas_oil_type, unit_system) pvt = self.__dto_converter.build_pvt(self.__field_id, self.__id, self.__cluster_apis.automation_api.create_pvt_from_text_file_well(self.id, dto)) self.pvts.append(pvt) return pvt def create_well_property_container(self, name: str) -> WellPropertyContainer: """ Creates a new well property container for this :class:`Well`. Parameters ---------- name: Name of the new well property container. Returns ------- :class:`WellPropertyContainer`: The newly created well property container. """ dto = self.__cluster_apis.field_api.create_well_property_container(self.__field_id, self.__id, {"name": name}) return self.__dto_converter.get_well_property_containers_from_well_property_container_dto(self.__field_id, self.__id, self.__well_properties_catalog, [dto])[0] def __str__(self) -> str: """String representation of the Well""" return f"Well(name='{self.name}', id='{self.id}', uwi={self.uwi})" def __repr__(self) -> str: """Detailed representation of the Well""" return f"Well(field_id='{self.field_id}', id='{self.id}', name='{self.name}', uwi={self.uwi}, well_group_id={self.well_group_id})" def create_survey(self, name: str, date: datetime) -> Survey: """ Create a new survey in the well. This method creates a new survey to store well logs at a specific date.. Parameters ---------- name : str The name of the survey container. date: Date of the survey Returns ------- Survey The newly created survey container object. """ dto = self.__dto_converter.get_survey_command_dto(name, date) survey = self.__dto_converter.build_survey(self.__field_id, self.__id, self.__cluster_apis.field_api.create_survey(self.__field_id, self.__id, dto)) return survey def delete_survey(self, survey: Survey) -> None: self.__cluster_apis.field_api.delete_survey(self.__field_id, self.__id, survey.id) self.surveys.remove(survey) def create_model_book(self, document: Document, name: str) -> ModelBook: """ Creates a new model book associated with this well. Parameters ---------- document : Document The document to use as the basis for creating the model book. name : str The name to assign to the new model book. Returns ------- ModelBook The newly created model book instance. """ dto = self.__dto_converter.get_create_model_book_dto(document, name, field_id=self.__field_id, well_id=self.__id) model_book_query_dto = self.__cluster_apis.automation_api.create_model_book(self.__id, dto) model_book = self.__dto_converter.build_model_book(self.__field_id, self.__id, model_book_query_dto) self.model_books.append(model_book) return model_book def delete_model_book(self, model_book: ModelBook) -> None: """ Deletes a model book from this well. Parameters ---------- model_book : ModelBook The model book instance to delete. Returns ------- None This method doesn't return any value. """ self.__cluster_apis.field_api.delete_model_book(self.field_id, self.id, model_book.id) self.model_books.remove(model_book) def delete_file(self, file: Union[File, Document]) -> None: """ Deletes a specified file from the cluster and updates the local file list. This method removes a file either provided directly as a `File` object or indirectly as a `Document` object. In the case of a `Document`, it searches for and locates the equivalent `File` object within the local file list. Once found, the file is deleted from both the remote cluster via the underlying API and the local files list, keeping the data in sync. Parameters ---------- file : Union[File, Document] The file object to be deleted. It can be provided either directly as a `File` object or as a `Document` object, which is then matched to a `File` for deletion. Warnings -------- Calling this method while iterating over a collection containing the file (e.g., `for file in self.files:`) can lead to unexpected behavior, as it modifies the collection during iteration. To avoid this issue, create a copy of the collection before iteration (e.g., `for file in list(well.files):`) or collect the files to delete first, then delete them in a separate step. """ if type(file) is Document: file = next(x for x in self.__get_files() if x.file_id == file.file_id) file = cast(File, file) self.__cluster_apis.field_api.delete_file(self.__field_id, self.__id, file.file_id) file = next(x for x in self.__get_files() if x.file_id == file.file_id) self.__get_files().remove(file) file_folder = find_file_folder_recursively_by_file(file, self.file_folders) file = next(x for x in file_folder.files if x.file_id == file.file_id) file_folder.files.remove(file) def compute_forward_rates(self, pvt_id: str, wellbore_id: str, computation_date: datetime, pressure_vs_depth_vector: VsDepthVector, temperature_vs_depth_vector: VsDepthVector, velocity_vs_depth_vector: VsDepthVector, heat_transfer_coefficient: Optional[float] = None, reservoir_temperature: Optional[float] = None, reservoir_temperature_gradient: Optional[float] = None, reservoir_depth: Optional[float] = None) -> ForwardRatesOutputs: """ Computes forward rates based on well and reservoir parameters, including pressure, temperature, velocity vs depth, and optional reservoir properties. Ensures consistency in providing all optional reservoir parameters as a single block or none. Converts the input data into specific data transfer objects (DTOs), makes a remote API request for computation, and processes the results into output DTO. Parameters ---------- pvt_id : str The PVT (Pressure-Volume-Temperature) identifier associated with the computation. wellbore_id : str The wellbore identifier for the computation. computation_date : datetime The date on which the computation is being carried out. pressure_vs_depth_vector : VsDepthVector A vector representing pressure values vs depth in the wellbore. temperature_vs_depth_vector : VsDepthVector A vector representing temperature values vs depth in the wellbore. velocity_vs_depth_vector : VsDepthVector A vector representing velocity values vs depth in the wellbore. heat_transfer_coefficient : Optional[float] The heat transfer coefficient for reservoir interaction, if provided. reservoir_temperature : Optional[float] The temperature of the reservoir, if provided. reservoir_temperature_gradient : Optional[float] The gradient of temperature across the reservoir, if provided. reservoir_depth : Optional[float] The depth of the reservoir, if provided. Returns ------- ForwardRatesOutputs The computed forward rates encapsulated in an output DTO. Raises ------ ValueError If any but not all the optional parameters (heat_transfer_coefficient, reservoir_temperature, reservoir_temperature_gradient, reservoir_depth) are provided. """ optional_params = [heat_transfer_coefficient, reservoir_temperature, reservoir_temperature_gradient, reservoir_depth] if not (all(param is None for param in optional_params) or all(param is not None for param in optional_params)): raise ValueError("Either all optional parameters (heat_transfer_coefficient, reservoir_temperature, reservoir_temperature_gradient, reservoir_depth) must be provided or none of them.") compute_forward_rates_command_dto = self.__dto_converter.get_compute_forward_rates_command_dto(self.__field_id, self.__id, pvt_id, wellbore_id, computation_date, pressure_vs_depth_vector, temperature_vs_depth_vector, velocity_vs_depth_vector, heat_transfer_coefficient, reservoir_temperature, reservoir_temperature_gradient, reservoir_depth) compute_forward_rates_query_dto = self.__cluster_apis.tech_objects_api.compute_forward_rates(compute_forward_rates_command_dto) return self.__dto_converter.build_forward_rates_outputs(compute_forward_rates_query_dto) def create_document(self, document_type: KWModuleEnum, target_folder_id: str, name: str, well_radius: float, porosity: float, pay_zone: float, rock_compressibility: float, top_reservoir_depth: float, pvt_id: Optional[str] = None, pressure_id: Optional[str] = None, oil_rate_id: Optional[str] = None, gas_rate_id: Optional[str] = None, water_rate_id: Optional[str] = None) -> Document: dto = self.__dto_converter.get_create_document_dto(self.__field_id, self.__id, target_folder_id, name, well_radius, porosity, pay_zone, rock_compressibility, top_reservoir_depth, pvt_id, pressure_id, oil_rate_id, gas_rate_id, water_rate_id) if document_type == KWModuleEnum.saphir: document_id = self.__cluster_apis.pta_api.create_document(self.__field_id, dto).documentId elif document_type == KWModuleEnum.topaze: document_id = self.__cluster_apis.rta_api.create_document(self.__field_id, dto).documentId else: raise ValueError(f"Unsupported document type: {document_type}, only Saphir and Topaze documents are supported.") self.__files = None document = next(x for x in self.files if x.file_id == document_id).as_kw_document() return document def delete_data(self, data: Data) -> None: """ Deletes a specified data object from the current field and updates the internal data list by removing the corresponding object. Parameters ---------- data : Data The data object to be deleted, defined by its unique identifier (vector_id). The data object must already exist in the cluster, and it should match an object retrieved by the current field's internal data list. Returns ------- None This method does not return a value. """ self.__cluster_apis.field_api.delete_data(self.__field_id, self.__id, data.vector_id) data = next(x for x in self.__get_data() if x.id == data.id) self.__get_data().remove(data) def create_data_folder(self, name: str, folder_parent_id: Optional[str] = None, is_subfolder: bool = True) -> DataFolder: """ Creates a data folder within the current context, either as a direct child of a parent folder or at the well level, depending on the specified parameters. This method allows hierarchical organization of data folders. If `is_subfolder` is set to False, the data folder is created at the well level irrespective of `folder_parent_id`. Otherwise, the folder is created under the specified parent folder, or under a default folder with the name "Gauges" if no parent folder ID is provided. Parameters ---------- name : str The name of the data folder to be created. folder_parent_id : Optional[str], optional The ID of the parent folder under which the new folder is created, by default None. If None, the folder is created under the "Gauges" folder, unless `is_subfolder` is set to False, in which case the folder is created at the well level. is_subfolder : bool, optional A flag indicating whether the folder should be created as a subfolder (default: True). If False, the folder is created at the well level regardless of `folder_parent_id`. Returns ------- DataFolder The created `DataFolder` object. Raises ------ ValueError If `folder_parent_id` is provided but does not correspond to an existing folder in the current structure. """ if is_subfolder is False: data_folder_dto = self.__cluster_apis.field_api.create_data_folder_at_well_level(self.__field_id, self.__id, {'name': name}) new_data_folder = self.__dto_converter.get_data_folders_from_data_folder_dto_recursively(self.__field_id, self.__id, None, None, [data_folder_dto])[0] self.data_folders.append(new_data_folder) return new_data_folder if folder_parent_id is not None: parent_data_folder = find_data_folder_recursively_by_id(folder_parent_id, self.data_folders) if parent_data_folder is None: raise ValueError("data folder parent {} does not exist".format(folder_parent_id)) else: folder_parent_name = "Gauges" parent_data_folder = find_data_folders_recursively_by_name(folder_parent_name, self.data_folders)[0] data_folder = next((x for x in parent_data_folder.data_folders if x.name == name), None) if data_folder is None: data_folder_dto = self.__cluster_apis.field_api.create_data_folder(self.__field_id, self.__id, parent_data_folder.id, {'name': name}) data_folder = self.__dto_converter.get_data_folders_from_data_folder_dto_recursively(self.__field_id, self.__id, parent_data_folder.id, parent_data_folder.name, [data_folder_dto])[0] parent_data_folder.data_folders.append(data_folder) else: print(f"Data Folder with name {name} already exists in well {self.__name}") return data_folder