from ._private._cluster_apis import ClusterAPIS
from ._private._well_dto import WellDtoWithHierarchy, PlotPropertiesDto, WellParamDto
from ._private.dto_converters._well_dto_converter import WellDtoConverter
from datetime import datetime
from typing import List, Optional, cast, Union
from .production_folder_kind_enum import ProductionFolderKindEnum
from .custom_workflow import CustomWorkflow
from .field_data_types_catalog import FieldDataTypesCatalog
from .field_well_properties_catalog import FieldWellPropertiesCatalog
from .data import Data
from .file import File
from .document import Document
from .kw_module_enum import KWModuleEnum
from .analysis import Analysis
from .kw.model_book.model_book import ModelBook
from .user_task import UserTask
from .incremental_pta import IncrementalPTA
from .save_strategy_enum import SaveStrategyEnum
from .well_production_enum import WellProductionTypeEnum
from .workflow_type import IncrementalPTAType, IncrementalRTAType
from .incremental_rta import IncrementalRTA
from .well_property_container import WellPropertyContainer
from .plot import Plot
from .gauge_loading_strategy import GaugeLoadingStrategy
from .workflow_settings import WorkflowImproveSettings
from .measure_enum import MeasureEnum
from .unit_enum import UnitEnum
from .time_format_enum import TimeFormatEnum
from .pta_extraction import PTAExtraction
from .shutin import ShutIn
from .wellbore import Wellbore
from .production_folder import ProductionFolder
import uuid
from .look_for_enum import LookForEnum
from .input_type_enum import InputTypeEnum
from .shut_in_types_enum import ShutInTypesEnum
from .shut_in_categories_enum import ShutInCategoriesEnum
from .replace_by_enum import ReplaceByEnum
from .rate_correction_option_enum import RateCorrectionOptionEnum
from .simplify_method_enum import SimplifyMethodEnum
from .filter_type_enum import FilterTypeEnum
from .denoising_pre_sampling_type_enum import DenoisingPreSamplingTypeEnum
from .denoising_threshold_type_enum import DenoisingThresholdTypeEnum
from .pvt.pvt import PVT
from .file_folder import FileFolder
from .file_folders_extensions import find_file_folders_recursively_by_name, find_file_folder_recursively_by_id, find_file_folder_recursively_by_file
from .survey import Survey
from .well_logs import WellLogs
from .vs_depth_vector import VsDepthVector
from .forward_rates_outputs import ForwardRatesOutputs
from .gas_oil_type_enum import GasOilTypeEnum
from .unit_system_pvt_enum import UnitSystemPvtEnum
from .wellbore.wellbore_perforation import WellborePerforation
from .wellbore.wellbore_geometry import WellboreGeometry
from .data_folder import DataFolder
from .data_folder_extensions import find_data_folder_recursively_by_id, find_data_folders_recursively_by_name
[docs]
class Well:
    """ Well object.
    Presents a KAPPA Automate well object that can be queried for contained data, documents and every existing object under the well.
    Returned as a result of the :py:obj:`Field.wells` query.
    .. note:: Should not be instantiated directly.
    .. note::
        :py:obj:`Well.data`, :py:obj:`Well.shutin`, and :py:obj:`Well.documents` properties are populated on-demand
        and are cached for the duration of the :class:`Connection`.
    """
[docs]
    def __init__(self,
                 field_id: str,
                 well_group_id: Optional[str],
                 well_id: str,
                 name: str,
                 uwi: Optional[str],
                 data_types_catalog: FieldDataTypesCatalog,
                 well_properties_catalog: FieldWellPropertiesCatalog,
                 cluster_apis: ClusterAPIS,
                 dto_converter: WellDtoConverter) -> None:
        self.__field_id: str = field_id
        self.__id: str = well_id
        self.__well_group_id: Optional[str] = well_group_id
        self.__name: str = name
        self.__uwi: Optional[str] = uwi
        self.__dto_converter: WellDtoConverter = dto_converter
        self.__cluster_apis: ClusterAPIS = cluster_apis
        self.__data_types_catalog: FieldDataTypesCatalog = data_types_catalog
        self.__well_properties_catalog: FieldWellPropertiesCatalog = well_properties_catalog
        self.__labels: Optional[List[str]] = None
        self.__production_folders: Optional[List[ProductionFolder]] = None
        self.__corrected_production_folders: Optional[List[ProductionFolder]] = None
        self.__data: Optional[List[Data]] = None
        self.__files: Optional[List[File]] = None
        self.__user_tasks: Optional[List[UserTask]] = None
        self.__functions: Optional[List[Data]] = None
        self.__incremental_pta_workflows: Optional[List[IncrementalPTA]] = None
        self.__incremental_rta_workflows: Optional[List[IncrementalRTA]] = None
        self.__well_dto: Optional[WellDtoWithHierarchy] = None
        self.__shut_in: Optional[Data] = None
        self.__well_property_containers: Optional[List[WellPropertyContainer]] = None
        self.__well_plots: Optional[List[Plot]] = None
        self.__gauges: Optional[List[Data]] = None
        self.__filters: Optional[List[Data]] = None
        self.__productions: Optional[List[Data]] = None
        self.__corrected_productions: Optional[List[Data]] = None
        self.__file_folders: Optional[List[FileFolder]] = None
        self.__well_info_dto: Optional[List[WellParamDto]] = None
        self.__production_type: Optional[WellProductionTypeEnum] = None
        self.__wellbore: Optional[Wellbore] = None
        self.__custom_workflows: Optional[List[CustomWorkflow]] = None
        self.__pvts: Optional[List[PVT]] = None
        self.__surveys: Optional[List[Survey]] = None
        self.__model_books: Optional[List[ModelBook]] = None
        self.__well_logs: Optional[List[WellLogs]] = None
        self.__data_folders: Optional[List[DataFolder]] = None 
    def __get_well_dto(self) -> WellDtoWithHierarchy:
        if self.__well_dto is None:
            self.__well_dto = self.__cluster_apis.field_api.get_well_dto(self.__field_id, self.__id)
        return self.__well_dto
    @property
    def field_id(self) -> str:
        """ Gets the id of the field that contains this :class:`Well`.
        """
        return self.__field_id
    @property
    def id(self) -> str:
        """ Gets the id of the :class:`Well` object.
        """
        return self.__id
    @property
    def name(self) -> str:
        """ Gets the name of the :class:`Well`.
        """
        return self.__name
    @property
    def uwi(self) -> Optional[str]:
        """ Gets the UWI of the :class:`Well` object.
        """
        return self.__uwi
    @property
    def labels(self) -> List[str]:
        """ Gets the labels of the :class:`Well` object.
        """
        if self.__labels is None:
            self.__labels = self.__dto_converter.file_dto_converter.get_labels_from_labels_dto(self.__get_well_dto().labels)
        return self.__labels
    @property
    def well_group_id(self) -> Optional[str]:
        """ Gets the well group id that contains this :class:`Well`.
        """
        return self.__well_group_id
    @property
    def production_folders(self) -> List[ProductionFolder]:
        """ Gets the raw production folders that contains this :class:`Well`.
        """
        if self.__production_folders is None:
            self.__production_folders = self.__dto_converter.get_production_folders_from_production_folders_dto(self.__field_id, self.__id, self.__get_well_dto().productions)
        return self.__production_folders
    @property
    def corrected_production_folders(self) -> List[ProductionFolder]:
        """ Gets the corrected production folders that contains this :class:`Well`.
        """
        if self.__corrected_production_folders is None:
            self.__corrected_production_folders = self.__dto_converter.get_production_folders_from_production_folders_dto(self.__field_id, self.__id, self.__get_well_dto().productions, ProductionFolderKindEnum.corrected_production)
        return self.__corrected_production_folders
    @property
    def shut_in(self) -> Optional[Data]:
        """ Gets the shut-in data for this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        if self.__shut_in is None:
            well_dto_shut_in = self.__get_well_dto().shutin
            if well_dto_shut_in is not None:
                self.__shut_in = self.__dto_converter.production_folder_dto_converter.build_data(self.__field_id, self.__id, well_dto_shut_in)
        return self.__shut_in
    @property
    def gauges(self) -> List[Data]:
        """ Gets the list of gauges contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`."""
        if self.__gauges is None:
            self.__gauges, self.__filters = self.__dto_converter.get_gauges_and_filters_from_data_folders_dto(self.__field_id, self.__id, self.__get_well_dto().dataFolders)
        return self.__gauges
    @property
    def data(self) -> List[Data]:
        """ Gets the list of data contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        return list(self.__get_data())
    def __get_data(self) -> List[Data]:
        if self.__data is None:
            self.__data = [item for sublist in [self.gauges, self.filters, self.productions, self.corrected_productions, self.functions, [y for x in self.user_tasks for y in x.outputs], [y for x in self.incremental_rta_workflows for y in x.outputs],
                                                [self.shut_in] if self.shut_in is not None else [], [item for x in self.custom_workflows for item in x.data]] for item in sublist]
        return self.__data
    @property
    def filters(self) -> List[Data]:
        """ Gets the list of filters contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        if self.__filters is None:
            self.__gauges, self.__filters = self.__dto_converter.get_gauges_and_filters_from_data_folders_dto(self.__field_id, self.__id, self.__get_well_dto().dataFolders)
        return self.__filters
    @property
    def productions(self) -> List[Data]:
        """ Gets the list of production phases contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        if self.__productions is None:
            self.__productions = list()
            for production_folder in self.production_folders:
                self.__productions.extend(production_folder.data)
        return self.__productions
    @property
    def corrected_productions(self) -> List[Data]:
        """ Gets the list of corrected production phases contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        if self.__corrected_productions is None:
            self.__corrected_productions = list()
            for corrected_production_folder in self.corrected_production_folders:
                self.__corrected_productions.extend(corrected_production_folder.data)
        return self.__corrected_productions
    @property
    def files(self) -> List[File]:
        """ Gets the list of files contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        return list(self.__get_files())
    def __get_files(self) -> List[File]:
        if self.__files is None:
            self.__files = self.__dto_converter.get_files_recursively(self.field_id, self.id, self.__get_well_dto())
        return self.__files
    @property
    def documents(self) -> List[Document]:
        """ Gets the list of KW documents contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        document_list = list()
        for file in self.__get_files():
            try:
                document = file.as_kw_document()
            except ValueError:
                document = None
            if document is not None:
                document_list.append(document)
        for ipta in self.incremental_pta_workflows:
            document_list.extend(ipta.output_documents)
        return document_list
    @property
    def user_tasks(self) -> List[UserTask]:
        """ Gets the list of user tasks contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        if self.__user_tasks is None:
            self.__user_tasks = self.__dto_converter.get_user_tasks_from_user_task_dto(self.field_id, self.id, None, self.__data_types_catalog, self.__get_well_dto().userTasks)
        return self.__user_tasks
    @property
    def functions(self) -> List[Data]:
        """ Gets the list of functions contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        if self.__functions is None:
            self.__functions = self.__dto_converter.get_functions_from_functions_dto(self.field_id, self.id, self.__get_well_dto().functions)
        return self.__functions
    @property
    def incremental_pta_workflows(self) -> List[IncrementalPTA]:
        """ Gets the list of incremental PTA workflows contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        if self.__incremental_pta_workflows is None:
            self.__incremental_pta_workflows = self.__dto_converter.ipta_dto_converter.get_incremental_pta_workflows_from_dto(self.field_id, self.__well_group_id, self.id, self.__get_well_dto().incrementalPtas)
        return self.__incremental_pta_workflows
    @property
    def incremental_rta_workflows(self) -> List[IncrementalRTA]:
        """ Gets the list of incremental RTA workflows contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        if self.__incremental_rta_workflows is None:
            self.__incremental_rta_workflows = self.__dto_converter.get_incremental_rta_workflows_from_dto(self.field_id, self.__well_group_id, self.id, self.__get_well_dto().incrementalRtas)
        return self.__incremental_rta_workflows
    @property
    def well_property_containers(self) -> List[WellPropertyContainer]:
        """ Gets the list of well property containers contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        if self.__well_property_containers is None:
            self.__well_property_containers = self.__dto_converter.get_well_property_containers_from_well_property_container_dto(self.field_id, self.id, self.__well_properties_catalog, self.__get_well_dto().wellPropertyContainers)
        return self.__well_property_containers
    @property
    def plots(self) -> List[Plot]:
        """ Gets the list of plots contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        if self.__well_plots is None:
            self.__well_plots = self.__dto_converter.get_plots_from_well_dto(self.field_id, self.id, self.__data_types_catalog, self.__get_well_dto())
        return self.__well_plots
    @property
    def production_type(self) -> WellProductionTypeEnum:
        """ Returns the production type of the well
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        if self.__production_type is None:
            self.__production_type = self.__get_well_dto().productionType
        return self.__production_type
    @property
    def wellbore(self) -> Optional[Wellbore]:
        """ Gets the wellbore of the :class:`Well` object.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        if self.__wellbore is None:
            self.__wellbore = self.__dto_converter.get_wellbore_from_wellbore_dto(self.__get_well_dto().wellbore)
        return self.__wellbore
    @property
    def custom_workflows(self) -> List[CustomWorkflow]:
        """ Gets the list of custom workflows contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        if self.__custom_workflows is None:
            self.__custom_workflows = self.__dto_converter.get_custom_workflows_from_custom_workflows_dto(self.__field_id, self.__id, self.__data_types_catalog, self.__get_well_dto().customWorkflows)
        return self.__custom_workflows
    @property
    def pvts(self) -> List[PVT]:
        """ Gets the list of PVTs contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        if self.__pvts is None:
            self.__pvts = self.__dto_converter.get_pvts_from_pvts_dto(self.__field_id, self.__id, self.__get_well_dto().pvts)
        return self.__pvts
    @property
    def file_folders(self) -> List[FileFolder]:
        """ Gets the list of file folders contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        if self.__file_folders is None:
            self.__file_folders = self.__dto_converter.file_dto_converter.get_file_folders_from_file_folder_dto_recursively(self.__field_id, None, self.__id, None, None, self.__get_well_dto().fileFolders)
        return self.__file_folders
    @property
    def data_folders(self) -> List[DataFolder]:
        """ Gets the list of data folders contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`.
        """
        if self.__data_folders is None:
            self.__data_folders = self.__dto_converter.get_data_folders_from_data_folder_dto_recursively(self.__field_id, self.__id, None, None, self.__get_well_dto().dataFolders)
        return self.__data_folders
    @property
    def surveys(self) -> List[Survey]:
        """ Gets the list of surveys contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`."""
        if self.__surveys is None:
            self.__surveys = self.__dto_converter.get_surveys_from_surveys_dto(self.__field_id, self.__id, self.__get_well_dto().surveyContainer.surveys)
        return self.__surveys
    @property
    def model_books(self) -> List[ModelBook]:
        """ Gets the list of model book contained in this :class:`Well`.
        .. note:: This property is populated on-demand and is cached for the duration of the :class:`Connection`."""
        if self.__model_books is None:
            self.__model_books = self.__dto_converter.get_model_books_from_field_model_books_dto(self.__field_id, self.__id, self.__get_well_dto().modelBooks)
        return self.__model_books
    @property
    def well_logs(self) -> List[WellLogs]:
        """
        Gets the list of well log contained in this :class:`Well`.
        Returns:
            List[WellLogs]: A list of WellLogs objects containing well logging data.
        """
        if self.__well_logs is None:
            self.__well_logs = self.__dto_converter.get_well_logs_from_well_logs_dto(self.__field_id, self.__id, self.__get_well_dto().staticLogs)
        return self.__well_logs
[docs]
    def create_data(self, name: str, data_type: str, labels: Optional[List[str]] = None, folder_id: Optional[str] = None, measure_depth: float = 0, true_vertical_depth: float = 0, true_vertical_depth_sub_sea: float = 0) -> Data:
        """
        Creates and returns a new data object with specified parameters. This method serves as an interface
        to initialize a data entity with the provided attributes and configuration settings.
        Parameters
        ----------
        name : str
            The name of the data entity to create.
        data_type : str
            The type of data.
        labels : list of str, optional
            A list of labels associated with the data.
        folder_id : str, optional
            The identifier of the production folder where the data is stored.
        measure_depth : float
            The measured depth of the data.
        true_vertical_depth : float
            The true vertical depth of the data.
        true_vertical_depth_sub_sea : float
            The true vertical depth of the data below sea level.
        Returns
        -------
        Data
            The newly created data object.
        """
        return self.__create_data(name, data_type, False, labels, folder_id, measure_depth, true_vertical_depth, true_vertical_depth_sub_sea) 
[docs]
    def create_step_data(self, name: str, data_type: str, first_x: datetime, labels: Optional[List[str]] = None, folder_id: Optional[str] = None, measure_depth: float = 0, true_vertical_depth: float = 0, true_vertical_depth_sub_sea: float = 0) -> Data:
        """
        Creates and initializes a step data object with the provided parameters.
        The function generates a step data object by initializing it with essential details like name, data type and associated
        parameters. It requires a ’first_x’ value to be explicitly specified and uses it to update the step data object after
        creation. The function optionally accepts additional metadata such as labels or production folder identifier.
        Parameters
        ----------
        name : str
            The name of the step data to be created.
        data_type : str
            The type specifying the data format or classification.
        first_x : datetime
            The datetime object representing the first x-coordinate for the step data. This parameter must not be None.
        labels : list of str, optional
            A list of labels to associate with the step data. Defaults to None.
        folder_id : str, optional
            Identifier for the production folder to associate the step data with. Defaults to None.
        measure_depth : float
            The measured depth of the data.
        true_vertical_depth : float
            The true vertical depth of the data.
        true_vertical_depth_sub_sea : float
            The true vertical depth of the data below sea level.
        Returns
        -------
        Data
            The created step data object initialized with the given parameters.
        """
        if first_x is None:
            raise ValueError("Argument error: 'first_x' must be defined for step data.")
        data = self.__create_data(name, data_type, True, labels, folder_id, measure_depth, true_vertical_depth, true_vertical_depth_sub_sea)
        data.update_first_x(first_x)
        return data 
[docs]
    def upload_file(self, file_path: str, file_folder_id: Optional[str] = None, overwrite: bool = False, automatic_extraction: bool = False) -> File:
        """ Uploads a file to this :class:`Well`.
        Parameters
        ----------
        file_path:
            Full path and name of the file to upload.
        overwrite:
            A value indicating whether to overwrite a file with the same name if it already exists in the well.
        automatic_extraction:
            A value indicating whether to perform automatic extraction of well properties if uploading a KW file.
        file_folder_id:
            id of the file folder to upload the file
        Returns
        -------
        :class:`File`:
            An uploaded file object.
        """
        if file_folder_id is not None:
            file_folder = find_file_folder_recursively_by_id(file_folder_id, self.file_folders)
            if file_folder is None:
                raise ValueError(f"Missing File folder {file_folder_id} in well {self.__name}")
            file_dto = self.__cluster_apis.field_api.upload_file_to_file_folder_in_well(self.__field_id, self.__id, file_folder.id, file_path, overwrite, automatic_extraction)
        else:
            file_dto = self.__cluster_apis.field_api.upload_file_to_well(self.__field_id, self.__id, file_path, overwrite, automatic_extraction)
        file = self.__dto_converter.file_dto_converter.build_file_from_file_dto(None, file_dto)
        self.__get_files().append(file)
        return file 
[docs]
    def create_plot(self, plot_name: str, pane_name: Optional[str] = None, square_log_cycles: bool = False, stacked_bars: bool = False, x_label: str = "", is_x_log: bool = False, labels: Optional[List[str]] = None) -> Plot:
        """
        Create a Kappa Automate Plot instance under the well
        Parameters
        ----------
        plot_name:
            Name of plot
        pane_name:
            Name of the pane
        square_log_cycles:
            Whether or not use Square log cycles
        stacked_bars:
            Whether or not use stacked bars
        x_label:
            x label
        is_x_log:
            Whether or not use logarithmic scale for x values
        labels:
            Add some labels to the plot
        Returns
        -------
        :class:`Plot`:
            The new Plot instance created under the well.
        """
        if labels is None:
            labels = list()
        plot_properties = PlotPropertiesDto(name=plot_name, pane_name=pane_name, square_log_cycles=square_log_cycles, stacked_bars=stacked_bars,
                                            x_label=x_label, is_x_log=is_x_log, labels=labels)
        plot_instance_dto = self.__dto_converter.plot_dto_converter.get_plot_instance_dto(plot_properties)
        plot_id, plot_name = self.__cluster_apis.field_api.create_plot(self.__field_id, self.__id, plot_instance_dto)
        new_plot = Plot(self.__field_id, self.__id, plot_id, plot_name, self.__cluster_apis.field_api, self.__data_types_catalog, self.__dto_converter.plot_dto_converter)
        self.plots.append(new_plot)
        return new_plot 
    def __create_data(self, name: str, data_type: str, is_by_step: bool = False, labels: Optional[List[str]] = None, folder_id: Optional[str] = None, measure_depth: float = 0, true_vertical_depth: float = 0, true_vertical_depth_sub_sea: float = 0) -> Data:
        unique_data_name = name
        i = 1
        production_folder_id = next((x for x in self.production_folders if x.id == folder_id), None)
        is_production_folder = True if production_folder_id is not None else False
        data = self.gauges if is_production_folder else self.productions
        while True:
            if next((x for x in data if x.name == unique_data_name), None) is None:
                break
            unique_data_name = name + " #" + str(i)
            i += 1
        input_data_dto = self.__dto_converter.data_dto_converter.get_basic_data_dto(name, data_type, is_by_step, uuid.uuid4(), labels, measure_depth, true_vertical_depth, true_vertical_depth_sub_sea)
        basic_data_dto = self.__cluster_apis.field_api.create_basic_data(self.__field_id, self.__id, folder_id, input_data_dto, is_production_folder)
        new_data = self.__dto_converter.production_folder_dto_converter.build_data(self.field_id, self.id, basic_data_dto)
        data.append(new_data)
        return new_data
[docs]
    def find_gauge_by_id(self, vector_id: str) -> Data:
        """
        Finds a gauge by id.
        Parameters
        ----------
        vector_id:
            The id of a vector to search for.
        """
        try:
            return next(x for x in self.data if x.vector_id == vector_id)
        except StopIteration:
            raise ValueError(f"Data with vector_id {vector_id} cannot be find inside the well {self.name}") 
[docs]
    def find_gauge_by_type(self, data_type: Optional[str], label: Optional[str] = None, is_reference: bool = True, default_value: Optional[Data] = None) -> Data:
        """
        Finds the first matching gauge by data-type and/or label.
        Parameters
        ----------
        data_type : Optional[str]
            The type of the gauge to find.
        label : Optional[str], optional
            The label of the gauge, if any (default is None).
        is_reference : bool, optional
            Whether the gauge is a reference gauge (default is True).
        default_value : Optional[Data], optional
            The default value to return if the gauge is not found (default is None).
        Returns
        -------
        Data
            The found gauge.
        """
        data = self.__match_gauge(self.data, data_type, label, is_reference)
        if data is not None:
            return data
        # Recursive search if data_type is provided
        if data_type is not None:
            return self.find_gauge_by_type(None, data_type, is_reference, default_value)
        if default_value is not None:
            return default_value
        raise ValueError(f"Data with data type {data_type} and label {label} cannot be found inside the well {self.name}") 
[docs]
    def refresh_data(self) -> None:
        """
        Clean all the well attributes and dto from the cache, to grab updated attributes.
        """
        self.__data = None
        self.__gauges = None
        self.__productions = None
        self.__corrected_productions = None
        self.__production_folders = None
        self.__filters = None
        self.__files = None
        self.__file_folders = None
        self.__user_tasks = None
        self.__functions = None
        self.__incremental_pta_workflows = None
        self.__incremental_rta_workflows = None
        self.__well_dto = None
        self.__shut_in = None
        self.__well_property_containers = None
        self.__well_plots = None
        self.__well_info_dto = None
        self.__production_type = None
        self.__wellbore = None
        self.__model_books = None
        self.__pvts = None
        self.__file_folders = None
        self.__data_folders = None
        self.__surveys = None
        self.__well_logs = None
        self.__custom_workflows = None 
    def __match_gauge(self, data_list: List[Data], data_type: Optional[str], label: Optional[str] = None, is_reference: bool = True) -> Optional[Data]:
        """
        Finds a first matching gauge by data-type and/or label.
        Parameters
        ----------
        data_list:
            A list of data to search for matching data.
        data_type:
            A data-type to search for.
        label:
            A label to search for.
        """
        try:
            if data_type is not None and label is not None:
                try:
                    return next(x for x in data_list if x.data_type == data_type and x.is_reference == is_reference and label in x.labels)
                except StopIteration:
                    return next(x for x in data_list if x.data_type == data_type and label in x.labels)
            elif data_type is not None and label is None:
                return next(x for x in data_list if x.data_type == data_type and x.is_reference == is_reference)
            elif label is not None:
                return next(x for x in data_list if data_type in x.labels)
        except StopIteration:
            pass
        return None
[docs]
    def create_incremental_pta(self, name: str, workflow_type: IncrementalPTAType, shutin_id: str, pressure_id: str, pta_document_id: str,
                               improve_settings: Optional[WorkflowImproveSettings] = None, min_shutin_duration: float = 10, replication_of_results: bool = False,
                               save_strategy: Optional[SaveStrategyEnum] = None, labels: Optional[List[str]] = None, auto_compute_semilog_pstar: bool = False,
                               shut_in_types: Optional[List[ShutInTypesEnum]] = None, shut_in_categories: Optional[List[ShutInCategoriesEnum]] = None) -> IncrementalPTA:
        """
        Create an incremental pta workflow
        Parameters
        ----------
        name:
            Name of the ipta.
        workflow_type:
            A workflow-type to use.
        shutin_id:
            Vector id of the shutin input
        pressure_id:
            Vector id of the pressure
        pta_document_id:
            Id of the reference pta document
        improve_settings:
            Improve Settings parameter when using an improve ipta
        min_shutin_duration:
            Minimum shutin duration
        replication_of_results:
            If we replicate the results in the master container or not
        save_strategy:
            save strategy of the output documents
        labels:
            a list of labels to add to the ipta, the associated documents and the associated well properties container
        auto_compute_semilog_pstar:
            Auto compute the semilog pstar
        shut_in_types:
            List of shut in types to use
        shut_in_categories:
            List of shut in categories to use
        """
        if labels is None:
            labels = []
        oil_rate_data = next((x for x in self.corrected_productions if x.data_type == "qo"), None)
        water_rate_data = next((x for x in self.corrected_productions if x.data_type == "qw"), None)
        payload = self.__dto_converter.ipta_dto_converter.build_ipta_dto(self.__field_id, self.__id, name, workflow_type, shutin_id, pressure_id, pta_document_id,
                                                                         oil_rate_data, water_rate_data, labels, improve_settings, min_shutin_duration, replication_of_results,
                                                                         save_strategy, auto_compute_semilog_pstar, shut_in_types, shut_in_categories)
        incremental_pta_dto = self.__cluster_apis.automation_api.create_incremental_pta(self.__id, payload)
        new_ipta = self.__dto_converter.ipta_dto_converter.get_incremental_pta_workflows_from_dto(self.field_id, self.__well_group_id, self.id, [incremental_pta_dto])[0]
        self.incremental_pta_workflows.append(new_ipta)
        return new_ipta 
[docs]
    def create_incremental_rta(self, name: str, workflow_type: IncrementalRTAType, pressure_id: str, rta_document: Document, data_hours_to_analyze: float = 2160,
                               improve_settings: Optional[WorkflowImproveSettings] = None, oil_rate_vector_id: Optional[str] = None, gas_rate_vector_id: Optional[str] = None,
                               water_rate_vector_id: Optional[str] = None, gauge_loading_strategy: Optional[GaugeLoadingStrategy] = None, is_improved_only_on_new_data: Optional[bool] = None, replication_of_results: bool = False) -> IncrementalRTA:
        """
        Create an incremental rta workflow
        Parameters
        ----------
        name:
            Name of the ipta.
        workflow_type:
            A workflow-type to use.
        pressure_id:
            Vector id of the pressure
        rta_document:
            reference rta document
        data_hours_to_analyze:
            number of hours to analyze
        improve_settings:
            Improve Settings parameter when using an improve ipta
        oil_rate_vector_id:
            Vector id of the oil rate data
        gas_rate_vector_id:
            Vector id of the gas rate data
        water_rate_vector_id:
            Vector id of the water rate data
        gauge_loading_strategy:
            gauge loading strategy of the output documents
        is_improved_only_on_new_data:
            If the improve irta is only on new data
        replication_of_results:
            If we replicate the results in the master container or not
        """
        payload = self.__dto_converter.irta_dto_converter.build_irta_payload(self.__field_id, self.__id, name, workflow_type, pressure_id, rta_document,
                                                                             data_hours_to_analyze, improve_settings, oil_rate_vector_id, gas_rate_vector_id, water_rate_vector_id, gauge_loading_strategy, is_improved_only_on_new_data, replication_of_results)
        incremental_rta_dto = self.__cluster_apis.automation_api.create_incremental_rta(self.__id, payload)
        new_irta = self.__dto_converter.get_incremental_rta_workflows_from_dto(self.field_id, self.__well_group_id, self.id, [incremental_rta_dto])[0]
        self.incremental_rta_workflows.append(new_irta)
        return new_irta 
[docs]
    def load_gauge(self, datasource_name: str, datasource_gauge_name: str, dimension: MeasureEnum, data_type: str,
                   unit: UnitEnum, time_format: TimeFormatEnum, gauge_name: Optional[str] = None, children_datasource_names: Optional[List[str]] = None,
                   last_step_duration_hours: Optional[float] = None,
                   is_high_frequency: bool = True, gauge_model: str = "", measure_depth: float = 0, true_vertical_depth: float = 0, true_vertical_depth_sub_sea: float = 0, serial_number: str = "", labels: Optional[List[str]] = None,
                   comment: str = "", read_from: Optional[datetime] = None, read_to: Optional[datetime] = None, folder_id: Optional[str] = None) -> Data:
        """
        Loads a gauge with the provided attributes into the system.
        This method integrates with external data APIs to fetch required data sources
        and constructs a payload with given attributes for the automation API to
        register a new gauge. The method ensures the new gauge is properly loaded
        by continuously checking the system until it appears in the available gauges list.
        Parameters
        ----------
        datasource_name : str
            Name of the data source where the gauge data is stored.
        datasource_gauge_name : str
            Name of the specific gauge within the data source.
        dimension : MeasureEnum
            The dimension of the data measured by the gauge.
        data_type : str
            The type of the data being processed (e.g., float, int).
        unit : UnitEnum
            The unit of measurement for the gauge.
        time_format : TimeFormatEnum
            The time format associated with the gauge data.
        gauge_name : str, optional
            The name assigned to the gauge. Defaults to `datasource_gauge_name`.
        children_datasource_names : list of str, optional
            Names of the child data sources if applicable.
        last_step_duration_hours : float, optional
            Duration of the last recorded step in hours. Default is None.
        is_high_frequency : bool, default=True
            Boolean indicating if the gauge produces high frequency data.
        gauge_model : str, default=""
            Model of the gauge being registered.
        measure_depth : float, default=0
            The depth at which the gauge measurements are taken, in meters.
        true_vertical_depth : float, default=0
            The true vertical depth of the gauge.
        true_vertical_depth_sub_sea : float, default=0
            The true vertical depth below sea level for the gauge.
        serial_number : str, default=""
            Serial number of the gauge, if available.
        labels : list of str, optional
            Labels associated with the gauge, useful for categorization.
        comment : str, default=""
            An optional comment or description for the gauge.
        read_from : datetime, optional
            Start datetime for reading gauge data.
        read_to : datetime, optional
            End datetime for reading gauge data.
        folder_id : str, optional
            ID of the folder to associate the gauge with.
        Returns
        -------
        Data
            Returns the newly created gauge object once it is successfully registered.
        """
        if gauge_name is None:
            gauge_name = datasource_gauge_name
        datasource_id, tag_id = self.__cluster_apis.external_data_api.get_datasource_id_and_tag_id(datasource_name, datasource_gauge_name, children_datasource_names)
        payload = self.__dto_converter.data_dto_converter.get_gauge_dto(self.__field_id, self.__id, datasource_id, tag_id, gauge_name, dimension, data_type,
                                                                        unit,
                                                                        time_format, last_step_duration_hours, is_high_frequency, gauge_model, measure_depth, true_vertical_depth, true_vertical_depth_sub_sea,
                                                                        serial_number, labels, comment, False, read_from, read_to)
        if folder_id is None:
            data_dto = self.__cluster_apis.automation_api.load_gauge(self.__id, payload)
        else:
            data_dto = self.__cluster_apis.automation_api.load_gauge_under_folder(folder_id, payload)
        new_gauge = self.__dto_converter.production_folder_dto_converter.build_data(self.__field_id, self.__id, data_dto)
        if folder_id is not None:
            folder = find_data_folder_recursively_by_id(folder_id, self.data_folders)
            folder.data.append(new_gauge)
        self.gauges.append(new_gauge)
        return new_gauge 
[docs]
    def rename(self, new_well_name: str) -> None:
        """
        Rename the current well object.
        Parameters
        ----------
        new_well_name:
            New name of the well
        """
        self.__name = self.__cluster_apis.field_api.rename_well(self.__field_id, self.__id, {"name": new_well_name}) 
[docs]
    def create_file_folder(self, name: str, folder_parent_id: Optional[str] = None) -> FileFolder:
        """
        Creates a new file folder within the specified parent folder or within the default
        parent folder if none is specified.
        This method searches for an existing folder with the specified name in the
        given parent folder. If such a folder already exists, it will return the existing
        folder. Otherwise, it creates a new folder under the parent folder, updates the
        folder hierarchy and appends the new folder to the parent's list of child folders.
        Parameters
        ----------
        name : str
            The name of the new file folder to be created.
        folder_parent_id : Optional[str], optional
            The ID of the parent folder under which the new folder should be created. If
            not provided, the new folder will be created under the default parent folder.
        Returns
        -------
        FileFolder
            The created or pre-existing file folder object.
        """
        if folder_parent_id is not None:
            parent_file_folder = find_file_folder_recursively_by_id(folder_parent_id, self.file_folders)
            if parent_file_folder is None:
                raise ValueError("File folder parent {} does not exist".format(folder_parent_id))
        else:
            folder_parent_name = "Files"
            parent_file_folder = find_file_folders_recursively_by_name(folder_parent_name, self.file_folders)[0]
        file_folder = next((x for x in parent_file_folder.file_folders if x.name == name), None)
        if file_folder is None:
            file_folder_dto = self.__cluster_apis.field_api.create_file_folder(self.__field_id, self.__id, parent_file_folder.id, {'name': name})
            file_folder = self.__dto_converter.file_dto_converter.get_file_folders_from_file_folder_dto_recursively(self.__field_id, None, self.__id, parent_file_folder.id, parent_file_folder.name, [file_folder_dto])[0]
            parent_file_folder.file_folders.append(file_folder)
        else:
            print(f"File Folder with name {name} already exists in well {self.__name}")
        return file_folder 
[docs]
    def create_wellbore(self, geometries: List[WellboreGeometry], perforations: Optional[List[WellborePerforation]] = None, delete_existing_wellbore: bool = False) -> None:
        """
        Creates or updates a wellbore with the specified geometries and perforations. If the `delete_existing_wellbore`
        flag is set to True, the existing wellbore will be deleted before creating a new one.
        Parameters
        ----------
        geometries : List[WellboreGeometry]
            A list of wellbore geometries to define the structure of the wellbore.
        perforations : Optional[List[WellborePerforation]], optional
            A list of perforations to specify the locations where fluid can flow
            between the wellbore and the reservoir, by default None.
        delete_existing_wellbore : bool, optional
            A flag indicating whether to delete the existing wellbore before creating
            a new one, by default True.
        """
        if delete_existing_wellbore:
            self.delete_wellbore()
        wellbore_command_dto = self.__dto_converter.wellbore_dto_converter.get_wellbore_command_dto(self.__field_id, self.__id, self.__uwi, geometries, perforations)
        wellbore_query_dto = self.__cluster_apis.automation_api.create_wellbore(self.__id, wellbore_command_dto)
        self.__wellbore = self.__dto_converter.get_wellbore_from_wellbore_dto(wellbore_query_dto) 
[docs]
    def delete_wellbore(self) -> None:
        """
        Deletes the current wellbore if the well has one.
        """
        if self.wellbore is not None:
            self.__cluster_apis.field_api.delete_wellbore(self.field_id, self.id, self.wellbore.id)
            self.__wellbore = None 
[docs]
    def create_production_folder(self, name: str) -> ProductionFolder:
        """
        Creates a new production folder in the well
        Parameters
        ----------
        name: The name of the production folder
        Returns
        -------
             :class:`ProductionFolder`.
        """
        production_folder = self.__dto_converter.get_production_folders_from_production_folders_dto(self.__field_id, self.__id, [self.__cluster_apis.field_api.create_production_folder(self.__field_id, self.__id, {"name": name})])[0]
        if self.__production_folders is None:
            self.__production_folders = []
        self.__production_folders.append(production_folder)
        return production_folder 
[docs]
    def create_shut_in(self, input_type: InputTypeEnum, input_gauge: Data, look_for: LookForEnum, minimum_shut_in_duration_hr: float = 10, minimum_value_change: Optional[float] = None, requires_validation: bool = False) -> Data:
        """
        Creates a shut-in event based on the given parameters.
        Parameters
        ----------
        input_type : InputTypeEnum
            The type of input data.
        input_gauge : Data
            The gauge data to be used for identifying shut-ins.
        look_for : LookForEnum
            The criteria to look for in the data.
        minimum_shut_in_duration_hr : float
            The minimum duration of the shut-in in hours.
        minimum_value_change : float
            The minimum value change to identify a shut-in.
        requires_validation : bool, optional
            Whether the Shut-in requires validation (default is False).
        Returns
        -------
        Data
            Shut-in.
        """
        if self.__shut_in is not None:
            raise ValueError(f"There is already a shut-in object in this well {self.__name}")
        if minimum_value_change is None:
            minimum_value_change = cast(float, self.__dto_converter.unit_converter.convert_to_internal(UnitEnum.pressure_psia, 50))
        shut_in_creation_dto = self.__dto_converter.get_shut_in_command_dto(self.__field_id, self.__id, input_type, input_gauge, look_for, minimum_shut_in_duration_hr, minimum_value_change, requires_validation)
        shut_in_dto = self.__cluster_apis.automation_api.create_shut_in(self.__id, shut_in_creation_dto)
        shut_in = self.__dto_converter.production_folder_dto_converter.build_data(self.__field_id, self.__id, shut_in_dto)
        self.__shut_in = shut_in
        return shut_in 
[docs]
    def create_corrected_production(self, production_folder: ProductionFolder, shut_in: Data, name: str = "Corrected production #1", use_uptime_correction: bool = False, set_to_0_in_shut_ins: bool = True,
                                    replace_by: Optional[ReplaceByEnum] = None, constant_value_for_liquid_rate: Optional[float] = None, constant_value_for_gas_rate: Optional[float] = 1,
                                    rate_correction_option: Optional[RateCorrectionOptionEnum] = None, difference: float = 0.1, delta_p_difference_multiplier: float = 0.8,
                                    pressure: Optional[Data] = None, iqr_multiplier: Optional[bool] = None, simplify_method: Optional[SimplifyMethodEnum] = None,
                                    delta_y_for_liquid_rate_simplification: float = 0.1, delta_y_for_gas_rate_simplification: float = 0.1, keep_original_rates_before_shut_ins: bool = False,
                                    set_rates_to_negative_values: bool = False) -> None:
        """
        Creates a corrected production dataset based on the given parameters.
        Parameters
        ----------
        production_folder : ProductionFolder
            The production folder to be corrected.
        shut_in : Data
            The shut-in data to be used for corrections.
        name : str, optional
            The name of the corrected production dataset (default is "Corrected production #1").
        use_uptime_correction : bool, optional
            Whether to use uptime correction (default is False).
        set_to_0_in_shut_ins : bool, optional
            Whether to set rates to 0 during shut-ins (default is True).
        replace_by : Optional[ReplaceByEnum], optional
            The method to replace values (default is None).
        constant_value_for_liquid_rate : Optional[float], optional
            A constant value for liquid rate (default is None).
        constant_value_for_gas_rate : Optional[float], optional
            A constant value for gas rate (default is 1).
        rate_correction_option : Optional[RateCorrectionOptionEnum], optional
            The option for rate correction (default is None).
        difference : float, optional
            The difference value for corrections (default is 0.1).
        delta_p_difference_multiplier : float, optional
            The multiplier for delta pressure difference (default is 0.8).
        pressure : Optional[Data], optional
            The pressure data to be used (default is None).
        iqr_multiplier : Optional[bool], optional
            Whether to use IQR multiplier (default is None).
        simplify_method : Optional[SimplifyMethodEnum], optional
            The method for simplification (default is None).
        delta_y_for_liquid_rate_simplification : float, optional
            The delta Y value for liquid rate simplification (default is 0.1).
        delta_y_for_gas_rate_simplification : float, optional
            The delta Y value for gas rate simplification (default is 0.1).
        keep_original_rates_before_shut_ins : bool, optional
            Whether to keep original rates before shut-ins (default is False).
        set_rates_to_negative_values : bool, optional
            Whether to set rates to negative values (default is False).
        Returns
        -------
        None
        """
        if len(production_folder.data) == 0:
            raise ValueError("The production folder does not contain any gauge, you must at least load one to be able to create a corrected production."
                             "If you loaded a gauge with the production_folder.load_production_gauge() method, you need to refresh the cache in your well with well.refresh_data() and re-grab the production folder")
        oil = next((x for x in production_folder.data if x.data_type == "qo"), None)
        gas = next((x for x in production_folder.data if x.data_type == "qg"), None)
        water = next((x for x in production_folder.data if x.data_type == "qw"), None)
        if rate_correction_option is not None and keep_original_rates_before_shut_ins is True:
            raise ValueError("You must choose between correcting the last rate and keeping the original rates before shut-ins. You cannot use both.")
        constant_value_for_liquid_rate = cast(float, self.__dto_converter.unit_converter.convert_to_internal(UnitEnum.rate_standard_baril_per_day, constant_value_for_liquid_rate)) if constant_value_for_liquid_rate is None else constant_value_for_liquid_rate
        constant_value_for_gas_rate = cast(float, self.__dto_converter.unit_converter.convert_to_internal(UnitEnum.rate_thousandcf_per_day, constant_value_for_gas_rate)) if constant_value_for_gas_rate is None else constant_value_for_gas_rate
        corrected_production_command_dto = self.__dto_converter.production_folder_dto_converter.get_corrected_production_command_dto(self.__field_id, self.__id, production_folder.id, shut_in, name, use_uptime_correction, set_to_0_in_shut_ins, replace_by,
                                                                                                                                     constant_value_for_liquid_rate, constant_value_for_gas_rate, rate_correction_option, difference, delta_p_difference_multiplier,
                                                                                                                                     pressure, iqr_multiplier, simplify_method, delta_y_for_liquid_rate_simplification, delta_y_for_gas_rate_simplification,
                                                                                                                                     keep_original_rates_before_shut_ins, set_rates_to_negative_values, oil, gas, water)
        corrected_production_query_dto = self.__cluster_apis.automation_api.create_corrected_production(self.__id, corrected_production_command_dto)
        self.__corrected_productions = [self.__dto_converter.production_folder_dto_converter.build_data(self.field_id, self.__id, y.data) for y in corrected_production_query_dto.phases if y.data is not None] 
[docs]
    def create_filter(self, data: Data, filter_type: FilterTypeEnum, filter_name: str = 'New Filter', labels: Optional[List[str]] = None,
                      denoising_threshold_level: Optional[float] = None, denoising_threshold_value: float = 2340.80561921557, denoising_threshold_type: DenoisingThresholdTypeEnum = DenoisingThresholdTypeEnum.adaptative, delta_t: float = 2.0, delta_p: float = 13789.5, size_of_slice: int = 100000, min_window_x: Optional[datetime] = None,
                      max_window_x: Optional[datetime] = None, min_window_y: Optional[float] = None, max_window_y: Optional[float] = None, denoising_pre_sampling_type: DenoisingPreSamplingTypeEnum = DenoisingPreSamplingTypeEnum.intermediate,
                      denoising_pre_sampling_duration: float = 0.004166666666666667, denoising_minimum_gap_duration: Optional[float] = None, output_decimated_raw_data: bool = False) -> Data:
        """
        Create a wavelet filter under the gauge associated to this :class:`Data` object.
        Parameters
        ----------
        data:
            Source gauge to create the filter
        filter_type :
            Type of the wavelet filter to use
        filter_name :
            Name of the new filter
        labels:
            Labels of the filter
        denoising_threshold_level:
            Level of denoising
        denoising_threshold_type:
            Type of denoising threshold
        denoising_threshold_value:
            Threshold value (default value is in Pascal)
        delta_t:
            Delta T value
        delta_p:
            Delta P value (default value is in Pascal)
        size_of_slice:
            Size of the slice, minimum is 10000 and maximum is 100000
        min_window_x:
            Minimum window x value, must be a datetime
        max_window_x:
            Maximum window x value, must be a datetime
        min_window_y:
            Minimum window y value
        max_window_y:
            Maximum window y value
        denoising_pre_sampling_type:
            Only works if filter_type == FilterTypeEnum.wavelet_1 and must one of the following value ["Intermediate", "PTA", "User", "PA"]
        denoising_pre_sampling_duration:
            Only works if filter_type == FilterTypeEnum.wavelet_1, the pre sampling interval duration in seconds
        denoising_minimum_gap_duration:
            Only works if filter_type == FilterTypeEnum.wavelet_1, denoising minimum gap duration
        output_decimated_raw_data:
            To output the raw filter
        Returns
        -------
        Data:
            New filter
        """
        if denoising_threshold_type == DenoisingThresholdTypeEnum.adaptative and denoising_threshold_level is not None:
            denoising_threshold_value = 2340.80561921557 * (10 ** (denoising_threshold_level / 50))
        labels = labels if labels is not None else []
        size_of_slice = 10000 if size_of_slice < 10000 else 100000 if size_of_slice > 100000 else size_of_slice
        payload = self.__dto_converter.data_dto_converter.get_filter_dto(self.__field_id, self.__id, data.vector_id, filter_name, labels, filter_type, denoising_threshold_level, denoising_threshold_value, denoising_threshold_type,
                                                                         delta_t, delta_p, denoising_pre_sampling_type, denoising_pre_sampling_duration, denoising_minimum_gap_duration, size_of_slice, min_window_x, max_window_x, min_window_y, max_window_y, output_decimated_raw_data)
        new_filter = self.__dto_converter.production_folder_dto_converter.build_data(self.__field_id, self.__id, self.__cluster_apis.automation_api.create_filter(data.id, payload))
        data.filters.append(new_filter)
        return new_filter 
[docs]
    def copy_file(self, file: Union[Document, File], new_name: Optional[str] = None, field_id: Optional[str] = None, well_id: Optional[str] = None,
                  well_group_id: Optional[str] = None, user_task_id: Optional[str] = None) -> File:
        """ Copy this document to the current file folder or to another field/well file folder
        Parameters
        ----------
        file:
            File to copy
        new_name:
            Use this parameter if you want to rename the copied document
        field_id:
            Specify the field id to copy the file under a different field folder
        well_id:
            Specify the well id to copy the file under a different field folder
        well_group_id:
            Specify the well group id to copy the file under a well group
        user_task_id:
            Specify the user task id to copy the file under a user task
        Returns
        -------
        str:
            id of the new copied file
        """
        field_id = self.__field_id if field_id is None else field_id
        well_id = self.__id if well_id is None and well_group_id is None else well_id
        if new_name is None:
            file_dto = self.__cluster_apis.field_api.copy_file(field_id, well_id, well_group_id, user_task_id, file.file_id)
        else:
            file_dto = self.__cluster_apis.field_api.copy_and_rename_file(field_id, well_id, well_group_id, user_task_id, file.file_id, new_name)
        file = self.__dto_converter.file_dto_converter.build_file_from_file_dto(well_group_id, file_dto)
        if self.__files is None:
            self.__files = self.__dto_converter.get_files_recursively(self.field_id, self.id, self.__get_well_dto())
        if user_task_id is not None:
            user_task = next(x for x in self.user_tasks if x.id == user_task_id)
            user_task.files.append(file)
        else:
            self.__files.append(file)
        return file 
[docs]
    def create_pvt_from_kw_document(self, pvt_name: str, document_id: str, analysis_id: str) -> PVT:
        """
        Create a pvt object in the well
        Parameters
        ----------
        pvt_name: str
            Name of the PVT object to create
        document_id: str
            Id of the document to use
        analysis_id:
            Id of the analysis to use
        Returns
        -------
        PVT
            The PVT object created.
        """
        dto = self.__dto_converter.get_command_pvt_from_kw_document_dto(pvt_name, self.__field_id, self.__id, document_id, analysis_id)
        pvt = self.__dto_converter.build_pvt(self.__field_id, self.__id, self.__cluster_apis.automation_api.create_pvt_from_kw_document_well(self.id, dto))
        self.pvts.append(pvt)
        return pvt 
[docs]
    def create_pvt_from_file(self, pvt_name: str, file_id: str, start_date: Optional[datetime] = None,
                             reservoir_pressure: Optional[float] = None, reservoir_temperature: Optional[float] = None,
                             gas_oil_type: Optional[GasOilTypeEnum] = None, unit_system: Optional[UnitSystemPvtEnum] = None) -> PVT:
        """
        Creates a PVT (Pressure-Volume-Temperature) object from a file. You can define fallback parameters when the gas oil type is undetermined.
        Parameters
        ----------
        pvt_name : str
            The name of the PVT object to be created.
        file_id : str
            The identifier of the file from which the PVT object will be created.
        start_date : datetime, optional
            The start date for the PVT data coverage. Defaults to None.
        reservoir_pressure : float, optional
            The pressure of the reservoir associated with the PVT object. Defaults to None.
        reservoir_temperature : float, optional
            The temperature of the reservoir associated with the PVT object. Defaults to None.
        gas_oil_type : GasOilTypeEnum, optional
            The type of gas or oil associated with the PVT object, as per the enumerated
            GasOilTypeEnum. Defaults to None.
        unit_system : UnitSystemPvtEnum, optional
            The unit system used for the PVT object, as per the enumerated UnitSystemPvtEnum.
            Defaults to None.
        Returns
        -------
        PVT
            An instance of the PVT object created using the provided parameters and data from
            the specified text file.
        """
        dto = self.__dto_converter.get_command_pvt_from_text_file_dto(pvt_name, self.__field_id, self.__id, file_id, start_date, reservoir_pressure, reservoir_temperature, gas_oil_type, unit_system)
        pvt = self.__dto_converter.build_pvt(self.__field_id, self.__id, self.__cluster_apis.automation_api.create_pvt_from_text_file_well(self.id, dto))
        self.pvts.append(pvt)
        return pvt 
[docs]
    def create_well_property_container(self, name: str) -> WellPropertyContainer:
        """ Creates a new well property container for this :class:`Well`.
        Parameters
        ----------
        name:
            Name of the new well property container.
        Returns
        -------
        :class:`WellPropertyContainer`:
            The newly created well property container.
        """
        dto = self.__cluster_apis.field_api.create_well_property_container(self.__field_id, self.__id, {"name": name})
        return self.__dto_converter.get_well_property_containers_from_well_property_container_dto(self.__field_id, self.__id, self.__well_properties_catalog, [dto])[0] 
[docs]
    def __str__(self) -> str:
        """String representation of the Well"""
        return f"Well(name='{self.name}', id='{self.id}', uwi={self.uwi})" 
[docs]
    def __repr__(self) -> str:
        """Detailed representation of the Well"""
        return f"Well(field_id='{self.field_id}', id='{self.id}', name='{self.name}', uwi={self.uwi}, well_group_id={self.well_group_id})" 
[docs]
    def create_survey(self, name: str, date: datetime) -> Survey:
        """
            Create a new survey in the well.
            This method creates a new survey to store well logs at a specific date..
            Parameters
            ----------
            name : str
                The name of the survey container.
            date:
                Date of the survey
            Returns
            -------
            Survey
                The newly created survey container object.
            """
        dto = self.__dto_converter.get_survey_command_dto(name, date)
        survey = self.__dto_converter.build_survey(self.__field_id, self.__id, self.__cluster_apis.field_api.create_survey(self.__field_id, self.__id, dto))
        return survey 
[docs]
    def delete_survey(self, survey: Survey) -> None:
        self.__cluster_apis.field_api.delete_survey(self.__field_id, self.__id, survey.id)
        self.surveys.remove(survey) 
[docs]
    def create_model_book(self, document: Document, name: str) -> ModelBook:
        """
        Creates a new model book associated with this well.
        Parameters
        ----------
        document : Document
            The document to use as the basis for creating the model book.
        name : str
            The name to assign to the new model book.
        Returns
        -------
        ModelBook
            The newly created model book instance.
        """
        dto = self.__dto_converter.get_create_model_book_dto(document, name, field_id=self.__field_id, well_id=self.__id)
        model_book_query_dto = self.__cluster_apis.automation_api.create_model_book(self.__id, dto)
        model_book = self.__dto_converter.build_model_book(self.__field_id, self.__id, model_book_query_dto)
        self.model_books.append(model_book)
        return model_book 
[docs]
    def delete_model_book(self, model_book: ModelBook) -> None:
        """
        Deletes a model book from this well.
        Parameters
        ----------
        model_book : ModelBook
            The model book instance to delete.
        Returns
        -------
        None
            This method doesn't return any value.
        """
        self.__cluster_apis.field_api.delete_model_book(self.field_id, self.id, model_book.id)
        self.model_books.remove(model_book) 
[docs]
    def delete_file(self, file: Union[File, Document]) -> None:
        """
        Deletes a specified file from the cluster and updates the local file list.
        This method removes a file either provided directly as a `File` object or
        indirectly as a `Document` object. In the case of a `Document`, it searches
        for and locates the equivalent `File` object within the local file list. Once
        found, the file is deleted from both the remote cluster via the underlying
        API and the local files list, keeping the data in sync.
        Parameters
        ----------
        file : Union[File, Document]
            The file object to be deleted. It can be provided either directly as a
            `File` object or as a `Document` object, which is then matched to a
            `File` for deletion.
        Warnings
        --------
        Calling this method while iterating over a collection containing the file
        (e.g., `for file in self.files:`) can lead to unexpected behavior, as it
        modifies the collection during iteration. To avoid this issue, create a copy
        of the collection before iteration (e.g., `for file in list(well.files):`)
        or collect the files to delete first, then delete them in a separate step.
        """
        if type(file) is Document:
            file = next(x for x in self.__get_files() if x.file_id == file.file_id)
        file = cast(File, file)
        self.__cluster_apis.field_api.delete_file(self.__field_id, self.__id, file.file_id)
        file = next(x for x in self.__get_files() if x.file_id == file.file_id)
        self.__get_files().remove(file)
        file_folder = find_file_folder_recursively_by_file(file, self.file_folders)
        file = next(x for x in file_folder.files if x.file_id == file.file_id)
        file_folder.files.remove(file) 
[docs]
    def compute_forward_rates(self, pvt_id: str, wellbore_id: str, computation_date: datetime, pressure_vs_depth_vector: VsDepthVector, temperature_vs_depth_vector: VsDepthVector, velocity_vs_depth_vector: VsDepthVector, heat_transfer_coefficient: Optional[float] = None, reservoir_temperature: Optional[float] = None,
                              reservoir_temperature_gradient: Optional[float] = None,
                              reservoir_depth: Optional[float] = None) -> ForwardRatesOutputs:
        """
        Computes forward rates based on well and reservoir parameters, including pressure,
        temperature, velocity vs depth, and optional reservoir properties. Ensures consistency in
        providing all optional reservoir parameters as a single block or none. Converts the input
        data into specific data transfer objects (DTOs), makes a remote API request for computation,
        and processes the results into output DTO.
        Parameters
        ----------
        pvt_id : str
            The PVT (Pressure-Volume-Temperature) identifier associated with the computation.
        wellbore_id : str
            The wellbore identifier for the computation.
        computation_date : datetime
            The date on which the computation is being carried out.
        pressure_vs_depth_vector : VsDepthVector
            A vector representing pressure values vs depth in the wellbore.
        temperature_vs_depth_vector : VsDepthVector
            A vector representing temperature values vs depth in the wellbore.
        velocity_vs_depth_vector : VsDepthVector
            A vector representing velocity values vs depth in the wellbore.
        heat_transfer_coefficient : Optional[float]
            The heat transfer coefficient for reservoir interaction, if provided.
        reservoir_temperature : Optional[float]
            The temperature of the reservoir, if provided.
        reservoir_temperature_gradient : Optional[float]
            The gradient of temperature across the reservoir, if provided.
        reservoir_depth : Optional[float]
            The depth of the reservoir, if provided.
        Returns
        -------
        ForwardRatesOutputs
            The computed forward rates encapsulated in an output DTO.
        Raises
        ------
        ValueError
            If any but not all the optional parameters (heat_transfer_coefficient,
            reservoir_temperature, reservoir_temperature_gradient, reservoir_depth) are provided.
        """
        optional_params = [heat_transfer_coefficient, reservoir_temperature, reservoir_temperature_gradient, reservoir_depth]
        if not (all(param is None for param in optional_params) or all(param is not None for param in optional_params)):
            raise ValueError("Either all optional parameters (heat_transfer_coefficient, reservoir_temperature, reservoir_temperature_gradient, reservoir_depth) must be provided or none of them.")
        compute_forward_rates_command_dto = self.__dto_converter.get_compute_forward_rates_command_dto(self.__field_id, self.__id, pvt_id, wellbore_id, computation_date, pressure_vs_depth_vector, temperature_vs_depth_vector, velocity_vs_depth_vector, heat_transfer_coefficient, reservoir_temperature, reservoir_temperature_gradient, reservoir_depth)
        compute_forward_rates_query_dto = self.__cluster_apis.tech_objects_api.compute_forward_rates(compute_forward_rates_command_dto)
        return self.__dto_converter.build_forward_rates_outputs(compute_forward_rates_query_dto) 
[docs]
    def create_document(self, document_type: KWModuleEnum, target_folder_id: str, name: str, well_radius: float, porosity: float, pay_zone: float, rock_compressibility: float, top_reservoir_depth: float, pvt_id: Optional[str] = None, pressure_id: Optional[str] = None,
                        oil_rate_id: Optional[str] = None, gas_rate_id: Optional[str] = None, water_rate_id: Optional[str] = None) -> Document:
        dto = self.__dto_converter.get_create_document_dto(self.__field_id, self.__id, target_folder_id, name, well_radius, porosity, pay_zone, rock_compressibility, top_reservoir_depth, pvt_id, pressure_id, oil_rate_id, gas_rate_id, water_rate_id)
        if document_type == KWModuleEnum.saphir:
            document_id = self.__cluster_apis.pta_api.create_document(self.__field_id, dto).documentId
        elif document_type == KWModuleEnum.topaze:
            document_id = self.__cluster_apis.rta_api.create_document(self.__field_id, dto).documentId
        else:
            raise ValueError(f"Unsupported document type: {document_type}, only Saphir and Topaze documents are supported.")
        self.__files = None
        document = next(x for x in self.files if x.file_id == document_id).as_kw_document()
        return document 
[docs]
    def delete_data(self, data: Data) -> None:
        """
        Deletes a specified data object from the current field and updates the internal
        data list by removing the corresponding object.
        Parameters
        ----------
        data : Data
            The data object to be deleted, defined by its unique identifier (vector_id).
            The data object must already exist in the cluster, and it should match an
            object retrieved by the current field's internal data list.
        Returns
        -------
        None
            This method does not return a value.
        """
        self.__cluster_apis.field_api.delete_data(self.__field_id, self.__id, data.vector_id)
        data = next(x for x in self.__get_data() if x.id == data.id)
        self.__get_data().remove(data) 
[docs]
    def create_data_folder(self, name: str, folder_parent_id: Optional[str] = None, is_subfolder: bool = True) -> DataFolder:
        """
        Creates a data folder within the current context, either as a direct child
        of a parent folder or at the well level, depending on the specified parameters.
        This method allows hierarchical organization of data folders. If `is_subfolder`
        is set to False, the data folder is created at the well level irrespective of
        `folder_parent_id`. Otherwise, the folder is created under the specified parent
        folder, or under a default folder with the name "Gauges" if no parent folder ID
        is provided.
        Parameters
        ----------
        name : str
            The name of the data folder to be created.
        folder_parent_id : Optional[str], optional
            The ID of the parent folder under which the new folder is created, by
            default None. If None, the folder is created under the "Gauges" folder,
            unless `is_subfolder` is set to False, in which case the folder is created
            at the well level.
        is_subfolder : bool, optional
            A flag indicating whether the folder should be created as a subfolder
            (default: True). If False, the folder is created at the well level
            regardless of `folder_parent_id`.
        Returns
        -------
        DataFolder
            The created `DataFolder` object.
        Raises
        ------
        ValueError
            If `folder_parent_id` is provided but does not correspond to an existing
            folder in the current structure.
        """
        if is_subfolder is False:
            data_folder_dto = self.__cluster_apis.field_api.create_data_folder_at_well_level(self.__field_id, self.__id, {'name': name})
            new_data_folder = self.__dto_converter.get_data_folders_from_data_folder_dto_recursively(self.__field_id, self.__id, None, None, [data_folder_dto])[0]
            self.data_folders.append(new_data_folder)
            return new_data_folder
        if folder_parent_id is not None:
            parent_data_folder = find_data_folder_recursively_by_id(folder_parent_id, self.data_folders)
            if parent_data_folder is None:
                raise ValueError("data folder parent {} does not exist".format(folder_parent_id))
        else:
            folder_parent_name = "Gauges"
            parent_data_folder = find_data_folders_recursively_by_name(folder_parent_name, self.data_folders)[0]
        data_folder = next((x for x in parent_data_folder.data_folders if x.name == name), None)
        if data_folder is None:
            data_folder_dto = self.__cluster_apis.field_api.create_data_folder(self.__field_id, self.__id, parent_data_folder.id, {'name': name})
            data_folder = self.__dto_converter.get_data_folders_from_data_folder_dto_recursively(self.__field_id, self.__id, parent_data_folder.id, parent_data_folder.name, [data_folder_dto])[0]
            parent_data_folder.data_folders.append(data_folder)
        else:
            print(f"Data Folder with name {name} already exists in well {self.__name}")
        return data_folder