消息消費方式
Consumer分為兩種,PullConsumer和PushConsumer。從名字就可以看出一種是拉取的方式,一種是主動Push的方式。具體實現(xiàn)如下:
- PullConsumer,由用戶主動調(diào)用pull方法來獲取消息,沒有則返回
- PushConsumer,在啟動后,Consumer客戶端會主動循環(huán)發(fā)送Pull請求到broker,如果沒有消息,broker會把請求放入等待隊列,新消息到達后返回response。
所以本質(zhì)上,兩種方式都是通過客戶端Pull來實現(xiàn)的。
消費模式
Consumer有兩種消費模式,broadcast和Cluster,由初始化consumer時設置。對于消費同一個topic的多個consumer,可以通過設置同一個consumerGroup來標識屬于同一個消費集群。
- 在Broadcast模式下,消息會發(fā)送給group內(nèi)所有consumer。
- 在Cluster模式下,每條消息只會發(fā)送給group內(nèi)的一個consumer,但是集群模式的支持消費失敗重發(fā),從而保證消息一定被消費。
這篇文章主要看下PushConsumer,以Cluster模式消費的源碼是如何實現(xiàn)的。這種方式相對來說是最復雜的一種。
Consumer 例子
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
首先,初始化一個DefaultMQPushConsumer,并設置group name。然后設置訂閱的topic,一個consumer可以訂閱多個topic。
設置Listener,當新消息到來時會回調(diào)consumeMessage()方法,用戶通過實現(xiàn)這個方法來做業(yè)務邏輯處理。
最后啟動consumer,開始接收消息。
下面我們看下Consumer的啟動過程的代碼。
Consumer啟動
DefaultPushConsumer是DefaultPushConsumerImpl的Facade類,啟動直接調(diào)用DefaultPushConsumerImpl.start()方法
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
//1、基本的參數(shù)檢查,group name不能是DEFAULT_CONSUMER
this.checkConfig();
//2、將DefaultMQPushConsumer的訂閱信息copy到RebalanceService中
//如果是cluster模式,如果訂閱了topic,則自動訂閱%RETRY%topic
this.copySubscription();
//3、修改InstanceName參數(shù)值為PID
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
//4、新建一個MQClientInstance,客戶端管理類,所有的i/o類操作由它管理
//緩存客戶端和topic信息,各種service
//一個進程只有一個實例
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
//5、Queue分配策略,默認AVG
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
//6、PullRequest封裝實現(xiàn)類,封裝了和broker的通信接口
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
//7、消息被客戶端過濾時會回調(diào)hook
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
//8、consumer客戶端消費offset持久化接口
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING://廣播消息本地持久化offset
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING://集群模式持久化到broker
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
//9、如果是本地持久化會從文件中l(wèi)oad
this.offsetStore.load();
//10、消費服務,順序和并發(fā)消息邏輯不同,接收消息并調(diào)用listener消費,處理消費結果
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
//11、只啟動了清理等待處理消息服務
this.consumeMessageService.start();
//12、注冊(緩存)consumer,保證CID單例
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//13、啟動MQClientInstance,會啟動PullMessageService和RebalanceService
mQClientFactory.start();
log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
...
...
default:
break;
}
//14、從NameServer更新topic路由和訂閱信息
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();//如果是SQL過濾,檢查broker是否支持SQL過濾
//15、發(fā)送心跳,同步consumer配置到broker,同步FilterClass到FilterServer(PushConsumer)
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//16、做一次re-balance
this.mQClientFactory.rebalanceImmediately();
}
第2步,這里主要做兩件事,第一是將訂閱信息跟RebalanceImpl同步,這個類是consumer的核心邏輯實現(xiàn)類,這個后面會講到。另外一個就是如果consumer是cluster模式,并且訂閱了TopicA的消息,那客戶端會自動訂閱%RETRY%TopicA。
那這個%RETRY%開頭的topic是做什么的呢?我們知道consumer消費消息處理失敗的話,broker是會延時一定的時間重新推送的,重新推送不是跟其它新消息一起過來,而是通過單獨的%RETRY%的topic過來。
第4步,初始化一個MQClientInstance,這個跟producer共用一個實現(xiàn)。
第5步,對于同一個group內(nèi)的consumer,RebalanceImpl負責分配具體每個consumer應該消費哪些queue上的消息,以達到負載均衡的目的。Rebalance支持多種分配策略,比如平均分配、一致性Hash等(具體參考AllocateMessageQueueStrategy實現(xiàn)類)。默認采用平均分配策略(AVG)。
第7步,消息在broker端過濾后,到達客戶端consumer還會再檢查一次
第8步,consumer端會將消費進度保存下來,這樣可以保證在consumer重啟或者queue被分給集群內(nèi)其它consumer的時候能夠從上次的位置開始消費。對于broadcast的模式,采用文件的方式存到本地;cluster模式下,是同步到broker,由broker負責保存。
第10步,消息到達consumer后悔緩存到隊列中,ConsumeMessageService另起線程回調(diào)Listener消費。同時對于在緩存隊列中等待的消息,會定時檢查是否已超時,通知broker重發(fā)。
第16步,啟動RebalanceImpl,這里才真正開始的Pull消息的操作
在講Producer的時候已經(jīng)講過MQClientInstance的啟動過程中包含consumer的邏輯,下面再來看一下:
MQClientInstance啟動
首先看下初始化的部分
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
//前面的邏輯跟Producer相同
...
...
//1、Pull請求服務,異步發(fā)送請求到broker并負責將返回結果放到緩存隊列
this.pullMessageService = new PullMessageService(this);
//2、定時或者被觸發(fā)做subscribe queue的re-balance
this.rebalanceService = new RebalanceService(this);
//3、初始化一個自用的producer,`CLIENT_INNER_PRODUCER`
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
...
}
其中第三步中的自用producer,主要用于在消費失敗或者超時后發(fā)送重試的消息給broker。
下面看下啟動的過程中Consumer相關的部分:
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
...
// 1、Start various schedule tasks
this.startScheduledTask();
// 2、Start pull service,開始處理PullRequest
this.pullMessageService.start();
// 3、Start rebalance service
this.rebalanceService.start();
// 4、Start push service,consumer預留的producer,發(fā)送要求重新的消息
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
...
}
}
}
第1步,啟動的定時任務中,consumer相關的任務有兩個
//保存消費進度,廣播消息存在本地,集群消息上傳到所有的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
//對于`PushConsumer`,根據(jù)負載調(diào)整本地處理消息的線程池corePool大小
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
第3步,RebalanceService的任務主要是調(diào)用RebalanceImpl,來給consumer重新調(diào)整和分配queue。
- 定時觸發(fā)(20sec)做rebalance
- 接口觸發(fā),1)收到broker的consumer list發(fā)生變化通知后需要重新做負載均衡,比如同一個group中新加入了consumer或者有consumer下線;2)consumer啟動的時候
從以上的PushConsumer啟動邏輯可以看出,主要的消息讀取邏輯都是由RebalanceImpl完成的,通過調(diào)用doRebalance()來觸發(fā),下面看下具體實現(xiàn)。
RebalanceImpl觸發(fā)Pull消息
public void doRebalance(final boolean isOrder) {
//獲取該consumer的訂閱信息
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//循環(huán)針對所有訂閱的topic,做rebalance
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
//做完rebalance后,檢查是否有的queue已經(jīng)不歸自己負責消費,是的話就釋放緩存message的queue
this.truncateMessageQueueNotMyTopic();
}
主要的邏輯都是在rebalanceByTopic()中實現(xiàn)的:
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
...
...
break;
}
case CLUSTERING: {
//1、從路由信息中獲取topic對應所有的Queue
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//2、從broker獲取所有同一個group的所有Consumer ID
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
//3、將MQ和cid都排好序
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
//4、按照初始化是指定的分配策略,獲取分配的MQ列表
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
//5、更新rebalanceImpl中的processQueue用來緩存收到的消息,對于新加入的Queue,提交一次PullRequest
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
...
...
//6、同步數(shù)據(jù)到broker,通過發(fā)送一次心跳實現(xiàn)
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
第4步,同一個topic的消息會分布于集群內(nèi)的多個broker的不同queue上。同一個group下面會有多個consumer,分配策略AllocateMessageQueueStrategy的作用就是計算當前consumer應該消費哪幾個queue的消息。
第5步,根據(jù)分配策略分配到queue之后,會查看是否是新增的queue,如果是則提交一次PullRequest去broker拉取消息。
注:對于新啟動的consumer來說,所有的queue都是新添加的,所以所有queue都會觸發(fā)PullRequest。
下面來看下分配策略和Pull請求的提交過程。
Queue分配策略AllocateMessageQueueStrategy
系統(tǒng)默認使用AVG策略(AllocateMessageQueueAveragely),就是將該topic所有Queue按照broker和queueId從小到大做排列,按照consumer的數(shù)量平均分成幾份。然后每個consumer分到一份,按照consumer排序后的順序來領取。代碼實現(xiàn)如下:
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
//AVG size計算方法,mq數(shù)量<=consumer數(shù)量,size=1,這種情況是很少的
//否則size=mq數(shù)量/consumer數(shù)量,余數(shù)是幾則前幾個consumer的size+1,這樣所有的queue都會有consumer消費
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
//從第一個consumer開始分配,每個分avgSize個連續(xù)的Queue,
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
RocketMQ提供其它的queue分配策略:
- AVG_BY_CIRCLE, 跟AVG類似,只是分到的queue不是連續(xù)的。比如一共12個Queue,3個consumer,則第一個consumer接收queue1,4,7,9的消息。
- CONSISTENT_HASH,使用一致性hash算法來分配Queue,用戶需自定義虛擬節(jié)點的數(shù)量
- MACHINE_ROOM,將queue先按照broker劃分幾個computer room,不同的consumer只消費某幾個broker上的消息
- CONFIG,用戶啟動時指定消費哪些Queue的消息
提交Pull請求
通過上面的策略分配到queue之后,RebalanceImpl通過updateProcessQueueTableInRebalance()方法來檢查新加入queue并提交pull請求。
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic)) {
if (!mqSet.contains(mq)) {//不再消費這個Queue的消息
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {//保存offset并
it.remove();
changed = true;
log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
}
} else if (pq.isPullExpired()) {//超過max idle時間
switch (this.consumeType()) {
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
consumerGroup, mq);
}
break;
default:
break;
}
}
}
}
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {//如果是新加入的Queue
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
//從offset store中移除過時的數(shù)據(jù)
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
//獲取起始消費offset
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
//為新的Queue初始化一個ProcessQueue,用來緩存收到的消息
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
//對新加的queue初始化一個PullRequest
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
//分發(fā)pull request到PullMessageService,拉取消息
this.dispatchPullRequest(pullRequestList);
return changed;
}
從以上的代碼可以看出,RebalanceImpl每次都會檢查分配到的queue列表,如果發(fā)現(xiàn)有新的queue加入,就會給這個queue初始化一個緩存隊列,然后新發(fā)起一個PullRequest給PullMessageService執(zhí)行。由此可見,新增的queue只有第一次Pull請求時RebalanceImpl發(fā)起的,后續(xù)請求是在broker返回數(shù)據(jù)后,處理線程發(fā)起的。
消息拉取服務PullMessageService
這個服務就是一個單獨運行的線程,在收到Pull請求后異步執(zhí)行。
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
上面的真正的實現(xiàn)是在DefaultMQPushConsumerImpl.pullMessage()里面。
public void pullMessage(final PullRequest pullRequest) {
final ProcessQueue processQueue = pullRequest.getProcessQueue();
...
...
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
...
...
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
//1、如果堆積未處理的消息過多,則扔回PullMessageService,延時執(zhí)行(默認50ms)
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(...);
}
return;
}
//2、如果堆積消息的size過大,同上面的邏輯
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(...);
}
return;
}
//3、無序消息,消息offset跨度過大,同上面的流控邏輯
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(...);
}
return;
}
} else {
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
pullRequest, offset);
}
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
}
//4、檢查訂閱關系有沒有變化,有可能在延時期間,topic或者consumer的配置都發(fā)生了變化
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
final long beginTimestamp = System.currentTimeMillis();
//5、Pull Command發(fā)送后,返回結果處理
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
//6、消息預處理,客戶端再次過濾,set minOffset和maxOffset
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
//7、如果獲取到的消息數(shù)為0,則立即發(fā)起下一次pull
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
//8、消息放入ProcessQueue
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
//9、消費消息,調(diào)用messageListener處理,處理完成會通知ProcessQueue
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
//10、再次提交pull request
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL://Queue已經(jīng)不存在了
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
//存儲消費offset,從rebalance中移除ProcessQueue
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {//錯誤處理,延時重試(默認3sec)
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
}
以上邏輯中主要關注第5步,在消息返回后,會將消息放入ProcessQueue,然后通知ConsumeMessageService來異步處理消息,然后再次提交Pull請求。這樣對于用戶端來說,只有ConsumeMessageService回調(diào)listener這一步是可見的,其它都是透明的。
消息處理ConsumeMessageService
消息處理的邏輯比較簡單,就是回調(diào)Consumer啟動時注冊的Listener。無論Listener是否處理成功,消息都會從ProcessQueue中移除掉。我們看下對于Listener返回結果的處理方法。
ConsumeMessageConcurrentlyService.processConsumeResult()
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
...
...
switch (this.defaultMQPushConsumer.getMessageModel()) {
//broadcast模式,處理失敗,不做處理
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
//Cluster模式,將消息發(fā)回broker重新發(fā)送
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
//發(fā)回broker失敗,則再次嘗試本地消費
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
//將消費前緩存的消息清除
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
//更新offset
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
消息處理失敗后,consumer會將消息發(fā)給broker,broker會根據(jù)重試次數(shù)來重新投遞消息。sendback方法的實現(xiàn)如下
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
int delayLevel = context.getDelayLevelWhenNextConsume();
try {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}
return false;
}
首先會根據(jù)這是第幾次消費失敗,設置延時多長時間重新投遞,然后調(diào)用DefaultMQPushConsumerImpl.sendMessageBack()的方法。默認設置下,最多會重新投遞16次。
//consumer把沒有消費的消息提交給broker,broker會延時一段時間后重新發(fā)送
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
//首先嘗試直接發(fā)送CONSUMER_SEND_MSG_BACK命令給broker
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
//如果發(fā)送失敗,則把消息發(fā)送到%RETRY%topic,重新發(fā)送
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
}
}
一共有兩種方式讓broker重發(fā),先嘗試給broker發(fā)送send_msg_back的命令,如果失敗了,則通過consumer預留的producer給%RETRY%topic發(fā)送消息,前面consumer啟動的時候已經(jīng)講過,所有consumer都訂閱%RETRY%topic,所以等于是自己給自己發(fā)一條消息。
整體流程圖
以上就是整個consumer啟動和消息消費的流程圖,這個邏輯還是比producer要復雜很多的,下面畫了一個流程圖,希望可以幫助加深下理解。
