Kafka/RocketMQ生產(chǎn)者路由對比

一、Kafka分區(qū)機制

Kafka的消息組織方式實際上是三級結構:主題-分區(qū)-消息。主題下的每條消息只會保存在某一個分區(qū)中,而不會在多個分區(qū)中被保存多份。


kafka分區(qū).png
1. 默認分區(qū)策略
  • 如果指定了Key,那么默認實現(xiàn)按消息鍵保序策略,即相同Key 的消息會發(fā)送到同一個Partition 中;
  • 如果沒有指定Key,則在所有Partition 中使用輪詢策略;
2. 自定義策略

如果要自定義分區(qū)策略,你需要顯式地配置生產(chǎn)者端的參數(shù)partitioner.class,并編寫一個具體的類實現(xiàn)org.apache.kafka.clients.producer.Partitioner接口的partition方法;

int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

這里的topic、key、keyBytes、value和valueBytes都屬于消息數(shù)據(jù),cluster則是集群信息(比如當前Kafka集群共有多少主題、多少Broker等)

  • 隨機策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
  • 消息鍵保序策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
  • 根據(jù)Broker的特殊信息分區(qū)
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
3. 重試機制

調(diào)用KafkaProducer.send()發(fā)送消息,實際上只是把消息保存到RecordAccumulator中;后臺線程KafkaThread掃描到RecordAccumulator中有消息后,將消息發(fā)送到kafka集群;
如果發(fā)送失敗,那么判斷是否允許重試。如果允許重試,把消息再保存到RecordAccumulator中,等待后臺線程KafkaThread掃描再次發(fā)送;

二、RMQ發(fā)送消息路由機制

RMQ的消息組織方式為:主題-隊列-消息。主題下的每條消息只會保存在某一個隊列中,而不會在多個隊列中被保存多份。


1. 默認路由策略
public SendResult send(Message msg)

使用該方法發(fā)送消息時默認使用輪詢策略;Producer從namesrv獲取的到Topic_A路由信息TopicPublishInfo

// 主題的消息隊列
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
// 每選擇一次消息隊列,該值會自增1,達到Integer.MAX_VALUE則重置為0,用于選擇消息隊列
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();

每次獲取queue都會通過sendWhichQueue加一來實現(xiàn)對所有queue的輪詢;

2. 自定義路由策略
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)

使用該方法發(fā)送時需要實現(xiàn)MessageQueueSelector接口的select方法實現(xiàn)自定義路由策略;
例如:

  • 通過指定值hash
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    int value = arg.hashCode();
    if (value < 0) {
        value = Math.abs(value);
    }
    value = value % mqs.size();
    return mqs.get(value);
}
  • 隨機
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    int value = random.nextInt(mqs.size());
    return mqs.get(value);
}
3. 重試機制
  • 對于send(Message msg)方法發(fā)送的消息,如果發(fā)送失敗,默認重試2次,RocketMQ選擇隊列默認是通過MQFaultStrategy#selectOneMessageQueue來選擇一個的隊列,在未開啟延遲容錯的情況下,內(nèi)部會調(diào)用TopicPublishInfo#selectOneMessageQueue方法
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    // 消息第一次發(fā)送,直接輪詢
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        // 消息發(fā)送失敗重試,優(yōu)先選擇其他broker上的隊列
        int index = this.sendWhichQueue.getAndIncrement();
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int pos = Math.abs(index++) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        // 沒有其他broker可選,依然輪詢,有可能發(fā)到之前失敗的broker上
        return selectOneMessageQueue();
    }
}
  • 對于自定義路由策略或者指定MessageQueue發(fā)送的消息
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
public SendResult send(Message msg, MessageQueue mq)

只能根據(jù)自定義策略發(fā)送到特定的Broker上的某個特定的Queue中,如果發(fā)送失敗,重試失敗的可能依然很大,所以默認不進行重試。如果需要重試,需要業(yè)務方自己來做,例如通過一個for循環(huán)最多重試幾次。

----------over---------

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容