最近公司的項(xiàng)目需要用到kafka,因?yàn)檎麄€(gè)項(xiàng)目基于Spring Cloud,所以想著不如用Spring Cloud Stream來集成。Spring Cloud Stream封裝了一層抽象的接口,底層實(shí)現(xiàn)可以用kafka,也可以基于其他消息中間件。
環(huán)境
- Spring Cloud:Edgware SR5
- kafka-clients:0.10.1.1 這個(gè)是spring-cloud-stream-binder-kafka依賴的kafka-clients.jar的版本號(hào)
- kafka:版本號(hào)未知
kafka binder連接本地zookeeper
kafka binder在啟動(dòng)時(shí)會(huì)嘗試連接本地zookeeper,如果本地沒有zookeeper服務(wù)的話就會(huì)報(bào)錯(cuò)導(dǎo)致啟動(dòng)失敗。解決方法是加入如下配置
spring.cloud.stream.kafka.binder.auto-create-topics=false
具體可以參考github issue#37
消費(fèi)者接收數(shù)據(jù)異常
消費(fèi)者接收數(shù)據(jù)用的是@StreamListener注解,主要參考如下示例
@SpringBootApplication
@EnableBinding(Sink.class)
public class VoteRecordingSinkApplication {
public static void main(String[] args) {
SpringApplication.run(VoteRecordingSinkApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void processVote(Vote vote) {
votingService.recordVote(vote);
}
}
本以為照著官方示例寫就萬事大吉了,結(jié)果解析數(shù)據(jù)的時(shí)候拋出了StringIndexOutOfBoundException。因?yàn)橹坝肎reenwich版本的Spring Cloud Stream運(yùn)行過同樣的程序,那個(gè)時(shí)候是沒有任何問題的。仔細(xì)研究了下官方文檔,發(fā)現(xiàn)Edgware版本生產(chǎn)者和消費(fèi)者的headerMode的默認(rèn)配置為embeddedHeaders,而Greenwich版本則默認(rèn)依賴于binder的實(shí)現(xiàn)。雖然不知道Spring Cloud Stream針對(duì)embeddedHeaders到底做了什么處理,不過可以想見應(yīng)該是這里的問題。果然把headerMode改為raw之后就正常了。
生產(chǎn)者發(fā)送到kafka的指定分區(qū)
關(guān)鍵的配置如下
spring.cloud.stream.bindings.<channelName>.producer.partitionKeyExpression
spring.cloud.stream.default.producer.partitionCount
- 除了
partitionKeyExpression,還包括partitionKeyExtractorClass、partitionSelectorClass和partitionSelectorExpression等。這些都是用來定制更為復(fù)雜的發(fā)送策略的。 -
partitionCount是取模的基數(shù),可以和kafka實(shí)際的分區(qū)數(shù)不一致。比如說如果配置為1的話,那么所有的數(shù)據(jù)都會(huì)發(fā)送到kafka的第0個(gè)分區(qū)。 - 注意不能用
spring.cloud.stream.default.producer.partitionKeyExpression,否則會(huì)提示
Failed to convert property value of type 'java.lang.String' to required type 'org.springframework.expression.Expression' for property 'producer.partitionKeyExpression'
具體原因可以參考github issue#1040和github pull#1041
消費(fèi)者從kafka的指定分區(qū)接收數(shù)據(jù)
默認(rèn)情況下kafka會(huì)自動(dòng)平衡每個(gè)消費(fèi)者對(duì)應(yīng)的分區(qū)。比如說在只有一個(gè)消費(fèi)者的情況下,所有的分區(qū)數(shù)據(jù)都會(huì)發(fā)送給這個(gè)消費(fèi)者。這個(gè)時(shí)候如果再啟動(dòng)另一個(gè)消費(fèi)者,kafka會(huì)自動(dòng)進(jìn)行調(diào)整,把一部分分區(qū)的數(shù)據(jù)發(fā)送給新啟動(dòng)的消費(fèi)者。如果我們希望固定分區(qū)和消費(fèi)者的對(duì)應(yīng)關(guān)系,比如說處理的數(shù)據(jù)都是有狀態(tài)的,這個(gè)時(shí)候我們可以采取如下方式
- 禁用kafka自動(dòng)平衡
spring.cloud.stream.kafka.bindings.input.consumer.autoRebalanceEnabled
- 設(shè)置消費(fèi)節(jié)點(diǎn)的分區(qū)信息
spring.cloud.stream.bindings.input.consumer.partitioned
spring.cloud.stream.instanceCount
spring.cloud.stream.instanceIndex
需要注意的是每臺(tái)機(jī)器要有不同的instanceIndex