Source code for kappa_sdk.incremental_pta
from typing import List, Optional, cast
from .document import Document
from ._private._cluster_apis import ClusterAPIS
from ._private._well_dto import DataPropertyDto
from .automation_status_enum import AutomationStatus
[docs]
class IncrementalPTA:
""" Incremental PTA object.
Presents a KAPPA Automate incremental PTA workflow object.
"""
def __init__(self, field_id: str, well_id: str, user_task_id: str, name: str, output_documents: List[Document], script_instance_id: str, container_id: str, cluster_apis: ClusterAPIS):
self.__field_id = field_id
self.__well_id = well_id
self.__id = user_task_id
self.__script_instance_id = script_instance_id
self.__name = name
self.__output_documents: List[Document] = output_documents
self.__cluster_apis: ClusterAPIS = cluster_apis
self.__minimum_shutin_duration: Optional[float] = None
self.__container_id: str = container_id
self.__script_properties: Optional[List[DataPropertyDto]] = None
@property
def id(self) -> str:
""" Gets the id of the :class:`IncrementalPTA` object.
"""
return self.__id
@property
def name(self) -> str:
""" Gets the name of the :class:`IncrementalPTA`.
"""
return self.__name
@property
def output_documents(self) -> List[Document]:
""" Gets the list of output documents of the :class:`IncrementalPTA`.
"""
return self.__output_documents
@property
def minimum_shutin_duration(self) -> float:
"""Get the minimum shutin duration used to run the incremental PTA workflow"""
if self.__minimum_shutin_duration is None:
self.__minimum_shutin_duration = self.__cluster_apis.kw_api.get_ipta_description(self.__script_instance_id).minShutinDuration
return self.__minimum_shutin_duration
@property
def container_id(self) -> str:
""" Gets the id of the associated well properties container """
return self.__container_id
@property
def pressure_vector_id(self) -> str:
""" Gets the vector id of the pressures used by the incremental pta """
properties = self.__get_properties()
pressure = next(x for x in properties if x.name.lower() == "pressure")
return str(pressure.value)
@property
def shutin_vector_id(self) -> str:
""" Gets the vector id of the shut-in used by the incremental pta """
properties = self.__get_properties()
shutin = next(x for x in properties if x.name.lower() == "shut-in")
return str(shutin.value)
@property
def replicate_results_to_master(self) -> bool:
properties = self.__get_properties()
replicate_results = next(x for x in properties if x.name.lower() == "replication of results")
return cast(bool, replicate_results.value)
def __get_properties(self) -> List[DataPropertyDto]:
if self.__script_properties is None:
self.__script_properties = self.__cluster_apis.kw_api.get_script_properties_dto(self.__script_instance_id)
return self.__script_properties
def get_automation_status(self) -> AutomationStatus:
"""Get the automation status
Returns
-------
:class:`AutomationStatus`
Automation status
"""
job_properties_dto = self.__cluster_apis.automation_api.get_object_properties(self.__script_instance_id)
status_dto = next(x for x in job_properties_dto if x.name == "status")
if status_dto is None:
raise ValueError("There is an issue with the Automation Service")
return AutomationStatus(status_dto.value)