Source code for kappa_sdk.user_tasks.scheduler

from typing import Optional
from datetime import timedelta


[docs] class Scheduler: """Scheduler. .. note:: Should not be instantiated directly. """ def __init__(self) -> None: self.__reschedule_immediate = False self.__cron_schedule: Optional[str] = None def reschedule_immediate(self) -> None: """ Asks KAPPA Automate to re-run this user task right after it finishes its current execution. """ self.__reschedule_immediate = True self.__cron_schedule = None def reschedule_at_time_delta(self, time_delta: timedelta) -> None: """ Asks KAPPA Automate to re-run this user task at date after it finishes its current execution. """ self.__cron_schedule = self.__build_cron_from_time_delta(time_delta) self.__reschedule_immediate = False def reschedule_with_cron(self, cron_schedule: str) -> None: self.__cron_schedule = cron_schedule self.__reschedule_immediate = False @property def cron_schedule(self) -> Optional[str]: return self.__cron_schedule @property def is_reschedule_immediate(self) -> bool: return self.__reschedule_immediate def __build_cron_from_time_delta(self, time_delta: timedelta) -> str: seconds = time_delta.total_seconds() minutes, seconds = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) days, hours = divmod(hours, 24) seconds, minutes, hours, days = map(int, (seconds, minutes, hours, days)) date = '' if days or days != 0: date = f'{days}D' time = '' if hours or hours != 0: time += f"{hours}H" if minutes or minutes != 0: time += f"{minutes}M" if seconds or seconds != 0: time += f"{seconds}S" output = date + 'T' + time if time else date return 'P' + output