引言
在實際的kafka開發(fā)中,我們會發(fā)現(xiàn),無論是生產(chǎn)者還是消費者,都需要構(gòu)建一個Properties對象,里面設(shè)置了很多參數(shù)。對于很多初學(xué)者來說,會看不懂這些參數(shù)分別代表什么含義。
在本篇文章我們就來詳細(xì)地了解一下這些參數(shù)的作用,并探討下如何使用合理的配置去優(yōu)化提高生產(chǎn)/消費效率。
正文
1.kafka消費者參數(shù)
我們先來看一段消費者的構(gòu)建代碼。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("fetch.min.bytes", "1048576");
props.put("fetch.max.wait.ms", "2000");
props.put("max.partition.fetch.bytes", "2097152");
props.put("max.poll.records", "10000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
在這段代碼中有很多常用的參數(shù)配置,在線上使用時,我們要根據(jù)實際的數(shù)據(jù)量和數(shù)據(jù)大小來決定這些配置的具體值。下面來挑出其中比較重要的幾個參數(shù)來詳細(xì)解析一下。
1.1 enable.auto.commit
指定了消費者是否自動提交偏移量,默認(rèn)值是true,為了盡量避免重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為false,有自己控制合適提交偏移量,如果設(shè)置為true, 可以通過設(shè)置 auto.commit.interval.ms屬性來控制提交的頻率。
詳細(xì)地來說:
當(dāng)一個consumer因某種原因退出Group時,進(jìn)行重新分配partition后,同一group中的另一個consumer在讀取該partition時,怎么能夠知道上一個consumer該從哪個offset的message讀取呢?也是是如何保證同一個group內(nèi)的consumer不重復(fù)消費消息呢?上面說了一次走網(wǎng)絡(luò)的fetch請求會拉取到一定量的數(shù)據(jù),但是這些數(shù)據(jù)還沒有被消息完畢,Consumer就掛掉了,下一次進(jìn)行數(shù)據(jù)fetch時,是否會從上次讀到的數(shù)據(jù)開始讀取,而導(dǎo)致Consumer消費的數(shù)據(jù)丟失嗎?
為了做到這一點,當(dāng)使用完poll從本地緩存拉取到數(shù)據(jù)之后,需要client調(diào)用commitSync方法(或者commitAsync方法)去commit 下一次該去讀取 哪一個offset的message。
而這個commit方法會通過走網(wǎng)絡(luò)的commit請求將offset在coordinator中保留,這樣就能夠保證下一次讀?。ú徽撨M(jìn)行了rebalance)時,既不會重復(fù)消費消息,也不會遺漏消息。
對于offset的commit,Kafka Consumer Java Client支持兩種模式:由KafkaConsumer自動提交,或者是用戶通過調(diào)用commitSync、commitAsync方法的方式完成offset的提交。
自動提交的例子:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
手動提交的栗子:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
在手動提交單個partition的offset時,需要注意的一點是:要提交的是下一次要讀取的offset,例如:
try {
while(running) {
// 取得消息
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
// 根據(jù)分區(qū)來遍歷數(shù)據(jù):
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
// 數(shù)據(jù)處理
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
// 取得當(dāng)前讀取到的最后一條記錄的offset
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
// 提交offset,記得要 + 1
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
1.2 auto.offset.reset
該屬性指定了消費者在讀取一個沒有偏移量后者偏移量無效(消費者長時間失效當(dāng)前的偏移量已經(jīng)過時并且被刪除了)的分區(qū)的情況下,應(yīng)該作何處理,默認(rèn)值是latest,也就是從最新記錄讀取數(shù)據(jù)(消費者啟動之后生成的記錄),另一個值是earliest,意思是在偏移量無效的情況下,消費者從起始位置開始讀取數(shù)據(jù)。
1.3 session.timeout.ms
該屬性指定了當(dāng)消費者被認(rèn)為已經(jīng)掛掉之前可以與服務(wù)器斷開連接的時間。默認(rèn)是3s,消費者在3s之內(nèi)沒有再次向服務(wù)器發(fā)送心跳,那么將會被認(rèn)為已經(jīng)死亡。此時,協(xié)調(diào)器將會出發(fā)再均衡,把它的分區(qū)分配給其他的消費者,該屬性與heartbeat.interval.ms緊密相關(guān),該參數(shù)定義了消費者發(fā)送心跳的時間間隔,也就是心跳頻率,一般要同時修改這兩個參數(shù),heartbeat.interval.ms參數(shù)值必須要小于session.timeout.ms,一般是session.timeout.ms的三分之一,比如,session.timeout.ms設(shè)置成3min,那么heartbeat.interval.ms一般設(shè)置成1min,這樣,可以更快的檢測以及恢復(fù)崩潰的節(jié)點,不過長時間的輪詢或垃圾收集可能導(dǎo)致非預(yù)期的再均衡(有一種情況就是網(wǎng)絡(luò)延遲,本身消費者是沒有掛掉的,但是網(wǎng)絡(luò)延遲造成了心跳超時,這樣本不該發(fā)生再均衡,但是因為網(wǎng)絡(luò)原因造成了非預(yù)期的再均衡),把該屬性的值設(shè)置得大一些,可以減少意外的再均衡,不過檢測節(jié)點崩憤-需要更長的時間。
1.4 max.partition.fetch.bytes
該屬性指定了服務(wù)器從每個分區(qū)里返回給消費者的最大字節(jié)數(shù)。它的默認(rèn)值是lMB , 也
就是說,kafkaConsumer.poll() 方法從每個分區(qū)里返回的記錄最多不超max.partitions.fetch.bytes 指定的字節(jié)。如果一個主題有20 個分區(qū)和5 個消費者,那么每個消費者需要至少4MB 的可用內(nèi)存來接收記錄。在為消費者分配內(nèi)存時,可以給它們多分配一些,因為如果群組里有消費者發(fā)生奔潰,剩下的消費者需要處理更多的分區(qū)。max.partition.fetch.bytes 的值必須比broker 能夠接收的最大消息的字節(jié)數(shù)(通過max.message.size 屬性配置)大, 否則消費者可能無法讀取這些消息,導(dǎo)致消費者一直掛起重試,例如,max.message.size設(shè)置為2MB,而該屬性設(shè)置為1MB,那么當(dāng)一個生產(chǎn)者可能就會生產(chǎn)一條大小為2MB的消息,那么就會出現(xiàn)問題,消費者能從分區(qū)取回的最大消息大小就只有1MB,但是數(shù)據(jù)量是2MB,所以就會導(dǎo)致消費者一直掛起重試。
在設(shè)置該屬性時,另一個需要考慮的因素是消費者處理數(shù)據(jù)的時間。消費者需要頻繁調(diào)用poll()方法
來避免會話過期和發(fā)生分區(qū)再均衡,如果單次調(diào)用poll()返回的數(shù)據(jù)太多,消費者需要更多的時間來處理,可能無怯及時進(jìn)行下一個輪詢來避免會話過期。如果出現(xiàn)這種情況, 可以把max.partitioin.fetch.bytes 值改小,或者延長會話過期時間。
1.5 fetch.min.bytes
消費者從服務(wù)器獲取記錄的最小字節(jié)數(shù),broker收到消費者拉取數(shù)據(jù)的請求的時候,如果可用數(shù)據(jù)量小于設(shè)置的值,那么broker將會等待有足夠可用的數(shù)據(jù)的時候才返回給消費者,這樣可以降低消費者和broker的工作負(fù)載。
因為當(dāng)主題不是很活躍的情況下,就不需要來來回回的處理消息,如果沒有很多可用數(shù)據(jù),但消費者的CPU 使用率卻很高,那么就需要把該屬性的值設(shè)得比默認(rèn)值大。如果消費者的數(shù)量比較多,把該屬性的值設(shè)置得大一點可以降低broker 的工作負(fù)載。
1.6 fetch.max.wait.ms
fetch.min.bytes設(shè)置了broker返回給消費者最小的數(shù)據(jù)量,而fetch.max.wait.ms設(shè)置的則是broker的等待時間,兩個屬性只要滿足了任何一條,broker都會將數(shù)據(jù)返回給消費者,也就是說舉個例子,fetch.min.bytes設(shè)置成1MB,fetch.max.wait.ms設(shè)置成1000ms,那么如果在1000ms時間內(nèi),如果數(shù)據(jù)量達(dá)到了1MB,broker將會把數(shù)據(jù)返回給消費者;如果已經(jīng)過了1000ms,但是數(shù)據(jù)量還沒有達(dá)到1MB,那么broker仍然會把當(dāng)前積累的所有數(shù)據(jù)返回給消費者。
1.7 max.poll.records
控制單次調(diào)用call方法能夠返回的記錄數(shù)量,幫助控制在輪詢里需要處理的數(shù)據(jù)量。
1.8 receive.buffer.bytes + send.buffer.bytes
socket 在讀寫數(shù)據(jù)時用到的TCP 緩沖區(qū)也可以設(shè)置大小。如果它們被設(shè)為-1 ,就使用操作系統(tǒng)的默認(rèn)值。如果生產(chǎn)者或消費者與broker 處于不同的數(shù)據(jù)中心內(nèi),可以適當(dāng)增大這些值,因為跨數(shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬。
1.9 partition.assignment.strategy
分區(qū)分配策略,kafka有兩個默認(rèn)策略:
- Range:該策略會把主題的若干個連續(xù)的分區(qū)分配給消費者
- Robin:該策略把主題的所有分區(qū)逐個分配給消費者
分區(qū)策略默認(rèn)是:org.apache.kafka.clients.consumer.RangeAssignor=>Range策略
org.apache.kafka.clients.consumer.RoundRobinAssignor=>Robin策略
1.10 client.id
Consumer進(jìn)程的標(biāo)識。如果設(shè)置一個人為可讀的值,跟蹤問題會比較方便。