-
|
Hi guys! I have dynamic scheduled tasks in my FastAPI project, and the project consists of the following steps:
When creating an object: task.schedule_by_time(time=<some_time>), but when attempting to update the object, the schedule time should also be updated. How do I do this? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
|
After calling class CreatedSchedule(Generic[_ReturnType]):
"""A schedule that has been created."""
def __init__(
self,
kicker: "AsyncKicker[Any,_ReturnType]",
source: ScheduleSource,
task: ScheduledTask,
) -> None:
self.kicker = kicker
self.source = source
self.task = task
self.schedule_id = task.schedule_idTo update schedule time you can use fields from this object to delete previous schedule and add a new one with different time: # It's a pseudocode to demonstrate this idea:
# 1) Create schedule source
schedule_source = ...
await schedule_source.startup()
# 2) Schedule task for the first time
created_schedule = await my_task.schedule_by_time(
schedule_source,
datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=1, seconds=0),
)
# 3) Reschedule task
await schedule_source.delete_schedule(created_schedule.schedule_id)
new_scheduled_task = created_schedule.task
new_scheduled_task.time = datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=2, seconds=0)
await schedule_source.add_schedule(new_scheduled_task)This methods of schedule source are optional according to documentation, but they are implemented in most of them. |
Beta Was this translation helpful? Give feedback.
After calling
schedule_by_timeas a result you will getCreatedScheduleobject like this (code source):To update schedule time you can use fields from this object to delete previous schedule and add a new one with different time: