Source code for kappa_sdk.user_tasks.scheduler
from typing import Optional
from datetime import timedelta
[docs]
class Scheduler:
    """Scheduler.
    .. note:: Should not be instantiated directly.
    """
[docs]
    def __init__(self) -> None:
        self.__reschedule_immediate = False
        self.__cron_schedule: Optional[str] = None 
[docs]
    def reschedule_at_time_delta(self, time_delta: timedelta) -> None:
        """
        Asks KAPPA Automate to re-run this user task at date after it finishes its current execution.
        """
        self.__cron_schedule = self.__build_cron_from_time_delta(time_delta)
        self.__reschedule_immediate = False 
[docs]
    def reschedule_with_cron(self, cron_schedule: str) -> None:
        self.__cron_schedule = cron_schedule
        self.__reschedule_immediate = False 
    @property
    def cron_schedule(self) -> Optional[str]:
        return self.__cron_schedule
    @property
    def is_reschedule_immediate(self) -> bool:
        return self.__reschedule_immediate
    def __build_cron_from_time_delta(self, time_delta: timedelta) -> str:
        seconds = time_delta.total_seconds()
        minutes, seconds = divmod(seconds, 60)
        hours, minutes = divmod(minutes, 60)
        days, hours = divmod(hours, 24)
        seconds, minutes, hours, days = map(int, (seconds, minutes, hours, days))
        date = ''
        if days or days != 0:
            date = f'{days}D'
        time = ''
        if hours or hours != 0:
            time += f"{hours}H"
        if minutes or minutes != 0:
            time += f"{minutes}M"
        if seconds or seconds != 0:
            time += f"{seconds}S"
        output = date + 'T' + time if time else date
        return 'P' + output