一、Kafka分區(qū)機制
Kafka的消息組織方式實際上是三級結構:主題-分區(qū)-消息。主題下的每條消息只會保存在某一個分區(qū)中,而不會在多個分區(qū)中被保存多份。

1. 默認分區(qū)策略
- 如果指定了Key,那么默認實現(xiàn)按消息鍵保序策略,即相同Key 的消息會發(fā)送到同一個Partition 中;
- 如果沒有指定Key,則在所有Partition 中使用輪詢策略;
2. 自定義策略
如果要自定義分區(qū)策略,你需要顯式地配置生產(chǎn)者端的參數(shù)partitioner.class,并編寫一個具體的類實現(xiàn)org.apache.kafka.clients.producer.Partitioner接口的partition方法;
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
這里的topic、key、keyBytes、value和valueBytes都屬于消息數(shù)據(jù),cluster則是集群信息(比如當前Kafka集群共有多少主題、多少Broker等)
- 隨機策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
- 消息鍵保序策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
- 根據(jù)Broker的特殊信息分區(qū)
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
3. 重試機制
調(diào)用KafkaProducer.send()發(fā)送消息,實際上只是把消息保存到RecordAccumulator中;后臺線程KafkaThread掃描到RecordAccumulator中有消息后,將消息發(fā)送到kafka集群;
如果發(fā)送失敗,那么判斷是否允許重試。如果允許重試,把消息再保存到RecordAccumulator中,等待后臺線程KafkaThread掃描再次發(fā)送;
二、RMQ發(fā)送消息路由機制
RMQ的消息組織方式為:主題-隊列-消息。主題下的每條消息只會保存在某一個隊列中,而不會在多個隊列中被保存多份。

1. 默認路由策略
public SendResult send(Message msg)
使用該方法發(fā)送消息時默認使用輪詢策略;Producer從namesrv獲取的到Topic_A路由信息TopicPublishInfo
// 主題的消息隊列
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
// 每選擇一次消息隊列,該值會自增1,達到Integer.MAX_VALUE則重置為0,用于選擇消息隊列
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
每次獲取queue都會通過sendWhichQueue加一來實現(xiàn)對所有queue的輪詢;
2. 自定義路由策略
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
使用該方法發(fā)送時需要實現(xiàn)MessageQueueSelector接口的select方法實現(xiàn)自定義路由策略;
例如:
- 通過指定值hash
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value = value % mqs.size();
return mqs.get(value);
}
- 隨機
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = random.nextInt(mqs.size());
return mqs.get(value);
}
3. 重試機制
- 對于
send(Message msg)方法發(fā)送的消息,如果發(fā)送失敗,默認重試2次,RocketMQ選擇隊列默認是通過MQFaultStrategy#selectOneMessageQueue來選擇一個的隊列,在未開啟延遲容錯的情況下,內(nèi)部會調(diào)用TopicPublishInfo#selectOneMessageQueue方法
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
// 消息第一次發(fā)送,直接輪詢
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
// 消息發(fā)送失敗重試,優(yōu)先選擇其他broker上的隊列
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
// 沒有其他broker可選,依然輪詢,有可能發(fā)到之前失敗的broker上
return selectOneMessageQueue();
}
}
- 對于自定義路由策略或者指定MessageQueue發(fā)送的消息
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
public SendResult send(Message msg, MessageQueue mq)
只能根據(jù)自定義策略發(fā)送到特定的Broker上的某個特定的Queue中,如果發(fā)送失敗,重試失敗的可能依然很大,所以默認不進行重試。如果需要重試,需要業(yè)務方自己來做,例如通過一個for循環(huán)最多重試幾次。
----------over---------