Spring Cloud Stream集成kafka問題

最近公司的項(xiàng)目需要用到kafka,因?yàn)檎麄€(gè)項(xiàng)目基于Spring Cloud,所以想著不如用Spring Cloud Stream來集成。Spring Cloud Stream封裝了一層抽象的接口,底層實(shí)現(xiàn)可以用kafka,也可以基于其他消息中間件。

環(huán)境

  • Spring Cloud:Edgware SR5
  • kafka-clients:0.10.1.1 這個(gè)是spring-cloud-stream-binder-kafka依賴的kafka-clients.jar的版本號(hào)
  • kafka:版本號(hào)未知

kafka binder連接本地zookeeper

kafka binder在啟動(dòng)時(shí)會(huì)嘗試連接本地zookeeper,如果本地沒有zookeeper服務(wù)的話就會(huì)報(bào)錯(cuò)導(dǎo)致啟動(dòng)失敗。解決方法是加入如下配置

spring.cloud.stream.kafka.binder.auto-create-topics=false

具體可以參考github issue#37

消費(fèi)者接收數(shù)據(jù)異常

消費(fèi)者接收數(shù)據(jù)用的是@StreamListener注解,主要參考如下示例

@SpringBootApplication
@EnableBinding(Sink.class)
public class VoteRecordingSinkApplication {

  public static void main(String[] args) {
    SpringApplication.run(VoteRecordingSinkApplication.class, args);
  }

  @StreamListener(Sink.INPUT)
  public void processVote(Vote vote) {
      votingService.recordVote(vote);
  }
}

本以為照著官方示例寫就萬事大吉了,結(jié)果解析數(shù)據(jù)的時(shí)候拋出了StringIndexOutOfBoundException。因?yàn)橹坝肎reenwich版本的Spring Cloud Stream運(yùn)行過同樣的程序,那個(gè)時(shí)候是沒有任何問題的。仔細(xì)研究了下官方文檔,發(fā)現(xiàn)Edgware版本生產(chǎn)者和消費(fèi)者的headerMode的默認(rèn)配置為embeddedHeaders,而Greenwich版本則默認(rèn)依賴于binder的實(shí)現(xiàn)。雖然不知道Spring Cloud Stream針對(duì)embeddedHeaders到底做了什么處理,不過可以想見應(yīng)該是這里的問題。果然把headerMode改為raw之后就正常了。

生產(chǎn)者發(fā)送到kafka的指定分區(qū)

關(guān)鍵的配置如下

spring.cloud.stream.bindings.<channelName>.producer.partitionKeyExpression
spring.cloud.stream.default.producer.partitionCount
  • 除了partitionKeyExpression,還包括partitionKeyExtractorClass、partitionSelectorClasspartitionSelectorExpression等。這些都是用來定制更為復(fù)雜的發(fā)送策略的。
  • partitionCount是取模的基數(shù),可以和kafka實(shí)際的分區(qū)數(shù)不一致。比如說如果配置為1的話,那么所有的數(shù)據(jù)都會(huì)發(fā)送到kafka的第0個(gè)分區(qū)。
  • 注意不能用spring.cloud.stream.default.producer.partitionKeyExpression,否則會(huì)提示
Failed to convert property value of type 'java.lang.String' to required type 'org.springframework.expression.Expression' for property 'producer.partitionKeyExpression'

具體原因可以參考github issue#1040github pull#1041

消費(fèi)者從kafka的指定分區(qū)接收數(shù)據(jù)

默認(rèn)情況下kafka會(huì)自動(dòng)平衡每個(gè)消費(fèi)者對(duì)應(yīng)的分區(qū)。比如說在只有一個(gè)消費(fèi)者的情況下,所有的分區(qū)數(shù)據(jù)都會(huì)發(fā)送給這個(gè)消費(fèi)者。這個(gè)時(shí)候如果再啟動(dòng)另一個(gè)消費(fèi)者,kafka會(huì)自動(dòng)進(jìn)行調(diào)整,把一部分分區(qū)的數(shù)據(jù)發(fā)送給新啟動(dòng)的消費(fèi)者。如果我們希望固定分區(qū)和消費(fèi)者的對(duì)應(yīng)關(guān)系,比如說處理的數(shù)據(jù)都是有狀態(tài)的,這個(gè)時(shí)候我們可以采取如下方式

  • 禁用kafka自動(dòng)平衡
spring.cloud.stream.kafka.bindings.input.consumer.autoRebalanceEnabled
  • 設(shè)置消費(fèi)節(jié)點(diǎn)的分區(qū)信息
spring.cloud.stream.bindings.input.consumer.partitioned
spring.cloud.stream.instanceCount
spring.cloud.stream.instanceIndex

需要注意的是每臺(tái)機(jī)器要有不同的instanceIndex

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容