從基本架構、高可用高可靠、負載均衡、刷盤機制、消息順序以及分布式事務對RocketMQ和Kafka進行深入比較

基本架構:

RocketMQ:

每個Broker與所有NameServer保持心跳
每個Producer/Consumer與其中一個NameServer建立長連接,與當前生產/消費的Topic涉及到的所有Broker保持心跳
NameServer之間互不通信
每組Broker(Master/Slave)互不通信

Kafka:

多個Broker連接到相同的ZooKeeper集群
一個Topic對應一個或多個Partition。
Topic是邏輯單位。Partition是物理單位。
Partition有多個Replica,均衡分布在不同Broker,其中一個為Leader,負責消息讀寫。
其他Replica則是Follower,Follower定期到Leader上同步數據。
每個Partition副本對應一個磁盤上的日志文件夾

高可用高可靠機制:

Kafka:

保證Producer發(fā)送消息可靠性

  • producer.type=sync
  • request.require.acks:設置為ALL或者-1,等待所有ISR接收到消息后再給Producer發(fā)送Response。要配合設置Broker的ISR相關參數。

保證Broker消息可靠,高可用和吞吐量

通過如下配置來保證Broker消息可靠性:

  • default.replication.factor:設置為大于等于3,保證一個partition中至少有兩個Replica,并且replication.factor > min.insync.replicas
  • min.insync.replicas:設置為大于等于2,保證ISR中至少有兩個Replica
  • unclean.leader.election.enable=false,那么就意味著非ISR中的副本不能夠參與選舉,避免臟Leader。

Kafka的ISR機制可自動動態(tài)調整同步復制的Replica,將慢(可能是暫時的慢)Follower踢出ISR,將同步趕上的Follower拉回ISR,避免最慢的Follower拖慢整體速度,最大限度地兼顧了可靠性和可用性。

Kafka對環(huán)境的適應和機器的利用效率要強于RocketMQ。

保證Consumer消費消息的可靠性

enable.auto.commit=false 關閉自動提交位移,消息處理完成之后再提交offset

每個Consumer Group獨立維護offset,互不干擾,不存在線程安全問題。

RocketMQ:

多個Master模式同步刷盤(磁盤配置為RAID10)
多Msater多Slave模式,同步雙寫
Msater宕機Slave只提供存量消息的讀,后續(xù)的讀寫由其它Master承擔

負載均衡機制:

Kafka:

Partition的數量應該要大于對應Consumer的數量。并建議Partition的數量大于集群Broker的數量,這樣Leader partition可以均勻的分布在各個Broker中,最終使得集群負載均衡。

Producer和Topic下所有partition leader保持socket連接。Borker擴容時,Producer可以直接感知。

消息由producer直接通過socket發(fā)送到broker。producer決定消息被路由到哪個partition。可以采用、random、key-hash、輪詢等策略。Kafka提供了接口供用戶實現自定義Partition,用戶可以為每個消息指定一個partitionKey,通過這個key來實現一些hash分區(qū)算法。

Controller負責協(xié)調負載均衡
Kafka會依照默認和配置的策略,自動的均衡打散分布Partition、Leader和Follower。并在Broker宕機和擴容時自動觸發(fā)reloadbalancing,增加程序并行能力和高效。
創(chuàng)建Topic時可指定parition數量來適應不同Topic的消息量。

RocketMQ:

NameServer通過心跳監(jiān)控Broker,一旦Broker失聯,調整Topic跟Broker的對應關系
Broker通過心跳監(jiān)控Consumer,一旦Consumer失聯,如果該Consumer屬于某個消費組,則觸發(fā)消費組負載均衡

發(fā)送消息通過輪詢隊列的方式發(fā)送,每個隊列接收平均的消息量。
多個隊列可以部署在一臺機器上,也可以分別部署在多臺不同的機器
可通過控制臺命令配置Broker的Topic和隊列,實現調整負載

一個Consumer Group下包含多個Consumer實例,可以是多臺機器,也可以是多個進程,或者是一個進程的多個Consumer對象。
一個Consumer Group下的多個Consumer以均攤方式消費消息
Consumer數量要小于等于隊列數量,否則多余的Consumer將不能消費消息。

刷盤機制:

RocketMQ:

所有數據單獨儲存到commit Log ,同時只會寫一個文件,一個文件滿1G,再寫新文件,真正的完全順序寫盤。對最終用戶展現的隊列實際只儲存消息在Commit Log的位置信息。
隨機讀,讀取pagecache時,即使只訪問1K的消息,系統(tǒng)也會提前預讀出更多的數據,盡可能讓讀命中pagecache,減少IO操作,所以內存越大越好。

Kafka:

partition少的時候,基本上是順序寫;在partition特別多的時候,就變成了隨機寫,性能會急劇下降。盡量在業(yè)務上避免過多partition

保證消息順序:

盡可能從業(yè)務上避開消息的順序性
保證消息順序,前提是保證不丟消息,以及消息去重

在MQ的模型中,順序需要由3個階段去保障:

  • 消息被發(fā)送時保持順序

  • 消息被存儲時保持和發(fā)送的順序一致

  • 消息被消費時保持和存儲的順序一致

發(fā)送時保持順序意味著對于有順序要求的消息,用戶應該在同一個線程中采用同步的方式發(fā)送。存儲保持和發(fā)送的順序一致則要求在同一線程中被發(fā)送出來的消息A和B,存儲時在空間上A一定在B之前。而消費保持和存儲一致則要求消息A、B到達Consumer之后必須按照先A后B的順序被處理。

Kafka:

  • Producer端串行發(fā)送消息,max.in.flight.requests.per.connection = 1 限制客戶端在單個連接上能夠發(fā)送的未響應請求的個數。設置此值是1表示kafka broker在響應請求之前client不能再向同一個broker發(fā)送請求。
  • 所有發(fā)送的消息,用同一個key,這樣同樣的key會落在同一個partition里面。
  • consumer端,Kafka保證,1個partition只能被1個consumer消費。

RocketMQ:

  • produce在發(fā)送消息的時候,配置MessageQueueSelector,把消息發(fā)到同一個隊列(queue)中。
  • 消費者注冊消息監(jiān)聽器為MessageListenerOrderly,這樣就可以保證消費端只有一個線程去消費消息

分布式事務一致性:

保證at least once和業(yè)務上消息去重
將大事務拆分成小事務,通俗說
第一步、先保證本地事務和消息發(fā)送同時成功或失敗
第二步、如果第一步成功,確保消息被消費,同時遠程事務成功

第二步可通過消費狀態(tài)表確保消息成功消費,或者通過消息中間件的重試機制來實現
RocketMQ實現了重試隊列和死信隊列機制,死信隊列的消息需要人工干預處理
并且需要在業(yè)務上實現冪等和消息去重

實現分布式事務一致性的三種方式:

  • 事務消息表:
    先執(zhí)行本地事務。如果成功,記錄到事務消息表,通過定時輪詢事務消息表確保成功發(fā)送消息。第二步如上。

定時輪詢對數據庫讀寫壓力很大

  • 經典事務消息:
    將消息發(fā)送放在本地事務中,消息發(fā)送失敗則回滾事務。第二步如上。

發(fā)送消息可能響應遲緩,放在數據庫事務中值得考量

  • 類似于RocketMQ的分階段事務消息

RocketMQ:

首先發(fā)送Prepared消息,消息發(fā)送成功后才開始執(zhí)行本地事務。之后發(fā)送確認消息,本地事務成功則發(fā)送Commit消息,失敗則發(fā)送Rollback消息。

如果確認消息發(fā)送失???RocketMQ會定期掃描消息集群中的事務消息,這時候發(fā)現了Prepared消息,它會向消息發(fā)送者確認。
RocketMQ會根據發(fā)送端設置的策略來決定是回滾還是繼續(xù)發(fā)送確認消息。這樣就保證了消息發(fā)送與本地事務同時成功或同時失敗。
如果當前Producer宕機,RocketMQ會向該Producer所屬的生產組其他Producer發(fā)送確認消息。

java代碼中通過實現TransactionListener接口,來實現RocketMQ的事務消息機制。

public interface TransactionListener {
    /**
     * 發(fā)送prepare消息成功后回調該方法用于執(zhí)行本地事務
     * @param msg 回傳的消息,利用transactionId即可獲取到該消息的唯一Id
     * @param arg 調用send方法時傳遞的參數,當send時候若有額外的參數可以傳遞到send方法中,這里能獲取到
     * @return 返回事務狀態(tài),COMMIT:提交  ROLLBACK:回滾  UNKNOW:回調
     */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
    /**
     * @param msg 通過獲取transactionId來判斷這條消息的本地事務執(zhí)行狀態(tài)
     * @return 返回事務狀態(tài),COMMIT:提交  ROLLBACK:回滾  UNKNOW:回調
     */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

TransactionListener接口有兩個方法:
executeLocalTransaction方法用于在發(fā)送prepare消息成功后執(zhí)行本地事務。
checkLocalTransaction方法在確認消息發(fā)送失敗,RocketMQ掃描到Prepared消息并向消息發(fā)送者確認時調用,用來通知RocketMQ本地事務是否成功。

kafka:

不提供完整的事務一致性保證,需要使用者自行實現


本文草成,還有很多細節(jié)會逐步補充完善上去。

并且本人才疏學淺,在此拋磚引玉,如有錯漏,敬請不吝指正。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容