-
Hi,
It is vital that the script is only run after fresh data has been downloaded. So I wrote it like this, but it seems to work incorrectly, running the script before the download is complete @app.task(
every('30 minutes') &
time_of_day.between('10:00', '16:15') &
time_of_week.between("Mon", "Fri") &
(time_of_hour.between("01:00", "02:00") | time_of_hour.between("31:00", "32:00"))
)
def task_download_data():
download_data()
@app.task(
every('1 hour') &
after_success(task_download_data) &
time_of_day.between('10:00', '17:00') &
time_of_week.between("Mon", "Fri") &
time_of_hour.after('01:00')
)
def task_script():
run_script()
run_app() Could you please give me a hint as to how do I change the conditions to ensure that task_script runs only after task_download_data is completed? In general, I found that I was not able to get a good understanding from the docs (which are otherwise great) of how different conditions work in combination. So any insight or a mental framework for reasoning about them would be greatly appreciated too, because I'm pretty sure I'll keep using Rocketry as long as I write Python code :) Thanks |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments
-
Hello @gecko984, As far I understood, there are 3 possible status for a task, with a different condition for each one:
from rocketry.conds import after_success, after_fail, after_finish
@app.task(
every('30 minutes') &
time_of_day.between('10:00', '16:15') &
time_of_week.between("Mon", "Fri") &
(time_of_hour.between("01:00", "02:00") | time_of_hour.between("31:00", "32:00"))
)
def task_download_data():
download_data()
@app.task(after_finish(task_download_data))
def task_script():
run_script() For more information, check the documentation link: https://rocketry.readthedocs.io/en/stable/tutorial/intermediate.html#pipelining |
Beta Was this translation helpful? Give feedback.
-
I read the docs more thoroughly and upon some reflection came to a solution using task return values, which I'll share here in case other people encounter similar problems. In my model example, the time scale is scaled down with a factor of 60, so basically instead of running the first task every 30 minutes, I run it every 30 seconds (for quicker debug obviously) import datetime
import random
from time import sleep
from rocketry import Rocketry
from rocketry.args import Return
from rocketry.conds import after_success, time_of_minute
def log(msg):
print(f'{datetime.datetime.now().isoformat()} | {msg}')
app = Rocketry(execution='thread')
# note that there is no every(...), it is not necessary!
download_condition = time_of_minute.at('00') | time_of_minute.at('30')
@app.task(download_condition)
def task_download_data():
# Get current second of minute and conclude whether this is the XX:00 run or XX:30 run.
# Maybe you can do it with rocketry's tools without the datetime module, but I didn't figure out how.
current_second = datetime.datetime.now().second
is_00_run = current_second < 30
# Simulate work
log('downloading data...')
sleep_duration = random.uniform(6, 9)
sleep(sleep_duration)
log('Finished downloading data')
# We'll run or skip the second task depending on this task's return value.
return {'is_00_run': is_00_run}
# note that there are no clock-related conditions here
script_condition = after_success(task_download_data)
@app.task(script_condition)
def task_run_script(arg=Return(task_download_data)):
if arg['is_00_run']:
log('It was the XX:00 download, running script')
# simulate work
sleep_duration = random.uniform(1, 2)
sleep(sleep_duration)
log('Finished running script')
else:
log('It was the XX:30 download, skipping script')
app.run() And the output is
So everything is working at expected - the script runs right after the download at XX minutes 00 seconds is complete, and doesn't run after the XX:30 downloads. As a side note, I got rid of the So if you need a clock-bound schedule, you need to use |
Beta Was this translation helpful? Give feedback.
-
One more solution (a simpler one) is to create a custom condition, that checks whether the first task is currently running, so that the second tasks can wait until the first task finishes. Still somewhat a crutch though. The condition looks like this: from rocketry.args import Task
app = Rocketry(execution='thread')
@app.task(time_of_minute.at(0) | time_of_minute.at(30)))
def first_task():
sleep(5)
@app.cond()
def first_task_is_running(task=Task(first_task)):
return task.is_running
|
Beta Was this translation helpful? Give feedback.
I read the docs more thoroughly and upon some reflection came to a solution using task return values, which I'll share here in case other people encounter similar problems. In my model example, the time scale is scaled down with a factor of 60, so basically instead of running the first task every 30 minutes, I run it every 30 seconds (for quicker debug obviously)