7.1 分區(qū)分配策略
在 3.1 節(jié)中講述了消費(fèi)者與消費(fèi)組的模型,并且在默認(rèn)分區(qū)分配策略的背景下通過(guò)案例進(jìn) 行了具體的分析。 Kafka 提供了消費(fèi)者客戶端參數(shù) partition . assignrr陽(yáng)1t . strategy 來(lái)設(shè) 置消費(fèi)者與訂閱主題之間的分區(qū)分配策略。默認(rèn)情況下,此參數(shù)的值為 org.apache.kafka. clients.consumer.RangeAssignor,即采用 RangeAssignor分配策略。除此之外, Kafka還提供了另 外兩種分配策略: RoundRobinAssignor 和 StickyAssignor。 消費(fèi)者客戶端參數(shù) partitio口- assignment.strategy 可以配置多個(gè)分配策略,彼此之間以逗號(hào)分隔。
7.1 .1 RangeAssignor 分配策略
RangeAssignor 分配策略的原理是按照消費(fèi)者總數(shù)和分區(qū)總數(shù)進(jìn)行整除運(yùn)算來(lái)獲得一個(gè)跨度,然后將分區(qū)按照跨度進(jìn)行平均分配, 以保證 分區(qū)盡可 能均勻地分配 給所 有的消費(fèi)者 。 對(duì)于每一個(gè)主題 , RangeAssignor 策略會(huì)將消費(fèi)組內(nèi)所有訂閱這個(gè)主題的消費(fèi)者按照名稱的字典序排 序 , 然后為每個(gè)消費(fèi)者劃分固定的分區(qū)范圍,如果不夠平均分配,那么字典序靠前的消費(fèi)者會(huì) 被多分配一個(gè)分區(qū)。假設(shè) n=分區(qū)數(shù)/消費(fèi)者數(shù)量, m=分區(qū)數(shù)%消費(fèi)者數(shù)量,那么前 m 個(gè)消費(fèi)者每個(gè)分配 n+l 個(gè) 分區(qū),后面的(消費(fèi)者數(shù)量-m)個(gè)消費(fèi)者每個(gè)分配 n個(gè)分區(qū)。
除了第 3.1 節(jié)的示例,為了更加通俗地講解 RangeAssignor 策略 , 我們不妨再舉一些示例。 假設(shè)消費(fèi)組內(nèi)有 2個(gè)消費(fèi)者 co和 Cl,都訂閱了主題 tO和 tl,并且每個(gè)主題都有 4個(gè)分區(qū), 那 么訂閱的所有分區(qū)可以標(biāo)識(shí)為 : tOpO、 tOpl、 t0p2、 t0p3、 tlpO、 tlp1、 tlp2、 tlp3。最終的分配 結(jié)果為 :
消費(fèi)者 CO: tOpO、 tOpl, t lpO、 tlpl
消貨者 Cl : t0p2、 t0p3, tlp2、 tlp3
這樣分配得很均勻,那么這個(gè)分配策略能夠一直保持這種良好的特性嗎?我們不妨再來(lái)看 另一種情況。假設(shè)上面例子中 2個(gè)主題都只有 3個(gè)分區(qū),那么訂閱的所有分區(qū)可以標(biāo)識(shí)為: tOpO、 tOpl、 t0p2、 tlpO、 tlp 1、 tlp2。最終的分配結(jié)果為 :
消費(fèi)者 CO: tOpO、 tOpL tlpO、 tlpl
消費(fèi)者 Cl: t0p2, tlp2
可以明顯地看到這樣的分配并不均勻,如果將類似的情形擴(kuò)大, 則有可能出現(xiàn)部分消費(fèi)者 過(guò)載的情況。對(duì)此我們?cè)賮?lái)看另一種 RoundRobinAssignor策略的分配效果如何。
7.1.2 RoundRobinAssignor 分配策略
RoundRobinAssignor 分配策略的原理是將消費(fèi)組內(nèi)所有消費(fèi)者及消費(fèi)者訂閱的所有主題的分 區(qū)按照字典序 排序,然后通過(guò)輪詢方式逐 個(gè)將分區(qū)依次分配給每個(gè)消費(fèi)者。 RoundRobinAssignor 分配策略對(duì)應(yīng)的 partition.assignment.strategy 參數(shù)值為 org.apache.kafka.clients.consumer.RoundRobinAssignor。
如果同一個(gè)消費(fèi)組內(nèi)所有的消費(fèi)者的訂閱信息都是相同的,那么 RoundRobinAssignor分配 策略 的分區(qū)分配會(huì)是均勻的。舉個(gè)例子,假設(shè)消費(fèi)組中有 2 個(gè)消費(fèi)者 co 和 Cl,都訂閱了主題 tO 和 tl,并且每個(gè)主題都有 3 個(gè)分區(qū) , 那么訂閱的所有分區(qū)可以標(biāo)識(shí)為: tOpO、 tOpl、 t0p2、 tlpO、 tlpl、 tlp2。最終的分配結(jié)果為 :
消費(fèi)者 CO: tOpO、 t0p2, tlpl
消費(fèi)者 Cl: tOpl, tlpO、 tlp2
如果同一個(gè)消費(fèi)組內(nèi)的消費(fèi)者訂閱的信息是不相同的,那么在執(zhí)行分區(qū)分配的時(shí)候就不是 完全的輪詢分配,有可能導(dǎo)致分區(qū)分配得不均勻。如果某個(gè)消費(fèi)者沒(méi)有訂閱消費(fèi)組內(nèi)的某個(gè)主 題,那么在分配分區(qū)的時(shí)候此消費(fèi)者將分配不到這個(gè)主題的任何分區(qū)。
舉個(gè)例子,假設(shè)消費(fèi)組內(nèi)有 3個(gè)消費(fèi)者 CCO、 Cl 和 C刀,它們共訂閱了 3個(gè)主題(tO、 tl、 t刀,這 3個(gè)主題分別有 l、 2、 3個(gè)分區(qū),即整個(gè)消費(fèi)組訂閱了 tOpO、 tlpO、 tlpl、 t2p0、 t2pl、 t2p2這6個(gè)分區(qū)。 具體而言,消費(fèi)者co訂閱的是主題tO,消費(fèi)者Cl訂閱的是主題tO和tl. 消費(fèi)者 C2 訂閱的是主題 tO、 ti 和 t2, 那么最終的分配結(jié)果為 :
消費(fèi)者CO: 消費(fèi)者Cl: 消費(fèi)者C2:
tOpO
tlpO
tlpL t2p0、 t2pL t2p2
可以看到 RoundRobinAssignor策略也不是十分完美,這樣分配其實(shí)并不是最優(yōu)解,因?yàn)橥?全可以將分區(qū) tlpl 分配給消費(fèi)者 Cl。
7.1.3 StickyAssignor分配策略
我們?cè)賮?lái)看一下 StickyAssignor分配策略,“sticky”這個(gè)單詞可以翻譯為“黠性的”, Kafka從 0.11.x 版本開始引入這種分配策略,它主要有兩個(gè)目的 :
Cl )分區(qū)的分配要盡可能均勻 。
(2)分區(qū)的分配盡可能與上次分配的保持相同。
當(dāng)兩者發(fā)生沖突時(shí),第 一個(gè)目標(biāo)優(yōu)先于第二個(gè)目標(biāo) 。 鑒于這兩個(gè)目標(biāo) , StickyAssignor 分配 策略的具體實(shí)現(xiàn)要比 RangeAssignor 和 RoundRobinAssignor 這兩種分配策略要復(fù)雜得多 。 我們 舉例來(lái)看一下 StickyAssignor 分配策略的實(shí)際效果 。
假設(shè)消費(fèi)組內(nèi)有3個(gè)消費(fèi)者 (CO、 Cl和C2),它們都訂閱了4個(gè)主題 (tO、 tl、 t2、 t3) ' 并且每個(gè)主題有 2 個(gè) 分區(qū) 。 也就是說(shuō),整個(gè)消費(fèi)組訂閱了 tOpO、 tOpl、 tlpO、 tlpl、 t2p0、 t2pl、 t3p0、 t3pl這8個(gè)分區(qū)。 最終的分配結(jié)果如下:

這樣初看上去似乎與采用 RoundRobinAssignor分配策略所分配的結(jié)果相同,但事實(shí)是否真 的如此呢?再假設(shè)此時(shí)消費(fèi)者 Cl 脫離了消費(fèi)組,那么消費(fèi)組就會(huì)執(zhí)行再均衡操作,進(jìn)而消費(fèi) 分區(qū)會(huì)重新分配。如果采用 RoundRobinAssignor分配策略,那么此時(shí)的分配結(jié)果如下:

可以看到分配結(jié)果中保留了上一次分配中對(duì)消費(fèi)者 co和 C2 的所有分配結(jié)果,并將原來(lái)消 費(fèi)者 Cl 的“ 負(fù)擔(dān)”分配給了剩余的兩個(gè)消費(fèi)者 co 和 C2, 最終 co 和 C2 的分配還保持了均衡 。
如果發(fā)生分區(qū)重分配,那么對(duì)于同一個(gè)分區(qū)而言,有可能之前的消費(fèi)者和新指派的消費(fèi)者 不是同一個(gè),之前消費(fèi)者進(jìn)行到 一半的處理還要在新指派的消費(fèi)者中再次復(fù)現(xiàn)一遍,這顯然很 瑯費(fèi)系統(tǒng)資源。 StickyAssignor 分配策略如同其名稱中的“ sticky” 一樣,讓分配策略具備一定 的“勤性”,盡可能地讓前后兩次分配相同,進(jìn)而減少系統(tǒng)資源的損耗及其他異常情況的發(fā)生 。
到目前為止,我們分析的都是消 費(fèi)者的訂閱信息都是相同的情況,我們來(lái)看一下訂閱信息 不同的情況下的處理。
舉個(gè)例子,同樣消費(fèi)組內(nèi)有3個(gè)消費(fèi)者 CCO、 Cl和C2), 集群中有3個(gè)主題 CtO、 tl和 t2),這 3 個(gè)主題分別有 1、 2、 3 個(gè)分區(qū) 。 也就是說(shuō), 集群中有 tOpO、 tlpO、 tlpl、 t2p0、 t2pl、 t2p2這6個(gè)分區(qū)。 消費(fèi)者co訂閱了主題tO,消費(fèi)者Cl訂閱了主題tO和tl,消費(fèi)者C2訂閱了 主題tO、 ti和t2。
如果此時(shí)采用 RoundRobinAssignor分配策略,那么最終的分配結(jié)果如分配清單 7-1 所示(和 講述 RoundRobinAssignor分配策略時(shí)的一樣,這樣不妨贅述一下):

可以看到這才是一個(gè)最優(yōu)解(消費(fèi)者 co 沒(méi)有訂閱主題 tl 和 t2,所以不能分配主題 tl 和 t2 中的任何分區(qū)給它,對(duì)于消費(fèi)者 Cl 也可同理推斷〉 。
StickyAssignor分配策略比另外兩者分配策略而言顯得更加優(yōu)異,這個(gè)策略的 代碼實(shí)現(xiàn)也異常復(fù)雜 ,
7.1.4 自定義分區(qū)分配策略
讀者不僅可以任意選用 Kafka提供的 3種分配策略, 還可以自定義分配策略來(lái)實(shí)現(xiàn)更多可 選的功能 。自定義 的 分配策略 必 須要實(shí) 現(xiàn) org.apache.kafka.clients.consumer.intemals. PartitionAssignor接口。 PartitionAssignor接口的定義如下:
Subscription subscription(Set<String> top工cs); String name() ;

PartitionAssignor接口中定義了兩個(gè)內(nèi)部類: Subscription和 Assignment。
Subscription 類用來(lái)表示消費(fèi)者 的訂閱 信息,類中有兩 個(gè)屬性: topics 和 userData,分 別表示消費(fèi)者 的訂閱主題列表和用戶自 定義信息。 PartitionAssignor 接口通過(guò) subscription()方法 來(lái)設(shè)置消費(fèi)者自身相關(guān)的 Subscription 信 息,注意到此方法中只有 一 個(gè)參數(shù) topics , 與 Subscription 類中的 topics 的相呼應(yīng),但并沒(méi)有體現(xiàn)有關(guān) userData 的參數(shù) 。 為了增強(qiáng)用戶 對(duì)分配結(jié)果的控制,可以在 subscription()方法內(nèi)部添加 一 些影 響 分配的用戶自定義信息賦予 userData,比如權(quán)重、 IP 地址 、 host 或機(jī)架 Crack)等 。
舉個(gè)例子,在 subscription()方法 中提供機(jī)架 信息 ,標(biāo)識(shí)此消費(fèi)者所部署的機(jī)架位置,在分 區(qū)分配時(shí)可以根據(jù)分區(qū)的 leader 副本所在的機(jī)架位置來(lái)實(shí)施具體的分配,這樣可以讓消費(fèi)者與 所需拉取消息的 broker 節(jié)點(diǎn)處于同 一機(jī)架 。參考圖 7-1, 消費(fèi)者 consumer!和 brokerl 都部署在 機(jī)架 rackl 上,消 費(fèi)者 consumer2 和 broker2 都部署在機(jī)架 rack2 上 。 如果分區(qū)的分配不是機(jī)架 感知的,那么有可能與圖 7”1 (上半部分)中的分配結(jié)果一樣, consumerl 消費(fèi) broker2 中的分 區(qū),而 consumer2 消費(fèi) brokerl 中的分區(qū) ; 如果分區(qū)的分配是機(jī)架感知的 , 那么就會(huì)出現(xiàn)圖 7-1(下半部分〉的分配結(jié)果, consumer!消 費(fèi) broker! 中的分區(qū),而 consumer2 消費(fèi) broker2 中的 分區(qū),這樣相 比前一種情形,既可以減少消費(fèi)延時(shí),又可以減少跨機(jī)架帶寬的占用 。
再來(lái)說(shuō)一下 Assignment類,它用來(lái)表示分配結(jié)果信息, 類中也有兩個(gè)屬性: partitions 和 userData, 分別表示所分配到的分區(qū)集合和用戶自定義的數(shù)據(jù) 。 PartitionAssignor 接口中的 onAssignment()方法是在每個(gè)消費(fèi)者收到消費(fèi)組 leader 分配結(jié)果時(shí)的回調(diào)函數(shù),例如在 StickyAssignor 分配策略中就是通過(guò)這個(gè)方法保存當(dāng)前的分配方案,以備在下次消費(fèi)組再均衡
(rebalance)時(shí)可以提供分配參考依據(jù) 。

接口中的 name()方法用來(lái)提供分配策略的名稱,對(duì) Kafka 提供的 3 種分配策略而言, RangeAssignor 對(duì)應(yīng)的 protocol_name 為“ range”, RoundRobinAssignor 對(duì)應(yīng)的 protocol name 為“ roundrobin”, StickyAssignor 對(duì)應(yīng)的 protocol_name 為“ sticky”,所以自定義的分配策略 中 要注意命名的時(shí)候不要與己存在的分配策略發(fā)生沖突。這個(gè)命名用來(lái)標(biāo)識(shí)分配策略的名稱, 在后面所描述的加入消費(fèi)組及選舉消費(fèi)組 leader 的時(shí)候會(huì)有涉及 。
真正的分區(qū)分配方案的實(shí)現(xiàn)是在 assign()方法中,方法中的參數(shù) metadata表示集群的元數(shù)據(jù) 信息,而 subscriptions 表示消費(fèi)組內(nèi)各個(gè)消費(fèi)者成員的訂閱信息,最終方法返 回各個(gè)消費(fèi)者的 分配信息。
Kafka 還提供了一個(gè)抽象類 org.apache.kafka.clients.consumer.intemals.AbstractPartitionAssignor, 它可以簡(jiǎn)化實(shí)現(xiàn) PartitionAssignor 接口的工作,井對(duì) assign()方法進(jìn)行了詳細(xì)實(shí)現(xiàn), 其中會(huì)將 Subscription中的 us巳rData信息去掉后再進(jìn)行分配。 Kafka提供的 3種分配策略都繼承自這個(gè)抽 象類 。 如果開發(fā)人員在自定義分區(qū)分配策略時(shí)需要使用 userData 信息來(lái)控制分區(qū)分配的結(jié)果, 那么就不能直接繼承 AbstractPartitionAssignor 這個(gè)抽象類,而需 要直接實(shí)現(xiàn) PartitionAssignor 接口 。
下面筆者參考 Kafka 的 RangeAssignor 分配策略來(lái)自定義 一個(gè)隨機(jī) 的分配策略,這里筆者稱 之為 RandomAssignor,具體代碼實(shí)現(xiàn)如下:
package chapter7;
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
/**
*/
public class RandomAssignor extends AbstractPartitionAssignor {
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) {
Map<String, List<String>> consumersPerTopic =
consumersPerTopic(subscriptions);
Map<String, List<TopicPartition>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet()) {
assignment.put(memberId, new ArrayList<>());
}
//針對(duì)每一個(gè)主題進(jìn)行分區(qū)分配
for (Map.Entry<String, List<String>> topicEntry :
consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey();
List<String> consumersForTopic = topicEntry.getValue();
int consumerSize = consumersForTopic.size();
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null) {
continue;
}
//當(dāng)前主題下的所有分區(qū)
List<TopicPartition> partitions =
AbstractPartitionAssignor.partitions(topic,
numPartitionsForTopic);
//將每個(gè)分區(qū)隨機(jī)分配給一個(gè)消費(fèi)者
for (TopicPartition partition : partitions) {
int rand = new Random().nextInt(consumerSize);
String randomConsumer = consumersForTopic.get(rand);
assignment.get(randomConsumer).add(partition);
}
}
return assignment;
}
@Override
public String name() {
return "name";
}
private Map<String,List<String>> consumersPerTopic(Map<String,Subscription> consumerMetadata){
Map<String, List<String>> res = new HashMap<>();
for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
String consumerId = subscriptionEntry.getKey();
for (String topic : subscriptionEntry.getValue().topics()) {
put(res, topic, consumerId);
}
}
return res;
}
}
在使用時(shí), 消費(fèi)者客戶端需要添加相 應(yīng) 的 Properties 參數(shù) , 示例如 下 :
properties .put(ConsumerConfig . PARTITION ASSIGNMENT STRATEGY CONFIG,
RandomAssignor .cl ass.getName ());
這里只是演示如何自定義實(shí)現(xiàn)一個(gè)分區(qū)分配策略, RandomAssignor 的實(shí)現(xiàn)并不是特別理 想, 并不見(jiàn)得會(huì)比 Kafka 自身提供的 RangeAssignor之類的策略要好。
在第3章中陳述了一個(gè)事實(shí): 按照Kafka默認(rèn)的消費(fèi)邏輯設(shè)定, 一個(gè)分區(qū)只能被同一個(gè)消 費(fèi)組(ConsumerGroup) 內(nèi)的一個(gè)消費(fèi)者消費(fèi)。 但這一設(shè)定不是絕對(duì)的,我們可以通過(guò)自定義 分區(qū)分配策略使一個(gè)分區(qū)可以分配給多個(gè)消費(fèi)者消費(fèi)。
考慮一種極端情況, 同一消費(fèi)組 內(nèi)的任意消費(fèi)者都可以消費(fèi)訂閱主題的所有分區(qū), 從而實(shí) 現(xiàn)了一種“組內(nèi)廣播(消費(fèi))”的功能。 針對(duì)第3章中圖3-4的7個(gè)分區(qū)和3個(gè)消費(fèi)者的情形,
如果采用組內(nèi)廣播的分配策略 , 那么就會(huì)變成圖 7-2 中的這種分配結(jié)果。

import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class BroadcastAssignor extends AbstractPartitionAssignor {
@Override
public String name() {
return "broadcast";
}
private Map<String, List<String>> consumersPerTopic(
Map<String, Subscription> consumerMetadata) {
//(具體實(shí)現(xiàn)請(qǐng)參考 RandomAssignor 中的 consumersPerTopic()方法)
return null;
}
@Override
public Map<String, List<TopicPartition>> assign(
Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<String>> consumersPerTopic =
consumersPerTopic(subscriptions);
Map<String, List<TopicPartition>> assignment = new HashMap<>();
//Java8
subscriptions.keySet().forEach(memberId ->
assignment.put(memberId, new ArrayList<>()));
//針對(duì)每一個(gè)主題,為每一個(gè)訂閱的消費(fèi)者分配所有的分區(qū)
consumersPerTopic.entrySet().forEach(topicEntry->{
String topic = topicEntry.getKey();
List<String> members = topicEntry.getValue();
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null || members.isEmpty())
return;
List<TopicPartition> partitions = AbstractPartitionAssignor
.partitions(topic, numPartitionsForTopic);
if (!partitions.isEmpty()) {
members.forEach(memberId ->
assignment.get(memberId).addAll(partitions));
}
});
return assignment;
}
}
注意組內(nèi)廣播的這種實(shí)現(xiàn)方式會(huì)有一個(gè)嚴(yán)重的問(wèn)題一一默認(rèn)的消費(fèi)位移的提交會(huì)失效。所 有的消費(fèi)者都會(huì)提交它自身的消費(fèi)位移到 consumer_offsets 中 , 后提交的消費(fèi)位移會(huì)覆蓋前面 提交的消費(fèi)位移。
假設(shè)消費(fèi)者 consumerl 提交了分區(qū) tpO 的消 費(fèi)位移為 10, 這時(shí)消費(fèi)者 consumer2 緊接著提 交了同一分區(qū) tpO 的消費(fèi)位移為 12,如果此時(shí)消費(fèi)者 consumer!由于某些原因重啟了 ,那么 consum巳rl 就會(huì)從位移 12 之后重新開始消費(fèi),這樣 consumer! 就丟失了部分消息。
再考慮另一種情況,同樣消費(fèi)者 consumerl 提交了分區(qū) tpO 的消費(fèi)位移為 10, 這時(shí)消費(fèi)者 consumer2 緊接著提交了同 一分區(qū)的消費(fèi)位移為 8,如果此時(shí)消費(fèi)者 consumerl 由于某些原因重 啟了,那么 consumerl 就會(huì)從位移 8 之后重新開始消費(fèi),這樣 consumerl 就重復(fù)消費(fèi)了消息。 很多情形下 , 重復(fù)消費(fèi)少量消息對(duì)于上層業(yè)務(wù)應(yīng)用來(lái)說(shuō)可以忍受。但是設(shè)想這樣一種情況 , 消 費(fèi)組 內(nèi)的消 費(fèi)者對(duì)于分區(qū) tpO 的 消費(fèi)位移都在 100000 之后了, 此時(shí) 又有一 個(gè)新的消 費(fèi)者 consumer3 加入進(jìn)來(lái),消費(fèi)了部分消息之后提交了 tpO 的消費(fèi)位移為 9, 那么此時(shí)原消費(fèi)組內(nèi)的 任何消 費(fèi)者重啟都會(huì)從這個(gè)消 費(fèi)位移 9 之后再開始重新消費(fèi) ,這樣大量的重復(fù)消息會(huì)讓上層業(yè) 務(wù)應(yīng)用猝不及防,同樣會(huì)造成計(jì)算資源的浪費(fèi) 。
針對(duì)上述這種情況,如果要真正實(shí)現(xiàn)組內(nèi)廣播,則需要自己保存每個(gè)消費(fèi)者的消費(fèi)位移 。 筆者的實(shí)踐經(jīng)驗(yàn)是,可以通過(guò)將消費(fèi)位移保存到本地文件或數(shù)據(jù)庫(kù)中等方法來(lái)實(shí)現(xiàn)組內(nèi)廣播的 位移提交。
7.2 消費(fèi)者協(xié)調(diào)器和組協(xié)調(diào)器
了解了 Kafka 中消費(fèi)者的分區(qū)分配策略之后是否會(huì)有這樣的疑問(wèn):如果消費(fèi)者客戶端中配 置了兩個(gè)分配策略,那么以哪個(gè)為準(zhǔn)呢?如果有 多個(gè)消費(fèi)者,彼此所配置的分配策略并不完全 相同,那么以哪個(gè)為準(zhǔn)?多個(gè)消費(fèi)者之間的分區(qū)分配是需要協(xié)同的,那么這個(gè)協(xié)同的過(guò)程又是 怎樣的呢?這一切都是交由消費(fèi)者協(xié)調(diào)器( ConsumerCoordinator )和組協(xié)調(diào)器(GroupCoordinator)來(lái)完成的,它們之間使用 一套組協(xié)調(diào)協(xié)議進(jìn)行交互 。
7.2.1 舊版消費(fèi)者客戶端的問(wèn)題
消費(fèi)者協(xié)調(diào)器和組協(xié)調(diào)器的概念是針對(duì)新版的消費(fèi)者客戶端而言的, Kafka 建立之初并沒(méi) 有它們。舊版的消費(fèi)者客戶端是使用 ZooKeeper 的監(jiān)聽器( Watcher〕來(lái)實(shí)現(xiàn)這些功能的。
每個(gè)消費(fèi)組(<group>)在 ZooKeeper 中都維護(hù)了 一個(gè)/ consumers/<group>/ids 路徑, 在此路徑下使用臨時(shí)節(jié)點(diǎn)記錄隸屬于此消費(fèi)組的消費(fèi)者的唯 一標(biāo)識(shí)( consumerldString) , consumerldString由消費(fèi)者啟動(dòng)時(shí)創(chuàng)建。消費(fèi)者的唯一標(biāo)識(shí)由 consumer.id+主機(jī)名+時(shí)間戳+UUID 的部分信息構(gòu)成,其中 consumer.id 是舊版消 費(fèi) 者客戶端中的配置,相當(dāng)于新版客戶端中的 client.id。比如某個(gè)消費(fèi)者的唯一標(biāo)識(shí)為 consumerld localhost-1510734527562-64b377f5, 那么其中 consumerId 為指定的 consumer.id, localhost 為計(jì)算機(jī)的主機(jī)名, 1510734527562 代表時(shí)間戳,而 64b377f5 表示 UUID 的部分信息 。
參考圖 7-4,與/consumers/<group>/工ds 同級(jí)的還有兩個(gè)節(jié)點(diǎn): owners 和 offsets, /consumers/<group>/owner 路徑下記錄了分區(qū)和消費(fèi)者的對(duì)應(yīng)關(guān)系,/ consumers/ <group>/offsets 路徑下記錄了 此消 費(fèi)組在分區(qū)中對(duì)應(yīng)的消費(fèi)位移。

每個(gè) broker、 主題和分區(qū)在 ZooKeeper 中也都對(duì)應(yīng)一個(gè)路徑 : /brokers/ids/<id>記錄 了 host、 port及分配在此 broker上的主題分區(qū)列表; /brokers/topics/<topic>記錄了每 個(gè)分區(qū)的 leader 副本、 ISR 集合等信息 。/brokers/topics/<topic>/partitions/ <partition>/state記錄了當(dāng)前 leader副本、 leader epoch等信息。
每個(gè)消費(fèi)者在啟動(dòng)時(shí)都會(huì)在/ consumers/<group>/ids 和/brokers/ids 路徑上注冊(cè) 一個(gè)監(jiān)聽器。當(dāng)/consumers/<group>/ids 路徑下的子節(jié)點(diǎn)發(fā)生變化時(shí),表示消費(fèi)組中的消 費(fèi)者發(fā)生了變化;當(dāng)/ brokers/ids 路徑下的子節(jié)點(diǎn)發(fā)生變化時(shí),表示 broker 出現(xiàn)了增減 。 這 樣通過(guò) ZooKeeper所提供的 Watcher, 每個(gè)消費(fèi)者就可以監(jiān)昕消費(fèi)組和 Kafka集群的狀態(tài)了 。
這種方式下每個(gè)消費(fèi)者對(duì) ZooKeeper 的相關(guān)路徑分別進(jìn)行監(jiān)聽, 當(dāng)觸發(fā)再均衡操作時(shí), 一 個(gè)消費(fèi)組下的所有消費(fèi)者會(huì)同時(shí)進(jìn)行再均衡操作,而消費(fèi)者之間并不知道彼此操作的結(jié)果,這 樣可能導(dǎo)致 Kafka工作在一個(gè)不正確的狀態(tài)。 與此同時(shí),這種嚴(yán)重依賴于 ZooKeeper集群的做 法還有兩個(gè) 比較嚴(yán)重的問(wèn)題 。
(1)羊群效應(yīng)(HerdEffect) : 所謂的羊群效應(yīng)是指 ZooK巳eper中一個(gè)被監(jiān)聽的節(jié)點(diǎn)變化, 大量的 Watcher 通知被發(fā)送到客戶端 , 導(dǎo)致在通知期間的其他操作延遲,也有可能發(fā)生類似死 鎖的情況 。
(2)腦裂問(wèn)題( Split Brain) : 消費(fèi)者進(jìn)行再均衡操作時(shí)每個(gè)消費(fèi)者都與 ZooKeeper 進(jìn)行 通信以判斷消費(fèi)者或 broker變化的情況,由于 ZooKeeper本身的特性,可能導(dǎo)致在同一時(shí)刻各 個(gè)消 費(fèi)者獲取的狀態(tài)不一致 , 這樣會(huì)導(dǎo)致異常 問(wèn)題 發(fā)生。
7.2.2 再均衡的原理
新版的消費(fèi)者客戶端對(duì)此進(jìn)行了重新設(shè)計(jì) , 將全部消費(fèi)組分成多個(gè)子集 , 每個(gè)消費(fèi)組的子 集在服務(wù)端對(duì)應(yīng)一個(gè) GroupCoordinator 對(duì)其進(jìn)行管理, GroupCoordinator 是 Kafka 服務(wù)端 中用于 管理消費(fèi)組的組件。而消費(fèi)者客戶端中的 ConsumerCoordinator組件負(fù)責(zé)與 GroupCoordinator進(jìn) 行交互 。
Consum巳rCoordinator 與 GroupCoordinator 之間最重要 的職 責(zé)就是負(fù)責(zé)執(zhí)行消費(fèi)者再 均衡的 操作,包括前面提及的分區(qū)分配的工作也是在再均衡期間完成的。就目前而言 , 一共有 如下幾 種情形會(huì)觸發(fā)再均衡的操作 :
- 有新的消 費(fèi)者 加入 消費(fèi)組。
- 有消費(fèi)者若機(jī)下線 。 消費(fèi)者并不一定需要真正下線,例如遇到長(zhǎng)時(shí)間的 GC、網(wǎng)絡(luò)延 遲導(dǎo)致消費(fèi)者長(zhǎng)時(shí)間未向 GroupCoordinator 發(fā)送心跳等情況時(shí), GroupCoordinator 會(huì)認(rèn) 為消費(fèi)者己經(jīng)下線。
- 有消費(fèi)者主動(dòng)退出消費(fèi)組(發(fā)送 LeaveGroupRequest 請(qǐng)求) 。 比如客戶端調(diào)用了 unsubscrible()方法取消對(duì)某些主題 的訂閱 。
- 消費(fèi)組所對(duì)應(yīng)的 GroupCoorinator節(jié)點(diǎn)發(fā)生了變更。
- 消費(fèi)組內(nèi)所訂閱的任一主題或者主題的分區(qū)數(shù)量發(fā)生變化。
下面就以一個(gè)簡(jiǎn)單的例子來(lái)講解一下再均衡操作的具體內(nèi)容 。當(dāng)有消費(fèi)者加入消費(fèi)組時(shí),消費(fèi)者、消費(fèi)組及組協(xié)調(diào)器之間會(huì)經(jīng)歷 一 下 幾個(gè) 階段。
第一階段( FIND_COORDINATOR)
消費(fèi)者需要確定它所屬的消費(fèi)組對(duì)應(yīng)的 GroupCoordinator所在的 brok巳r,并創(chuàng)建與該 broker 相互通信的網(wǎng)絡(luò)連接 。 如果消 費(fèi)者 己經(jīng)保存了與消費(fèi)組對(duì)應(yīng)的 GroupCoordinator 節(jié)點(diǎn)的信息, 并且與它之間的網(wǎng)絡(luò)連接是正常的,那么就可以進(jìn)入第二階段。否則, 就需要向集群中 的某個(gè) 節(jié)點(diǎn)發(fā)送 FindCoordinatorRequest請(qǐng)求來(lái)查找對(duì)應(yīng)的 GroupCoordinator, 這里的“某個(gè)節(jié)點(diǎn)” 并 非是集群中的任意節(jié)點(diǎn),而是負(fù)載最小的節(jié)點(diǎn),即 2.2.2 節(jié)中的leastLoadedNode。
如圖 7-5 所示, FindCoordinatorRequest請(qǐng)求體中只有兩個(gè)域( Field) : coordinator key 和 coord工nator_typeo coordinator_key 在這里就是消費(fèi)組的名稱 , 即 groupid, coordinator type 置為 0。

Kafka 在收到 FindCoordinatorRequest 請(qǐng)求之后,會(huì)根據(jù) coordinator_key (也就是 groupld)查找對(duì)應(yīng)的 GroupCoordinator 節(jié)點(diǎn),如果找到對(duì)應(yīng)的 GroupCoordinator 則會(huì)返回其相 對(duì)應(yīng)的 node_id、 host 和 port 信息。
具體查找 GroupCoordinator的方式是先根據(jù)消費(fèi)組 groupid的晗希值計(jì)算_consumer_offsets 中的分區(qū)編號(hào),具體算法如代碼清單 7-1 所示。 :
代碼清單 7-1 消費(fèi)組所對(duì)應(yīng)的分區(qū)號(hào)的計(jì)算方式 Utils.abs(groupid.hashCode ) % groupMetadataTopicPartit 工 onCount
其中 groupid.hashCode 就是使用 Java 中 String 類的 hashCode()方法獲得的, groupMetadataTopicPartitio口Cou口t 為主題 consumer一offsets 的分區(qū)個(gè)數(shù),這個(gè)可以 通過(guò) broker端參數(shù) offsets.topic.num.partitions 來(lái)配置,默認(rèn)值為 50。
找到對(duì)應(yīng)的 consumer offsets 中的分區(qū)之后,再尋找此分區(qū) leader副本所在的 broker節(jié)點(diǎn), 該 broker節(jié)點(diǎn)即為這個(gè) groupld所對(duì)應(yīng)的 GroupCoordinator節(jié)點(diǎn)。消費(fèi)者 groupId最終的分區(qū)分 配方案及組內(nèi)消費(fèi)者所提交的消費(fèi)位移信息都會(huì)發(fā)送給此分區(qū) leader副本所在的 broker節(jié)點(diǎn), 讓此 broker節(jié)點(diǎn)既扮演 GroupCoordinator 的角色,又扮演保存分區(qū)分配方案和組內(nèi)消費(fèi)者位移 的角色,這樣可以省去很多不必要的中間輪轉(zhuǎn)所帶來(lái)的開銷 。
第二階段(JOIN GROUP)
在成功找到消費(fèi)組所對(duì)應(yīng)的 GroupCoordinator 之后就進(jìn)入加入消費(fèi)組的階段,在此階段的 消費(fèi)者會(huì)向 GroupCoordinator 發(fā)送 JoinGroupRequest請(qǐng)求,并處理響應(yīng)。
如圖 7-6所示, JoinGroupR巳quest的結(jié)構(gòu)包含多個(gè)域:
- group_id 就是消費(fèi)組的 id,通常也表示為 groupld。
- sessioηtimout 對(duì)應(yīng)消費(fèi)端參數(shù) sess工on.timeout.ms,默認(rèn)值為 10000,即10 秒 。 GroupCoordinator 超過(guò) session_timeout 指 定 的時(shí)間內(nèi)沒(méi)有收到心跳報(bào)文則 認(rèn)為此消 費(fèi)者已經(jīng)下線。
- rebalance timeout 對(duì)應(yīng)消費(fèi)端參數(shù) max .poll . interval.ms , 默認(rèn)值 為 300000,即 5 分鐘 。表示當(dāng)消費(fèi)組再平衡的時(shí)候, GroupCoordinator 等待各個(gè)消費(fèi)者 重新加入的最長(zhǎng)等待時(shí)間 。
- member id 表示 GroupCoordinator 分配給消費(fèi)者的 id 標(biāo)識(shí)。 消費(fèi)者第一次發(fā)送 JoinGroupRequest請(qǐng)求的時(shí)候此字段設(shè)置為 nulla
- protocol_type 表示消費(fèi)組實(shí)現(xiàn)的協(xié)議,對(duì)于消費(fèi)者而言此字段值為“consumer”。

JoinGroupRequest 中的 group protocols 域?yàn)閿?shù)組類型,其中可以囊括多個(gè)分區(qū)分配策 略,這個(gè)主要取決于消 費(fèi)者客戶端參數(shù) pa:::-titio口 .assignment. strategy 的配置。 如果 配置了多種策略,那么 JoinGroupRequest 中就會(huì)包含多個(gè) protocol name 和 protocol metadata。其中 protocol name 對(duì)應(yīng)于 PartitionAssignor 接口中的 name()方法,我們?cè)谥v 述消費(fèi)者分區(qū)分配策略的時(shí)候提及過(guò)相關(guān)內(nèi)容(參考 7.1.4 節(jié)) 。 而 protocol metadata 和 PartitionAssignor接口中的 subscription()方法有直接關(guān)系, protocol_metadata 是一個(gè) bytes 類型,其實(shí)質(zhì)上還可以更細(xì)粒度地劃分為 version、 topics 和 user data,如圖 7-7 所示。

version 占 2個(gè)字節(jié),目前其值固定為 0; topics 對(duì)應(yīng) PartitionAssignor接口的 subscription() 方法返回值類型 Subscription 中的 topics,代表一個(gè)主題列表; user_data 對(duì)應(yīng) Subscription 中 的 userData,可以為空 。
如果是原有的消費(fèi)者重新加入消費(fèi)組,那么在真正發(fā)送 JoinGroupRequest 請(qǐng)求之前還要執(zhí) 行一些準(zhǔn)備工作:
(1)如果消費(fèi)端參數(shù)enable.auto.commit設(shè)置為true(默認(rèn)值也為true), 即開啟自 動(dòng)提交位移功能,那么在請(qǐng)求加入消費(fèi)組之前需要向 GroupCoordinator 提交消費(fèi)位移。這個(gè)過(guò) 程是阻塞執(zhí)行的,要么成功提交消費(fèi)位移,要么超時(shí)。
(2)如果消 費(fèi)者添加了自定義的再均衡監(jiān)聽器( ConsumerRebalanceListener),那么此時(shí) 會(huì)調(diào)用 onPartitionsRevoked()方法在重新加入消費(fèi)組之前實(shí)施自定義的規(guī)則邏輯,比如清除一些 狀態(tài),或者提交消費(fèi)位移 等。
(3)因?yàn)槭侵匦录尤胂M(fèi)組,之前與 GroupCoordinator節(jié)點(diǎn)之間的心跳檢測(cè)也就不需要了, 所以在成功地重新加入消費(fèi)組之前 需要禁止 心跳檢測(cè)的 運(yùn)作。
消費(fèi)者在發(fā)送 JoinGroupRequest 請(qǐng)求之后會(huì)阻塞等待 Kafka 服務(wù)端的響應(yīng)。服務(wù)端在收到JainGroupRequest 請(qǐng)求 后會(huì)交由 GroupCoordinator 來(lái)進(jìn)行處理 。 GroupCoordinator 首先 會(huì)對(duì) JoinGroupRequest 請(qǐng)求做合法性校驗(yàn),比如 group 工d 是否為空、當(dāng)前 broker 節(jié)點(diǎn)是否是請(qǐng)求 的消費(fèi)者組所對(duì)應(yīng)的組協(xié)調(diào)器、 rebalance timeout 的值是否在合理的范圍之內(nèi)。如果消費(fèi) 者是第一次請(qǐng)求加入消費(fèi)組,那么 JoinGroupRequest 請(qǐng)求中的 member_id 值為 null,即沒(méi)有 它自身的唯一標(biāo)志,此時(shí)組協(xié)調(diào)器負(fù)責(zé)為此消費(fèi)者生成一個(gè) member id。這個(gè)生成的算法很 簡(jiǎn)單,具體如以下偽代碼所示。
Stringmemberid= clientid+ ”+ UUID.randomUUID().toStr工ng();
其中 clientld 為消費(fèi)者客戶端的 clientld,對(duì)應(yīng)請(qǐng)求頭中的 client id。由此可見(jiàn)消費(fèi)者的member 工d 由 clientld 和 UUID 用“-” 字符拼接而成。
選舉消費(fèi)紐的 leader
GroupCoordinator 需要為消費(fèi)組內(nèi)的消費(fèi)者選舉出一個(gè)消費(fèi)組的 leader,這個(gè)選舉的算法也 很簡(jiǎn)單,分兩種情況分析。如果消費(fèi)組內(nèi)還沒(méi)有 leader,那么第一個(gè)加入消費(fèi)組的消費(fèi)者即為 消費(fèi)組的 leader。如果某一時(shí)刻 leader 消費(fèi)者由于某些原因退出了消費(fèi)組,1 那么會(huì)重新選舉一 個(gè)新的 leader,這個(gè)重新選舉 leader 的過(guò)程又更“隨 意”了,相關(guān)代碼如下 :
//scala code.
pr工vate val members = new mutable.HashMap[String, MemberMetadata] var leaderid = members.keys.head
解釋一下這 2 行代碼:在 GroupCoordinator 中消費(fèi)者的信息是以 HashMap 的形式存儲(chǔ)的,其中 key 為消 費(fèi)者的 member id,而 value 是消 費(fèi)者相關(guān)的元數(shù)據(jù)信息。 leaderld 表示 leader 消費(fèi)者的 member id,它的取值為 HashMap 中的第一個(gè)鍵值對(duì)的 key,這種選舉的方式基本 上和隨機(jī)無(wú)異。 總體上來(lái)說(shuō),消費(fèi)組的 l巳ader選舉過(guò)程是很隨意的。
選舉分區(qū)分配某咯
每個(gè)消費(fèi)者都可以設(shè)置自己的分區(qū)分配策略,對(duì)消費(fèi)組而言需要從各個(gè)消費(fèi)者呈報(bào)上來(lái)的 各個(gè)分配策略中選舉一個(gè)彼此都“信服”的策略來(lái)進(jìn)行整體上的分區(qū)分配 。 這個(gè)分區(qū)分配的選 舉并非由 leader消費(fèi)者決定,而是根據(jù)消費(fèi)組內(nèi)的各個(gè)消費(fèi)者投票來(lái)決定的。這里所說(shuō)的 “根據(jù)組內(nèi)的各個(gè)消費(fèi)者投票來(lái)決定”不是指 GroupCoordinator 還要再與各個(gè)消費(fèi)者進(jìn)行進(jìn)一步交 互,而是根據(jù)各個(gè)消費(fèi)者呈報(bào)的分配策略來(lái)實(shí)施。最終選舉的分配策略基本上可以看作被各個(gè) 消費(fèi)者支持的最多的策略,具體的選舉過(guò)程如下:
(1)收集各個(gè)消費(fèi)者支持的所有分配策略,組成候選集 candidates。 (2)每個(gè)消費(fèi)者從候選集 candidates 中找出第一個(gè)自身支持的策略,為這個(gè)策略投上一票。
(3)計(jì)算候選集中各個(gè)策略的選票數(shù),選票數(shù)最多的策略即為當(dāng)前消費(fèi)組的分配策略。
如果有消費(fèi)者并不支持選出的分配策略,那么就會(huì)報(bào)出異常 IllegalArgumentException: Member does not supp。此 protocol。 需要注意的是,這里所說(shuō)的“消費(fèi)者所支持的分配策略”是 指 partition.assignment.strategy 參數(shù)配置的策略,如果這個(gè)參數(shù)值只配置了 RangeAssignor, 那么這個(gè)消費(fèi)者客戶端只支持 RangeAssignor 分配策略,而不是消費(fèi)者客戶端 代碼中實(shí)現(xiàn)的 3 種分配策略及可能的自定義分配策略 。
在此之后, Kafka 服務(wù)端就要發(fā)送 JoinGroupResponse 響應(yīng)給各個(gè)消費(fèi)者, leader 消費(fèi)者和 其他普通消費(fèi)者收到的響應(yīng)內(nèi)容并不相同,首先我們看一下 JoinGroupResponse 的具體結(jié)構(gòu),如 圖 7-8 所 示。

JoinGroupRespons巳包含了多個(gè)域,其中 ge口eratio口一工d 用來(lái)標(biāo)識(shí)當(dāng)前消費(fèi)組的年代信息,避免受到過(guò)期請(qǐng)求的影響。 leader 工d 表示消費(fèi)組 leader 消費(fèi)者的 member id。
Kafka 發(fā)送給普通消費(fèi)者的 JoinGroupResponse 中的 members 內(nèi)容為空,而只有 leader 消 費(fèi)者的 JoinGroupResponse 中的 members 包含有效數(shù)據(jù)。members 為數(shù)組類型,其中包含各 個(gè)成員信息 。 member_metadata 為消費(fèi)者的訂閱信息,與 JoinGroupRequest 中的 protocol metadata 內(nèi)容相同,不同的是 JoinGroupR巳quest可以包含多個(gè)<protocol 口ame, protocol metadata>的鍵值對(duì),在收到 JoinGroupRequest 之后 , GroupCoordinator 已經(jīng)選舉 出唯一的分配策略。也就是說(shuō), protocol name 己經(jīng)確定( group protocol 〉 , 那么對(duì)應(yīng) 的 protocol metadata 也就確定了,最終各個(gè)消費(fèi)者收到的 JoinGroupResponse 響應(yīng)中的 member_metadata 就是這個(gè)確定了的 protocol_metadata。 由此可見(jiàn), Kafka 把分區(qū)分配 的具體分配交還給客戶端,自身并不參與具體的分配細(xì)節(jié),這樣即使以后分區(qū)分配的策略發(fā)生 了變更,也只需要重啟消費(fèi)端的應(yīng)用即可,而不需要重啟服務(wù)端。

第三階段( SYNC GROUP)
leader 消費(fèi)者根據(jù)在第二階段中選舉出來(lái)的分區(qū)分配策略來(lái)實(shí)施具體的分區(qū)分配,在此之 后需要將分配的方案同步給各個(gè)消費(fèi)者,此時(shí) leader 消費(fèi)者并不是直接和其余的普通消費(fèi)者同 步分配方案,而是通過(guò) GroupCoordinator 這個(gè)“中間人”來(lái)負(fù)責(zé)轉(zhuǎn)發(fā)同步分配方案的。 在第三 階段,也就是同步階段, 各個(gè)消 費(fèi)者會(huì) 向 GroupCoordinator 發(fā)送 SyncGroupRequest 請(qǐng)求來(lái)同步 分配方案,如圖 7-11 所示。

我們?cè)賮?lái)看一下SyncGroupRequest 請(qǐng)求的具體結(jié)構(gòu) ,如 圖 7-12 所示 。 SyncGroupRequest 中的 group id、 generation id 和 member id 前面都有涉及,這里不再贅述 。 只有 leader 消費(fèi)者發(fā)送的 SyncGroupRequest 請(qǐng)求中才包含具體的分區(qū)分配方案,這個(gè)分配方案保存在 group ass工gnment 中,而其余消費(fèi)者發(fā)送的 SyncGroupRequest請(qǐng)求中的 group assignment 為空。

group assignment是一個(gè)數(shù)組類型,其中包含了各個(gè)消費(fèi)者對(duì)應(yīng)的具體分配方案 :
member id 表示消費(fèi)者的唯一標(biāo)識(shí),而 member assignment 是與消費(fèi)者對(duì)應(yīng)的分配方案,它還可以做更具體的劃分,

服務(wù)端在收到消費(fèi)者發(fā)送的 SyncGroupRequest 請(qǐng)求之后 會(huì)交 由 GroupCoordinator 來(lái)負(fù)責(zé)具 體的邏輯處理。 GroupCoordinator 同樣會(huì)先對(duì) SyncGroupRequest 請(qǐng)求做合法性校驗(yàn),在此之后 會(huì)將從 leader 消費(fèi)者發(fā)送過(guò)來(lái)的分配方案提取出來(lái),連同整個(gè)消費(fèi)組的元數(shù)據(jù)信息一起存入 Kafka 的 consumer offsets 主題中 , 最后發(fā)送響應(yīng)給各個(gè)消費(fèi)者 以提供給各個(gè)消費(fèi)者各自所屬 的分配方案。
這里所說(shuō)的響應(yīng)就是指 SyncGroupRequest 請(qǐng)求對(duì)應(yīng) 的 SyncGroupResponse, SyncGroupResponse
的內(nèi)容很簡(jiǎn)單,里面包含的就是消費(fèi)者對(duì)應(yīng)的所屬分配方案, SyncGroupResponse 的結(jié)構(gòu)如圖
7-14 所示,具體字段的釋義可以從前面的內(nèi)容中推測(cè)出來(lái),這里就不贅述了。

當(dāng)消費(fèi)者收到所屬的分配方案之后會(huì)調(diào)用 PartitionAssignor 中的 onAssignment()方法。隨后再調(diào)用 ConsumerRebalanceListener 中的 OnPartitionAssigned()方法 。 之后開啟 心跳任務(wù) , 消費(fèi)者定期 向服 務(wù)端的 GroupCoordinator 發(fā)送 HeartbeatRequest 來(lái)確定彼此在線。
消費(fèi)組元數(shù)據(jù)信息
我們知道消費(fèi)者客戶端提交的消費(fèi)位移會(huì)保存在Kafka的_consumer_offsets主題中,這里也一樣,只不過(guò)保存的是消費(fèi)組的元數(shù)據(jù)信息(GroupMetadata)。
圖 7-15 中對(duì)應(yīng)的就是消費(fèi)組元數(shù)據(jù)信息的具體內(nèi)容格式,上面是消息的 key,下面是消息 的value??梢钥吹?key和 value中都包含 version字段,用來(lái)標(biāo)識(shí)具體的 key和 value的版本信 息 , 不同的版本對(duì)應(yīng)的內(nèi)容格式可能并不相同,就目前版本而言, key 的 version 為 2,而 value 的 version 為 1, 讀者在理解時(shí)其實(shí)可以忽略這個(gè)字段而探究其他具備特定含義的 內(nèi)容。 key 中除了 versio口就是 group 宇段,它表示消費(fèi)組的名稱,和 JoinGroupRequest 或 SyncGroupRequest 請(qǐng)求中的 group_id 是同 一個(gè)東西 。 雖然 key 中包含了 version 字段, 但確定這條信息所要存儲(chǔ)的分區(qū)還是根據(jù)單獨(dú)的 group 字段來(lái)計(jì)算的,這樣就可以保證消費(fèi)組 的元數(shù)據(jù)信息與消費(fèi)組對(duì)應(yīng)的 GroupCoordinator 處于同 一個(gè) broker 節(jié)點(diǎn)上,省去了中間輪轉(zhuǎn)的 開銷。

value 中包含的內(nèi)容有很多,可以參照和 JoinGroupRequest 或 SyncGroupRequest 請(qǐng)求中的內(nèi)容來(lái)理解,具體各個(gè)字段的釋義如下 。
- protocol type:消費(fèi)組實(shí)現(xiàn)的協(xié)議,這里的值為“ consumer”。 * generation:標(biāo)識(shí)當(dāng)前消費(fèi)組的年代信息,避免收到過(guò)期請(qǐng)求的影響。 protocol : 消費(fèi)組選取的分區(qū)分配策略。
- leader: 消費(fèi)組的 lead巳r消費(fèi)者的名稱。
- members: 數(shù)組類型,其中包含了消費(fèi)組的各個(gè)消費(fèi)者成員信息,圖 7-15 中右邊部分 就是消費(fèi)者成員的具體信息,每個(gè)具體字段都 比較容易辨 別, 需要著重說(shuō)明的是 subscription 和 assignment 這兩個(gè)字段, 分別代碼消費(fèi)者的訂閱信息和分配信息
第四階段( HEARTBEAT)
進(jìn)入這個(gè)階段之后,消 費(fèi)組中的所有消費(fèi)者就會(huì)處于正常工作狀態(tài)。在正式消費(fèi)之前 ,消 費(fèi)者還需要確定拉取消息的起始位置。假設(shè)之前已經(jīng)將最后的消費(fèi)位移提交到了 GroupCoordinator,并且GroupCoordinator將其保存到了Kafka內(nèi)部的一consumer_offsets主題中, 此時(shí)消費(fèi)者可以通過(guò) OffsetFetchRequest 請(qǐng)求獲取上次提交的消 費(fèi)位移并 從此處繼續(xù)消費(fèi) 。
消費(fèi)者通過(guò)向 GroupCoordinator 發(fā)送心跳來(lái)維持它們與消費(fèi)組的從屬關(guān)系,以及它們對(duì)分區(qū)的所有權(quán)關(guān)系。只要消費(fèi)者以正常的時(shí)間間隔發(fā)送 心跳 , 就被認(rèn)為是活躍的 ,說(shuō)明它還在讀 取分區(qū)中的消息。 心跳線程是一個(gè)獨(dú)立的線程,可以在輪詢消息的空檔發(fā)送心跳。如果消費(fèi)者停 止發(fā)送心跳的時(shí)間足夠長(zhǎng),則整個(gè)會(huì)話就被判定為過(guò)期, GroupCoordinator 也會(huì)認(rèn)為這個(gè)消費(fèi)者 己經(jīng)死亡,就會(huì)觸發(fā)一次再均衡行為。消費(fèi)者的心跳間隔時(shí)間由參數(shù) heartbeat.interval.ms 指定,默認(rèn)值為 3000,即 3 秒 , 這個(gè)參數(shù)必須比 session . timeout.ms 參數(shù)設(shè)定的值要小, 一般情況下 heartbeat. interval.ms 的配置值不能超過(guò) session.timeout.ms配置值的 1/3。這個(gè)參數(shù)可以調(diào)整得更低,以控制正常 重新平衡 的預(yù)期時(shí)間 。
消費(fèi)者通過(guò)向 GroupCoordinator 發(fā)送心跳來(lái)維持它們與消費(fèi)組的從屬關(guān)系,以及它們對(duì)分區(qū)的所有權(quán)關(guān)系。只要消費(fèi)者以正常的時(shí)間間隔發(fā)送 心跳 , 就被認(rèn)為是活躍的 ,說(shuō)明它還在讀 取分區(qū)中的消息。 心跳線程是一個(gè)獨(dú)立的線程,可以在輪詢消息的空檔發(fā)送心跳。如果消費(fèi)者停 止發(fā)送心跳的時(shí)間足夠長(zhǎng),則整個(gè)會(huì)話就被判定為過(guò)期, GroupCoordinator 也會(huì)認(rèn)為這個(gè)消費(fèi)者 己經(jīng)死亡,就會(huì)觸發(fā)一次再均衡行為。消費(fèi)者的心跳間隔時(shí)間由參數(shù) heartbeat.interval.ms 指定,默認(rèn)值為 3000,即 3 秒 , 這個(gè)參數(shù)必須比 session . timeout.ms 參數(shù)設(shè)定的值要小, 一般情況下 heartbeat. interval.ms 的配置值不能超過(guò) session.timeout.ms配置值的 1/3。這個(gè)參數(shù)可以調(diào)整得更低,以控制正常 重新平衡 的預(yù)期時(shí)間 。
如果一個(gè)消費(fèi)者發(fā)生崩潰,并停止讀取消息 , 那么 GroupCoordinator 會(huì)等待一小段時(shí)間 , 確認(rèn)這個(gè)消費(fèi)者死亡之后才會(huì)觸發(fā)再均衡。在這一小段 時(shí)間內(nèi), 死掉的消費(fèi)者井不會(huì)讀取分區(qū) 里的消息。這個(gè)一小段時(shí)間由 session . timeout.ms 參數(shù)控制,該參數(shù)的配置值必須在 broker 端參數(shù) group.m工n.sessio口.timeout.ms (默認(rèn)值為 6000,即 6 秒)和 group.max. session. timeout. ms (默認(rèn)值為 300000,即 5 分鐘)允許的范圍內(nèi)。
還有一個(gè)參數(shù) max.poll.interval.ms,它用來(lái)指定使用消費(fèi)者組管理時(shí) poll()方法調(diào) 用之 間的 最大延遲 ,也就是消費(fèi)者在獲取更多消息之前可以空閑的時(shí)間量的上限。如果 此超時(shí) 時(shí)間期滿之前 poll()沒(méi)有調(diào)用, 則消費(fèi)者被視為失敗,并且分組將重新平衡, 以便將分區(qū)重新分 配給別的成員。
除了被動(dòng)退 出消費(fèi)組,還可 以使用 Leav巳GroupRequest 請(qǐng)求主動(dòng)退出消費(fèi)組,比如客戶端調(diào)用了 unsubscrible()方法取消對(duì)某些主題的訂閱,這個(gè)比較簡(jiǎn)單,這里就不再贅述了 。
7.3 _consumer_offsets 剖析
位移提交是使用消費(fèi)者客戶端過(guò)程中一個(gè)比較“講究”的操作, 3.2.5 節(jié)也 使用了較大的篇 幅來(lái)介紹它。位移提交的內(nèi)容最終會(huì)保存到 Kafka 的內(nèi)部主題 consumer offsets 中,對(duì)于主題
consumer offsets 的深度掌握也可以讓我們更好地理解和使用好位移提交。
一般情況下,當(dāng)集群中第一次有消費(fèi)者消費(fèi)消息時(shí)會(huì)自動(dòng)創(chuàng)建主題 consumer offsets,不 過(guò)它的副本因子還受 offsets.topic.replication.factor參數(shù)的約束,這個(gè)參數(shù)的默認(rèn)值為 3 (下載安 裝的包中此值可能為 1),分區(qū)數(shù)可以通過(guò) offsets.topic.num.partitions參數(shù)設(shè)置,默認(rèn)為 50???戶端提交消費(fèi)位移是使用。他etCommitRequest 請(qǐng)求實(shí)現(xiàn)的, OffsetCommitRequest 的結(jié)構(gòu)如圖 7-16 所示 。

如果已經(jīng)掌握了 6.1 節(jié)和] 7.2 節(jié)的內(nèi)容,那么就很容易理解 OffsetCommitRequest 的結(jié)構(gòu) 。 請(qǐng)求體第一層中的 group id、 generation_id 和 member_id 在前面的內(nèi)容中已經(jīng)介紹過(guò) 多次了, retention time 表示當(dāng)前提交的消費(fèi)位移所能保留的時(shí)長(zhǎng),不過(guò)對(duì)于消費(fèi)者而言 這個(gè)值保持為 I。也就是說(shuō),按照 broker 端的配置 offsets . retention . minutes 來(lái)確定 保留時(shí)長(zhǎng) 。 offsets . retention .minutes 的默認(rèn)值為 10080,即 7 天,超過(guò)這個(gè)時(shí)間后消 費(fèi)位移的信息就會(huì)被刪除(使用墓碑消息和日志壓縮策略) 。 注意這個(gè)參數(shù)在 2.0.0 版本之前的 默認(rèn)值為 1440,即 1 天,很多關(guān)于消費(fèi)位移的異常也是由這個(gè)參數(shù)的值配置不當(dāng)造成的 。 有些 定時(shí)消費(fèi)的任務(wù)在執(zhí)行完某次消費(fèi)任務(wù)之后保存了消費(fèi)位移,之后隔了一段時(shí)間再次執(zhí)行消費(fèi)任務(wù),如果這個(gè)問(wèn)隔時(shí)間超過(guò) offsets.retent工on.minutes 的配置值,那么原先的位移信 息就會(huì)丟失, 最后只能根據(jù)客戶端參數(shù) auto . offset.reset 來(lái)決定開始消費(fèi)的位置,遇到 這種情況時(shí)就需要根據(jù)實(shí)際情況來(lái)調(diào)配 offsets.retention.minutes 參數(shù) 的值 。
OffsetCommitRequest 中的其余字段大抵也是按照分區(qū)的粒度來(lái)劃分消費(fèi)位移的 : topic 表 示主題名稱, partition 表示分區(qū)編號(hào)等。注意這里還有一個(gè) metadata 字段。在 3.2.5 節(jié)中 講到手動(dòng)位移提交時(shí)提到了可以通過(guò) Map<TopicPartition, OffsetAndMetadata> offsets 參數(shù)來(lái)指 定要提交的分區(qū)位移
同消費(fèi)組的元數(shù)據(jù)信息 一樣,最終提交的消費(fèi)位移也會(huì)以消息的形式發(fā)送至主題 _consumer_offsets,與消費(fèi)位移對(duì)應(yīng)的消息也只定義了 key 和 value 字段的具體內(nèi)容,它不依 賴于具體版本的消息格式,以此做到與具體的消息格式無(wú)關(guān) 。
圖 7”17 中展示了消費(fèi)位移對(duì)應(yīng)的消息內(nèi)容格式,上面是消息的 key,下面是消息的 value。
可以看到 key 和 value 中都包含了 version 宇段 ,這個(gè)用來(lái)標(biāo)識(shí)具體的 key 和 value 的版本信 息,不同的版本對(duì)應(yīng)的內(nèi)容格式可能并不相同 。就目前版本 而 言 , key 和 value 的 version 值 都為 l。 key 中除了 version 字段還有 group、 topic、 partition 字段,分別表示消費(fèi)組 的 groupId、 主題名稱和分區(qū)編號(hào)。雖然 key 中包含了 4 個(gè)字段,但最終確定這條消息所要存儲(chǔ) 的分區(qū)還是根據(jù)單獨(dú)的 group 字段來(lái)計(jì)算的,這樣就可以保證消費(fèi)位移信息與消費(fèi)組對(duì)應(yīng)的 GroupCoordinator 處于同 一個(gè) broker 節(jié)點(diǎn)上,省去了中間輪轉(zhuǎn)的開銷,這 一點(diǎn) 與消費(fèi)組的元數(shù) 據(jù)信息的存儲(chǔ)是一樣的 。
value 中包含了 5 個(gè)字段,除 version 宇段外,其余的 offset、 metadata、 commit
timestamp、 expire timestamp 宇段分別表示消費(fèi)位移、自定義的元數(shù)據(jù)信息、位移提交 到 Kafka 的時(shí)間戳、消費(fèi)位移被判定為超時(shí)的時(shí)間戳 。其 中 offset 和 metadata 與 OffsetCommitRequest 請(qǐng)求體中的 offset 和 metadata 對(duì)應(yīng),而 expire timestamp 和 OffsetCommitRequest 請(qǐng)求體中的 retention time 也有關(guān)聯(lián), commit timestamp 值與 offsets . retention .minutes 參數(shù)值之和即為 expire_timestamp (默認(rèn)情況下)。

在處理完消費(fèi)位移之后, Kafka返回 OffsetCommitResponse給客戶端 ,OffsetCommitResponse 的結(jié)構(gòu)如圖 7-18 所示 。 OffsetCornmitResponse 中各個(gè)域的具體含義可以通過(guò)前面內(nèi)容中推斷出 來(lái),這里就不再贅述了 。

我們可以通過(guò)kafka-console-consumer.sh腳本來(lái)查看 consumeroffsets中的內(nèi)容,不過(guò)要設(shè) 定 formatter 參數(shù)為 kafka.coordinator.group.GroupMetadataManager$0ffsetsMessageForrnatter。 假設(shè)我們要查看消費(fèi)組“consumerGroupid” 的位移提交信息 , 首先可 以根據(jù)代碼清單 7-1 中的 計(jì)算方式得出分區(qū)編號(hào)為 20, 然后查看這個(gè)分區(qū)中的消息,相關(guān)示例如下:

有時(shí)候在查看主題 consumer offsets 中的內(nèi)容時(shí)有可能出現(xiàn)下面這種情況:
[consumerGroupid, topic-offsets,21]: :null
這說(shuō)明對(duì)應(yīng)的消費(fèi)位移己經(jīng)過(guò)期了 。在 Kafka 中有一個(gè)名為“ delete-expired-group-metadata” 的定時(shí)任務(wù)來(lái)負(fù)責(zé)清理過(guò)期的消費(fèi)位移,這個(gè)定時(shí)任務(wù)的執(zhí)行周期由參數(shù) offsets . retention.check.interval . ms 控制,默認(rèn)值為 600000,即 10 分鐘。
事務(wù)
7.4.1 消息傳輸保障
一般而言,消息中間件的消息傳輸保障有 3個(gè)層級(jí),分別如下。
(1)at most once:至多 一次。消息可能會(huì)丟失,但絕對(duì)不會(huì)重復(fù)傳輸。
(2)at least once: 最少一次。消息絕不會(huì)丟失,但可能會(huì)重復(fù)傳輸。
(3) exactly once:恰好一次。每條消息肯定會(huì)被傳輸一次且僅傳輸一次。
Kafka 的消息傳輸保障機(jī)制非常直觀 。 當(dāng)生產(chǎn)者向 Kafka 發(fā)送消息時(shí),一旦消息被成功提 交到日志文件,由于多副本機(jī)制的存在,這條消息就不會(huì)丟失。如果生產(chǎn)者發(fā)送消息到 Kafka 之后,遇到了網(wǎng)絡(luò)問(wèn)題而造成通信中斷,那么生產(chǎn)者就無(wú)法判斷該消息是否己經(jīng)提交。雖然 Kafka 無(wú)法確定網(wǎng)絡(luò)故障期間發(fā)生了什么,但生產(chǎn)者可以進(jìn)行多次重試來(lái)確保消息已經(jīng)寫入 Kafka, 這個(gè)重試的過(guò)程中有可能會(huì)造成消息的重復(fù)寫入,所以這里 Kafka 提供的消息傳輸保障為 at least once。
對(duì)消費(fèi)者而言,消費(fèi)者處理消息和提 交消費(fèi)位移 的順序在很大程度上決定了消費(fèi)者提供哪 一種消息傳輸保障 。 如果消費(fèi)者在拉取完消息之后 ,應(yīng)用邏輯先處理消息后提交消費(fèi)位移 ,那 么在消息處理之后且在位移提交之前消費(fèi)者看機(jī)了,待它重新上線之后,會(huì)從上一次位移提交 的位置拉取,這樣就出現(xiàn)了重復(fù)消費(fèi),因?yàn)橛胁?分消息已經(jīng)處理過(guò)了只是還沒(méi)來(lái)得及提交消費(fèi) 位移,此時(shí)就對(duì)應(yīng) at least once。如果消費(fèi)者在拉完消息之后,應(yīng)用邏輯先提交消費(fèi)位移后進(jìn)行消息處理,那么在位移提交之后且在消息處理完成之前消費(fèi)者巖機(jī)了,待它重新上線之后,會(huì) 從己經(jīng)提交的位移處開始重新消費(fèi),但之前尚有部分消息未進(jìn)行消 費(fèi),如此就會(huì)發(fā)生消 息丟失, 此時(shí)就對(duì)應(yīng) atmost once。
Kafka 從 0.11.0.0 版本開始引 入了軍等和事務(wù)這兩個(gè)特性,以此來(lái)實(shí)現(xiàn) EOS ( exactly once semantics,精確 一次 處理語(yǔ)義) 。
7.4.2 幕等
所謂的幕等,簡(jiǎn)單地說(shuō)就是對(duì)接口的多次調(diào)用所產(chǎn)生的結(jié)果和調(diào)用 一次是一致 的 。生產(chǎn)者 在進(jìn)行重試的時(shí)候有可能會(huì)重復(fù)寫入消息,而使用 Kafka 的幕等性功能之后就可以避免這種情況。
開啟幕等性功能的方式很簡(jiǎn)單,只需要顯式地將生產(chǎn)者客戶端參數(shù) enable.idempotence 設(shè)置為 true 即可(這個(gè)參數(shù)的默認(rèn)值為 false),參考如下:
properties .put(ProducerConfig .ENABLE_IDEMPOTENCE CONFIG, true);
或者
properties.put ("enable . idempotence ” , true);
不過(guò)如果要確保軍等性功能正常,還需要確保生產(chǎn)者客戶端的 retries 、 acks 、 max.in. flight.requests.per. connect工on 這幾個(gè)參數(shù)不被配置錯(cuò)。實(shí)際上在使用幕等 性功能的時(shí)候,用戶完全可以不用配置(也不建議配置)這幾個(gè) 參數(shù)。
如果用戶顯式地指定了 retries 參數(shù),那么這個(gè)參數(shù)的值必須大于 0, 否則會(huì)報(bào)出 ConfigException:
如果用戶沒(méi)有顯式地指定 retries 參數(shù),那么 KafkaProducer 會(huì)將它置為 Integer.MAX_ VALUE。 同時(shí)還需要保證 max.in.flight.requests.per . connection 參數(shù)的值不能大 于 5 (這個(gè)參數(shù)的值默認(rèn)為 5, 在 2.2.1 節(jié)中有相關(guān)的介紹),否則也會(huì)報(bào)出 ConfigException:
如果用戶還顯式地指定了 acks 參數(shù),那么還需要保證這個(gè)參數(shù) 的值為一1 (all),如果不 為 1 (這個(gè)參數(shù)的值默認(rèn)為 1' 2.3 節(jié)中有相關(guān)的介紹),那么 也會(huì)報(bào)出 ConfigException:
org.apache.kafka.common.config.ConfigException: Must set acks to all in order
to use the jdempotent producer . Otherwise we cannot guarantee idempotence .
如果用戶沒(méi)有顯式地指定這個(gè) 參數(shù) ,那么 KafkaProducer 會(huì)將它置為 1。 開啟幕等性功能之 后 ,生 產(chǎn)者就可以如同未開啟幕等 時(shí) 一樣發(fā)送消息了。
為了實(shí)現(xiàn)生產(chǎn)者的幕等性,Kafka為此引入了 producerid(以下簡(jiǎn)稱 PID)和序列號(hào)(sequence number)這兩個(gè)概念,這兩個(gè)概念其實(shí)在 5.2.5 節(jié)中就講過(guò),分別對(duì)應(yīng) v2 版的日志格式中 RecordBatch 的 producer id 和 first seqe口ce 這兩個(gè)宇段(參考圖 5-7)。每個(gè)新的生產(chǎn) 者實(shí)例在初始化的時(shí)候都會(huì)被分配一個(gè) PID,這個(gè) PID 對(duì)用戶而言是完全透明的 。對(duì)于每個(gè) PID, 消息發(fā)送到的每一個(gè)分區(qū)都有對(duì)應(yīng)的序列號(hào),這些序列號(hào)從 0 開始單調(diào)遞增。生產(chǎn)者每發(fā)送一 條消息就會(huì)將<PID, 分區(qū)>對(duì)應(yīng)的序列號(hào)的值加 l。
broker 端會(huì)在內(nèi)存中為 每一對(duì)<PID,分區(qū)>維護(hù)一個(gè)序列號(hào)。對(duì)于收到的每一條消息,只有 當(dāng)它的序列號(hào)的值(SN new)比broker端中維護(hù)的對(duì)應(yīng)的序列號(hào)的值(SN old)大 1(即 SN new =SN old+1)時(shí), broker才會(huì)接收它。 如果SN new<SN old+I, 那么說(shuō)明消息被重復(fù)寫入, broker 可以直接將其丟棄 。 如果 SN new> SN old + l,那么說(shuō)明中間有數(shù)據(jù)尚未寫入, 出現(xiàn)了 亂序,暗示可能有消息丟失,對(duì)應(yīng)的生產(chǎn)者會(huì)拋出 OutOfOrderSequenceException,這個(gè)異常是 一個(gè)嚴(yán)重的異常,后續(xù)的諸如 send()、 beginTransaction()、 commitTransaction()等方法的調(diào)用都 會(huì)拋出 Illega!StateException 的異常 。
引入序列號(hào)來(lái)實(shí)現(xiàn)幕等也只 是針對(duì)每一對(duì)<PID, 分區(qū)>而言的,也就是說(shuō), Kafka 的霖等只 能保證單個(gè)生產(chǎn)者會(huì)話( session)中單分區(qū)的事等 。
ProducerRecord<String , String> record
=new ProducerRecord<>(topic, "key", ”msg” ) ;
producer . send(record) ;
producer . send (record ) ;
注意,上面示例中發(fā)送了兩條相同的消息,不過(guò)這僅僅是指消息 內(nèi) 容相同,但對(duì) Kafka 而 言是兩條不同 的消息,因?yàn)闀?huì)為這兩條消息分配不同的序列號(hào) 。 Kafka 并不會(huì)保證消息 內(nèi)容的 罪等。
7.4.3 事務(wù)
軍等性并不能跨多個(gè)分區(qū)運(yùn)作,而事務(wù)可以彌補(bǔ)這個(gè)缺陷。事務(wù)可以保證對(duì)多個(gè)分區(qū)寫操作的原子性。操作的原子性是指多個(gè)操作要么全部成功,要么全部失敗,不存在部分成功、部分失敗的可能。
對(duì)流式應(yīng)用( Stream Processing Applications )而 言 , 一 個(gè)典型的 應(yīng)用模 式為“ consume- transform-produce” 。在這種模式下消費(fèi)和生產(chǎn) 并存: 應(yīng)用程序從某個(gè)主題中消費(fèi)消息 , 然后經(jīng) 過(guò)一系列轉(zhuǎn)換后寫入另一個(gè)主題 ,消費(fèi)者可能在提交消費(fèi)位移的過(guò)程中出現(xiàn)問(wèn)題而導(dǎo)致重復(fù)消 費(fèi), 也有可能生產(chǎn)者重復(fù)生產(chǎn)消息 。 Kafka 中的事務(wù)可以使應(yīng)用程序?qū)⑾M(fèi)消息、生產(chǎn)消息 、 提交消費(fèi)位移當(dāng)作原子操作來(lái)處理,同時(shí)成功或失敗,即使該生產(chǎn)或消費(fèi)會(huì)跨多個(gè)分區(qū) 。
為了實(shí)現(xiàn)事務(wù),應(yīng)用程序必須提供唯一的 transactionalld,這個(gè) transactionalld 通過(guò)客戶端 參數(shù) transact工onal.id 來(lái)顯式設(shè)置,參考如下 :
properties.put(ProducerConfig.TRANSACTIONAL ID CONFIG,”transactionId”);
或者:
properties .put (”transactional .id”,”transactionid”),
事務(wù)要求生產(chǎn)者開啟幕等特性,因 此通過(guò)將 transactional . id 參數(shù)設(shè)置為非空從而開 啟事務(wù)特性的同時(shí) 需要將 enable.idempotence 設(shè)置為 true ( 如果未顯式設(shè)置 , 則 KafkaProducer 默認(rèn)會(huì)將它 的值設(shè) 置 為 true) ,如果用 戶顯式地將 enable . idempotence 設(shè)置 為 false,則會(huì)報(bào)出 ConfigException:
transactionalld 與 PID 一一對(duì)應(yīng),兩者之間所不同的是 transactionalld 由用戶顯式設(shè)置, 而 PID是由 Kafka內(nèi)部分配的。另外,為了保證新的生產(chǎn)者啟動(dòng)后具有相同 transactionalld的舊生 產(chǎn)者能夠立即失效 ,每個(gè)生產(chǎn)者通過(guò) transactionalld 獲取 PID 的 同時(shí),還會(huì)獲取一個(gè)單調(diào)遞增的 producer epoch (對(duì)應(yīng)下面 要講述 的 KafkaProducer.initTransactions()方法〉。如果使用 同 一 個(gè) transactionalld 開啟兩個(gè)生產(chǎn)者,那么前 一個(gè) 開啟的生產(chǎn)者會(huì)報(bào)出如下的錯(cuò)誤:
org . apache . kafka . common . errors . ProducerFencedExcept工on : Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalid, or the producer’s transaction has been expired by the broker.
從生產(chǎn)者 的角度分析,通過(guò)事務(wù) , Kafka 可以保證跨生產(chǎn)者會(huì)話的消息幕等發(fā)送,以及跨 生產(chǎn)者會(huì)話的事務(wù)恢復(fù) 。 前者表示具有相同 transactionalld 的新生產(chǎn)者實(shí)例被創(chuàng)建且工作的時(shí)候,舊的且擁有相同 transactionalld 的生產(chǎn)者實(shí)例將不再工作。后者指當(dāng)某個(gè)生產(chǎn)者實(shí)例君機(jī)后, 新的生產(chǎn)者實(shí)例可以保證任何未完成的舊事務(wù)要么被提交( Commit),要么被中止( Abo時(shí)), 如此可以使新的生產(chǎn)者實(shí)例從一個(gè)正常的狀態(tài)開始工作。
而從消費(fèi)者的角度分析, 事務(wù)能保證的語(yǔ)義相對(duì)偏弱。出于以下原因, Kafka 并不能保證 己提交的事務(wù)中的所有消息都能夠被消 費(fèi) :
- 對(duì)采用日志壓縮策略的主題而言,事務(wù)中的某些消息有可能被清理(相同 key 的消息, 后寫入的消息會(huì)覆蓋前面寫入的消息)。
- 事務(wù)中消息可能分布在同一個(gè)分區(qū)的多個(gè)日志分段( LogSegment)中,當(dāng)老的日志分 段被刪除時(shí),對(duì)應(yīng)的消息可能會(huì)丟失。
- 消費(fèi)者可以通過(guò) seekO方法訪問(wèn)任意 offset 的消息,從而可能遺漏事務(wù)中的部分消息。
- 消費(fèi)者在消費(fèi)時(shí)可能沒(méi)有分配到事務(wù)內(nèi)的所有分區(qū),如 此它也就不能讀取事務(wù)中的所 有消息。
initTransactions()方法用來(lái)初始化事務(wù),這個(gè)方法能夠執(zhí)行的前提是配置了 transactionalld, 如果沒(méi)有則會(huì)報(bào)出 IllegalStateException:
beginTransaction()方法用來(lái)開啟 事務(wù): sendOffsetsToTransaction()方法為消費(fèi)者提供在事務(wù) 內(nèi)的位移提交 的操作; commitTransaction()方法用來(lái)提交事務(wù) : abortTransaction()方法用來(lái)中止 事務(wù) ,類似于事務(wù)回滾 。
package chapter7;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* 代碼清單7-2
*/
public class TransactionOnlySend {
public static final String topic = "topic-transaction";
public static final String brokerList = "localhost:9092";
public static final String transactionId = "transactionId";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
producer.initTransactions();
producer.beginTransaction();
try {
//處理業(yè)務(wù)邏輯并創(chuàng)建ProducerRecord
ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1");
producer.send(record1);
ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2");
producer.send(record2);
ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "msg3");
producer.send(record3);
//處理一些其它邏輯
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.abortTransaction();
}
producer.close();
}
}
在消費(fèi)端有一個(gè)參數(shù) isolation.level,與事務(wù)有著莫大的關(guān)聯(lián),這個(gè)參數(shù)的默認(rèn)值為“ read uncommitted”, 意思是說(shuō)消費(fèi)端應(yīng)用可 以看到(消費(fèi)到)未提交的事務(wù), 當(dāng)然對(duì)于己提 交的事務(wù)也是可見(jiàn)的。這個(gè)參數(shù)還可以設(shè)置為“ read committed”,表示消費(fèi)端應(yīng)用不可以看到 尚未提交的事務(wù)內(nèi)的消息。舉個(gè)例子,如果生產(chǎn)者開啟事務(wù)并向某個(gè)分區(qū)值發(fā)送 3 條消息 msgl 、 msg2 和 msg3,在執(zhí)行 commitTransaction()或 abortTransaction()方法前,設(shè)置為“read_committed” 的消費(fèi)端應(yīng)用是消費(fèi)不到這些消息的,不過(guò)在 KafkaConsumer 內(nèi)部會(huì)緩存這些消息,直到生產(chǎn) 者執(zhí)行 commitTransaction()方法之后它才能將這些消息推送給消費(fèi)端應(yīng)用。反之,如果生產(chǎn)者 執(zhí)行了 abortTransaction()方法,那么 KafkaConsumer 會(huì)將這些緩存的消息丟棄而不推送給消費(fèi) 端應(yīng)用。
日志文件中除了普通的消息,還有 一種消息專門用來(lái)標(biāo)志 一個(gè)事務(wù) 的結(jié)束,它就是控制消息( Contro!Batch)??刂葡⒁还灿袃煞N類型 : COMMIT 和 ABORT,分別用來(lái)表征事務(wù)己經(jīng) 成功提交或己經(jīng)被成功中止。 KafkaConsumer 可以通過(guò)這個(gè)控制消息來(lái)判斷對(duì)應(yīng)的事務(wù)是被提 交了還是被中止了,然后結(jié)合參數(shù) isolation.level 配置的隔離級(jí)別來(lái)決定是否將相應(yīng)的消 息返回給消費(fèi)端應(yīng)用,如圖 7-19所示。注意 Contro!Batch對(duì)消費(fèi)端應(yīng)用不可見(jiàn),后面還會(huì)對(duì)它 有更加詳細(xì)的介紹。

本節(jié)開頭就提及了 consume-transform-produce 這種應(yīng)用模式 ,這里還涉及在代碼清單 7-2 中尚未使用的 s巳ndOffsetsToTransaction()方法。該模式的具體結(jié)構(gòu)如圖 7-20 所示。與此對(duì)應(yīng)的 應(yīng)用示例如代碼清單 7-3 所示。

package chapter7;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.*;
/**
* 代碼清單7-3
*/
public class TransactionConsumeTransformProduce {
public static final String brokerList = "10.198.197.73:9092";
public static Properties getConsumerProperties(){
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
return props;
}
public static Properties getProducerProperties(){
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalId");
return props;
}
public static void main(String[] args) {
//初始化生產(chǎn)者和消費(fèi)者
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(getConsumerProperties());
consumer.subscribe(Collections.singletonList("topic-source"));
KafkaProducer<String, String> producer =
new KafkaProducer<>(getProducerProperties());
//初始化事務(wù)
producer.initTransactions();
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
if (!records.isEmpty()) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
//開啟事務(wù)
producer.beginTransaction();
try {
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords
= records.records(partition);
for (ConsumerRecord<String, String> record :
partitionRecords) {
//do some logical processing.
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("topic-sink", record.key(),
record.value());
//消費(fèi)-生產(chǎn)模型
producer.send(producerRecord);
}
long lastConsumedOffset = partitionRecords.
get(partitionRecords.size() - 1).offset();
offsets.put(partition,
new OffsetAndMetadata(lastConsumedOffset + 1));
}
//提交消費(fèi)位移
producer.sendOffsetsToTransaction(offsets,"groupId");
//提交事務(wù)
producer.commitTransaction();
} catch (ProducerFencedException e) {
//log the exception
//中止事務(wù)
producer.abortTransaction();
}
}
}
}
}
注意 : 在使用 KafkaConsumer 的時(shí) 候要將 enable .auto .commit 參數(shù)設(shè)置為 false,代碼里也不能手動(dòng)提交消費(fèi)位移 。
為了實(shí)現(xiàn)事務(wù)的功能, Kafka還引入了事務(wù)協(xié)調(diào)器 CTransactionCoordinator)來(lái)負(fù)責(zé)處理事 務(wù),這一點(diǎn)可以類比一下組協(xié)調(diào)器( GroupCoordinator) 。每一個(gè)生產(chǎn)者都會(huì)被指派一個(gè)特定的 TransactionCoordinator,所有的事務(wù)邏輯包括分派 PID 等都是由 TransactionCoordinator 來(lái)負(fù)責(zé) 實(shí)施的。 TransactionCoordinator 會(huì)將事務(wù)狀態(tài)持久化到內(nèi)部主題 位ansaction state 中 。下面就以最復(fù)雜 的 consume-transform-produce 的流程 (參考圖 7-21 )為例來(lái)分析 Kafka 事務(wù)的 實(shí)現(xiàn)原 理。

1 查找 TransactionCoordinator
TransactionCoordinator 負(fù)責(zé)分配 PID 和管理事務(wù),因此生產(chǎn)者要做的第一件事情就是找出 對(duì)應(yīng)的 TransactionCoordinator 所在 的 broker 節(jié)點(diǎn) 。與查找 GroupCoordinator 節(jié)點(diǎn) 一樣 ,也是通 過(guò) FindCoordinatorRequest請(qǐng)求來(lái)實(shí)現(xiàn)的,只不過(guò) FindCoordinatorRequest 中的 coordinator_ type 就由原來(lái)的 0變成了 1,由此來(lái)表示與事務(wù)相關(guān)聯(lián)(FindCoordinatorRequest請(qǐng)求的具體結(jié) 構(gòu)參考圖 7-5)。
Kafka 在收到 FindCoorinatorRequest 請(qǐng)求之后 , 會(huì)根據(jù) coord工nator_key (也就是 transactionalld)查找對(duì)應(yīng)的TransactionCoordinator節(jié)點(diǎn)。如果找到,則會(huì)返回其相對(duì)應(yīng)的node id、 host 和 port 信息。具體查找 TransactionCoordinator 的方式是根據(jù) transactionalld 的 哈希值計(jì)算主 題 transaction state中的分區(qū)編號(hào), 具體算法如代碼清單 7-4所示。
代碼:青單 7-4 計(jì)算分區(qū)編號(hào)
Utils.abs(transactionalid.hashCode) % transactionTopicPartit工onCount
其中 transactionTopicPartitionCount 為主題一transaction_state 中的分區(qū)個(gè)數(shù) , 這 個(gè)可以通過(guò) brok巳r端參數(shù) transaction.state.log.num.partitions 來(lái)配置,默認(rèn)值為 50 。
找到對(duì)應(yīng)的分區(qū)之后,再尋找此分區(qū) leader 副本所在 的 broker 節(jié)點(diǎn),該 broker 節(jié)點(diǎn)即為這 個(gè) transactionalld對(duì)應(yīng)的 TransactionCoordinator節(jié)點(diǎn)。細(xì)心的讀者可以發(fā)現(xiàn),這一整套的邏輯和 查找 GroupCoordinator 的邏輯如出 一轍(參考 7.2.2 節(jié)) 。
- 獲取 PID
在找到 TransactionCoordinator 節(jié)點(diǎn)之后,就需要為當(dāng)前生產(chǎn)者分配一個(gè) PID 了 。凡是開啟 了罪等性功能的生產(chǎn)者都必須執(zhí)行這個(gè)操作,不需要考慮該生 產(chǎn)者是否還開啟了事務(wù)。生產(chǎn)者 獲取 PID 的操作是通過(guò) InitProducerldRequest 請(qǐng)求來(lái)實(shí)現(xiàn)的, InitProducerldRequest 請(qǐng)求體結(jié)構(gòu) 如圖 7-22 所示,其中 transactional id 表示 事務(wù) 的 transactiona!Id, transaction timeout ms 表示 TransactionCoordinaor等待事務(wù)狀態(tài)更新的超時(shí)時(shí)間,通過(guò)生產(chǎn)者客戶端參 數(shù) transact工on . timeout .ms 配置,默認(rèn)值為 60000。

保存 PID
生產(chǎn)者的 InitProducerldRequest請(qǐng)求會(huì)被發(fā)送給 TransactionCoordinator。 注意,如果未開啟 事務(wù)特性 而 只開啟幕等特性 , 那么 InitProducerldRequest 請(qǐng)求可以發(fā)送給任意的 broker。當(dāng) TransactionCoordinator 第一次收到包含該 transactiona!Id 的 InitProduc巳rldRequest 請(qǐng)求時(shí),它會(huì) 把 transactiona!Id 和對(duì)應(yīng)的 PID 以消息(我們習(xí) 慣性地把這類消息稱為“事務(wù)日志消息”〉的形 式保存到主題 transaction state 中,如圖 7-21 步驟 2.1 所示 。這樣可以保證<transaction Id, PID> 的對(duì)應(yīng)關(guān)系被持久化,從而保證即使 TransactionCoordinator 右機(jī)該對(duì)應(yīng)關(guān)系也不會(huì)丟失 。 存儲(chǔ) 到主題 transaction state 中的具體內(nèi)容格式如圖 7-23 所示 。
其中 transaction status 包含 Empty(O)、 Ongoing(l)、 PrepareCommit(2) 、 PrepareAbort(3)、 CompleteCommit(4)、 CompleteAbort(S)、 Dead(6)這 幾種狀態(tài) 。在存入主題
transaction state 之前,事務(wù)日志消息同樣會(huì)根據(jù)單獨(dú)的 transactiona!Id 來(lái)計(jì)算要發(fā)送的分區(qū), 算法同代碼清單 7-4 一樣。

與InitProducerldRequest 對(duì)應(yīng)的 InitProducerldResponse 響應(yīng)體結(jié)構(gòu)如圖 7-24 所示,

3. 開啟事務(wù)
通過(guò) KafkaProduc町的 beginTransactionO方法可以開啟一個(gè)事務(wù), 調(diào)用該方法后,生產(chǎn)者本 地會(huì)標(biāo)記己經(jīng)開啟了 一個(gè)新的事務(wù) ,只有在生產(chǎn)者發(fā)送第一條消息之后 TransactionCoordinator 才會(huì)認(rèn)為該事務(wù) 己經(jīng)開啟 。
4 . Consume-Transform-Produce
這個(gè)階段囊括了整個(gè)事務(wù)的數(shù)據(jù)處理過(guò)程,其中還涉及多種請(qǐng)求
5. 提交或者中止事務(wù)
一旦數(shù)據(jù)被寫入成功,我 們 就可以調(diào)用 KafkaProducer 的 commitTransaction()方法或 abortTransaction()方法來(lái)結(jié)束當(dāng)前 的 事務(wù) 。