Source code for kappa_sdk.connection

import logging
from typing import List, Optional, Dict, Any
from ._private._cluster_apis import ClusterAPIS
from ._private.dto_converters._connection_dto_converter import ConnectionDtoConverter
from ._private.dto_converters._script_dto_converter import ScriptDtoConverter
from ._private._field_dto import GlobalDataTypeCatalogDto, WellPropertyCatalogDto, UnitSystemsDto
from ._private._user_task_dto import UserTaskDefinitionDto
from .field_data_types_catalog import FieldDataTypesCatalog
from .field_well_properties_catalog import FieldWellPropertiesCatalog
from .coordinate_system_enum import CoordinateSystemEnum
from .country_enum import CountryEnum
from .script_result import ScriptResult
from .field import Field
from .kw_module_enum import KWModuleEnum
from .event_bus import EventBus
from ._private._configuration import Configuration
from ._private._sdk_configuration import SdkConfiguration
from .unit_converter import UnitConverter
from .timezone_enum import TimezoneEnum
from .rest_api import RestAPI
from ._private._sdk_oauth_service import SDKOAuthService
from datetime import datetime
logger = logging.getLogger(__name__)


[docs] class Connection: """ Connection to the KAPPA Automate instance. Use this class to establish a connection to the KAPPA Automate instance. Parameters ---------- server_address: URL of the KAPPA Automate instance. This is the same URL that you use to access the web interface of your KAPPA Automate instance. verify_ssl: Set this to False if you need to disable SSL certificate verification (True by default). """ __event_bus: Optional[EventBus] def __init__(self, server_address: str, proxies: Optional[Dict[Any, Any]] = None, headers: Dict[str, str] = dict(), verify_ssl: bool = True, client_id: Optional[str] = None, client_secret: Optional[str] = None): if not isinstance(server_address, str): raise TypeError('"server_address" parameter of string type is expected, but an instance of a different type was given.') self._configuration: Configuration = SdkConfiguration(self.__get_corrected_server_address(server_address), proxies, headers, verify_ssl, client_id, client_secret) self.__event_bus: Optional[EventBus] = None self.__authentication_service: SDKOAuthService = SDKOAuthService(self._configuration) self.__initialize_api() self.__initialize_converters() self.__data_types_catalog: Optional[FieldDataTypesCatalog] = None self.__well_properties_catalog: Optional[FieldWellPropertiesCatalog] = None self.__user_tasks_definition: Optional[List[UserTaskDefinitionDto]] = None def __get_corrected_server_address(self, server_address: str) -> str: server_address = server_address.rstrip() server_address = server_address if "://" in server_address else "https://" + server_address return str(server_address.strip("/")) def __initialize_api(self) -> None: self.__rest_api = RestAPI(self._configuration, self.__authentication_service) self.__cluster_apis = ClusterAPIS(self.__rest_api) def __initialize_converters(self) -> None: try: self._unit_converter = UnitConverter(self.__cluster_apis.field_api.get_unit_systems_dto()) except ConnectionError as exception: logger.error(exception.strerror, exc_info=True) raise ConnectionError(exception.strerror) from exception self.__script_dto_converter = ScriptDtoConverter() self.__connection_dto_converter = ConnectionDtoConverter(self.__cluster_apis, self._unit_converter, self.__script_dto_converter) @property def rest_api(self) -> RestAPI: """ Gets the REST API gateway. """ return self.__rest_api @property def unit_converter(self) -> UnitConverter: """ Gets the unit converter API results. """ return self._unit_converter @property def event_bus(self) -> EventBus: """ Gets the event bus. Returns ------- EventBus The event bus """ if self.__event_bus is None: self.__event_bus = EventBus(self._configuration.server_address.strip("/") + "/signalr/main", self._get_access_token(), verify_ssl=self._configuration.verify_ssl) self.__event_bus.start() return self.__event_bus @property def data_types_catalog(self) -> FieldDataTypesCatalog: """ Get the Global Data Types catalog object of the :class:`Field`. Returns ------- FieldDataTypesCatalog The Global Data Types catalog object. """ if self.__data_types_catalog is None: self.__data_types_catalog = self.__connection_dto_converter.field_dto_converter.get_field_data_types_catalog_from_data_type_catalog_dto(None, self.__get_global_data_types_catalog_dto()) return self.__data_types_catalog @property def well_properties_catalog(self) -> FieldWellPropertiesCatalog: """ Get the Global Well Properties catalog object of the :class:`Field`. Returns ------- FieldWellPropertiesCatalog The Global Well Properties catalog object. """ if self.__well_properties_catalog is None: self.__well_properties_catalog = self.__connection_dto_converter.field_dto_converter.get_field_well_properties_catalog_from_well_property_catalog_dto(None, self.__get_global_well_properties_catalog_dto()) return self.__well_properties_catalog def get_fields(self) -> List[Field]: """ Lists all fields. Returns ------- List[Field] A list of all fields. """ return self.__connection_dto_converter.get_fields_from_field_descriptor_dto(self.__cluster_apis.field_api.get_fields_dto()) def get_field_by_id(self, field_id: str) -> Field: """ Gets a field by id. Parameters ---------- field_id : str The unique identifier of the field to retrieve. Returns ------- Field The field corresponding to the provided identifier. """ return self.__connection_dto_converter.get_field_from_field_dto_hierarchy(self.__cluster_apis.field_api.get_field_dto(field_id)) def run_kw_script(self, field_id: str, well_id: str, module: KWModuleEnum, script: str, additional_content: Optional[str] = None, name: Optional[str] = "Custom KW script", time_to_live: float = 0.0001) -> ScriptResult: """ Runs a KW script. Parameters ---------- field_id: The id of the field well_id: The id of the well module: Type of the KW module that should execute the script. script: The KW script to execute. time_to_live: Life duration of the script in days additional_content: Additional content that is supplied with the script (optional). name: The name of the script (optional). """ dto = self.__script_dto_converter.get_background_script_input_dto(field_id, well_id, module, script, time_to_live, additional_content, name) return self.__cluster_apis.automation_api.execute_background_script(dto) def _get_access_token(self) -> Optional[str]: return self.__authentication_service.get_oauth_token() if self.__authentication_service else None def __get_global_data_types_catalog_dto(self) -> GlobalDataTypeCatalogDto: return self.__cluster_apis.field_api.get_global_data_types_catalog_dto() def __get_global_well_properties_catalog_dto(self) -> WellPropertyCatalogDto: return self.__cluster_apis.field_api.get_global_well_properties_catalog_dto() def __get_unit_systems(self) -> UnitSystemsDto: return self.__cluster_apis.field_api.get_unit_systems_dto() def create_field(self, name: str, country: CountryEnum, field_timezone: TimezoneEnum, unit_system_name: str, coordinate_system: CoordinateSystemEnum = CoordinateSystemEnum.wsg_84, reference_date: Optional[datetime] = None, asset: str = "", latitude: Optional[float] = None, longitude: Optional[float] = None, datum: Optional[float] = None, comment: str = "") -> Field: """ Creates a new field with the specified parameters. Parameters ---------- name : str The name of the field. country : CountryEnum The country where the field is located. field_timezone : TimezoneEnum The timezone of the field. unit_system_name : str The name of the unit system. coordinate_system : CoordinateSystemEnum, optional The coordinate system used (default is CoordinateSystemEnum.wsg_84). reference_date : Optional[datetime], optional The reference date for the field (default is None). asset : str, optional The asset associated with the field (default is an empty string). latitude : Optional[float], optional The latitude of the field (default is None). longitude : Optional[float], optional The longitude of the field (default is None). datum : Optional[float], optional The datum of the field (default is None). comment : str, optional Additional comments about the field (default is an empty string). Returns ------- Field The created field. """ well_property_catalog_dto = self.__get_global_well_properties_catalog_dto() data_type_catalog_dto = self.__get_global_data_types_catalog_dto() unit_system_id = next(x for x in self.__get_unit_systems().unitSystems if x.name == unit_system_name).id payload = self.__connection_dto_converter.create_field_dto(name, country.value, coordinate_system.value, reference_date, field_timezone.value, unit_system_id, asset, latitude, longitude, datum, comment, data_type_catalog_dto, well_property_catalog_dto) response = self.__cluster_apis.field_api.create_field(payload) field = self.__connection_dto_converter.get_field_from_field_dto_hierarchy(response) return field def __get_user_tasks_definition(self) -> List[UserTaskDefinitionDto]: if self.__user_tasks_definition is None: self.__user_tasks_definition = self.__cluster_apis.processing_api.get_all_user_task_definition() return self.__user_tasks_definition def __get_user_task_definition(self, user_task_definition_name: str) -> UserTaskDefinitionDto: try: return next(x for x in self.__get_user_tasks_definition() if x.name == user_task_definition_name) except StopIteration: raise ValueError(f"There is no user definition with the name {user_task_definition_name} in the database") def migrate_user_tasks(self, initial_user_task_definition_name: str, new_user_task_definition_name: str, field_name: Optional[str], well_name: Optional[str]) -> None: """ Migrate user task items using a definition to another user task definition. Parameters ---------- initial_user_task_definition_name: The name of the initial user task definition new_user_task_definition_name: The name of the new user task definition field_name: (Optional) Specify this to apply the migration to a specific field well_name: (Optional) Specify this to apply the migration to a specific well. Note that the field name must be specified if this parameter is used. """ user_task_definition_id = self.__get_user_task_definition(initial_user_task_definition_name).id user_task_items = self.__cluster_apis.processing_api.get_user_task_items(user_task_definition_id) new_user_task_definition_id = self.__get_user_task_definition(new_user_task_definition_name).id for user_task_item in user_task_items: if field_name is not None and user_task_item.fieldName == field_name: if well_name is not None and user_task_item.wellName == well_name: self.__cluster_apis.processing_api.migrate_user_task(user_task_item.id, new_user_task_definition_id) if well_name is None: self.__cluster_apis.processing_api.migrate_user_task(user_task_item.id, new_user_task_definition_id) if field_name is None: self.__cluster_apis.processing_api.migrate_user_task(user_task_item.id, new_user_task_definition_id)