Source code for kappa_sdk.incremental_pta

from typing import List, Optional, cast
from .document import Document
from ._private._cluster_apis import ClusterAPIS
from ._private._well_dto import DataPropertyDto
from .automation_status_enum import AutomationStatus


[docs] class IncrementalPTA: """ Incremental PTA object. Presents a KAPPA Automate incremental PTA workflow object. """ def __init__(self, field_id: str, well_id: str, user_task_id: str, name: str, output_documents: List[Document], script_instance_id: str, container_id: str, cluster_apis: ClusterAPIS): self.__field_id = field_id self.__well_id = well_id self.__id = user_task_id self.__script_instance_id = script_instance_id self.__name = name self.__output_documents: List[Document] = output_documents self.__cluster_apis: ClusterAPIS = cluster_apis self.__minimum_shutin_duration: Optional[float] = None self.__container_id: str = container_id self.__script_properties: Optional[List[DataPropertyDto]] = None @property def id(self) -> str: """ Gets the id of the :class:`IncrementalPTA` object. """ return self.__id @property def name(self) -> str: """ Gets the name of the :class:`IncrementalPTA`. """ return self.__name @property def output_documents(self) -> List[Document]: """ Gets the list of output documents of the :class:`IncrementalPTA`. """ return self.__output_documents @property def minimum_shutin_duration(self) -> float: """Get the minimum shutin duration used to run the incremental PTA workflow""" if self.__minimum_shutin_duration is None: self.__minimum_shutin_duration = self.__cluster_apis.kw_api.get_ipta_description(self.__script_instance_id).minShutinDuration return self.__minimum_shutin_duration @property def container_id(self) -> str: """ Gets the id of the associated well properties container """ return self.__container_id @property def pressure_vector_id(self) -> str: """ Gets the vector id of the pressures used by the incremental pta """ properties = self.__get_properties() pressure = next(x for x in properties if x.name.lower() == "pressure") return str(pressure.value) @property def shutin_vector_id(self) -> str: """ Gets the vector id of the shut-in used by the incremental pta """ properties = self.__get_properties() shutin = next(x for x in properties if x.name.lower() == "shut-in") return str(shutin.value) @property def replicate_results_to_master(self) -> bool: properties = self.__get_properties() replicate_results = next(x for x in properties if x.name.lower() == "replication of results") return cast(bool, replicate_results.value) def __get_properties(self) -> List[DataPropertyDto]: if self.__script_properties is None: self.__script_properties = self.__cluster_apis.kw_api.get_script_properties_dto(self.__script_instance_id) return self.__script_properties def get_automation_status(self) -> AutomationStatus: """Get the automation status Returns ------- :class:`AutomationStatus` Automation status """ job_properties_dto = self.__cluster_apis.automation_api.get_object_properties(self.__script_instance_id) status_dto = next(x for x in job_properties_dto if x.name == "status") if status_dto is None: raise ValueError("There is an issue with the Automation Service") return AutomationStatus(status_dto.value)