python huey 輕量級異步任務(wù)隊列簡介

簡介并安裝

huey, a little task queue.
輕量級異步任務(wù)隊列。

下載安裝huey。

$ pip install huey

下載安裝redis依賴(huey暫時只支持redis)。

$ pip install redis

利用huey定義并執(zhí)行一些任務(wù)時,可以分成這幾個文件。

  • config.py: 定義使用huey的一些配置,任務(wù)的redis存儲。

The first step is to configure your queue. The consumer needs to be pointed at a Huey
instance, which specifies which backend to use.

  • task.py: 定義你需要執(zhí)行的一些異步任務(wù)。
  • huey_consumer.py: 開啟huey consumer的入口(huey提供)。
  • huey_main.py: 執(zhí)行異步任務(wù)。

config.py

實(shí)際上就是利用redis創(chuàng)建consumer所指向的huey實(shí)例,huey實(shí)際上包含了一個隊列queue,用來存儲和取出消息。

from huey import RedisHuey

huey = RedisHuey('base_app', host='127.0.0.1')

或者

from huey import RedisHuey
from redis import ConnectionPool

import settings

redis_pool = ConnectionPool(host=settings.REDIS_ADDRESS, port=settings.REDIS_PORT, db=0)

huey = RedisHuey('base_app', connection_pool=redis_pool)
task.py

利用config.py所創(chuàng)建的huey來修飾普通函數(shù)使之成為huey任務(wù)。
這樣就定義了一個最基本的異步任務(wù)。(base_huey.py 及上述的 config.py)

from base.base_huey import huey

@huey.task()
def count_beans(num):
    print('-- counted %s beans --' % num)
    for n in range(num):
        print(n)
    return 'Counted %s beans' % num
huey_consumer.py

之前習(xí)慣把huey包里的huey_consumer.py文件直接拿出來到主目錄然后執(zhí)行,新版的的huey_consumer.py與舊版的稍微有點(diǎn)區(qū)別。
新版本增加了一個consumer_options.py,用來定義封裝了一些consumer相關(guān)的配置和命令行解析的處理類;在舊版中,這些都是直接定義在huey_consumer.py中。
查看OptionParserHandler源碼可知,huey_consumer可以包含很多參數(shù),主要分為三個group(Logging日志記錄, Workers任務(wù)worker相關(guān), Scheduler計劃任務(wù)相關(guān))。

    def get_option_parser(self):
        parser = optparse.OptionParser('Usage: %prog [options] '
                                       'path.to.huey_instance')

        def add_group(name, description, options):
            group = parser.add_option_group(name, description)
            for abbrev, name, kwargs in options:
                group.add_option(abbrev, name, **kwargs)

        add_group('Logging', 'The following options pertain to logging.',
                  self.get_logging_options())

        add_group('Workers', (
            'By default huey uses a single worker thread. To specify a '
            'different number of workers, or a different execution model (such'
            ' as multiple processes or greenlets), use the options below.'),
            self.get_worker_options())

        add_group('Scheduler', (
            'By default Huey will run the scheduler once every second to check'
            ' for tasks scheduled in the future, or tasks set to run at '
            'specfic intervals (periodic tasks). Use the options below to '
            'configure the scheduler or to disable periodic task scheduling.'),
            self.get_scheduler_options())

        return parser

最常用的一些參數(shù):

  • -l 指定huey異步任務(wù)執(zhí)行時的日志文件(也可以通過ConsumerConfigsetup_logger()來定義logger)。
  • -w 執(zhí)行器worker隊列的數(shù)量
  • -k worker的類型(process, thread, greenlet,默認(rèn)是thread)
  • -d 輪詢隊列的最短時間間隔
huey_main.py

定義需要執(zhí)行huey任務(wù)的方法。

from tasks.huey_task import count_beans

# base test
def test_1():
    count_beans(10)  # no block

    count_beans.schedule(args=(5, ), delay=5)  # delay 5s

    res = count_beans.schedule(args=(5, ), delay=5)
    # res.get(blocking=True)
    res(blocking=True)  # block

if __name__ == '__main__':

    test_1()

    print('end')
執(zhí)行腳本
  1. 開啟consumer輪詢:python huey_consumer_new.py tasks.huey_task.huey -l logs/base_huey.log -w 1
    tasks.task.huey即上述的task.py,在此時后綴名需要替換成 .huey。
  2. 執(zhí)行異步方法:pyton huey_main.py

ps:官方文檔中是將 huey 實(shí)例和task任務(wù)都引入到main.py中。

main.py

from config import huey # import our "huey" object
from tasks import count_beans # import our task
if name == 'main':
beans = raw_input('How many beans? ')
count_beans(int(beans))
print('Enqueued job to count %s beans' % beans)

To run these scripts, follow these steps:
1. Ensure you have [Redis](http://redis.io/) running locally
2. Ensure you have [installed huey](http://huey.readthedocs.io/en/latest/installation.html#installation)
3. Start the consumer: huey_consumer.py main.huey
 (notice this is “main.huey” and not “config.huey”).
4. Run the main program: python main.py

---
#####huey task簡單api介紹
利用```@huey.task()```能來定義一些基本異步任務(wù),當(dāng)然還有其他延時任務(wù),周期性任務(wù)等。
1. 延時執(zhí)行:下例展示了兩種延時執(zhí)行的方法:第一種時直接執(zhí)行延時時間n秒并傳入?yún)?shù);第二種是指定了eta參數(shù),即estimated time of arrival,傳入未來的某個時間點(diǎn),使其在計劃時間點(diǎn)執(zhí)行。

import datetime

from tasks.huey_task import count_beans

count_beans(3) # normal

count_beans.schedule(args=(3, ), delay=5) # delay 5s

in_a_minute = datetime.datetime.now() + datetime.timedelta(seconds=60)
count_beans.schedule(args=(100,), eta=in_a_minute)

2. 阻塞:利用block參數(shù)能夠使其阻塞。

res = count_beans(100)

res.get(blocking=True)

res(blocking=True) # block

3. 異常重試:當(dāng)任務(wù)出現(xiàn)異常時進(jìn)行retry,并且可以指定重試延時時間。

from base.base_huey import huey

retry 3 times delay 5s

@huey.task(retries=3, retry_delay=5)
def try_reties_by_delay():
print('trying %s' % datetime.now())
raise Exception('try_reties_by_delay')

4. 周期性任務(wù):利用```@huey.periodic_task()```來定義一個周期性任務(wù)。

from base.base_huey import huey
from huey import crontab

@huey.periodic_task(crontab(minute='*'))
def print_time():
print(datetime.now())

5. 取消或暫停任務(wù):revoke。
---
#####嘗試簡單閱讀源碼
...
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容