1. 介紹
Celery非常容易設置,通常都是使用默認的queue來存放任務,寫法如下:
@app.task
def task1(x, y):
for _ in range(10):
time.sleep(1)
print('x + y =', x + y)
return x + y
@app.task
def task2():
for _ in range(100):
print('task2: ', datetime.now())
time.sleep(1)
這兩個任務都會在同一個queue里面執(zhí)行,這樣寫很簡單,因為只需要一個decorator就能實現(xiàn)一個異步任務。但如果考慮到每個任務的執(zhí)行時間耗費資源或者重要程度不同,把兩個任務放到同一個queue中,可能造成執(zhí)行時間長但重要程度卻低的任務先執(zhí)行,極大的影響程序的功能。在使用同一個queue時,就算增加worker也無法解決該問題。
為了解決該問題,需要把task1放到queue1中,把task2放到queue2中去執(zhí)行。同時指定worker1去處理queue1的任務,worker2去處理queue2的任務。使用這種方式時,各個任務就能獲得足夠的worker去處理,同時一些優(yōu)先級worker也能很好的處理重要的任務而不需要等待了。
2. 定義queue和routes
首先手動定義queue
CELERY_QUEUES = (
Queue('default', exchange=Exchange('default'), routing_key='default'),
Queue('app_task1', exchange=Exchange('app_task1'), routing_key='app_task1'),
Queue('app_task2', exchange=Exchange('app_task2'), routing_key='app_task2'),
)
然后定義routes用來決定不同的任務去哪一個queue
CELERY_ROUTES = {
'celery_app.task.task1': {'queue': 'app_task1', 'routing_key': 'app_task1'},
'celery_app.task.task2': {'queue': 'app_task2', 'routing_key': 'app_task2'}
}
在啟動worker時指定該worker執(zhí)行哪一個queue中的任務
celery -A celery_app worker -l info -Q app_task1 -P eventlet
celery -A celery_app worker -l info -Q app_task2 -P eventlet
舉例:
如果某項目中涉及到大量文件轉換問題,有大量小于1MB的文件轉換,同時也有少量的20MB的文件轉換,小文件轉換的優(yōu)先級是最高的,同時不用占用太多時間,但大文件的轉換很耗時。如果將轉換任務放到一個隊列里面,那么很有可能因為出現(xiàn)轉換大文件,導致耗時太嚴重造成小文件轉換延時的問題。
所以可以按照文件大小設置3個優(yōu)先隊列,并且每個隊列設置不用的worker。

自己的理解,若有問題日后修正:
celery的生產者會根據CELERY_ROUTES的值,將不同的任務放到不同的Exchange中,exchange根據CELERY_QUEUES的值將任務分配到不同的queue中,在worker指定取任務的queue后,那么就只從該queue中取出任務然后執(zhí)行。
3. 代碼
celery_app
__init__.py
celeryconfig.py
main.py
task.py
3.1 init.py
from celery import Celery
app = Celery('celery_app') # include=['celery_app.task']
app.config_from_object('celery_app.celeryconfig')
3.2 celeryconfig.py
配置文件
from kombu import Queue, Exchange
BROKER_URL = 'redis://127.0.0.1:6379/7'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/8'
CELERY_IMPORTS = (
'celery_app.task'
)
CELERY_QUEUES = (
Queue('default', exchange=Exchange('default'), routing_key='default'),
Queue('app_task1', exchange=Exchange('app_task1'), routing_key='app_task1'),
Queue('app_task2', exchange=Exchange('app_task2'), routing_key='app_task2'),
)
CELERY_ROUTES = {
'celery_app.task.task1': {'queue': 'app_task1', 'routing_key': 'app_task1'},
'celery_app.task.task2': {'queue': 'app_task2', 'routing_key': 'app_task2'}
}
3.3 task.py
具體任務
import time
from datetime import datetime
from celery_app import app
@app.task
def task1(x, y):
for _ in range(10):
time.sleep(1)
print('x + y =', x + y)
return x + y
@app.task
def task2():
for _ in range(100):
print('task2: ', datetime.now())
time.sleep(1)
3.4 main.py
執(zhí)行任務
from celery_app.task import task1, task2
r = task1.apply_async(args=(1, 2))
r2 = task2.delay()
print(r.status)
print(r2.status)
3.5 執(zhí)行
先啟動虛擬環(huán)境,執(zhí)行
celery -A celery_app worker -l info -Q app_task1 -P eventlet
# 或者
celery -A celery_app worker -l info -Q app_task2 -P eventlet
然后執(zhí)行main.py文件
就可以看到兩個worker分別執(zhí)行不同的任務,并且只會執(zhí)行被分配的任務了。
4. 注意事項
4.1 CELERY_IMPORTS問題
CELERY_IMPORTS = (
'celery_app.task'
)
這個屬性中配置的是需要執(zhí)行的任務的模塊,如果沒有配置,那么在啟動worker之后,便會報錯,因為CELERY_ROUTES中的任務將會無法找到。
或者不想配置這個,也可以在創(chuàng)建Celery對象時傳入參數(shù)配置,
app = Celery('celery_app',include=['celery_app.task'])
4.2 CELERY_ROUTES
CELERY_ROUTES的作用是,給任務分配queue和routing_key,然后根據給worker分配的queue值執(zhí)行相應的任務。
如果在celeryconfig.py中沒有配置該項,那么也可以這么寫,
啟動worker:
celery -A celery_app worker -l info -Q app_task1 -P eventlet
然后在生產任務時,主動傳入queue和routing_key的值
r = task1.apply_async(args=(1, 2), queue='app_task1', routing_key='app_task1')
4.3 Exchange
如果在使用redis做BROKEN時,在創(chuàng)建Queue對象時,其實可以不用傳入Exchange的值,即
CELERY_QUEUES = (
Queue('default', routing_key='default'),
Queue('app_task1', routing_key='app_task1'),
Queue('app_task2', routing_key='app_task2'),
)
但如果使用了的是RabbitMQ,那么這個值就一定需要。
所以以防以后更改了BROKEN程序失效,那么在配置Queue時,默認將這個參數(shù)傳入,然后值跟Queue的名字一樣即可。
4.4 queue和routing_key
這兩個值的名字不需要保持一致,那么為了方便使用和檢查,最好還是保持一致。
4.5 定時任務
在上面添加代碼
CELERYBEAT_SCHEDULE = {
'celery_app.task.task1': {
'task': 'celery_app.task.task1',
'schedule': timedelta(seconds=20),
'args': (1, 10)
},
'celery_app.task.task2': {
'task': 'celery_app.task.task2',
'schedule': crontab(minute='*/2'),
'args': ()
}
}
屬性名稱不要寫錯了,是CELERYBEAT_SCHEDULE,不要寫成了BEAT_SCHEDULE或者CELERY_BEAT_SCHEDULE了。
啟動定時器:
CELERY -A celery_app beat
在啟動worker時,也可以指定queue,那么該worker就只執(zhí)行該queue中的任務。
CELERY -A celery_app worker -l info -Q app_task1 -P eventlet
5. 參考:
https://denibertovic.com/posts/celery-best-practices/
https://blog.csdn.net/siddontang/article/details/34447003