Source code for kappa_sdk.user_tasks.scheduler
from typing import Optional
from datetime import timedelta
[docs]
class Scheduler:
"""Scheduler.
.. note:: Should not be instantiated directly.
"""
def __init__(self) -> None:
self.__reschedule_immediate = False
self.__cron_schedule: Optional[str] = None
def reschedule_immediate(self) -> None:
"""
Asks KAPPA Automate to re-run this user task right after it finishes its current execution.
"""
self.__reschedule_immediate = True
self.__cron_schedule = None
def reschedule_at_time_delta(self, time_delta: timedelta) -> None:
"""
Asks KAPPA Automate to re-run this user task at date after it finishes its current execution.
"""
self.__cron_schedule = self.__build_cron_from_time_delta(time_delta)
self.__reschedule_immediate = False
def reschedule_with_cron(self, cron_schedule: str) -> None:
self.__cron_schedule = cron_schedule
self.__reschedule_immediate = False
@property
def cron_schedule(self) -> Optional[str]:
return self.__cron_schedule
@property
def is_reschedule_immediate(self) -> bool:
return self.__reschedule_immediate
def __build_cron_from_time_delta(self, time_delta: timedelta) -> str:
seconds = time_delta.total_seconds()
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
seconds, minutes, hours, days = map(int, (seconds, minutes, hours, days))
date = ''
if days or days != 0:
date = f'{days}D'
time = ''
if hours or hours != 0:
time += f"{hours}H"
if minutes or minutes != 0:
time += f"{minutes}M"
if seconds or seconds != 0:
time += f"{seconds}S"
output = date + 'T' + time if time else date
return 'P' + output