RocketMQ(二)RocketMQ高級(jí)特性

一、 關(guān)鍵特性

1 消息發(fā)送和消費(fèi)

1)消息發(fā)送者步驟分析:

  • 創(chuàng)建消息生產(chǎn)者producer,并制定生產(chǎn)者組名
  • 指定NameServer地址
  • 啟動(dòng)producer
  • 創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體
  • 發(fā)送消息
  • 關(guān)閉生產(chǎn)者producer

2)消息消費(fèi)者步驟分析:

  • 創(chuàng)建消費(fèi)者consumer,制定消費(fèi)者組名
  • 指定NameServer地址
  • 訂閱主題Topic和Tag
  • 設(shè)置回調(diào)函數(shù),處理消息
  • 啟動(dòng)消費(fèi)者consumer
2 消息類型

使用RocketMQ可以發(fā)送普通消息、順序消息、事務(wù)消息,順序消息能實(shí)現(xiàn)有序消費(fèi),事務(wù)消息可以解決分布式事務(wù)實(shí)現(xiàn)數(shù)據(jù)最終一致。

1)普通消息
消息隊(duì)列 MQ 提供三種方式來(lái)發(fā)送普通消息:

  • 可靠同步發(fā)送
    同步發(fā)送是指消息發(fā)送方發(fā)出數(shù)據(jù)后,會(huì)在收到接收方發(fā)回響應(yīng)之后才發(fā)下一個(gè)數(shù)據(jù)包的通訊方式。這種可靠的消息發(fā)送方式使用的比較廣泛,比如:重要的消息通知,短信通知。
public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //- 創(chuàng)建消息生產(chǎn)者producer,并制定生產(chǎn)者組名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //- 指定NameServer地址
        producer.setNamesrvAddr("192.168.217.130:9876");
        //- 啟動(dòng)producer
        producer.start();
        //- 創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體
        Message message = new Message("base","Tag1","keys_1",("hello").getBytes());
        //- 發(fā)送消息
        SendResult result = producer.send(message);
        //發(fā)送狀態(tài)
        SendStatus sendStatus = result.getSendStatus();
        //消息id
        String msgId = result.getMsgId();
        //消息接受隊(duì)列id
        int queueId = result.getMessageQueue().getQueueId();
        TimeUnit.SECONDS.sleep(3);
        System.out.println("發(fā)送狀態(tài)"+result+",消息id"+msgId+",隊(duì)列"+queueId);

        //- 關(guān)閉生產(chǎn)者producer
        producer.shutdown();
    }
}
  • 可靠異步發(fā)送
    異步發(fā)送是指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng),接著發(fā)送下個(gè)數(shù)據(jù)包的通訊方式,發(fā)送方通過(guò)回調(diào)接口接收服務(wù)器響應(yīng),并對(duì)響應(yīng)結(jié)果進(jìn)行處理。異步消息通常用在對(duì)響應(yīng)時(shí)間敏感的業(yè)務(wù)場(chǎng)景,即發(fā)送端不能容忍長(zhǎng)時(shí)間地等待Broker的響應(yīng)。
public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //- 創(chuàng)建消息生產(chǎn)者producer,并制定生產(chǎn)者組名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //- 指定NameServer地址
        producer.setNamesrvAddr("192.168.217.130:9876");
        //- 啟動(dòng)producer
        producer.start();
        //- 創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體
        for (int i = 0; i < 3; i++) {
            Message message = new Message("base","Tag2",("hello"+i).getBytes());
            //- 發(fā)送異步消息
            producer.send(message, new SendCallback() {
                public void onSuccess(SendResult sendResult) {
                    System.out.println("發(fā)送成功:"+sendResult);
                }

                public void onException(Throwable throwable) {
                    System.out.println("發(fā)送異常:"+throwable);
                }
            });

            TimeUnit.SECONDS.sleep(3);
        }
        //- 關(guān)閉生產(chǎn)者producer
        producer.shutdown();
    }
}
  • 單向發(fā)送消息
    這種方式注意用在不特別關(guān)心發(fā)送結(jié)果的場(chǎng)景,例如日志發(fā)送。
public class OnewayProducer {
    public static void main(String[] args) throws Exception {
        //- 創(chuàng)建消息生產(chǎn)者producer,并制定生產(chǎn)者組名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //- 指定NameServer地址
        producer.setNamesrvAddr("192.168.217.130:9876");
        //- 啟動(dòng)producer
        producer.start();
        //- 創(chuàng)建消息對(duì)象,指定主題Topic、Tag和消息體
        for (int i = 0; i < 3; i++) {
            Message message = new Message("base","Tag3",("hello"+i).getBytes());
            //- 發(fā)送單向消息
            producer.sendOneway(message);
            TimeUnit.SECONDS.sleep(3);
        }
        //- 關(guān)閉生產(chǎn)者producer
        producer.shutdown();
    }
}
  • 編寫(xiě)消息消費(fèi)者消費(fèi)消息( 啟動(dòng)時(shí)需要先啟動(dòng)消費(fèi)者監(jiān)聽(tīng))
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //- 創(chuàng)建消費(fèi)者consumer,制定消費(fèi)者組名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //- 指定NameServer地址
        consumer.setNamesrvAddr("192.168.217.130:9876");
        //- 訂閱主題Topic和Tag
        consumer.subscribe("base","*");
        //- 設(shè)置回調(diào)函數(shù),處理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //接收消息內(nèi)容
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //- 啟動(dòng)消費(fèi)者consumer
        consumer.start();
    }
}

RocketMQ 常見(jiàn)異常處理
2) 延時(shí)消息
消息在發(fā)送到消息隊(duì)列 MQ 服務(wù)端后并不會(huì)立馬投遞,而是根據(jù)消息中的屬性延遲固定時(shí)間后才投遞給消費(fèi)者。但是RocketMQ不支持任意時(shí)間精度,僅支持特定的 level,例如定時(shí) 5s, 10s, 1m 等。其中,level=0 級(jí)表示不延時(shí),level=1 表示 1 級(jí)延時(shí),level=2 表示 2 級(jí)延時(shí),以此類推。
在服務(wù)器端(rocketmq-broker端)的屬性配置文件中加入以下行:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

描述了各級(jí)別與延時(shí)時(shí)間的對(duì)應(yīng)映射關(guān)系。
? 這個(gè)配置項(xiàng)配置了從1級(jí)開(kāi)始,各級(jí)延時(shí)的時(shí)間,可以修改這個(gè)指定級(jí)別的延時(shí)時(shí)間;
? 時(shí)間單位支持:s、m、h、d,分別表示秒、分、時(shí)、天;
? 默認(rèn)值就是上面聲明的,可手工調(diào)整;
? 默認(rèn)值已夠用,不建議修改這個(gè)值。

public class DelayProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.217.130:9876");
        producer.start();
        //延時(shí)10s
        Message message = new Message("base","Tag1","keys_1",("hello").getBytes());
        message.setDelayTimeLevel(3);
        producer.send(message);
        producer.shutdown();
    }
}

如果你使用阿里云服務(wù)器,可以使用阿里封裝的api,它支持定時(shí)消息和延時(shí)消息,可以適應(yīng)更多場(chǎng)景。
詳細(xì)介紹和代碼示例

3) 順序消息

消息有序指的是可以按照消息的發(fā)送順序來(lái)消費(fèi)(FIFO)。RocketMQ可以嚴(yán)格的保證消息有序,可以分為分區(qū)有序或者全局有序。
詳細(xì)介紹


4) 事務(wù)消息


消息隊(duì)列 MQ 提供類似 X/Open XA 的分布式事務(wù)功能,通過(guò)消息隊(duì)列 MQ 事務(wù)消息能達(dá)到分布式事務(wù)的最終一致。上圖說(shuō)明了事務(wù)消息的大致流程:正常事務(wù)消息的發(fā)送和提交、事務(wù)消息的補(bǔ)償流程。

  • 事務(wù)消息發(fā)送及提交:
    ①發(fā)送消息(half消息);
    ②服務(wù)端響應(yīng)消息寫(xiě)入結(jié)果;
    ③根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)(如果寫(xiě)入失敗,此時(shí)half消息對(duì)業(yè)務(wù)不可見(jiàn),本地邏輯不執(zhí)行);
    ④根據(jù)本地事務(wù)狀態(tài)執(zhí)行Commit或Rollback(Commit操作生成消息索引,消息對(duì)消費(fèi)者可見(jiàn))。

  • 事務(wù)消息的補(bǔ)償流程:

①對(duì)沒(méi)有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息),從服務(wù)端發(fā)起一次“回查”;
②Producer收到回查消息,檢查回查消息對(duì)應(yīng)的本地事務(wù)的狀態(tài)。
③根據(jù)本地事務(wù)狀態(tài),重新Commit或RollBack
其中,補(bǔ)償階段用于解決消息Commit或Rollback發(fā)生超時(shí)或者失敗的情況。

  • 事務(wù)消息狀態(tài):

事務(wù)消息共有三種狀態(tài):提交狀態(tài)、回滾狀態(tài)、中間狀態(tài):
①TransactionStatus.CommitTransaction:提交事務(wù),它允許消費(fèi)者消費(fèi)此消息。
②TransactionStatus.RollbackTransaction:回滾事務(wù),它代表該消息將被刪除,不允許被消費(fèi)。
③TransactionStatus.Unkonwn:中間狀態(tài),它代表需要檢查消息隊(duì)列來(lái)確定消息狀態(tài)。
詳細(xì)介紹和代碼示例

消息類型對(duì)比:

Topic的消息類型 是否支持事務(wù)消息 是否支持定時(shí)/延時(shí)消息 性能
無(wú)序消息(普通、事務(wù)、定時(shí)/延時(shí)消息) 最高
分區(qū)順序消息
全局順序消息 一般

發(fā)送方式對(duì)比:

消息類型 是否支持同步發(fā)送 是否支持異步發(fā)送 是否支持單向發(fā)送
無(wú)序消息(普通、事務(wù)、定時(shí)/延時(shí)消息) 最高
分區(qū)順序消息
全局順序消息
3 批量消息

批量發(fā)送消息能顯著提高傳遞消息的性能,限制是這些消息應(yīng)該具有相同的topic,相同的waitStoreMsgOK,而且不能是延時(shí)消息。此外,這一批量消息的總大小不應(yīng)超過(guò)1MB。如果超過(guò),需要把消息分割。
不超過(guò)1M,直接producer.send(msg)就可以了。
超過(guò)IM,消息分割代碼示例

4 消息消費(fèi)方式

(1)負(fù)載均衡模式
消費(fèi)者默認(rèn)采用負(fù)載均衡方式,多個(gè)消費(fèi)者共同消費(fèi)隊(duì)列消息,每個(gè)消費(fèi)者處理的消息不同。
(2)廣播模式
消費(fèi)者采用廣播的方式消費(fèi)消息,每個(gè)消費(fèi)者消費(fèi)的消息都是相同的。

Producer負(fù)載均衡

Producer端,每個(gè)實(shí)例在發(fā)消息的時(shí)候,默認(rèn)會(huì)輪詢所有的message queue發(fā)送,以達(dá) 到讓消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息 就發(fā)送到不同的broker下,如下圖:



圖中箭頭線條上的標(biāo)號(hào)代表順序,發(fā)布方會(huì)把第一條消息發(fā)送至 Queue 0,然后第二條 消息發(fā)送至 Queue 1,以此類推。

Consumer負(fù)載均衡

1)集群模式
在集群消費(fèi)模式下,每條消息只需要投遞到訂閱這個(gè)topic的Consumer Group下的一個(gè) 實(shí)例即可。RocketMQ采用主動(dòng)拉取的方式拉取并消費(fèi)消息,在拉取的時(shí)候需要明確指定 拉取哪一條message queue。 而每當(dāng)實(shí)例的數(shù)量有變更,都會(huì)觸發(fā)一次所有實(shí)例的負(fù)載均衡,這時(shí)候會(huì)按照queue的 數(shù)量和實(shí)例的數(shù)量平均分配queue給每個(gè)實(shí)例。 默認(rèn)的分配算法是AllocateMessageQueueAveragely,如下圖:


還有另外一種平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分?jǐn)?每一條queue,只是以環(huán)狀輪流分queue的形式,如下圖:

需要注意的是,集群模式下,queue都是只允許分配只一個(gè)實(shí)例,這是由于如果多個(gè)實(shí) 例同時(shí)消費(fèi)一個(gè)queue的消息,由于拉取哪些消息是consumer主動(dòng)控制的,那樣會(huì)導(dǎo)致 同一個(gè)消息在不同的實(shí)例下被消費(fèi)多次,所以算法上都是一個(gè)queue只分給一個(gè) consumer實(shí)例,一個(gè)consumer實(shí)例可以允許同時(shí)分到不同的queue。 通過(guò)增加consumer實(shí)例去分?jǐn)俼ueue的消費(fèi),可以起到水平擴(kuò)展的消費(fèi)能力的作用。而 有實(shí)例下線的時(shí)候,會(huì)重新觸發(fā)負(fù)載均衡,這時(shí)候原來(lái)分配到的queue將分配到其他實(shí) 例上繼續(xù)消費(fèi)。 但是如consumer實(shí)例的數(shù)量比message queue的總數(shù)量還多的話,多出來(lái)的 consumer實(shí)例將無(wú)法分到queue,也就無(wú)法消費(fèi)到消息,也就無(wú)法起到分?jǐn)傌?fù)載的作用了。所以需要控制讓queue的總數(shù)量大于等于consumer的數(shù)量。
2)廣播模式
由于廣播模式下要求一條消息需要投遞到一個(gè)消費(fèi)組下面所有的消費(fèi)者實(shí)例,所以也就 沒(méi)有消息被分?jǐn)傁M(fèi)的說(shuō)法。 在實(shí)現(xiàn)上,就是在consumer分配queue的時(shí)候,所有consumer都分到所 有的queue。

《深入理解RocketMQ》- MQ消息的投遞機(jī)制

5 簡(jiǎn)單消息過(guò)濾

1) Tag過(guò)濾

RocketMQ 的消息過(guò)濾方式有別于其他消息中間件,是在訂閱時(shí),再做過(guò)濾,先來(lái)看下 Consume Queue 的存儲(chǔ)結(jié)構(gòu)。

(1)在 Broker 端進(jìn)行 Message Tag 比對(duì),先遍歷 Consume Queue,如果存儲(chǔ)的 Message Tag 與訂閱的 Message Tag 不符合,則跳過(guò),繼續(xù)比對(duì)下一個(gè),符合則傳輸給 Consumer。注意:Message Tag 是字符串形式,Consume Queue 中存儲(chǔ)的是其對(duì)應(yīng)的 hashcode,比對(duì)時(shí)也是比對(duì) hashcode。

(2)Consumer 收到過(guò)濾后的消息后,同樣也要執(zhí)行在 Broker 端的操作,但是比對(duì)的是真實(shí)的 Message Tag 字 符串,而不是 Hashcode。

為什么過(guò)濾要這樣做?

(1)Message Tag 存儲(chǔ) Hashcode,是為了在 Consume Queue 定長(zhǎng)方式存儲(chǔ),節(jié)約空間。

(2)過(guò)濾過(guò)程中不會(huì)訪問(wèn) Commit Log 數(shù)據(jù),可以保證堆積情況下也能高效過(guò)濾。

(3) 即使存在 Hash 沖突,也可以在 Consumer 端進(jìn)行修正,保證萬(wàn)無(wú)一失。

簡(jiǎn)單消息過(guò)濾通過(guò)指定多個(gè) Tag 來(lái)過(guò)濾消息,過(guò)濾動(dòng)作在服務(wù)器進(jìn)行。如:

consumer.subscribe("TopicTest1", "TagA || TagC || TagD");

以上方式對(duì)于復(fù)雜的場(chǎng)景可能不起作用,因?yàn)橐粋€(gè)消息只能有一個(gè)tag。這種情況下,可以使用SQL表達(dá)式篩選消息。

2) SQL語(yǔ)法過(guò)濾

consumer.subscribe("TopicTest",MessageSelector

.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" + "and (a is not null and a between 0 3)"));

注意:只有使用push模式的消費(fèi)者此案使用SQL92標(biāo)準(zhǔn)的sql語(yǔ)句。

其他消息過(guò)濾知識(shí)

6 消息重試

1)順序消息的重試

對(duì)于順序消息,當(dāng)消費(fèi)者消費(fèi)消息失敗后,消息隊(duì)列 RocketMQ 會(huì)自動(dòng)不斷進(jìn)行消息重 試(每次間隔時(shí)間為 1 秒),這時(shí),應(yīng)用會(huì)出現(xiàn)消息消費(fèi)被阻塞的情況。因此,在使用順序消息時(shí),務(wù)必保證應(yīng)用能夠及時(shí)監(jiān)控并處理消費(fèi)失敗的情況,避免阻塞現(xiàn)象的發(fā)生。

2) 無(wú)序消息的重試

對(duì)于無(wú)序消息(普通、定時(shí)、延時(shí)、事務(wù)消息),當(dāng)消費(fèi)者消費(fèi)消息失敗時(shí),您可以通過(guò)設(shè)置返回狀態(tài)達(dá)到消息重試的結(jié)果。

無(wú)序消息的重試只針對(duì)集群消費(fèi)方式生效;廣播方式不提供失敗重試特性,即消費(fèi)失敗后,失敗消息不再重試,繼續(xù)消費(fèi)新的消息。

消息重試詳細(xì)說(shuō)明

3)死信隊(duì)列

當(dāng)一條消息初次消費(fèi)失敗,消息隊(duì)列 RocketMQ 會(huì)自動(dòng)進(jìn)行消息重試;達(dá)到最大重試次 數(shù)后,若消費(fèi)依然失敗,則表明消費(fèi)者在正常情況下無(wú)法正確地消費(fèi)該消息,此時(shí),消息隊(duì)列 RocketMQ 不會(huì)立刻將消息丟棄,而是將其發(fā)送到該消費(fèi)者對(duì)應(yīng)的特殊隊(duì)列中。 在消息隊(duì)列 RocketMQ 中,這種正常情況下無(wú)法被消費(fèi)的消息稱為死信消息(Dead-Letter Message),存儲(chǔ)死信消息的特殊隊(duì)列稱為死信隊(duì)列(Dead-Letter Queue)。

死信隊(duì)列詳細(xì)說(shuō)明

7 消費(fèi)冪等

詳細(xì)說(shuō)明

二、消息存儲(chǔ)

分布式隊(duì)列因?yàn)橛懈呖煽啃缘囊?,所以?shù)據(jù)要進(jìn)行持久化存儲(chǔ)。



流程:
(1) 消息生成者發(fā)送消息;
(2) MQ收到消息,將消息進(jìn)行持久化,在存儲(chǔ)中新增一條記錄 ;
(3) 返回ACK給生產(chǎn)者;
(4) MQ push 消息給對(duì)應(yīng)的消費(fèi)者,然后等待消費(fèi)者返回ACK;
(5) 如果消息消費(fèi)者在指定時(shí)間內(nèi)成功返回ack,那么MQ認(rèn)為消息消費(fèi)成功,在存儲(chǔ)中 刪除消息,即執(zhí)行第6步;如果MQ在指定時(shí)間內(nèi)沒(méi)有收到ACK,則認(rèn)為消息消費(fèi)失 敗,會(huì)嘗試重新push消息,重復(fù)執(zhí)行4、5、6步驟
(6) MQ刪除消息。

1 存儲(chǔ)介質(zhì)

(1) 關(guān)系型數(shù)據(jù)庫(kù)DB
Apache下開(kāi)源的另外一款MQ—ActiveMQ(默認(rèn)采用的KahaDB做消息存儲(chǔ))可選用 JDBC的方式來(lái)做消息持久化,通過(guò)簡(jiǎn)單的xml配置信息即可實(shí)現(xiàn)JDBC消息存儲(chǔ)。由于, 普通關(guān)系型數(shù)據(jù)庫(kù)(如Mysql)在單表數(shù)據(jù)量達(dá)到千萬(wàn)級(jí)別的情況下,其IO讀寫(xiě)性能往往 會(huì)出現(xiàn)瓶頸。在可靠性方面,該種方案非常依賴DB,如果一旦DB出現(xiàn)故障,則MQ的消 息就無(wú)法落盤存儲(chǔ)會(huì)導(dǎo)致線上故障。
(2) 文件系統(tǒng)
目前業(yè)界較為常用的幾款產(chǎn)品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盤 至所部署虛擬機(jī)/物理機(jī)的文件系統(tǒng)來(lái)做持久化(刷盤一般可以分為異步刷盤和同步刷盤兩種模式)。消息刷盤為消息存儲(chǔ)提供了一種高效率、高可靠性和高性能的數(shù)據(jù) 持久化方式。除非部署MQ機(jī)器本身或是本地磁盤掛了,否則一般是不會(huì)出現(xiàn)無(wú)法持 久化的故障問(wèn)題。

2 性能對(duì)比

文件系統(tǒng)>關(guān)系型數(shù)據(jù)庫(kù)DB

3 消息的存儲(chǔ)和發(fā)送

1)消息存儲(chǔ)

磁盤如果使用得當(dāng),磁盤的速度完全可以匹配上網(wǎng)絡(luò) 的數(shù)據(jù)傳輸速度。目前的高性能磁 盤,順序?qū)懰俣瓤梢赃_(dá)到600MB/s, 超過(guò)了一般網(wǎng)卡的傳輸速度。但是磁盤隨機(jī)寫(xiě)的速 度只有大概100KB/s,和順序?qū)懙男阅芟嗖?000倍!因?yàn)橛腥绱司薮蟮乃俣炔顒e,好的 消息隊(duì)列系統(tǒng)會(huì)比普通的消息隊(duì)列系統(tǒng)速度快多個(gè)數(shù)量級(jí)。RocketMQ的消息用順序?qū)? 保證了消息存儲(chǔ)的速度。

2)消息發(fā)送

Linux操作系統(tǒng)分為【用戶態(tài)】和【內(nèi)核態(tài)】,文件操作、網(wǎng)絡(luò)操作需要涉及這兩種形態(tài) 的切換,免不了進(jìn)行數(shù)據(jù)復(fù)制。 一臺(tái)服務(wù)器 把本機(jī)磁盤文件的內(nèi)容發(fā)送到客戶端,一般分為兩個(gè)步驟:
1)read;讀取本地文件內(nèi)容;
2)write;將讀取的內(nèi)容通過(guò)網(wǎng)絡(luò)發(fā)送出去。
這兩個(gè)看似簡(jiǎn)單的操作,實(shí)際進(jìn)行了4 次數(shù)據(jù)復(fù)制,分別是:
1. 從磁盤復(fù)制數(shù)據(jù)到內(nèi)核態(tài)內(nèi)存;
2. 從內(nèi)核態(tài)內(nèi)存復(fù) 制到用戶態(tài)內(nèi)存;
3. 然后從用戶態(tài) 內(nèi)存復(fù)制到網(wǎng)絡(luò)驅(qū)動(dòng)的內(nèi)核態(tài)內(nèi)存;
4. 最后是從網(wǎng)絡(luò)驅(qū)動(dòng)的內(nèi)核態(tài)內(nèi)存復(fù) 制到網(wǎng)卡中進(jìn)行傳輸。


Consumer 消費(fèi)消息過(guò)程,使用了零拷貝,零拷貝包含以下兩種方式

  1. 使用 mmap + write 方式
    優(yōu)點(diǎn):即使頻繁調(diào)用,使用小塊文件傳輸,效率也很高
    缺點(diǎn):不能很好的利用 DMA 方式,會(huì)比 sendfile 多消耗 CPU,內(nèi)存安全性控制復(fù)雜,需要避免 JVM Crash問(wèn)題。
  2. 使用 sendfile 方式
    優(yōu)點(diǎn):可以利用 DMA 方式,消耗 CPU 較少,大塊文件傳輸效率高,無(wú)內(nèi)存安全新問(wèn)題。
    缺點(diǎn):小塊文件效率低于 mmap 方式,只能是 BIO 方式傳輸,不能使用 NIO。
    RocketMQ 選擇了第一種方式,mmap+write 方式,因?yàn)橛行K數(shù)據(jù)傳輸?shù)男枨?,效果?huì)比 sendfile 更好。
    關(guān)于 Zero Copy 的更詳細(xì)介紹,請(qǐng)參考以下文章
    http://www.linuxjournal.com/article/6345

通過(guò)使用mmap的方式,可以省去向用戶態(tài)的內(nèi)存復(fù)制,提高速度。這種機(jī)制在Java中是 通過(guò)MappedByteBuffer實(shí)現(xiàn)的 RocketMQ充分利用了上述特性,提高消息存盤和網(wǎng)絡(luò)發(fā)送 的速度。

這里需要注意的是,采用MappedByteBuffer這種內(nèi)存映射的方式有幾個(gè)限制,其 中之一是一次只能映射1.5~2G 的文件至用戶態(tài)的虛擬內(nèi)存,這也是為何RocketMQ 默認(rèn)設(shè)置單個(gè)CommitLog日志數(shù)據(jù)文件為1G的原因了。
MQ消息最終一致性解決方案

4 消息存儲(chǔ)結(jié)構(gòu)

RocketMQ消息的存儲(chǔ)是由ConsumeQueue和CommitLog配合完成 的,消息真正的物 理存儲(chǔ)文件是CommitLog,ConsumeQueue是消息的邏輯隊(duì)列,類似數(shù)據(jù)庫(kù)的索引文 件,存儲(chǔ)的是指向物理存儲(chǔ)的地址。每 個(gè)Topic下的每個(gè)Message Queue都有一個(gè)對(duì)應(yīng) 的ConsumeQueue文件。


  • CommitLog:存儲(chǔ)消息的元數(shù)據(jù)

  • ConsumerQueue:存儲(chǔ)消息在CommitLog的索引

  • IndexFile:為了消息查詢提供了一種通過(guò)key或時(shí)間區(qū)間來(lái)查詢消息的方法,這種通過(guò)IndexFile來(lái)查找消息的方法不影響發(fā)送與消費(fèi)消息的主流程

5 刷盤機(jī)制

RocketMQ的消息是存儲(chǔ)到磁盤上的,這樣既能保證斷電后恢復(fù), 又可以讓存儲(chǔ)的消息 量超出內(nèi)存的限制。RocketMQ為了提高性能,會(huì)盡可能地保證磁盤的順序?qū)?。消息在?過(guò)Producer寫(xiě)入RocketMQ的時(shí) 候,有兩種寫(xiě)磁盤方式,分布式同步刷盤和異步刷盤。


1)同步刷盤

在返回寫(xiě)成功狀態(tài)時(shí),消息已經(jīng)被寫(xiě)入磁盤。具體流程是,消息寫(xiě)入內(nèi)存的PAGECACHE 后,立刻通知刷盤線程刷盤, 然后等待刷盤完成,刷盤線程執(zhí)行完成后喚醒等待的線 程,返回消息寫(xiě) 成功的狀態(tài)。

2)異步刷盤

在返回寫(xiě)成功狀態(tài)時(shí),消息可能只是被寫(xiě)入了內(nèi)存的PAGECACHE,寫(xiě)操作的返回快,吞 吐量大;當(dāng)內(nèi)存里的消息量積累到一定程度時(shí),統(tǒng)一觸發(fā)寫(xiě)磁盤動(dòng)作,快速寫(xiě)入。

3)配置

同步刷盤還是異步刷盤,都是通過(guò)Broker配置文件里的flushDiskType 參數(shù)設(shè)置的, 這個(gè)參數(shù)被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一個(gè)。

三、高可用性機(jī)制

RocketMQ分布式集群是通過(guò)Master和Slave的配合達(dá)到高可用性的。

Master和Slave的區(qū)別:在Broker的配置文件中,參數(shù) brokerId的值為0表明這個(gè)Broker 是Master,大于0表明這個(gè)Broker是 Slave,同時(shí)brokerRole參數(shù)也會(huì)說(shuō)明這個(gè)Broker 是Master還是Slave。 Master角色的Broker支持讀和寫(xiě),Slave角色的Broker僅支持讀,也就是 Producer只能 和Master角色的Broker連接寫(xiě)入消息;Consumer可以連接 Master角色的Broker,也可 以連接Slave角色的Broker來(lái)讀取消息。

1 消息消費(fèi)高可用

在Consumer的配置文件中,并不需要設(shè)置是從Master讀還是從Slave 讀,當(dāng)Master不 可用或者繁忙的時(shí)候,Consumer會(huì)被自動(dòng)切換到從Slave 讀。有了自動(dòng)切換Consumer 這種機(jī)制,當(dāng)一個(gè)Master角色的機(jī)器出現(xiàn)故障后,Consumer仍然可以從Slave讀取消 息,不影響Consumer程序。這就達(dá)到了消費(fèi)端的高可用性。

2 消息發(fā)送高可用

在創(chuàng)建Topic的時(shí)候,把Topic的多個(gè)Message Queue創(chuàng)建在多個(gè)Broker組上(相同 Broker名稱,不同 brokerId的機(jī)器組成一個(gè)Broker組),這樣當(dāng)一個(gè)Broker組的 Master不可 用后,其他組的Master仍然可用,Producer仍然可以發(fā)送消息。 RocketMQ目前還不支持把Slave自動(dòng)轉(zhuǎn)成Master,如果機(jī)器資源不足, 需要把Slave轉(zhuǎn) 成Master,則要手動(dòng)停止Slave角色Broker,更改配置文 件,用新的配置文件啟動(dòng) Broker。

3 消息主從復(fù)制

如果一個(gè)Broker組有Master和Slave,消息需要從Master復(fù)制到Slave 上,有同步和異步兩種復(fù)制方式。

1)同步復(fù)制

同步復(fù)制方式是等Master和Slave均寫(xiě) 成功后才反饋給客戶端寫(xiě)成功狀態(tài);

在同步復(fù)制方式下,如果Master出故障, Slave上有全部的備份數(shù)據(jù),容易恢復(fù),但是同 步復(fù)制會(huì)增大數(shù)據(jù)寫(xiě)入 延遲,降低系統(tǒng)吞吐量。

2)異步復(fù)制

異步復(fù)制方式是只要Master寫(xiě)成功 即可反饋給客戶端寫(xiě)成功狀態(tài)。在異步復(fù)制方式下,系統(tǒng)擁有較低的延遲和較高的吞吐量,但是如果Master出了故障, 有些數(shù)據(jù)因?yàn)闆](méi)有被寫(xiě) 入Slave,有可能會(huì)丟失;

3)配置

同步復(fù)制和異步復(fù)制是通過(guò)Broker配置文件里的brokerRole參數(shù)進(jìn)行設(shè)置的,這個(gè)參數(shù) 可以被設(shè)置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三個(gè)值中的一個(gè)。

4) 總結(jié)


實(shí)際應(yīng)用中要結(jié)合業(yè)務(wù)場(chǎng)景,合理設(shè)置刷盤方式和主從復(fù)制方式, 尤其是SYNC_FLUSH 方式,由于頻繁地觸發(fā)磁盤寫(xiě)動(dòng)作,會(huì)明顯降低 性能。通常情況下,應(yīng)該把Master和 Save配置成ASYNC_FLUSH的刷盤 方式,主從之間配置成SYNC_MASTER的復(fù)制方式,這 樣即使有一臺(tái) 機(jī)器出故障,仍然能保證數(shù)據(jù)不丟,是個(gè)不錯(cuò)的選擇。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1 架構(gòu)原理 1.1 應(yīng)用場(chǎng)景 只支持發(fā)布訂閱模式。 大數(shù)據(jù)量的消息堆積能力,最終數(shù)據(jù)是持久化到磁盤上,理論上無(wú)限...
    可笑可樂(lè)閱讀 9,646評(píng)論 0 2
  • 目錄1 MQ? 1.1 MQ簡(jiǎn)介? 1.2 MQ的缺點(diǎn)? 1.3 主流消息隊(duì)列比較2 RocketMQ入門? 2....
    TiaNa_na閱讀 2,203評(píng)論 0 8
  • 什么是rocketmq RocketMQ 是阿里巴巴開(kāi)源的消息隊(duì)列中間件。具有下列特點(diǎn): 能夠保證嚴(yán)格的消息順序 ...
    millions_chan閱讀 11,124評(píng)論 2 10
  • 這是從阿里巴巴中間件博客中轉(zhuǎn)過(guò)來(lái)的入門RocketMQ的文章,原文鏈接 本文首先引出消息中間件通常需要解決哪些問(wèn)題...
    super_pcm閱讀 400評(píng)論 0 0
  • 夏天到了 西遼河的水漲了 尋遍每朵浪花 卻沒(méi)有你的影子 飛鳥(niǎo)說(shuō)你還在路上 因?yàn)槟銢](méi)有翱翔的翅膀 它們是你的使者 清...
    我是一片云_d288閱讀 238評(píng)論 2 4

友情鏈接更多精彩內(nèi)容