CELERY CELERY_QUEUES和CELERY_ROUTS的用法

1. 介紹

Celery非常容易設置,通常都是使用默認的queue來存放任務,寫法如下:

@app.task
def task1(x, y):
    for _ in range(10):
        time.sleep(1)
        print('x + y =', x + y)
        return x + y

@app.task
def task2():
    for _ in range(100):
        print('task2: ', datetime.now())
        time.sleep(1)

這兩個任務都會在同一個queue里面執(zhí)行,這樣寫很簡單,因為只需要一個decorator就能實現(xiàn)一個異步任務。但如果考慮到每個任務的執(zhí)行時間耗費資源或者重要程度不同,把兩個任務放到同一個queue中,可能造成執(zhí)行時間長但重要程度卻低的任務先執(zhí)行,極大的影響程序的功能。在使用同一個queue時,就算增加worker也無法解決該問題。
為了解決該問題,需要把task1放到queue1中,把task2放到queue2中去執(zhí)行。同時指定worker1去處理queue1的任務,worker2去處理queue2的任務。使用這種方式時,各個任務就能獲得足夠的worker去處理,同時一些優(yōu)先級worker也能很好的處理重要的任務而不需要等待了。

2. 定義queue和routes

首先手動定義queue

CELERY_QUEUES = (
    Queue('default', exchange=Exchange('default'), routing_key='default'),
    Queue('app_task1', exchange=Exchange('app_task1'), routing_key='app_task1'),
    Queue('app_task2', exchange=Exchange('app_task2'), routing_key='app_task2'),
)

然后定義routes用來決定不同的任務去哪一個queue

CELERY_ROUTES = {
    'celery_app.task.task1': {'queue': 'app_task1', 'routing_key': 'app_task1'},
    'celery_app.task.task2': {'queue': 'app_task2', 'routing_key': 'app_task2'}
}

在啟動worker時指定該worker執(zhí)行哪一個queue中的任務

celery -A celery_app worker -l info -Q app_task1 -P eventlet
celery -A celery_app worker -l info -Q app_task2 -P eventlet

舉例:
如果某項目中涉及到大量文件轉換問題,有大量小于1MB的文件轉換,同時也有少量的20MB的文件轉換,小文件轉換的優(yōu)先級是最高的,同時不用占用太多時間,但大文件的轉換很耗時。如果將轉換任務放到一個隊列里面,那么很有可能因為出現(xiàn)轉換大文件,導致耗時太嚴重造成小文件轉換延時的問題。
所以可以按照文件大小設置3個優(yōu)先隊列,并且每個隊列設置不用的worker。




自己的理解,若有問題日后修正:
celery的生產者會根據CELERY_ROUTES的值,將不同的任務放到不同的Exchange中,exchange根據CELERY_QUEUES的值將任務分配到不同的queue中,在worker指定取任務的queue后,那么就只從該queue中取出任務然后執(zhí)行。

3. 代碼

    celery_app
        __init__.py
        celeryconfig.py
        main.py
        task.py

3.1 init.py

from celery import Celery

app = Celery('celery_app')  # include=['celery_app.task']
app.config_from_object('celery_app.celeryconfig')

3.2 celeryconfig.py

配置文件

from kombu import Queue, Exchange

BROKER_URL = 'redis://127.0.0.1:6379/7'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/8'

CELERY_IMPORTS = (
    'celery_app.task'
)

CELERY_QUEUES = (
    Queue('default', exchange=Exchange('default'), routing_key='default'),
    Queue('app_task1', exchange=Exchange('app_task1'), routing_key='app_task1'),
    Queue('app_task2', exchange=Exchange('app_task2'), routing_key='app_task2'),
)

CELERY_ROUTES = {
    'celery_app.task.task1': {'queue': 'app_task1', 'routing_key': 'app_task1'},
    'celery_app.task.task2': {'queue': 'app_task2', 'routing_key': 'app_task2'}
}

3.3 task.py

具體任務

import time
from datetime import datetime
from celery_app import app


@app.task
def task1(x, y):
    for _ in range(10):
        time.sleep(1)
        print('x + y =', x + y)
        return x + y


@app.task
def task2():
    for _ in range(100):
        print('task2: ', datetime.now())
        time.sleep(1)

3.4 main.py

執(zhí)行任務

from celery_app.task import task1, task2

r = task1.apply_async(args=(1, 2))
r2 = task2.delay()

print(r.status)
print(r2.status)

3.5 執(zhí)行

先啟動虛擬環(huán)境,執(zhí)行

celery -A celery_app worker -l info -Q app_task1 -P eventlet
# 或者
celery -A celery_app worker -l info -Q app_task2 -P eventlet

然后執(zhí)行main.py文件
就可以看到兩個worker分別執(zhí)行不同的任務,并且只會執(zhí)行被分配的任務了。

4. 注意事項

4.1 CELERY_IMPORTS問題

CELERY_IMPORTS = (
    'celery_app.task'
)

這個屬性中配置的是需要執(zhí)行的任務的模塊,如果沒有配置,那么在啟動worker之后,便會報錯,因為CELERY_ROUTES中的任務將會無法找到。
或者不想配置這個,也可以在創(chuàng)建Celery對象時傳入參數(shù)配置,

app = Celery('celery_app',include=['celery_app.task'])

4.2 CELERY_ROUTES

CELERY_ROUTES的作用是,給任務分配queue和routing_key,然后根據給worker分配的queue值執(zhí)行相應的任務。
如果在celeryconfig.py中沒有配置該項,那么也可以這么寫,
啟動worker:

celery -A celery_app worker -l info -Q app_task1 -P eventlet

然后在生產任務時,主動傳入queue和routing_key的值

r = task1.apply_async(args=(1, 2), queue='app_task1', routing_key='app_task1')

4.3 Exchange

如果在使用redis做BROKEN時,在創(chuàng)建Queue對象時,其實可以不用傳入Exchange的值,即

CELERY_QUEUES = (
    Queue('default', routing_key='default'),
    Queue('app_task1', routing_key='app_task1'),
    Queue('app_task2', routing_key='app_task2'),
)

但如果使用了的是RabbitMQ,那么這個值就一定需要。
所以以防以后更改了BROKEN程序失效,那么在配置Queue時,默認將這個參數(shù)傳入,然后值跟Queue的名字一樣即可。

4.4 queue和routing_key

這兩個值的名字不需要保持一致,那么為了方便使用和檢查,最好還是保持一致。

4.5 定時任務

在上面添加代碼

CELERYBEAT_SCHEDULE = {
    'celery_app.task.task1': {
        'task': 'celery_app.task.task1',
        'schedule': timedelta(seconds=20),
        'args': (1, 10)
    },
    'celery_app.task.task2': {
        'task': 'celery_app.task.task2',
        'schedule': crontab(minute='*/2'),
        'args': ()
    }
}

屬性名稱不要寫錯了,是CELERYBEAT_SCHEDULE,不要寫成了BEAT_SCHEDULE或者CELERY_BEAT_SCHEDULE了。
啟動定時器:
CELERY -A celery_app beat
在啟動worker時,也可以指定queue,那么該worker就只執(zhí)行該queue中的任務。
CELERY -A celery_app worker -l info -Q app_task1 -P eventlet

5. 參考:

https://denibertovic.com/posts/celery-best-practices/
https://blog.csdn.net/siddontang/article/details/34447003
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容