Redis高級特性——消息隊列

消息隊列

消息隊列的設(shè)計方案在開發(fā)過程中應(yīng)用比較廣泛,一般情況都是為了實現(xiàn)程序解耦,生產(chǎn)者發(fā)送一個消息,消費者或多個客戶端消費者去消費消息,完成任務(wù)的異步處理。如果有多個消費者,一方面可以提高處理性能,另一方面可以使得消費者程序得容災(zāi)?;谶@兩方面得有點,消息隊列當(dāng)前應(yīng)用的非常廣泛,一般在啟用消息隊列時都會采用MQ服務(wù)中間件,比如RabbitMq、ZeroMQ等等,這些服務(wù)是專業(yè)的消息服務(wù),在互聯(lián)網(wǎng)公司應(yīng)用的非常多,因此服務(wù)質(zhì)量和分布式方案都是比較成熟的。

依據(jù)Redis的特點,他也有很多類型可以實現(xiàn)消息隊列方案,在之前的項目中也有應(yīng)用,并且有些場合是MQ無法替代的。同時Redis因為不是一個標準的MQ服務(wù),因此很多方面無法滿足需求,比如ACK,丟包的問題無法保證,有這方面的需求還是使用MQ中間件;即使有這方面的問題使用Redis作為消息中間件的團隊仍大量存在,基于這些原因,Redis的作者開發(fā)了一個新的MQ服務(wù),同Redis類似,使用方式也基本類似Disque,文檔,之前作者表示未來會將該組件加入到Redis標準組件中,這樣的話,未來Redis也可以作為MQ的一個首選項。

在之前的項目中分別使用過列表、有序集合作為簡單的消息隊列,這里對這幾種方式進行簡單描述。

列表作為消息隊列

列表作為消息隊列,是最簡單的選擇,其特性天然適合隊列。只要在一端分別使用LPUSH將信息添加到列表,在另一端使用RPOP消費一個消息即可。采用兩個cli模擬列表的消息隊列行為

# client 1  producer,對入庫的資源ID進行上架操作
LPUSH mymq 1
LPUSH mymq 2
# client 2  consumer
# 獲取消息,并消費
RPOP mymq
"1"
# 獲取到資源1后,進行上架操作,操作完成后丟棄
# 處理完成后,需要再次獲取消息
RPOP mymq
"2"
RPOP mymq
(nil)
# mymq中沒有新的消息,消費者客戶端,需要不斷重復(fù)RPOP去檢查有沒有消息

采用列表作為消息隊列,也有一個問題,如上述示例中演示的,消費者每次主動去獲取消息,有沒有消息沒有辦法判斷,只能不斷的讀取。對于服務(wù)端采用JAVA的話,一般這種任務(wù)型的都會采用定期執(zhí)行的方案,那么這種代碼很可能如下所示:

public void runTask() {
    while(true) {
        Object id = redis.rpop(myMqKey);
        if (null == id) {
            // 沒有消息了,需要停止,停止多久再次執(zhí)行,這個時間不太好處理,并且沒有什么經(jīng)驗值,因為資源的入庫沒有規(guī)律。
        }
        // 處理上架
        do(id);
    }
}

不斷的去檢查Redis是否有新消息,雖然不麻煩,但是也在耗費資源,并且是無效的投入。Redis的列表中有一個BRPOP方法,前面章節(jié)提到過,這里可以采用這個方法,解決不斷檢查Redis是否有數(shù)據(jù),消耗資源的問題。仍然使用上述示例

# 客戶端1,consumer
BRPOP mymq 0        # 0 表示永不過期
# mymq隊列沒有數(shù)據(jù),此時一直阻塞,客戶端2生產(chǎn)消息后,客戶端1理解消費消息
# 1) "mymq"
# 2) "1"
# (30.82s)
# 等待了30.82秒,mymq出現(xiàn)數(shù)據(jù)并進行了消費,一次讀取消息結(jié)束
# 客戶端2,producer
LPUSH mymq 1
# (integer) 1

這種方式很好的解決了不斷檢查耗費資源的問題,此時JAVA代碼的更改并不大

public void runTask() {
    while(true) {
        List<Object> ids = redis.brpop(myMqKey, 0);
        // 因為永久等待,只要彈出數(shù)據(jù),就不為空
        // 處理上架, 數(shù)據(jù)在位置1中
        do(ids.get(1));
    }
}

使用有序集合作為消息隊列

有序集合作為消息隊列,除了能夠解決列表中不斷檢查耗費資源的問題,同時將得分采用時間戳表示時,還能實現(xiàn)消息到期時間消亡或到期必須處理的需求。

在之前的產(chǎn)品開發(fā)中,有一個銷售課程的子模塊,用戶下訂單后,有一個支付時間的限制。在這個產(chǎn)品中要求用戶下訂單后15分鐘內(nèi)完成支付,如果15分鐘內(nèi)不完成支付,則取消訂單。

這里就采用了有序集合作為隊列的支撐,其具體實現(xiàn)流程為

  • 當(dāng)用戶下訂單時,將訂單號作為值,以訂單過期時間15分鐘作為得分,加入到有序隊列中
  • 完成訂單后,引導(dǎo)用戶進入支付流程,略
  • 起一個持續(xù)的任務(wù),用于處理定單消亡處理流程
  • 獲取第一個訂單信息,檢查其得分是否超時,如果超時則處理訂單消亡流程
  • 接收到一個過期訂單,處理訂單的消亡,由于訂單支付過程沒有更改隊列,因此需要檢查訂單的狀態(tài)再進行消亡
  • 如果得到的訂單沒有過期,則暫停,其時間可以比較準確,就是得分剩余的時間
  • 上述流程中,訂單過期稱為命中,一旦命中,則繼續(xù)讀取下一個訂單,直到隊列為空或一個未到期訂單為止

發(fā)布訂閱

Redis提供了發(fā)布訂閱模式,可以實現(xiàn)異步消息通信。發(fā)布訂閱模式中,訂閱者可以定于一個或多個頻道,發(fā)布者可以向一個或多個頻道發(fā)送消息,只要訂閱了這些頻道的客戶端,都能夠收到發(fā)布的消息。

Redis得發(fā)布訂閱模式?jīng)]有持久化消息的操作,如果某一個頻道沒有客戶端訂閱,則發(fā)布出去的消息就會被丟棄,即使該頻道后續(xù)有客戶端訂閱,之前發(fā)布出去的消息也無法被再次收到。

發(fā)布訂閱模式主要由以下指令完成

# 發(fā)布消息,將消息message發(fā)布到指定的頻道
PUBLISH channel message

# 訂閱頻道,客戶端訂閱一個或多個頻道
SUBSCRIBE channel [channel...]

# 按模式訂閱,客戶端按照channel得模式進行訂閱,類似MQ的queue模式
PSUBSCRIBE pattern [pattern...]

# 退訂一個或多個頻道,與SUBSCRIBE指令配合使用
UNSUBSCRIBE channel [channel...]

# 按給定的模式退訂一個或多個頻道,與PSUBSCRIBE指令配合使用
PUNSUBSCRIBE pattern [pattern...]

PUBLISH指令比較簡單,就是用于給一個頻道發(fā)布消息。其返回值表示當(dāng)前消息被幾個訂閱者接收。

# 向訂單處理channel發(fā)送一個訂單id=1的消息
PUBLISH mymq.order 1
# (integer) 0
# 由于此時沒有客戶端訂閱該頻道,因此返回值為0,訂單1得消息再不會被接收處理

對于其他幾個指令就比較復(fù)雜一些,不過SUBSCRIBE等幾個關(guān)于訂閱的指令,其收到的消息格式是類似的

UNSUBSCRIBE my
# 1) "unsubscribe"
# 2) "my"
# 3) (integer) 0

PUNSUBSCRIBE my*
# 1) "punsubscribe"
# 2) "my*"
# 3) (integer) 0

SUBSCRIBE mymq.order
# 1) "subscribe"
# 2) "mymq.order"
# 3) (integer) 1

PSUBSCRIBE my*
# 1) "psubscribe"
# 2) "my*"
# 3) (integer) 1

# 已訂閱成功,收到的消息
# 1) "message"
# 2) "mymq.order"
# 3) "2"

收到的消息是一個列表,長度為三,其具體得信息分別表示:

  • 當(dāng)調(diào)用的是指令時,第一個位置表示了當(dāng)前的指令信息,如SUBSCRIBE/UNSUBSCIRBE等;如果是消息的話,那么第一個位置為message,消息類型對業(yè)務(wù)來說是最重要的
  • 第二個位置,表示頻道或訂閱的頻道模式,如my/my*等
  • 第三個位置,如果是消息,則為消息內(nèi)容;如果是指令,訂閱指令,表示該頻道、頻道模式的訂閱數(shù)量;退訂指令表示,還有多少客戶端訂閱該頻道

分別啟用三個客戶端,一個客戶端用于訂閱頻道,一個用于按模式訂閱,一個用來發(fā)布消息。

# 客戶端1,SUBSCRIBE
SUBSCRIBE mymq.order
# Reading messages... (press Ctrl-C to quit)
# 1) "subscribe"
# 2) "mymq.order"
# 3) (integer) 1
# 成功訂閱,客戶端等待消息

### 客戶端3發(fā)送PUBLISH mymq.order world
# 1) "message"
# 2) "mymq.order"
# 3) "world"
# 客戶端2,PSUBSCRIBE mymq.order.? mymq.order*
PSUBSCRIBE mymq.order.? mymq.order*
# Reading messages... (press Ctrl-C to quit)
# 1) "psubscribe"
# 2) "mymq.order.?"
# 3) (integer) 1
# 1) "psubscribe"
# 2) "mymq.order*"
# 3) (integer) 2
# 對于mymq.order*已經(jīng)有兩個客戶端訂閱,因此該模式返回的訂閱者為2,等待發(fā)送消息
# 客戶端3發(fā)送 PUBLISH mymq.order.1 hello
# 1) "pmessage"
# 2) "mymq.order.?"
# 3) "mymq.order.1"
# 4) "hello"
# 1) "pmessage"
# 2) "mymq.order*"
# 3) "mymq.order.1"
# 4) "hello"

#### PUBLISH mymq.order world
# 1) "pmessage"
# 2) "mymq.order*"
# 3) "mymq.order"
# 4) "world"

# 客戶端3,發(fā)送消息
PUBLISH mymq.order.1 hello
# (integer) 2
# 由于客戶端2有兩個符合mymq.order.1的模式,因此接收到的客戶端為2
PUBLISH mymq.order world
# (integer) 2
# 客戶端1滿足訂閱頻道,客戶端2滿足mymq.order*的模式,因此接收到的客戶端仍然是2
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容