快速理解Celery

1.什么是Celery

Celery是一個(gè)簡單、靈活且可靠的,處理大量消息的分布式系統(tǒng)

專注于實(shí)時(shí)處理的異步任務(wù)隊(duì)列

同時(shí)也支持任務(wù)調(diào)度

Celery架構(gòu)

Celery架構(gòu)圖

Celery的架構(gòu)由三部分組成,消息中間件(message broker),任務(wù)執(zhí)行單元(worker)和任務(wù)執(zhí)行結(jié)果存儲(task result store)組成。

消息中間件

Celery本身不提供消息服務(wù),但是可以方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等

任務(wù)執(zhí)行單元

Worker是Celery提供的任務(wù)執(zhí)行的單元,worker并發(fā)的運(yùn)行在分布式的系統(tǒng)節(jié)點(diǎn)中。

任務(wù)結(jié)果存儲

Task result store用來存儲Worker執(zhí)行的任務(wù)的結(jié)果,Celery支持以不同方式存儲任務(wù)的結(jié)果,包括AMQP, redis等

版本支持情況

Celery version 4.0 runs on
        Python ?2.7, 3.4, 3.5?
        PyPy ?5.4, 5.5?
    This is the last version to support Python 2.7, and from the next version (Celery 5.x) Python 3.5 or newer is required.

    If you’re running an older version of Python, you need to be running an older version of Celery:

        Python 2.6: Celery series 3.1 or earlier.
        Python 2.5: Celery series 3.0 or earlier.
        Python 2.4 was Celery series 2.2 or earlier.

    Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

2.使用場景

異步任務(wù):將耗時(shí)操作任務(wù)提交給Celery去異步執(zhí)行,比如發(fā)送短信/郵件、消息推送、音視頻處理等等

定時(shí)任務(wù):定時(shí)執(zhí)行某件事情,比如每天數(shù)據(jù)統(tǒng)計(jì)

3.Celery的安裝配置

pip install celery

消息中間件:RabbitMQ/Redis

app=Celery('任務(wù)名',backend='xxx',broker='xxx')

4.Celery執(zhí)行異步任務(wù)

基本使用

創(chuàng)建項(xiàng)目celerytest

創(chuàng)建py文件:celery_app_task.py

import celery
import time
# broker='redis://127.0.0.1:6379/2' 不加密碼
backend='redis://:123456@127.0.0.1:6379/1'
broker='redis://:123456@127.0.0.1:6379/2'
cel=celery.Celery('test',backend=backend,broker=broker)
@cel.task
def add(x,y):
    return x+y


創(chuàng)建py文件:add_task.py,添加任務(wù)

from celery_app_task import add
result = add.delay(4,5)
print(result.id)

創(chuàng)建py文件:run.py,執(zhí)行任務(wù),或者使用命令執(zhí)行:celery worker -A celery_app_task -l info

注:windows下:celery worker -A celery_app_task -l info -P eventlet

from celery_app_task import cel
if __name__ == '__main__':
    cel.worker_main()
    # cel.worker_main(argv=['--loglevel=info')

創(chuàng)建py文件:result.py,查看任務(wù)執(zhí)行結(jié)果

from celery.result import AsyncResult
from celery_app_task import cel

async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結(jié)果刪除
elif async.failed():
    print('執(zhí)行失敗')
elif async.status == 'PENDING':
    print('任務(wù)等待中被執(zhí)行')
elif async.status == 'RETRY':
    print('任務(wù)異常后正在重試')
elif async.status == 'STARTED':
    print('任務(wù)已經(jīng)開始被執(zhí)行')

執(zhí)行 add_task.py,添加任務(wù),并獲取任務(wù)ID

執(zhí)行 run.py ,或者執(zhí)行命令:celery worker -A celery_app_task -l info

執(zhí)行 result.py,檢查任務(wù)狀態(tài)并獲取結(jié)果

多任務(wù)結(jié)構(gòu)

pro_cel
    ├── celery_task# celery相關(guān)文件夾
    │   ├── celery.py   # celery連接和配置相關(guān)文件,必須叫這個(gè)名字
    │   └── tasks1.py    #  所有任務(wù)函數(shù)
    │   └── tasks2.py    #  所有任務(wù)函數(shù)
    ├── check_result.py # 檢查結(jié)果
    └── send_task.py    # 觸發(fā)任務(wù)

celery.py

from celery import Celery

cel = Celery('celery_demo',
             broker='redis://127.0.0.1:6379/1',
             backend='redis://127.0.0.1:6379/2',
             # 包含以下兩個(gè)任務(wù)文件,去相應(yīng)的py文件中找任務(wù),對多個(gè)任務(wù)做分類
             include=['celery_task.tasks1',
                      'celery_task.tasks2'
                      ])

# 時(shí)區(qū)
cel.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
cel.conf.enable_utc = False

tasks1.py

import time
from celery_task.celery import cel

@cel.task
def test_celery(res):
    time.sleep(5)
    return "test_celery任務(wù)結(jié)果:%s"%res

tasks2.py

import time
from celery_task.celery import cel
@cel.task
def test_celery2(res):
    time.sleep(5)
    return "test_celery2任務(wù)結(jié)果:%s"%res

check_result.py

from celery.result import AsyncResult
from celery_task.celery import cel

async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel)

if async.successful():
    result = async.get()
    print(result)
    # result.forget() # 將結(jié)果刪除,執(zhí)行完成,結(jié)果不會自動(dòng)刪除
    # async.revoke(terminate=True)  # 無論現(xiàn)在是什么時(shí)候,都要終止
    # async.revoke(terminate=False) # 如果任務(wù)還沒有開始執(zhí)行呢,那么就可以終止。
elif async.failed():
    print('執(zhí)行失敗')
elif async.status == 'PENDING':
    print('任務(wù)等待中被執(zhí)行')
elif async.status == 'RETRY':
    print('任務(wù)異常后正在重試')
elif async.status == 'STARTED':
    print('任務(wù)已經(jīng)開始被執(zhí)行')

send_task.py

from celery_task.tasks1 import test_celery
from celery_task.tasks2 import test_celery2

# 立即告知celery去執(zhí)行test_celery任務(wù),并傳入一個(gè)參數(shù)
result = test_celery.delay('第一個(gè)的執(zhí)行')
print(result.id)
result = test_celery2.delay('第二個(gè)的執(zhí)行')
print(result.id)

添加任務(wù)(執(zhí)行send_task.py),開啟work:celery worker -A celery_task -l info -P eventlet,檢查任務(wù)執(zhí)行結(jié)果(執(zhí)行check_result.py)

5.Celery執(zhí)行定時(shí)任務(wù)

設(shè)定時(shí)間讓celery執(zhí)行一個(gè)任務(wù)

add_task.py

from celery_app_task import add
from datetime import datetime

# 方式一
# v1 = datetime(2019, 2, 13, 18, 19, 56)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = add.apply_async(args=[1, 3], eta=v2)
# print(result.id)

# 方式二
ctime = datetime.now()
# 默認(rèn)用utc時(shí)間
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# 使用apply_async并設(shè)定時(shí)間
result = add.apply_async(args=[4, 3], eta=task_time)
print(result.id)

類似于contab的定時(shí)任務(wù)

多任務(wù)結(jié)構(gòu)中celery.py修改如下

from datetime import timedelta
from celery import Celery
from celery.schedules import crontab

cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[
    'celery_task.tasks1',
    'celery_task.tasks2',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    # 名字隨意命名
    'add-every-10-seconds': {
        # 執(zhí)行tasks1下的test_celery函數(shù)
        'task': 'celery_task.tasks1.test_celery',
        # 每隔2秒執(zhí)行一次
        # 'schedule': 1.0,
        # 'schedule': crontab(minute="*/1"),
        'schedule': timedelta(seconds=2),
        # 傳遞參數(shù)
        'args': ('test',)
    },
    # 'add-every-12-seconds': {
    #     'task': 'celery_task.tasks1.test_celery',
    #     每年4月11號,8點(diǎn)42分執(zhí)行
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    #     'args': (16, 16)
    # },
}

啟動(dòng)一個(gè)beat:celery beat -A celery_task -l info

啟動(dòng)work執(zhí)行:celery worker -A celery_task -l info -P eventlet

6.Django中使用Celery

在項(xiàng)目目錄下創(chuàng)建celeryconfig.py

import djcelery
djcelery.setup_loader()
CELERY_IMPORTS=(
    'app01.tasks',
)
#有些情況可以防止死鎖
CELERYD_FORCE_EXECV=True
# 設(shè)置并發(fā)worker數(shù)量
CELERYD_CONCURRENCY=4
#允許重試
CELERY_ACKS_LATE=True
# 每個(gè)worker最多執(zhí)行100個(gè)任務(wù)被銷毀,可以防止內(nèi)存泄漏
CELERYD_MAX_TASKS_PER_CHILD=100
# 超時(shí)時(shí)間
CELERYD_TASK_TIME_LIMIT=12*30

在app01目錄下創(chuàng)建tasks.py

from celery import task
@task
def add(a,b):
    with open('a.text', 'a', encoding='utf-8') as f:
        f.write('a')
    print(a+b)

視圖函數(shù)views.py

from django.shortcuts import render,HttpResponse
from app01.tasks import add
from datetime import datetime
def test(request):
    # result=add.delay(2,3)
    ctime = datetime.now()
    # 默認(rèn)用utc時(shí)間
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    from datetime import timedelta
    time_delay = timedelta(seconds=5)
    task_time = utc_ctime + time_delay
    result = add.apply_async(args=[4, 3], eta=task_time)
    print(result.id)
    return HttpResponse('ok')

settings.py


INSTALLED_APPS = [
    ...
    'djcelery',
    'app01'
]

...

from djagocele import celeryconfig
BROKER_BACKEND='redis'
BOOKER_URL='redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容