(原創(chuàng))一種基于RabbitMQ的分布式任務系統(tǒng)的設計實現(xiàn)

雖然過去一年中的工作以爬蟲為主,但個人認為最大的收獲在于參考已有的系統(tǒng)、從零開始把整個爬蟲任務調度系統(tǒng)實現(xiàn)了一遍,因而有了一些架構上的理解,它具有非常強的可擴展性,或許這就是RPC(Remote procedure call)的優(yōu)點吧,因此在這里做一些歸納總結。
受限于本人薄弱的闡述能力,如果對RabbitMQ有所了解再來閱讀此文是比較合適的,如果能有所斧正就更好了。(參考RabbitMq python tutorial--語言可選)

1.架構概要

這個系統(tǒng)是基于RabbitMQ來實現(xiàn)的,因而它是跨平臺跨語言的、分布式的、可平行擴展的,較適合處理時效性不高的任務,在系統(tǒng)內部,任務處理當然是異步的,但對外的表現(xiàn)形式,則是可同步可異步的。
也因此,它的架構概要圖與RabbitMQ的生產者消費者模型在本質上是一樣的,只不過生產者和消費者的結構更豐富具體。

概要圖.png

producer_consumer.png

從概要設計圖可以看出,整個系統(tǒng)對外只暴露了Frontend,其余部分對Requester都是不可見的,他也不需要知道這些,他只要能夠通過Frontend發(fā)送請求(任務)、得到響應、查詢結果就ok了。
而對于RabbitMQ來說,圖中的mq_Clientmq_Server則相當于生產者和消費者了。

系統(tǒng)分為5個部分:

  • RabbitMQ負責傳遞消息(Message),一個消息對應一個任務,通過不同的Queue分發(fā)不同種類的任務,支持集群
  • DB負責存儲任務詳情或者結果,不論是mysql、MongoDB都支持集群
  • Redis負責緩存臨時信息,也是支持集群(也可以使用memcached)
  • Frontend負責對外提供RESTFul API,響應任務請求創(chuàng)建任務,并通過worker_manager_proxy將任務放進MQ??梢詫⒄麄€任務序列化放進MQ,也可以只把任務標識(如task_id)放進MQ,同時把信息放進Redis,或者創(chuàng)建后存入DB,只要記得處理任務時存在哪里就在哪里取。不過為了節(jié)省MQ的空間,不建議將整個任務信息放入MQ。
  • worker_manager負責從MQ里取任務,并為每個任務創(chuàng)建一個worker線程進行工作,期間會通過Redis、DB進行必要的讀寫或者增刪改查操作。

部署時就可以相應的分開部署。

2.特性分析

這里以上圖為例來解釋一下
1) 跨平臺、跨語言、分布式的

  • rabbitMQ可以部署在linux、windows等操作系統(tǒng)上,因此跨平臺
  • rabbitMQ教程代碼就給出了多達十來種語言的版本,當然是跨語言的,對應此系統(tǒng),就是你可以使用PythonJava版本的Frontend來接收請求,而同時使用PHPJavaScript寫的Worker來完成工作,只要它們的內在邏輯是一致的。
  • 我現(xiàn)在對于消息隊列(MQ)最大的理解就是:通過網(wǎng)絡實現(xiàn)了原來一個系統(tǒng)內部多個進程間的通信,如此一來各個進程(比如FrontendWorker)的運行就突破了空間限制,這當然是分布式的。

2)可擴展性
擴展的目的是為了應對高并發(fā),為了同時處理更多的任務。
假設系統(tǒng)出現(xiàn)性能瓶頸,rabbitMQ、redis、db都可以通過集群來擴展,而Frontend、worker_manager則直接加機器起服務就行。
眾所周知,分布式系統(tǒng)通過加機器并發(fā)4臺1核1G的低配機器,性能可視同于1臺4核4G的機器。

3.流程簡介

這里以最簡單的只處理一種任務(可能不只一個Frontend一個worker_manager)的處理過程來介紹一下系統(tǒng)的工作流程:

  • 先定義任務的字段
# python or node
task = {
  'task_id': uuid(),              # 唯一標識
  'status': 'new' or `done` ...,  # 任務狀態(tài)
  'type':  'crawler',             # 任務類型,假設為爬蟲類型
  'callback_url':  '',            # 回調地址,根據(jù)需要提供
  'data': {},                     # 更詳細的任務信息
  'correlation_id':uuid()         # 用于保持request和response的一致性
}
  • 啟動RabbitMQ、Redis、DB服務,以及Frontendwork_manager進程,后兩者在啟動時通過mq_client或者mq_server declare了兩個queue:crawler_task_queue、notify_queue,并立刻開始監(jiān)聽。

  • Requester通過Frontend提供的API發(fā)送任務請求,請求附帶必要的參數(shù)

  • Frontend根據(jù)請求創(chuàng)建task并存入DB(也可以是Redis),然后將task_tid、correlation_id作為MSG的內容發(fā)送至crawler_task_queue中,并將correlation_id記錄在outstandingRequests列表中,此時:

    • 一般異步處理時,此處FrontendRequester返回response即可
    • 想要同步化處理,則等到task完成后,再返回response,不過這樣Requester就出于阻塞狀態(tài),一般不會這么做。
  • worker_manager監(jiān)聽到 crawler_task_queue有新的MSG,取出來,根據(jù)task_idDB中讀取完整的task信息,由于type=='crawler',創(chuàng)建相應的Crawler_Worker進行工作。

  • Crawler_Worker完成工作后,將結果數(shù)據(jù)存入DB,更新task.status='done'。worker_managerMSG發(fā)送至notify_queue

  • Frontend監(jiān)聽notify_queue,讀取到MSG信息后,檢查correlation_id能否在outstandingRequests列表中匹配到:

    • 如果未匹配,則不予理會,
    • 如果匹配,則進行下一步處理,ack(MSG),并從outstandingRequests列表中刪除該correlation_id。
  • 如果task['callback_url']的值不為空,則Frontend向該callback_url發(fā)送任務結果(簡單的task或者根據(jù)type='crawler'查詢到的詳細的crawlerResultData)。

  • 對于沒有提供callback_urlRequester而言,則需要在一定時間后,調用Frontend的查詢接口進行查詢。

如上,一個任務的流程就算走完了。

如果理解了上述流程,就能明白通過type字段以及對應的work_manger可以擴展到多種任務的分布式異步處理。

4.進階

1)prefetch的使用
一個worker_manager只處理一個task當然不劃算,但如果來者不拒為每一個task都創(chuàng)建Crawler_Worker,在進行批跑時,很容把機器掛掉。這時就需要用到RabbitMQprefetchack機制了,怎么實現(xiàn)就不說了。需要提醒的是,worker_manager在通過ack機制限制當前機器上任務并發(fā)數(shù)在prefetch之下時,需要先緩存MSGtask完成后再進行ack操作。

2)關于correlation_id的用途
加入任務并發(fā)性要求較高,啟動了frontAfrontB兩個Frontend和多臺Crawler_Worker來處理任務。那么correlation_id就可以保證frontA受理的任務請求所需要的異步callback操作以及同步化response依然可以通過frontA完成。

3)異步任務的交互
系統(tǒng)處理任務是異步的,上述任務流程中沒有涉及到交互問題,但眾所周知,爬蟲工作過程中有一部分是需要人的交互的,這也是一種簡單的反爬措施,比如模擬登錄中需要提交短信驗證碼,那怎么辦呢?

  • 提示一下,從status入手,此外,Frontend的一致性在上一點說過了,當然還要考慮到爬蟲工作的連續(xù)性,就以補充短信驗證碼來說,必然要延用之前的cookiesession,這里就不細說了,再給個截圖作為提示。
    不要被queue的名字所迷惑

5.代碼示例

這里給出簡單的rabbitMQ_rpc_node_sample,只有rabbitmq的封裝、rabbitmq_clientrabbitmq_server的實現(xiàn)以及調用示例invoking_instance。
示例效果是進行簡單計算,對GET請求中的參數(shù)a進行求根操作,對POST請求中的參數(shù)a進行求3次方。
提示一下,示例代碼中有一個小小的不太影響使用的坑,不算bug,有心者可以發(fā)現(xiàn),我也是最近才發(fā)現(xiàn)的。

6.致謝

感謝魏總在這一年間給予的指導,讓我收獲頗豐。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容