Source code for kappa_sdk.incremental_rta

from typing import List
from .document import Document
from .data import Data
from ._private._automation_api import AutomationAPI
from .automation_status_enum import AutomationStatus


[docs] class IncrementalRTA: """ Incremental RTA object. Presents a KAPPA Automate incremental RTA workflow object. """ def __init__(self, user_task_id: str, name: str, output_documents: List[Document], outputs: List[Data], script_instance_id: str, automation_api: AutomationAPI): self.__id = user_task_id self.__script_instance_id = script_instance_id self.__name = name self.__output_documents: List[Document] = output_documents self.__outputs: List[Data] = outputs self.__automation_api = automation_api @property def id(self) -> str: """ Gets the id of the :class:`IncrementalRTA` object. """ return self.__id @property def name(self) -> str: """ Gets the name of the :class:`IncrementalRTA`. """ return self.__name @property def output_documents(self) -> List[Document]: """ Gets the list of output documents of the :class:`IncrementalRTA`. """ return self.__output_documents @property def outputs(self) -> List[Data]: """Gets the list of outputs of the :class: `IncrementalRTA`.""" return self.__outputs def get_automation_status(self) -> AutomationStatus: """Get the automation status Returns ------- :class:`AutomationStatus` Automation status """ job_properties_dto = self.__automation_api.get_object_properties(self.__script_instance_id) status_dto = next(x for x in job_properties_dto if x.name == "status") if status_dto is None: raise ValueError("There is an issue with the Automation Service") return AutomationStatus(status_dto.value)