1from celery import Celery
2from celery.schedules import crontab
3
4app = Celery()
5
6@app.on_after_configure.connect
7def setup_periodic_tasks(sender, **kwargs):
8 # Calls test('hello') every 10 seconds.
9 sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
10
11 # Calls test('world') every 30 seconds
12 sender.add_periodic_task(30.0, test.s('world'), expires=10)
13
14 # Executes every Monday morning at 7:30 a.m.
15 sender.add_periodic_task(
16 crontab(hour=7, minute=30, day_of_week=1),
17 test.s('Happy Mondays!'),
18 )
19
20@app.task
21def test(arg):
22 print(arg)
23
1from celery import Celery
2from celery.schedules import crontab
3
4app = Celery()
5
6@app.on_after_configure.connect
7def setup_periodic_tasks(sender, **kwargs):
8 # Calls test('hello') every 10 seconds.
9 sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
10
11 # Calls test('world') every 30 seconds
12 sender.add_periodic_task(30.0, test.s('world'), expires=10)
13
14 # Executes every Monday morning at 7:30 a.m.
15 sender.add_periodic_task(
16 crontab(hour=7, minute=30, day_of_week=1),
17 test.s('Happy Mondays!'),
18 )
19
20@app.task
21def test(arg):
22 print(arg)
23