Source code for kappa_sdk.incremental_rta
from typing import List
from .document import Document
from .data import Data
from ._private._automation_api import AutomationAPI
from .automation_status_enum import AutomationStatus
[docs]
class IncrementalRTA:
    """ Incremental RTA object.
    Presents a KAPPA Automate incremental RTA workflow object.
    """
[docs]
    def __init__(self, user_task_id: str, name: str, output_documents: List[Document], outputs: List[Data], script_instance_id: str, automation_api: AutomationAPI):
        self.__id = user_task_id
        self.__script_instance_id = script_instance_id
        self.__name = name
        self.__output_documents: List[Document] = output_documents
        self.__outputs: List[Data] = outputs
        self.__automation_api = automation_api 
    @property
    def id(self) -> str:
        """ Gets the id of the :class:`IncrementalRTA` object.
        """
        return self.__id
    @property
    def name(self) -> str:
        """ Gets the name of the :class:`IncrementalRTA`.
        """
        return self.__name
    @property
    def output_documents(self) -> List[Document]:
        """ Gets the list of output documents of the :class:`IncrementalRTA`.
        """
        return self.__output_documents
    @property
    def outputs(self) -> List[Data]:
        """Gets the list of outputs of the :class: `IncrementalRTA`."""
        return self.__outputs
[docs]
    def get_automation_status(self) -> AutomationStatus:
        """Get the automation status
        Returns
        -------
        :class:`AutomationStatus`
            Automation status
        """
        job_properties_dto = self.__automation_api.get_object_properties(self.__script_instance_id)
        status_dto = next(x for x in job_properties_dto if x.name == "status")
        if status_dto is None:
            raise ValueError("There is an issue with the Automation Service")
        return AutomationStatus(status_dto.value)