thinkphp-queue 學(xué)習(xí)筆記

前言

當(dāng)前筆記中的內(nèi)容針對的是 thinkphp-queue 的 v2.0 版本, 下文中提到的幾個(gè)Bug在最新的master分支上均已修復(fù)。 筆記中的部分內(nèi)容還未更新。

傳統(tǒng)的程序執(zhí)行流程一般是 即時(shí)|同步|串行的,在某些場景下,會存在并發(fā)低,吞吐量低,響應(yīng)時(shí)間長等問題。在大型系統(tǒng)中,一般會引入消息隊(duì)列的組件,將流程中部分任務(wù)抽離出來放入消息隊(duì)列,并由專門的消費(fèi)者作針對性的處理,從而降低系統(tǒng)耦合度,提高系統(tǒng)性能和可用性。

一般來說,可以抽離的任務(wù)具有以下的特點(diǎn):

  • 允許延后|異步|并行處理 (相對于傳統(tǒng)的 即時(shí)|同步|串行 的執(zhí)行方式)

    • 允許延后

      搶購活動時(shí),先快速緩沖有限的參與人數(shù)到消息隊(duì)列,后續(xù)再排隊(duì)處理實(shí)際的搶購業(yè)務(wù);

    • 允許異步

      業(yè)務(wù)處理過程中的郵件,短信等通知

    • 允許并行

      用戶支付成功之后,郵件通知,微信通知,短信通知可以由多個(gè)不同的消費(fèi)者并行執(zhí)行,通知到達(dá)的時(shí)間不要求先后順序。

  • 允許失敗和重試

    • 強(qiáng)一致性的業(yè)務(wù)放入核心流程處理
    • 無一致性要求或最終一致即可的業(yè)務(wù)放入隊(duì)列處理

thinkphp-queuethinkphp 官方提供的一個(gè)消息隊(duì)列服務(wù),它支持消息隊(duì)列的一些基本特性:

  • 消息的發(fā)布獲取,執(zhí)行,刪除,重發(fā),失敗處理延遲執(zhí)行,超時(shí)控制
  • 隊(duì)列的多隊(duì)列, 內(nèi)存限制 ,啟動,停止,守護(hù)
  • 消息隊(duì)列可降級為同步執(zhí)行

thinkphp-queue 內(nèi)置了 Redis,Database,Topthink ,Sync這四種驅(qū)動。本文主要介紹 thinkphp-queue 結(jié)合其內(nèi)置的 redis 驅(qū)動的使用方式和基本原理。

注1:如無特殊說明,下文中的 ‘消息’ 和 ‘任務(wù)’兩個(gè)詞指代的是同一個(gè)概念,即隊(duì)列中的一個(gè)成員。該成員對消息隊(duì)列而言是其內(nèi)部保存的消息; 對業(yè)務(wù)應(yīng)用而言是一個(gè)待執(zhí)行的任務(wù)。請根據(jù)語境區(qū)分。

注2:本文編寫時(shí)(2017-02-15)使用的 thinkphp-queue 的版本號是 v1.1.2 。該版本中部分功能并未全部完成,如 subscribe 模式,以及存在幾個(gè)bug(稍后會提及)。如有變更,請以官方最新版為準(zhǔn)。

一 代碼示例

先通過一段代碼,了解一下 thinkphp-queue 的基本使用流程。

目標(biāo):

在業(yè)務(wù)控制器中推送一個(gè)新消息到一個(gè)名為 ‘helloJobQueue’ 的隊(duì)列中,該消息中包含我們自定義的業(yè)務(wù)數(shù)據(jù),然后,編寫一個(gè)名為 Hello 的消費(fèi)者類,并通過命令行去調(diào)用該消費(fèi)者類獲取這個(gè)消息,拿到定義的數(shù)據(jù)。

1.1 安裝 thinkphp-queue

composer install thinkphp-queue

1.2 搭建消息隊(duì)列的存儲環(huán)境

  • 使用 Redis [推薦]

    安裝并啟動 Redis 服務(wù)
    
  • 使用數(shù)據(jù)庫 [不推薦]

    CREATE TABLE `prefix_jobs` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `queue` varchar(255) NOT NULL,
      `payload` longtext NOT NULL,
      `attempts` tinyint(3) unsigned NOT NULL,
      `reserved` tinyint(3) unsigned NOT NULL,
      `reserved_at` int(10) unsigned DEFAULT NULL,
      `available_at` int(10) unsigned NOT NULL,
      `created_at` int(10) unsigned NOT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    

1.3 配置消息隊(duì)列的驅(qū)動

根據(jù)選擇的存儲方式,在 \application\extra\queue.php 這個(gè)配置文件中,添加消息隊(duì)列對應(yīng)的驅(qū)動配置

   return [
       'connector'  => 'Redis',         // Redis 驅(qū)動
       'expire'     => 60,              // 任務(wù)的過期時(shí)間,默認(rèn)為60秒; 若要禁用,則設(shè)置為 null 
       'default'    => 'default',       // 默認(rèn)的隊(duì)列名稱
       'host'       => '127.0.0.1',     // redis 主機(jī)ip
       'port'       => 6379,            // redis 端口
       'password'   => '',              // redis 密碼
       'select'     => 0,               // 使用哪一個(gè) db,默認(rèn)為 db0
       'timeout'    => 0,               // redis連接的超時(shí)時(shí)間
       'persistent' => false,           // 是否是長連接
     
   //    'connector' => 'Database',   // 數(shù)據(jù)庫驅(qū)動
   //    'expire'    => 60,           // 任務(wù)的過期時(shí)間,默認(rèn)為60秒; 若要禁用,則設(shè)置為 null
   //    'default'   => 'default',    // 默認(rèn)的隊(duì)列名稱
   //    'table'     => 'jobs',       // 存儲消息的表名,不帶前綴
   //    'dsn'       => [],

   //    'connector'   => 'Topthink',   // ThinkPHP內(nèi)部的隊(duì)列通知服務(wù)平臺 ,本文不作介紹
   //    'token'       => '',
   //    'project_id'  => '',
   //    'protocol'    => 'https',
   //    'host'        => 'qns.topthink.com',
   //    'port'        => 443,
   //    'api_version' => 1,
   //    'max_retries' => 3,
   //    'default'     => 'default',

   //    'connector'   => 'Sync',       // Sync 驅(qū)動,該驅(qū)動的實(shí)際作用是取消消息隊(duì)列,還原為同步執(zhí)行
   ];

1.3.1 配置文件中的 expire 參數(shù)說明

expire 參數(shù)指的是任務(wù)的過期時(shí)間。 過期的任務(wù),其準(zhǔn)確的定義是

  1. 任務(wù)的狀態(tài)為執(zhí)行中
  2. 任務(wù)的開始執(zhí)行的時(shí)刻 + expire > 當(dāng)前時(shí)刻

expire 不為null 時(shí) ,thinkphp-queue 會在每次獲取下一個(gè)任務(wù)之前檢查并重發(fā)過期(執(zhí)行超時(shí))的任務(wù)。

expire 為null 時(shí),thinkphp-queue 不會檢查過期的任務(wù),性能相對較高一點(diǎn)。但是需要注意:

  • 這些執(zhí)行超時(shí)的任務(wù)會一直留在消息隊(duì)列中,需要開發(fā)者另行處理(刪除或者重發(fā))!

對expire 參數(shù)理解或者使用不當(dāng)時(shí),很容易產(chǎn)生一些bug,后面會舉例提到。

1.4 消息的創(chuàng)建與推送

我們在業(yè)務(wù)控制器中創(chuàng)建一個(gè)新的消息,并推送到 helloJobQueue 隊(duì)列

新增 \application\index\controller\JobTest.php 控制器,在該控制器中添加 actionWithHelloJob 方法

<?php
/**
* 文件路徑: \application\index\controller\JobTest.php
* 該控制器的業(yè)務(wù)代碼中借助了thinkphp-queue 庫,將一個(gè)消息推送到消息隊(duì)列
*/
namespace application\index\controller;
  use think\Exception;

  use think\Queue;

  class JobTest {
  /**
   * 一個(gè)使用了隊(duì)列的 action
   */
  public function actionWithHelloJob(){
      
      // 1.當(dāng)前任務(wù)將由哪個(gè)類來負(fù)責(zé)處理。 
      //   當(dāng)輪到該任務(wù)時(shí),系統(tǒng)將生成一個(gè)該類的實(shí)例,并調(diào)用其 fire 方法
      $jobHandlerClassName  = 'application\index\job\Hello'; 
      // 2.當(dāng)前任務(wù)歸屬的隊(duì)列名稱,如果為新隊(duì)列,會自動創(chuàng)建
      $jobQueueName       = "helloJobQueue"; 
      // 3.當(dāng)前任務(wù)所需的業(yè)務(wù)數(shù)據(jù) . 不能為 resource 類型,其他類型最終將轉(zhuǎn)化為json形式的字符串
      //   ( jobData 為對象時(shí),需要在先在此處手動序列化,否則只存儲其public屬性的鍵值對)
      $jobData            = [ 'ts' => time(), 'bizId' => uniqid() , 'a' => 1 ] ;
      // 4.將該任務(wù)推送到消息隊(duì)列,等待對應(yīng)的消費(fèi)者去執(zhí)行
      $isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );   
      // database 驅(qū)動時(shí),返回值為 1|false  ;   redis 驅(qū)動時(shí),返回值為 隨機(jī)字符串|false
      if( $isPushed !== false ){  
          echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";
      }else{
          echo 'Oops, something went wrong.';
      }
  }
 }

注意: 在這個(gè)例子當(dāng)中,我們是手動指定的 $jobHandlerClassName ,更合理的做法是先定義好消息名稱與消費(fèi)者類名的映射關(guān)系,然后由某個(gè)可以獲取該映射關(guān)系的類來推送這個(gè)消息。這樣,生產(chǎn)者只需要知道消息的名稱,而無需指定哪個(gè)消費(fèi)者類來處理。

除了 Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );這種方式之外,還可以直接傳入 Queue::push( $jobHandlerObject ,null , $jobQueueName ); 這時(shí),需要在 $jobHandlerObject 中定義一個(gè) handle() 方法,消息隊(duì)列在執(zhí)行到該任務(wù)時(shí)會自動反序列化該對象,并調(diào)用其 handle()方法。 該方式的缺點(diǎn)是無法傳入自定義數(shù)據(jù)。

1.5 消息的消費(fèi)與刪除

編寫 Hello 消費(fèi)者類,用于處理 helloJobQueue 隊(duì)列中的任務(wù)

新增 \application\index\job\Hello.php 消費(fèi)者類,并編寫其 fire() 方法

 <?php
  /**
   * 文件路徑: \application\index\job\Hello.php
   * 這是一個(gè)消費(fèi)者類,用于處理 helloJobQueue 隊(duì)列中的任務(wù)
   */
  namespace application\index\job;

  use think\queue\Job;

  class Hello {
      
      /**
       * fire方法是消息隊(duì)列默認(rèn)調(diào)用的方法
       * @param Job            $job      當(dāng)前的任務(wù)對象
       * @param array|mixed    $data     發(fā)布任務(wù)時(shí)自定義的數(shù)據(jù)
       */
      public function fire(Job $job,$data){
          // 如有必要,可以根據(jù)業(yè)務(wù)需求和數(shù)據(jù)庫中的最新數(shù)據(jù),判斷該任務(wù)是否仍有必要執(zhí)行.
          $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
          if(!isJobStillNeedToBeDone){
              $job->delete();
              return;
          }
        
          $isJobDone = $this->doHelloJob($data);
        
          if ($isJobDone) {
              //如果任務(wù)執(zhí)行成功, 記得刪除任務(wù)
              $job->delete();
              print("<info>Hello Job has been done and deleted"."</info>\n");
          }else{
              if ($job->attempts() > 3) {
                  //通過這個(gè)方法可以檢查這個(gè)任務(wù)已經(jīng)重試了幾次了
                  print("<warn>Hello Job has been retried more than 3 times!"."</warn>\n");
                $job->delete();
                  // 也可以重新發(fā)布這個(gè)任務(wù)
                  //print("<info>Hello Job will be availabe again after 2s."."</info>\n");
                  //$job->release(2); //$delay為延遲時(shí)間,表示該任務(wù)延遲2秒后再執(zhí)行
              }
          }
      }
      
      /**
       * 有些消息在到達(dá)消費(fèi)者時(shí),可能已經(jīng)不再需要執(zhí)行了
       * @param array|mixed    $data     發(fā)布任務(wù)時(shí)自定義的數(shù)據(jù)
       * @return boolean                 任務(wù)執(zhí)行的結(jié)果
       */
      private function checkDatabaseToSeeIfJobNeedToBeDone($data){
          return true;
      }

      /**
       * 根據(jù)消息中的數(shù)據(jù)進(jìn)行實(shí)際的業(yè)務(wù)處理
       * @param array|mixed    $data     發(fā)布任務(wù)時(shí)自定義的數(shù)據(jù)
       * @return boolean                 任務(wù)執(zhí)行的結(jié)果
       */
      private function doHelloJob($data) {
        // 根據(jù)消息中的數(shù)據(jù)進(jìn)行實(shí)際的業(yè)務(wù)處理...
        
          print("<info>Hello Job Started. job Data is: ".var_export($data,true)."</info> \n");
          print("<info>Hello Job is Fired at " . date('Y-m-d H:i:s') ."</info> \n");
          print("<info>Hello Job is Done!"."</info> \n");
          
          return true;
      }
  }

至此,所有的代碼都已準(zhǔn)備完畢,在運(yùn)行消息隊(duì)列之前,我們先看一下現(xiàn)在的目錄結(jié)構(gòu):

[圖片上傳失敗...(image-b435a1-1540961256653)]

1.6 發(fā)布任務(wù)

在瀏覽器中訪問 http://your.project.domain/index/job_test/actionWithHelloJob ,可以看到消息推送成功。

[圖片上傳失敗...(image-a1af6f-1540961256653)]

1.7 處理任務(wù)

切換當(dāng)前終端窗口的目錄到項(xiàng)目根目錄下,執(zhí)行

php think queue:work --queue helloJobQueue

可以看到執(zhí)行的結(jié)果類似如下:

[圖片上傳失敗...(image-cf70e7-1540961256653)]

?

至此,我們成功地經(jīng)歷了一個(gè)消息的 創(chuàng)建 -> 推送 -> 消費(fèi) -> 刪除 的基本流程

下文,將介紹 thinkphp-queue 的詳細(xì)使用方法。如配置介紹,基本原理,各種特殊情況的處理等

二 詳細(xì)介紹

2.1 命令模式

  • queue:subscribe 命令 [截至2017-02-15,作者暫未實(shí)現(xiàn)該模式,略過]

  • queue:work 命令

    work 命令: 該命令將啟動一個(gè) work 進(jìn)程來處理消息隊(duì)列。

    php think queue:work --queue helloJobQueue
    
  • queue:listen 命令

    listen 命令: 該命令將會創(chuàng)建一個(gè) listen 父進(jìn)程 ,然后由父進(jìn)程通過 proc_open(‘php think queue:work’) 的方式來創(chuàng)建一個(gè)work 子 進(jìn)程來處理消息隊(duì)列,且限制該work進(jìn)程的執(zhí)行時(shí)間。

    php think queue:listen --queue helloJobQueue
    

2.2 命令行參數(shù)

  • Work 模式

    php think queue:work \
    --daemon            //是否循環(huán)執(zhí)行,如果不加該參數(shù),則該命令處理完下一個(gè)消息就退出
    --queue  helloJobQueue  //要處理的隊(duì)列的名稱
    --delay  0 \        //如果本次任務(wù)執(zhí)行拋出異常且任務(wù)未被刪除時(shí),設(shè)置其下次執(zhí)行前延遲多少秒,默認(rèn)為0
    --force  \          //系統(tǒng)處于維護(hù)狀態(tài)時(shí)是否仍然處理任務(wù),并未找到相關(guān)說明
    --memory 128 \      //該進(jìn)程允許使用的內(nèi)存上限,以 M 為單位
    --sleep  3 \        //如果隊(duì)列中無任務(wù),則sleep多少秒后重新檢查(work+daemon模式)或者退出(listen或非daemon模式)
    --tries  2          //如果任務(wù)已經(jīng)超過嘗試次數(shù)上限,則觸發(fā)‘任務(wù)嘗試次數(shù)超限’事件,默認(rèn)為0
    
  • Listen 模式

    php think queue:listen \
    --queue  helloJobQueue \   //監(jiān)聽的隊(duì)列的名稱
    --delay  0 \         //如果本次任務(wù)執(zhí)行拋出異常且任務(wù)未被刪除時(shí),設(shè)置其下次執(zhí)行前延遲多少秒,默認(rèn)為0
    --memory 128 \       //該進(jìn)程允許使用的內(nèi)存上限,以 M 為單位
    --sleep  3 \         //如果隊(duì)列中無任務(wù),則多長時(shí)間后重新檢查,daemon模式下有效
    --tries  0 \         //如果任務(wù)已經(jīng)超過重發(fā)次數(shù)上限,則進(jìn)入失敗處理邏輯,默認(rèn)為0
    --timeout 60         //創(chuàng)建的work子進(jìn)程的允許執(zhí)行的最長時(shí)間,以秒為單位
    

    可以看到 listen 模式下,不包含 --deamon 參數(shù),原因下面會說明

2.3 work 模式和 listen 模式的區(qū)別

兩者都可以用于處理消息隊(duì)列中的任務(wù)

區(qū)別在于:

  • 2.3.1 執(zhí)行原理不同

    • work 命令是單進(jìn)程的處理模式。

      按照是否設(shè)置了 --daemon 參數(shù),work命令又可分為單次執(zhí)行和循環(huán)執(zhí)行兩種模式。

      • 單次執(zhí)行:不添加 --daemon參數(shù),該模式下,work進(jìn)程在處理完下一個(gè)消息后直接結(jié)束當(dāng)前進(jìn)程。當(dāng)不存在新消息時(shí),會sleep一段時(shí)間然后退出。
      • 循環(huán)執(zhí)行:添加了 --daemon參數(shù),該模式下,work進(jìn)程會循環(huán)地處理隊(duì)列中的消息,直到內(nèi)存超出參數(shù)配置才結(jié)束進(jìn)程。當(dāng)不存在新消息時(shí),會在每次循環(huán)中sleep一段時(shí)間。
    • listen 命令是 父進(jìn)程 + 子進(jìn)程 的處理模式。

      listen命令所在的父進(jìn)程會創(chuàng)建一個(gè)單次執(zhí)行模式的work子進(jìn)程,并通過該work子進(jìn)程來處理隊(duì)列中的下一個(gè)消息,當(dāng)這個(gè)work子進(jìn)程退出之后,listen命令所在的父進(jìn)程會監(jiān)聽到該子進(jìn)程的退出信號,并重新創(chuàng)建一個(gè)新的單次執(zhí)行的work子進(jìn)程

  • 2.3.2 退出時(shí)機(jī)不同

    • work 命令的退出時(shí)機(jī)在上面的執(zhí)行原理部分已敘述,此處不再重復(fù)
    • listen 命令中,listen所在的父進(jìn)程正常情況會一直運(yùn)行,除非遇到下面兩種情況:
      • 創(chuàng)建的某個(gè)work子進(jìn)程的執(zhí)行時(shí)間超過了 listen命令行中的--timeout 參數(shù)配置,此時(shí)work子進(jìn)程會被強(qiáng)制結(jié)束,listen所在的父進(jìn)程也會拋出一個(gè) ProcessTimeoutException 異常并退出。開發(fā)者可以選擇捕獲該異常,讓父進(jìn)程繼續(xù)執(zhí)行,也可以選擇通過 supervisor 等監(jiān)控軟件重啟一個(gè)新的listen命令。
      • listen 命令所在的父進(jìn)程因某種原因存在內(nèi)存泄露,則當(dāng)父進(jìn)程本身占用的內(nèi)存超過了命令行中的 --memory 參數(shù)配置時(shí),父子進(jìn)程均會退出。正常情況下,listen進(jìn)程本身占用的內(nèi)存是穩(wěn)定不變的。
  • 2.3.3 性能不同

    • work 命令是在腳本內(nèi)部做循環(huán),框架腳本在命令執(zhí)行的初期就已加載完畢;

    • 而listen模式則是處理完一個(gè)任務(wù)之后新開一個(gè)work進(jìn)程,此時(shí)會重新加載框架腳本。

      因此: work 模式的性能會比listen模式高。

      注意:當(dāng)代碼有更新時(shí),work 模式下需要手動去執(zhí)行 php think queue:restart 命令重啟隊(duì)列來使改動生效;而listen 模式會自動生效,無需其他操作。

  • 2.3.4 超時(shí)控制能力

    • work 模式本質(zhì)上既不能控制進(jìn)程自身的運(yùn)行時(shí)間,也無法限制執(zhí)行中的任務(wù)的執(zhí)行時(shí)間。

      舉例來說,假如你在某次上線之后,在上文中的 \application\index\job\Hello.php 消費(fèi)者的fire方法中添加了一段死循環(huán) :

      public function fire(){
         while(true){ //死循環(huán)
             $consoleOutPut->writeln("<info>I am looping forever inside a job.</info> \n");
             sleep(1);
         }
      }  
      

      那么這個(gè)循環(huán)將永遠(yuǎn)不能停止,直到任務(wù)所在的進(jìn)程超過內(nèi)存限制或者由管理員手動結(jié)束。這個(gè)過程不會有任何的告警。更嚴(yán)重的是,如果你配置了expire ,那么這個(gè)死循環(huán)的任務(wù)可能會污染到同樣處理 helloJobQueue 隊(duì)列的其他work進(jìn)程,最后好幾個(gè)work進(jìn)程將被卡死在這段死循環(huán)中。詳情后文會說明。

      work 模式下的超時(shí)控制能力,實(shí)際上應(yīng)該理解為 多個(gè)work 進(jìn)程配合下的過期任務(wù)重發(fā)能力。

    • 而 listen命令可以限制其創(chuàng)建的work子進(jìn)程的超時(shí)時(shí)間。

      listen 命令可通過 --timeout 參數(shù)限制work子進(jìn)程允許運(yùn)行的最長時(shí)間,超過該時(shí)間限制仍未結(jié)束的子進(jìn)程會被強(qiáng)制結(jié)束;

    • 這里有必要補(bǔ)充一下 expire 和 timeout 之間的區(qū)別:

      • expire 在配置文件中設(shè)置,timeout 在 listen命令 的命令行參數(shù)中設(shè)置,而且,expire 和 timeout 是兩個(gè)不同層次上的概念:
- expire 是指任務(wù)的過期時(shí)間。這個(gè)時(shí)間是全局的,影響到所有的work進(jìn)程。(不管是獨(dú)立的work命令還是 listen 模式下創(chuàng)建的的work子進(jìn)程) 。expire 針對的對象是 **任務(wù)**。
- timeout 是指work子進(jìn)程的超時(shí)時(shí)間。這個(gè)時(shí)間只對當(dāng)前執(zhí)行的listen 命令有效。timeout 針對的對象是 **work子進(jìn)程**。
  • 2.3.5 使用場景不同

    根據(jù)上面的介紹,可以看到,

    work 命令的適用場景是:

    • 任務(wù)數(shù)量較多
    • 性能要求較高
    • 任務(wù)的執(zhí)行時(shí)間較短
    • 消費(fèi)者類中不存在死循環(huán),sleep() ,exit() ,die() 等容易導(dǎo)致bug的邏輯

    listen命令的適用場景是:

    • 任務(wù)數(shù)量較少
    • 任務(wù)的執(zhí)行時(shí)間較長(如生成大型的excel報(bào)表等),
    • 任務(wù)的執(zhí)行時(shí)間需要有嚴(yán)格限制

2.4 消息隊(duì)列的開始,停止與重啟

  • 開始一個(gè)消息隊(duì)列:

    php think queue:work
    
  • 停止所有的消息隊(duì)列:

    php think queue:restart
    
  • 重啟所有的消息隊(duì)列:

    php think queue:restart 
    php think queue:work 
    

2.5 多模塊,多任務(wù)的處理

  • 多模塊

    單模塊項(xiàng)目推薦使用 app\job 作為任務(wù)類的命名空間

    多模塊項(xiàng)目可用使用 app\module\job 作為任務(wù)類的命名空間 也可以放在任意可以自動加載到的地方

  • 多任務(wù)

    如果一個(gè)任務(wù)類里有多個(gè)小任務(wù)的話,在發(fā)布任務(wù)時(shí),需要用 任務(wù)的類名@方法名app\lib\job\Job2@task1app\lib\job\Job2@task2

    注意:命令行中的 --queue 參數(shù)不支持@解析

    多任務(wù)例子:

    • \application\index\controller\JobTest.php 控制器中,添加 actionWithMultiTask()方法:
    public function actionWithMultiTask(){
          
      $taskType = $_GET['taskType'];
        switch ($whichTask) {
           case 'taskA':
               $jobHandlerClassName  = 'application\index\job\MultiTask@taskA';
               $jobDataArr = ['a' => '1'];
               $jobQueueName = "multiTaskJobQueue";   
               break;
           case 'taskB':
               $jobHandlerClassName  = 'application\index\job\MultiTask@taskB';
               $jobDataArr = ['b' => '2'];
               $jobQueueName = "multiTaskJobQueue";       
               break;
            default:
               break;
       }
      
      $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);
      if ($isPushed !== false) {
        echo("the $taskType of MultiTask Job has been Pushed to ".$jobQueueName ."<br>");
      }else{
        throw new Exception("push a new $taskType of MultiTask Job Failed!");
      }
    }
    
    • 新增 \application\index\job\MultiTask.php 消費(fèi)者類,并編寫其 taskA()taskB()方法
    <?php
    /**
     * 文件路徑: \application\index\job\MultiTask.php
     * 這是一個(gè)消費(fèi)者類,用于處理 multiTaskJobQueue 隊(duì)列中的任務(wù)
     */
    namespace application\index\job;
    
    use think\queue\Job;
    
    class MultiTask {
     
        public function taskA(Job $job,$data){
            
            $isJobDone = $this->_doTaskA($data);
    
            if ($isJobDone) {
                $job->delete();
                print("Info: TaskA of Job MultiTask has been done and deleted"."\n");
            }else{
                if ($job->attempts() > 3) {
                    $job->delete();   
                }
            }
        }
      
        public function taskB(Job $job,$data){
            
            $isJobDone = $this->_doTaskA($data);
    
            if ($isJobDone) {
                $job->delete();
                print("Info: TaskB of Job MultiTask has been done and deleted"."\n");
            }else{
                if ($job->attempts() > 2) {
                    $job->release();  
                }
            }
        }
    
        private function _doTaskA($data) {
            print("Info: doing TaskA of Job MultiTask "."\n");
            return true;
        }
      
        private function _doTaskB($data) {
            print("Info: doing TaskB of Job MultiTask "."\n");
            return true;
        }
    

2.6 消息的延遲執(zhí)行與定時(shí)執(zhí)行

延遲執(zhí)行,相對于即時(shí)執(zhí)行,是用來限制某個(gè)任務(wù)的最早可執(zhí)行時(shí)刻。在到達(dá)該時(shí)刻之前,該任務(wù)會被跳過。

可以利用該功能實(shí)現(xiàn)定時(shí)任務(wù)

使用方式:

  • 在生產(chǎn)者業(yè)務(wù)代碼中:
// 即時(shí)執(zhí)行
$isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);
// 延遲 2 秒執(zhí)行
$isPushed = Queue::later( 2, $jobHandlerClassName, $jobDataArr, $jobQueueName);
// 延遲到 2017-02-18 01:01:01 時(shí)刻執(zhí)行
$time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now');   
$isPushed = Queue::later($time2wait,$jobHandlerClassName, $jobDataArr, $jobQueueName);
  • 在消費(fèi)者類中:
// 重發(fā),即時(shí)執(zhí)行
$job->release();
// 重發(fā),延遲 2 秒執(zhí)行
$job->release(2);
// 延遲到 2017-02-18 01:01:01 時(shí)刻執(zhí)行
$time2wait = strtotime('2017-02-18 01:01:01') - strtotime('now');
$job->release($time2wait);
  • 在命令行中:
//如果消費(fèi)者類的fire()方法拋出了異常且任務(wù)未被刪除時(shí),將自動重發(fā)該任務(wù),重發(fā)時(shí),會設(shè)置其下次執(zhí)行前延遲多少秒,默認(rèn)為0
php think queue:work --delay 3  

2.7 消息的重發(fā)

thinkphp-queue 中,消息的重發(fā)時(shí)機(jī)有3種:

  • 2.7.1 在消費(fèi)者類中手動重發(fā):
if( $isJobDone === false){
    $job->release();
}
  • 2.7.2 work進(jìn)程自動重發(fā),需同時(shí)滿足以下兩個(gè)條件
    • 消費(fèi)者類的 fire() 方法拋出了異常
    • 任務(wù)未被刪除
  • 2.7.3 當(dāng)配置了 expire 不為 null 時(shí),work 進(jìn)程內(nèi)部每次查詢可用任務(wù)之前,會先自動重發(fā)已過期的任務(wù)。

補(bǔ)充:

在database 模式下,2.7.1 和 2.7.2 中的重發(fā)邏輯是先刪除原來的任務(wù),然后插入一個(gè)新的任務(wù)。2.7.3 中的重發(fā)時(shí)機(jī)是直接更新原任務(wù)。

而在redis 模式下,3種重發(fā)都是先刪除再插入。

不管是哪種重發(fā)方式,重發(fā)之后,任務(wù)的已嘗試次數(shù)會在原來的基礎(chǔ)上 +1 。

此外,消費(fèi)者類中需要注意,如果 fire() 方法中可能拋出異常,那么

  • 如果不需要自動重發(fā)的話, 請?jiān)趻伋霎惓V皩⑷蝿?wù)刪除 $job->delete() ,以免產(chǎn)生bug。
  • 如果需要自動重發(fā)的話,請直接拋出異常,不要在 fire() 方法中又手動使用 $job->release() , 這樣會導(dǎo)致該任務(wù)被重發(fā)兩次,產(chǎn)生兩個(gè)一樣的新任務(wù)。

2.8 任務(wù)的失敗回調(diào)及告警

當(dāng)同時(shí)滿足以下條件時(shí),將觸發(fā)任務(wù)失敗回調(diào):

  • 命令行的 --tries 參數(shù)的值大于0
  • 任務(wù)的已嘗試次數(shù)大于 命令行的 --tries 參數(shù)
  • 開發(fā)者添加了 queue_failed 事件標(biāo)簽及其對應(yīng)的回調(diào)代碼
  • 消費(fèi)者類中定義了 failed() 方法,用于接收任務(wù)失敗的通知

注意, queue_failed 標(biāo)簽需要在安裝了 thinkphp-queue 之后 手動\application\tags.php 文件中添加。

注意:該版本有bug,若想實(shí)現(xiàn)失敗任務(wù)回調(diào)功能,需要先修改位于 think-queue\src\queue\Worker.php 中的 logFailedJob方法 , 修改方式如下:

/**
 * Log a failed job into storage.
 * @param  \Think\Queue\Job $job
 * @return array
 */
    protected function logFailedJob(Job $job)
    {
        // 將原來的 queue.failed' 修改為 'queue_failed' 才可以觸發(fā)任務(wù)失敗回調(diào)
        if (Hook::listen('queue.failed', $job, null, true)) {  
            $job->delete();
            $job->failed();
        }

        return ['job' => $job, 'failed' => true];
    } 

首先,我們添加 queue_failed 事件標(biāo)簽, 及其對應(yīng)的回調(diào)方法

// 文件路徑: \application\tags.php
// 應(yīng)用行為擴(kuò)展定義文件
return [
    // 應(yīng)用初始化
    'app_init'     => [],
    // 應(yīng)用開始
    'app_begin'    => [],
    // 模塊初始化
    'module_init'  => [],
    // 操作開始執(zhí)行
    'action_begin' => [],
    // 視圖內(nèi)容過濾
    'view_filter'  => [],
    // 日志寫入
    'log_write'    => [],
    // 應(yīng)用結(jié)束
    'app_end'      => [],

    // 任務(wù)失敗統(tǒng)一回調(diào),有四種定義方式
    'queue_failed'=> [
      
         // 數(shù)組形式,[ 'ClassName' , 'methodName']
        ['application\\behavior\\MyQueueFailedLogger', 'logAllFailedQueues']
         
         // 字符串(靜態(tài)方法),'StaicClassName::methodName'
         // 'MyQueueFailedLogger::logAllFailedQueues'   
      
         // 字符串(對象方法),'ClassName',此時(shí)需在對應(yīng)的ClassName類中添加一個(gè)名為 queueFailed 的方法
         // 'application\\behavior\\MyQueueFailedLogger'
         
         // 閉包形式
         /*
         function( &$jobObject , $extra){
             // var_dump($jobObject);
             return true;
         }
         */
    ]
];

這里,我們選擇數(shù)組形式的回調(diào)方式,新增 \application\behavior\MyQueueFailedLogger 類,添加一個(gè) logAllFailedQueues() 方法

<?php
/**
 * 文件路徑: \application\behavior\MyQueueFailedLogger.php
 * 這是一個(gè)行為類,用于處理所有的消息隊(duì)列中的任務(wù)失敗回調(diào)
 */

namespace application\behavior;


class MyQueueFailedLogger {
       
    const should_run_hook_callback = true;
  
    /**
     * @param $jobObject   \think\queue\Job   //任務(wù)對象,保存了該任務(wù)的執(zhí)行情況和業(yè)務(wù)數(shù)據(jù)
     * @return bool     true                  //是否需要刪除任務(wù)并觸發(fā)其failed() 方法
     */
    public function logAllFailedQueues(&$jobObject){
        
        $failedJobLog = [
            'jobHandlerClassName'   => $jobObject->getName(), // 'application\index\job\Hello'
            'queueName' => $jobObject->getQueue(),             // 'helloJobQueue'    
            'jobData'   => $jobObject->getRawBody()['data'],  // '{'a': 1 }'
            'attempts'  => $jobObject->attempts(),            // 3
        ];
        var_export(json_encode($failedJobLog,true));
        
           // $jobObject->release();     //重發(fā)任務(wù)
          //$jobObject->delete();         //刪除任務(wù)
          //$jobObject->failed();     //通知消費(fèi)者類任務(wù)執(zhí)行失敗
        
        return self::should_run_hook_callback;         
    }
}

需要注意該回調(diào)方法的返回值:

  • 返回 true 時(shí),系統(tǒng)會自動刪除該任務(wù),并且自動調(diào)用消費(fèi)者類中的 failed() 方法
  • 返回 false 時(shí),系統(tǒng)不會自動刪除該任務(wù),也不會自動調(diào)用消費(fèi)者類中的 failed() 方法,需要開發(fā)者另行處理失敗任務(wù)的刪除和通知。

最后,在消費(fèi)者類中,添加 failed() 方法

/**
 * 文件路徑: \application\index\job\HelloJob.php
 */

/**
 * 該方法用于接收任務(wù)執(zhí)行失敗的通知,你可以發(fā)送郵件給相應(yīng)的負(fù)責(zé)人員
 * @param $jobData  string|array|...      //發(fā)布任務(wù)時(shí)傳遞的 jobData 數(shù)據(jù)
 */
public function failed($jobData){
    send_mail_to_somebody() ; 
    
    print("Warning: Job failed after max retries. job data is :".var_export($data,true)."\n"; 
}

這樣,就可以做到任務(wù)失敗的記錄告警

2.9 處理過期的任務(wù)

過期這個(gè)概念用文字比較難描述清楚,建議先看一下 深入理解3.4 消息處理的詳細(xì)流程圖

三 深入理解

3.1 thinkphp-queue 中消息與隊(duì)列的保存方式

  • Redis

    在 Redis 中,每一個(gè) 隊(duì)列 都三個(gè)key 與之對應(yīng) ,以 helloJobQueue 隊(duì)列舉例,其在redis 中的保存方式為:

    key名 類型 說明
    queues:helloJobQueue List , 列表 待執(zhí)行的任務(wù)列表
    queues:helloJobQueue:delayed Sorted Set,有序集合 延遲執(zhí)行和定時(shí)執(zhí)行的任務(wù)集合
    queues:helloJobQueue:reserved Sorted Set,有序集合 執(zhí)行中的任務(wù)集合

    使用的:分隔符, 只是用來表示相關(guān)key的關(guān)聯(lián)性。本身沒有特殊含義。使用分隔符是一種常見的組織key的方式。

    其中,在queues:helloJobQueue 列表中,每個(gè)元素的形式如下:

    [圖片上傳失敗...(image-7e44df-1540961256653)]

    queues:helloJobQueue:delayedqueues:helloJobQueue:delayed 有序集合中,每個(gè)元素的形式如下:

    [圖片上傳失敗...(image-65278a-1540961256653)]

    可以看到,在有序集合中,每個(gè)元素代表一個(gè)任務(wù),該元素的 Score 為該任務(wù)的入隊(duì)時(shí)間戳,任務(wù)的 value 為json 格式,保存了任務(wù)的執(zhí)行情況和業(yè)務(wù)數(shù)據(jù)。將value decode 為數(shù)組后形式如下:

    [
      'job'  => 'application\\index\\job\\Hello' ,  // jobHandlerClassName,消費(fèi)者類的類名 
      'data' => [                   // 生產(chǎn)者傳入的業(yè)務(wù)數(shù)據(jù)
         'time' => '2017-02-18 16:20:10',
         'data' => 'I have 648 apples'
      ],
      'id'   => '77IasdasadIasdadadadKL8t',   // 一個(gè)隨機(jī)的32位字符串
      'attempts' => 2             // 任務(wù)的已嘗試次數(shù)
    ]
    

    redis驅(qū)動下,為了實(shí)現(xiàn)任務(wù)的延遲執(zhí)行和過期重發(fā),任務(wù)將在這三個(gè)key中來回轉(zhuǎn)移,詳情可見 3.5

  • Database

    在 Database 中,每個(gè)任務(wù)對應(yīng)到表中的一行,queue 字段用來區(qū)分不同的隊(duì)列。

    表的字段結(jié)構(gòu)如下:

    [圖片上傳失敗...(image-2ed0e-1540961256653)]

    其中,payLoad 字段保存了消息的執(zhí)行者和業(yè)務(wù)數(shù)據(jù),payLoad 字段采用 json 格式的字符串來保存消息,將其 decode 為數(shù)組后形式如下:

    [
     'job'   => 'application\\index\\job\\Hello', // jobHandlerClassName,消費(fèi)者類的類名 
     'data'  => string|array|integer|object       // 生產(chǎn)者傳入的業(yè)務(wù)數(shù)據(jù)
    ]
    

3.2 thinkphp-queue 的目錄結(jié)構(gòu)和類關(guān)系圖

[圖片上傳失敗...(image-8178aa-1540961256653)]

這些類構(gòu)成了消息隊(duì)列中的幾個(gè)角色:

角色 類名 說明
命令行 Command + Worker 負(fù)責(zé)解析命令行參數(shù),控制隊(duì)列的啟動,重啟
驅(qū)動 Queue + Connector 負(fù)責(zé)隊(duì)列的創(chuàng)建,以及消息的入隊(duì),出隊(duì)等操作
任務(wù) Job 用于將消息轉(zhuǎn)化為一個(gè)任務(wù)對象,供消費(fèi)者使用
生產(chǎn)者 業(yè)務(wù)代碼 負(fù)責(zé)消息的創(chuàng)建與發(fā)布
消費(fèi)者 業(yè)務(wù)代碼 負(fù)責(zé)任務(wù)的接收與執(zhí)行

各個(gè)類之間的關(guān)系圖如下:

[圖片上傳失敗...(image-7cae52-1540961256653)]

3.3 Deamon模式的執(zhí)行流程

[圖片上傳失敗...(image-863db9-1540961256653)]

3.4 Database模式下消息處理的詳細(xì)流程

下圖中,展示了database 模式下消息處理的詳細(xì)流程,redis 驅(qū)動下大體類似

[圖片上傳失敗...(image-fb9dee-1540961256653)]

3.5 redis 驅(qū)動下的任務(wù)重發(fā)細(xì)節(jié)

在redis驅(qū)動下,為了實(shí)現(xiàn)任務(wù)的延遲執(zhí)行和過期重發(fā),任務(wù)將在這三個(gè)key中來回轉(zhuǎn)移。

在3.4 Database模式下消息處理的消息流程中,我們知道,如果配置的expire 不是null ,那么 thinkphp-queue的work進(jìn)程每次在獲取下一個(gè)可執(zhí)行任務(wù)之前,會先嘗試重發(fā)所有過期的任務(wù)。而在redis驅(qū)動下,這個(gè)步驟則做了更多的事情,詳情如下:

  1. queue:xxx:delayed 的key中查詢出有哪些任務(wù)在當(dāng)前時(shí)刻已經(jīng)可以開始執(zhí)行,然后將這些任務(wù)轉(zhuǎn)移到 queue:xxx 的key的尾部。
  2. queue:xxx:reserved 的key中查詢出有哪些任務(wù)在當(dāng)前時(shí)刻已經(jīng)過期,然后將這些任務(wù)轉(zhuǎn)移到 queue:xxx的key的尾部。
  3. 嘗試從 queue:xxx 的key的頭部取出一個(gè)任務(wù),如果取出成功,那么,將這個(gè)任務(wù)轉(zhuǎn)移到 queue:xxx:reserved 的key 的頭部,同時(shí)將這個(gè)任務(wù)實(shí)例化成任務(wù)對象,交給消費(fèi)者去執(zhí)行。

用圖來表示這個(gè)步驟的具體過程如下:

redis隊(duì)列中的過期任務(wù)重發(fā)步驟--執(zhí)行前:

[圖片上傳失敗...(image-186d8d-1540961256653)]

redis隊(duì)列中的過期任務(wù)重發(fā)步驟--執(zhí)行后:

[圖片上傳失敗...(image-bdec70-1540961256653)]

3.6 thinkphp-queue的性能

  • 測試環(huán)境 :

    虛擬機(jī) Ubuntu 16.04 , PHP 7.1 ,TP5,Redis 3.2 , 雙核 I5 6400,3G 內(nèi)存

  • 測試方式 :

    使用 Redis 驅(qū)動,在一個(gè)控制器中循環(huán)推送 40000 條消息到消息隊(duì)列;

    使用php think queue:work --daemon去消費(fèi)這些消息,計(jì)算推送和消費(fèi)各自所耗的時(shí)間。

  • 測試結(jié)果 :

    在最簡單的邏輯下,平均每秒中可推送8000個(gè)消息,平均每秒可消費(fèi)200個(gè)消息。

注意:由于在測試時(shí),Host 機(jī)本身的cpu和內(nèi)存長期100%,并且虛擬機(jī)中的各項(xiàng)服務(wù)并未專門調(diào)優(yōu),因此該測試結(jié)果并不具備參考性。

3.7 thinkphp-queue 的N種錯誤使用姿勢

  • 3.7.1 在 消費(fèi)者類的 fire() 方法中,忘記使用 $job->delete() 去刪除消息,這種情況下,會產(chǎn)生一系列的bug:

    • 配置的 expire 為 null , 則該任務(wù)被執(zhí)行一次后會永遠(yuǎn)留在消息隊(duì)列中,占用消息隊(duì)列的空間 , 除非開發(fā)者另行處理。

    • 配置的 expire 不為 null ,該任務(wù)在 expire 秒后被認(rèn)為是過期任務(wù),并被消息隊(duì)列還原為待執(zhí)行狀態(tài),在消息隊(duì)列的后面的循環(huán)中繼續(xù)被獲取,這時(shí),如果

      • 命令行中的 --tries 參數(shù)為0 或者未設(shè)置,那么每隔 一段時(shí)間該任務(wù)就會被執(zhí)行一次。
      • 命令行中的 --tries 參數(shù) n 大于0 , 那么當(dāng)這個(gè)任務(wù)被誤執(zhí)行的次數(shù)超過n 時(shí),會由消息隊(duì)列嘗試去觸發(fā)失敗回調(diào)事件:
        • 如果開發(fā)者沒有編寫失敗處理的回調(diào)事件:那么該任務(wù)仍然不會被刪除,每隔一段時(shí)間就會被執(zhí)行一次。[這個(gè)可能屬于框架的Bug] ,
        • 如果編寫了失敗回調(diào)事件
          • 回調(diào)事件中刪除了任務(wù),則這個(gè)任務(wù)被誤執(zhí)行了 n 次。
          • 回調(diào)事件中未刪除任務(wù),這時(shí),如果:
            • 回調(diào)事件返回值是 false,那么該任務(wù)仍然不會被刪除,每隔一段時(shí)間就會被執(zhí)行一次
            • 回調(diào)事件返回值是 true, 那么該任務(wù)會先被刪除,然后觸發(fā)消費(fèi)者類的 failed() 方法,如果在 failed() 方法中設(shè)置了告警,那么這個(gè)告警就是一次誤報(bào)。

    因此,在 使用 thinkphp-queue 時(shí),請記得:

    • 任務(wù)完成后, 使用 $job->delete() 刪除任務(wù)
    • 在消費(fèi)者類的 fire() 方法中,使用 $job->attempt() 檢查任務(wù)已執(zhí)行次數(shù),對于次數(shù)異常的,作相應(yīng)的處理。
    • 在消費(fèi)者類的 fire() 方法中,根據(jù)業(yè)務(wù)數(shù)據(jù)來判斷該任務(wù)是否已經(jīng)執(zhí)行過,以避免該任務(wù)被重復(fù)執(zhí)行。
    • 編寫失敗回調(diào)事件,將事件中失敗的任務(wù)及時(shí)通知給開發(fā)人員。
  • 3.7.2 使用了 queue:work --daemon ,但更新代碼后沒有使用 queue:restart 重啟 work 進(jìn)程, 使得 work 進(jìn)程中的代碼與最新的代碼不同,出現(xiàn)各種問題。

  • 3.7.3 使用了 queue:work --daemon ,但是消費(fèi)者類的 fire() 方法中存在死循環(huán),或 sleep(n) 等邏輯,導(dǎo)致消息隊(duì)列被堵塞;或者使用了 exit() , die() 這樣的邏輯,導(dǎo)致work進(jìn)程直接終止 。

  • 3.7.4 配置的 expire 為 null ,這時(shí)如果采用的是 Redis 驅(qū)動且使用了延遲功能,如 later(n)release(n) 方法或者 --delay 參數(shù)不為0 , 那么將導(dǎo)致被延遲的任務(wù)永遠(yuǎn)無法處理。(這個(gè)可能屬于框架的Bug)

  • 3.7.5 配置的 expire 為null ,但并沒有自行處理過期的任務(wù),導(dǎo)致過期的任務(wù)得不到處理,且一直占用消息隊(duì)列的空間。

  • 3.7.6 配置的 expire 不為null ,但配置的 expire 時(shí)間太短,以至于 expire 時(shí)間 < 消費(fèi)者的 fire() 方法所需時(shí)間 + 刪除該任務(wù)所需的時(shí)間 ,那么任務(wù)將被誤認(rèn)為執(zhí)行超時(shí),從而被消息隊(duì)列還原為待執(zhí)行狀態(tài)。

  • 3.7.7 使用 Queue::push($jobHandlerClassName , $jobData, $jobQueueName ); 推送任務(wù)時(shí),$jobData 中包含未序列化的對象。這時(shí),在消費(fèi)者端拿到的 $jobData 中拿到的是該對象的public 屬性的鍵值對數(shù)組。因此,需要在推送前手動序列化對象,在消費(fèi)者端再手動反序列化還原為對象。

四 拓展

4.1 隊(duì)列的穩(wěn)定性和拓展性

  • 穩(wěn)定性:不管是 listen 模式還是 work 模式,都建議使用 supervisor 或者 自定義的cron 腳本,去定時(shí)檢查 work 進(jìn)程是否正常
  • 拓展性: 當(dāng)某個(gè)隊(duì)列的消費(fèi)者不足時(shí),再給這個(gè)隊(duì)列添加 work進(jìn)程即可。

4.2 消息隊(duì)列的可視化管理工具

  • 隊(duì)列管理,隊(duì)列的列表,隊(duì)列的 work 進(jìn)程數(shù)量控制,隊(duì)列的任務(wù)數(shù)量變化趨勢 //TBD
  • 任務(wù)管理,任務(wù)的列表,添加/撤回/查詢?nèi)蝿?wù),修改任務(wù)的 執(zhí)行者/執(zhí)行時(shí)間/優(yōu)先級/數(shù)據(jù) 等 //TBD

4.2 編寫自定義的 thinkphp-queue 驅(qū)動

//TBD

4.3 編寫消息隊(duì)列的單元測試

//TBD

4.4 與其他PHP消息隊(duì)列庫的對比

TP5的消息隊(duì)列與Laravel的消息隊(duì)列比較相似,下面是與laravel 中的消息隊(duì)列的一些對比:

thinkphp-queue (v1.1.2) laravel-queue (v5.3)
內(nèi)置的驅(qū)動 Database,Redis,Sync,TopThink Database,Redis, Sync(在laravel中稱為 null)。
Redis驅(qū)動要求 安裝redis的C擴(kuò)展 安裝 predis 包 + LUA腳本
推送任務(wù) 允許推送 消費(fèi)者類名,消費(fèi)者對象 允許推送消費(fèi)者類名,消費(fèi)者對象,閉包
失敗任務(wù)處理 觸發(fā)失敗回調(diào)事件 (有Bug) 觸發(fā)失敗回調(diào)事件 + 移動任務(wù)到 failed_jobs表?
消息訂閱 subscribe 命令+ Topthink驅(qū)動(注:未實(shí)現(xiàn)/未提供) subscribe 命令 + 安裝IronMQ 驅(qū)動
刪除任務(wù) 消費(fèi)者類中手動刪除 任務(wù)完成后自動刪除
推送到多個(gè)隊(duì)列 需自己實(shí)現(xiàn) 原生支持
延遲執(zhí)行 支持 (有Bug) 支持
消息重發(fā) 支持 支持
檢查已執(zhí)行次數(shù) 原生支持 需在消費(fèi)者類中顯式 use 相關(guān)的 trait
執(zhí)行方式 work 模式 + listen 模式 work 模式 + listen 模式
進(jìn)程命令 開啟,停止,重啟 開啟,停止,重啟
任務(wù)命令 展示失敗任務(wù)列表,重試某個(gè)失敗任務(wù),刪除某個(gè)失敗任務(wù)
支持的事件 失敗回調(diào)事件 失敗回調(diào)事件,支持消費(fèi)前事件,消費(fèi)后事件

五 待討論的問題

5.1 thinkphp-queue 中消息名稱與消費(fèi)者類名的綁定怎么實(shí)現(xiàn)?

5.1 消息中的業(yè)務(wù)數(shù)據(jù)的格式如何約定?

https://github.com/coolseven/notes/blob/master/thinkphp-queue/README.md

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1.ios高性能編程 (1).內(nèi)層 最小的內(nèi)層平均值和峰值(2).耗電量 高效的算法和數(shù)據(jù)結(jié)構(gòu)(3).初始化時(shí)...
    歐辰_OSR閱讀 30,286評論 8 265
  • 傳統(tǒng)的程序執(zhí)行流程一般是 即時(shí)|同步|串行的,在某些場景下,會存在并發(fā)低,吞吐量低,響應(yīng)時(shí)間長等問題。在大型系統(tǒng)中...
    紅塵一落君莫笑閱讀 15,392評論 3 4
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,724評論 19 139
  • 必備的理論基礎(chǔ) 1.操作系統(tǒng)作用: 隱藏丑陋復(fù)雜的硬件接口,提供良好的抽象接口。 管理調(diào)度進(jìn)程,并將多個(gè)進(jìn)程對硬件...
    drfung閱讀 3,781評論 0 5
  • 憶江南 螃蟹(正韻) 螃蟹好,鮮美待輕烹。 火煮少時(shí)香味盛, 鍋掀稍候肉黃澄。 佐酒喜盈盈。 ~老財(cái)神
    老財(cái)神閱讀 216評論 0 3

友情鏈接更多精彩內(nèi)容