Source code for kappa_sdk.incremental_rta
from typing import List
from .document import Document
from .data import Data
from ._private._automation_api import AutomationAPI
from .automation_status_enum import AutomationStatus
[docs]
class IncrementalRTA:
""" Incremental RTA object.
Presents a KAPPA Automate incremental RTA workflow object.
"""
def __init__(self, user_task_id: str, name: str, output_documents: List[Document], outputs: List[Data], script_instance_id: str, automation_api: AutomationAPI):
self.__id = user_task_id
self.__script_instance_id = script_instance_id
self.__name = name
self.__output_documents: List[Document] = output_documents
self.__outputs: List[Data] = outputs
self.__automation_api = automation_api
@property
def id(self) -> str:
""" Gets the id of the :class:`IncrementalRTA` object.
"""
return self.__id
@property
def name(self) -> str:
""" Gets the name of the :class:`IncrementalRTA`.
"""
return self.__name
@property
def output_documents(self) -> List[Document]:
""" Gets the list of output documents of the :class:`IncrementalRTA`.
"""
return self.__output_documents
@property
def outputs(self) -> List[Data]:
"""Gets the list of outputs of the :class: `IncrementalRTA`."""
return self.__outputs
def get_automation_status(self) -> AutomationStatus:
"""Get the automation status
Returns
-------
:class:`AutomationStatus`
Automation status
"""
job_properties_dto = self.__automation_api.get_object_properties(self.__script_instance_id)
status_dto = next(x for x in job_properties_dto if x.name == "status")
if status_dto is None:
raise ValueError("There is an issue with the Automation Service")
return AutomationStatus(status_dto.value)