kafka consumer 通過偏移量來記錄消息的消費(fèi)進(jìn)度,當(dāng)consumer poll一次消息時(shí),
consumer內(nèi)部維護(hù)了一個(gè)指針,能夠探測(cè)到下一條要消費(fèi)的數(shù)據(jù),當(dāng)reblance的時(shí)候,
才會(huì)去GroupMetadata消費(fèi)者組的元數(shù)據(jù)里拿最近一次提交的offset當(dāng)初始o(jì)ffset。
通過消費(fèi)者組的機(jī)制,根據(jù)負(fù)載策略分配各消費(fèi)者的消費(fèi)分區(qū)以完成消費(fèi)負(fù)載,
默認(rèn)情況下采用平均分配
當(dāng)通過subscribe方法訂閱某些主題時(shí),此時(shí)該消費(fèi)者還未真正加入到訂閱組,只有當(dāng)
?consumeer#poll 方法被調(diào)用后,并且會(huì)向 broker 定時(shí)發(fā)送心跳包,如果 broker 在
?session.timeout.ms 時(shí)間內(nèi)未收到心跳包,則 broker 會(huì)任務(wù)該消費(fèi)者已宕機(jī),會(huì)將其剔除,
并觸發(fā)消費(fèi)端的分區(qū)重平衡。
消費(fèi)者也有活體鎖情況,就是消費(fèi)者正常與broker發(fā)送心跳,然后并沒有消費(fèi)進(jìn)展,要避免
這個(gè)消費(fèi)者占著分區(qū)不消費(fèi)的情況,可以通過配置max.poll.interval.ms參數(shù)觸發(fā)consumer的
pollTimeoutExpired(long now),原理就是通過減小我們poll的頻率,導(dǎo)致當(dāng)前時(shí)間-最近一次poll
時(shí)間大于max.poll.interval.ms,從而使整個(gè)消費(fèi)者退出消費(fèi)者組
Reblance 觸發(fā)條件
觸發(fā)reblance一般有三種情況,組成員數(shù)量發(fā)生變化、訂閱主題數(shù)量發(fā)生變化、訂閱主題
的分區(qū)數(shù)發(fā)生變化,最常見的是第一種情況,由于消費(fèi)者的心跳檢測(cè)判斷該消費(fèi)者該退出
亦或消費(fèi)者重啟后的加入,導(dǎo)致reblance
Reblance 通知機(jī)制
consumer的reblance是由broker的Coordinator協(xié)調(diào)器來完成整個(gè)消費(fèi)者組的重平衡的,
通過一個(gè)獨(dú)立的心跳線程檢測(cè)各消費(fèi)者的狀態(tài),當(dāng)Reblance觸發(fā),Coordinator想開始
一輪新的reblance,通過在心跳消息里封裝“REBALANCE_IN_PROGRESS”消息響應(yīng)
給各消費(fèi)者,從而開始重平衡,heartbeat.interval.ms是設(shè)置心跳間隔時(shí)間的參數(shù),同時(shí)
更多的是用來控制重平衡通知的頻率,想更快的讓消費(fèi)者組響應(yīng)重平衡就可以調(diào)小這個(gè)參數(shù)
消費(fèi)者組狀態(tài)機(jī)
通過消費(fèi)者組狀態(tài)機(jī),協(xié)調(diào)流轉(zhuǎn)整個(gè)reblance流程
Empty:組內(nèi)沒有成員,但是消費(fèi)者組可能存在已提交的位移數(shù)據(jù)且這些位移還未過期
Dead: 組內(nèi)無成員且已被Coordinator移除了
PreparingReblance: 準(zhǔn)備開始重平衡,所有成員必須重新請(qǐng)求加入消費(fèi)者組
CompletingReblance: 所有成員已經(jīng)加入,正在等待分配方案
Stable: 重平衡完成

一個(gè)消費(fèi)者組最開始是 Empty 狀態(tài),當(dāng)重平衡過程開啟后,它會(huì)被置于 PreparingRebalance
?狀態(tài)等待成員加入,之后變更到 CompletingRebalance 狀態(tài)等待分配方案,最后流轉(zhuǎn)到?
Stable 狀態(tài)完成重平衡
Consumer Reblance 流程
從上面的流程可以看出,重平衡第一步就是所有的消費(fèi)者重新加入消費(fèi)者組,這個(gè)過程
有一個(gè)超時(shí)時(shí)間,如果有成員在超時(shí)時(shí)間之內(nèi),無法完成加入組操作,它就會(huì)被排除在
這輪 Rebalance 之外,第二步是在消費(fèi)者中選出一個(gè)leader執(zhí)行重平衡策略
首先消費(fèi)者加入的時(shí)候,需要向Coordinator匯報(bào)自己的所有訂閱信息,收集完信息之后,
會(huì)將第一個(gè)加入的消費(fèi)者成為leader,然后Coordinator將收集到的訂閱信息發(fā)給leader,
leader根據(jù)各成員訂閱的主題以及各主題的分區(qū)數(shù),然后根據(jù)分配策略決定每個(gè)消費(fèi)者
對(duì)應(yīng)主題該消費(fèi)哪個(gè)分區(qū),然后發(fā)給Coordinator,然后Coordinator響應(yīng)給各個(gè)消費(fèi)者
完成分配工作。