import logging
from typing import List, Optional, Dict, Any
from ._private._cluster_apis import ClusterAPIS
from ._private.dto_converters._connection_dto_converter import ConnectionDtoConverter
from ._private.dto_converters._script_dto_converter import ScriptDtoConverter
from ._private._field_dto import GlobalDataTypeCatalogDto, WellPropertyCatalogDto, UnitSystemsDto
from ._private._user_task_dto import UserTaskDefinitionDto
from .field_data_types_catalog import FieldDataTypesCatalog
from .field_well_properties_catalog import FieldWellPropertiesCatalog
from .coordinate_system_enum import CoordinateSystemEnum
from .country_enum import CountryEnum
from .script_result import ScriptResult
from .field import Field
from .kw_module_enum import KWModuleEnum
from .event_bus import EventBus
from ._private._configuration import Configuration
from ._private._sdk_configuration import SdkConfiguration
from .unit_converter import UnitConverter
from .timezone_enum import TimezoneEnum
from .rest_api import RestAPI
from ._private._sdk_oauth_service import SDKOAuthService
from datetime import datetime
logger = logging.getLogger(__name__)
[docs]
class Connection:
"""
Connection to the KAPPA Automate instance.
Use this class to establish a connection to the KAPPA Automate instance.
Parameters
----------
server_address:
URL of the KAPPA Automate instance.
This is the same URL that you use to access the web interface of your KAPPA Automate instance.
verify_ssl:
Set this to False if you need to disable SSL certificate verification (True by default).
"""
__event_bus: Optional[EventBus]
def __init__(self,
server_address: str,
proxies: Optional[Dict[Any, Any]] = None,
headers: Dict[str, str] = dict(),
verify_ssl: bool = True,
client_id: Optional[str] = None,
client_secret: Optional[str] = None):
if not isinstance(server_address, str):
raise TypeError('"server_address" parameter of string type is expected, but an instance of a different type was given.')
self._configuration: Configuration = SdkConfiguration(self.__get_corrected_server_address(server_address), proxies, headers, verify_ssl, client_id, client_secret)
self.__event_bus: Optional[EventBus] = None
self.__authentication_service: SDKOAuthService = SDKOAuthService(self._configuration)
self.__initialize_api()
self.__initialize_converters()
self.__data_types_catalog: Optional[FieldDataTypesCatalog] = None
self.__well_properties_catalog: Optional[FieldWellPropertiesCatalog] = None
self.__user_tasks_definition: Optional[List[UserTaskDefinitionDto]] = None
def __get_corrected_server_address(self, server_address: str) -> str:
server_address = server_address.rstrip()
server_address = server_address if "://" in server_address else "https://" + server_address
return str(server_address.strip("/"))
def __initialize_api(self) -> None:
self.__rest_api = RestAPI(self._configuration, self.__authentication_service)
self.__cluster_apis = ClusterAPIS(self.__rest_api)
def __initialize_converters(self) -> None:
try:
self._unit_converter = UnitConverter(self.__cluster_apis.field_api.get_unit_systems_dto())
except ConnectionError as exception:
logger.error(exception.strerror, exc_info=True)
raise ConnectionError(exception.strerror) from exception
self.__script_dto_converter = ScriptDtoConverter()
self.__connection_dto_converter = ConnectionDtoConverter(self.__cluster_apis, self._unit_converter, self.__script_dto_converter)
@property
def rest_api(self) -> RestAPI:
""" Gets the REST API gateway.
"""
return self.__rest_api
@property
def unit_converter(self) -> UnitConverter:
"""
Gets the unit converter API results.
"""
return self._unit_converter
@property
def event_bus(self) -> EventBus:
"""
Gets the event bus.
Returns
-------
EventBus
The event bus
"""
if self.__event_bus is None:
self.__event_bus = EventBus(self._configuration.server_address.strip("/") + "/signalr/main", self._get_access_token(), verify_ssl=self._configuration.verify_ssl)
self.__event_bus.start()
return self.__event_bus
@property
def data_types_catalog(self) -> FieldDataTypesCatalog:
"""
Get the Global Data Types catalog object of the :class:`Field`.
Returns
-------
FieldDataTypesCatalog
The Global Data Types catalog object.
"""
if self.__data_types_catalog is None:
self.__data_types_catalog = self.__connection_dto_converter.field_dto_converter.get_field_data_types_catalog_from_data_type_catalog_dto(None, self.__get_global_data_types_catalog_dto())
return self.__data_types_catalog
@property
def well_properties_catalog(self) -> FieldWellPropertiesCatalog:
"""
Get the Global Well Properties catalog object of the :class:`Field`.
Returns
-------
FieldWellPropertiesCatalog
The Global Well Properties catalog object.
"""
if self.__well_properties_catalog is None:
self.__well_properties_catalog = self.__connection_dto_converter.field_dto_converter.get_field_well_properties_catalog_from_well_property_catalog_dto(None, self.__get_global_well_properties_catalog_dto())
return self.__well_properties_catalog
def get_fields(self) -> List[Field]:
"""
Lists all fields.
Returns
-------
List[Field]
A list of all fields.
"""
return self.__connection_dto_converter.get_fields_from_field_descriptor_dto(self.__cluster_apis.field_api.get_fields_dto())
def get_field_by_id(self, field_id: str) -> Field:
"""
Gets a field by id.
Parameters
----------
field_id : str
The unique identifier of the field to retrieve.
Returns
-------
Field
The field corresponding to the provided identifier.
"""
return self.__connection_dto_converter.get_field_from_field_dto_hierarchy(self.__cluster_apis.field_api.get_field_dto(field_id))
def run_kw_script(self,
field_id: str,
well_id: str,
module: KWModuleEnum,
script: str,
additional_content: Optional[str] = None,
name: Optional[str] = "Custom KW script",
time_to_live: float = 0.0001) -> ScriptResult:
""" Runs a KW script.
Parameters
----------
field_id:
The id of the field
well_id:
The id of the well
module:
Type of the KW module that should execute the script.
script:
The KW script to execute.
time_to_live:
Life duration of the script in days
additional_content:
Additional content that is supplied with the script (optional).
name:
The name of the script (optional).
"""
dto = self.__script_dto_converter.get_background_script_input_dto(field_id, well_id, module, script, time_to_live, additional_content, name)
return self.__cluster_apis.automation_api.execute_background_script(dto)
def _get_access_token(self) -> Optional[str]:
return self.__authentication_service.get_oauth_token() if self.__authentication_service else None
def __get_global_data_types_catalog_dto(self) -> GlobalDataTypeCatalogDto:
return self.__cluster_apis.field_api.get_global_data_types_catalog_dto()
def __get_global_well_properties_catalog_dto(self) -> WellPropertyCatalogDto:
return self.__cluster_apis.field_api.get_global_well_properties_catalog_dto()
def __get_unit_systems(self) -> UnitSystemsDto:
return self.__cluster_apis.field_api.get_unit_systems_dto()
def create_field(self, name: str, country: CountryEnum, field_timezone: TimezoneEnum, unit_system_name: str, coordinate_system: CoordinateSystemEnum = CoordinateSystemEnum.wsg_84, reference_date: Optional[datetime] = None, asset: str = "", latitude: Optional[float] = None, longitude: Optional[float] = None, datum: Optional[float] = None, comment: str = "") -> Field:
"""
Creates a new field with the specified parameters.
Parameters
----------
name : str
The name of the field.
country : CountryEnum
The country where the field is located.
field_timezone : TimezoneEnum
The timezone of the field.
unit_system_name : str
The name of the unit system.
coordinate_system : CoordinateSystemEnum, optional
The coordinate system used (default is CoordinateSystemEnum.wsg_84).
reference_date : Optional[datetime], optional
The reference date for the field (default is None).
asset : str, optional
The asset associated with the field (default is an empty string).
latitude : Optional[float], optional
The latitude of the field (default is None).
longitude : Optional[float], optional
The longitude of the field (default is None).
datum : Optional[float], optional
The datum of the field (default is None).
comment : str, optional
Additional comments about the field (default is an empty string).
Returns
-------
Field
The created field.
"""
well_property_catalog_dto = self.__get_global_well_properties_catalog_dto()
data_type_catalog_dto = self.__get_global_data_types_catalog_dto()
unit_system_id = next(x for x in self.__get_unit_systems().unitSystems if x.name == unit_system_name).id
payload = self.__connection_dto_converter.create_field_dto(name, country.value, coordinate_system.value, reference_date, field_timezone.value, unit_system_id, asset, latitude, longitude, datum, comment, data_type_catalog_dto, well_property_catalog_dto)
response = self.__cluster_apis.field_api.create_field(payload)
field = self.__connection_dto_converter.get_field_from_field_dto_hierarchy(response)
return field
def __get_user_tasks_definition(self) -> List[UserTaskDefinitionDto]:
if self.__user_tasks_definition is None:
self.__user_tasks_definition = self.__cluster_apis.processing_api.get_all_user_task_definition()
return self.__user_tasks_definition
def __get_user_task_definition(self, user_task_definition_name: str) -> UserTaskDefinitionDto:
try:
return next(x for x in self.__get_user_tasks_definition() if x.name == user_task_definition_name)
except StopIteration:
raise ValueError(f"There is no user definition with the name {user_task_definition_name} in the database")
def migrate_user_tasks(self, initial_user_task_definition_name: str, new_user_task_definition_name: str, field_name: Optional[str], well_name: Optional[str]) -> None:
"""
Migrate user task items using a definition to another user task definition.
Parameters
----------
initial_user_task_definition_name:
The name of the initial user task definition
new_user_task_definition_name:
The name of the new user task definition
field_name:
(Optional) Specify this to apply the migration to a specific field
well_name:
(Optional) Specify this to apply the migration to a specific well. Note that the field name must be specified if this parameter is used.
"""
user_task_definition_id = self.__get_user_task_definition(initial_user_task_definition_name).id
user_task_items = self.__cluster_apis.processing_api.get_user_task_items(user_task_definition_id)
new_user_task_definition_id = self.__get_user_task_definition(new_user_task_definition_name).id
for user_task_item in user_task_items:
if field_name is not None and user_task_item.fieldName == field_name:
if well_name is not None and user_task_item.wellName == well_name:
self.__cluster_apis.processing_api.migrate_user_task(user_task_item.id, new_user_task_definition_id)
if well_name is None:
self.__cluster_apis.processing_api.migrate_user_task(user_task_item.id, new_user_task_definition_id)
if field_name is None:
self.__cluster_apis.processing_api.migrate_user_task(user_task_item.id, new_user_task_definition_id)