(轉(zhuǎn)發(fā))搞定 SpringBoot 整合 Kafka(spring-kafka深入探秘)

前言

kafka是一個(gè)消息隊(duì)列產(chǎn)品,基于Topic partitions的設(shè)計(jì),能達(dá)到非常高的消息發(fā)送處理性能。Spring創(chuàng)建了一個(gè)項(xiàng)目Spring-kafka,封裝了Apache 的Kafka-client,用于在Spring項(xiàng)目里快速集成kafka。

除了簡(jiǎn)單的收發(fā)消息外,Spring-kafka還提供了很多高級(jí)功能,下面我們就來(lái)一一探秘這些用法。

項(xiàng)目地址:https://github.com/spring-projects/spring-kafka

簡(jiǎn)單集成

引入依賴

org.springframework.kafka

spring-kafka

2.2.6.RELEASE

添加配置

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092

測(cè)試發(fā)送和接收

/**

*@author:?kl@kailing.pub

*@date:?2019/5/30

*/

@SpringBootApplication

@RestController

publicclassApplication{

privatefinalLogger?logger?=?LoggerFactory.getLogger(Application.class);

publicstaticvoidmain(String[]?args){

SpringApplication.run(Application.class,?args);

}

@Autowired

privateKafkaTemplate?template;

@GetMapping("/send/{input}")

publicvoidsendFoo(@PathVariable?String?input){

this.template.send("topic_input",?input);

}

@KafkaListener(id?="webGroup",?topics?="topic_input")

publicvoidlisten(String?input){

logger.info("input?value:?{}",?input);

}

}

啟動(dòng)應(yīng)用后,在瀏覽器中輸入:http://localhost:8080/send/kl。就可以在控制臺(tái)看到有日志輸出了:input value: "kl"。基礎(chǔ)的使用就這么簡(jiǎn)單。發(fā)送消息時(shí)注入一個(gè)KafkaTemplate,接收消息時(shí)添加一個(gè)@KafkaListener注解即可。

Spring-kafka-test嵌入式Kafka Server

不過(guò)上面的代碼能夠啟動(dòng)成功,前提是你已經(jīng)有了Kafka Server的服務(wù)環(huán)境,我們知道Kafka是由Scala + Zookeeper構(gòu)建的,可以從官網(wǎng)下載部署包在本地部署。

但是,我想告訴你,為了簡(jiǎn)化開(kāi)發(fā)環(huán)節(jié)驗(yàn)證Kafka相關(guān)功能,Spring-Kafka-Test已經(jīng)封裝了Kafka-test提供了注解式的一鍵開(kāi)啟Kafka Server的功能,使用起來(lái)也是超級(jí)簡(jiǎn)單。本文后面的所有測(cè)試用例的Kafka都是使用這種嵌入式服務(wù)提供的。

引入依賴

org.springframework.kafka

spring-kafka-test

2.2.6.RELEASE

test

啟動(dòng)服務(wù)

下面使用Junit測(cè)試用例,直接啟動(dòng)一個(gè)Kafka Server服務(wù),包含四個(gè)Broker節(jié)點(diǎn)。

@RunWith(SpringRunner.class)

@SpringBootTest(classes?=?ApplicationTests.class)

@EmbeddedKafka(count?=4,ports?=?{9092,9093,9094,9095})

publicclassApplicationTests{

@Test

publicvoidcontextLoads()throwsIOException{

System.in.read();

}

}

如上:只需要一個(gè)注解@EmbeddedKafka即可,就可以啟動(dòng)一個(gè)功能完整的Kafka服務(wù),是不是很酷。默認(rèn)只寫(xiě)注解不加參數(shù)的情況下,是創(chuàng)建一個(gè)隨機(jī)端口的Broker,在啟動(dòng)的日志中會(huì)輸出具體的端口以及默認(rèn)的一些配置項(xiàng)。

不過(guò)這些我們?cè)贙afka安裝包配置文件中的配置項(xiàng),在注解參數(shù)中都可以配置,下面詳解下@EmbeddedKafka注解中的可設(shè)置參數(shù) :

value:broker節(jié)點(diǎn)數(shù)量

count:同value作用一樣,也是配置的broker的節(jié)點(diǎn)數(shù)量

controlledShutdown:控制關(guān)閉開(kāi)關(guān),主要用來(lái)在Broker意外關(guān)閉時(shí)減少此Broker上Partition的不可用時(shí)間

Kafka是多Broker架構(gòu)的高可用服務(wù),一個(gè)Topic對(duì)應(yīng)多個(gè)partition,一個(gè)Partition可以有多個(gè)副本Replication,這些Replication副本保存在多個(gè)Broker,用于高可用。

但是,雖然存在多個(gè)分區(qū)副本集,當(dāng)前工作副本集卻只有一個(gè),默認(rèn)就是首次分配的副本集【首選副本】為L(zhǎng)eader,負(fù)責(zé)寫(xiě)入和讀取數(shù)據(jù)。當(dāng)我們升級(jí)Broker或者更新Broker配置時(shí)需要重啟服務(wù),這個(gè)時(shí)候需要將partition轉(zhuǎn)移到可用的Broker。

下面涉及到三種情況

1、直接關(guān)閉Broker:當(dāng)Broker關(guān)閉時(shí),Broker集群會(huì)重新進(jìn)行選主操作,選出一個(gè)新的Broker來(lái)作為Partition Leader,選舉時(shí)此Broker上的Partition會(huì)短時(shí)不可用

2、開(kāi)啟controlledShutdown:當(dāng)Broker關(guān)閉時(shí),Broker本身會(huì)先嘗試將Leader角色轉(zhuǎn)移到其他可用的Broker上

3、使用命令行工具:使用bin/kafka-preferred-replica-election.sh,手動(dòng)觸發(fā)PartitionLeader角色轉(zhuǎn)移

ports:端口列表,是一個(gè)數(shù)組。對(duì)應(yīng)了count參數(shù),有幾個(gè)Broker,就要對(duì)應(yīng)幾個(gè)端口號(hào)

brokerProperties:Broker參數(shù)設(shè)置,是一個(gè)數(shù)組結(jié)構(gòu),支持如下方式進(jìn)行Broker參數(shù)設(shè)置:

@EmbeddedKafka(brokerProperties?=?{"log.index.interval.bytes?=?4096","num.io.threads?=?8"})

okerPropertiesLocation:Broker參數(shù)文件設(shè)置

功能同上面的brokerProperties,只是Kafka Broker的可設(shè)置參數(shù)達(dá)182個(gè)之多,都像上面這樣配置肯定不是最優(yōu)方案,所以提供了加載本地配置文件的功能,如:

@EmbeddedKafka(brokerPropertiesLocation?="classpath:application.properties")

創(chuàng)建新的Topic

默認(rèn)情況下,如果在使用KafkaTemplate發(fā)送消息時(shí),Topic不存在,會(huì)創(chuàng)建一個(gè)新的Topic,默認(rèn)的分區(qū)數(shù)和副本數(shù)為如下Broker參數(shù)來(lái)設(shè)定

num.partitions?=?1#默認(rèn)Topic分區(qū)數(shù)

num.replica.fetchers?=?1#默認(rèn)副本數(shù)

程序啟動(dòng)時(shí)創(chuàng)建Topic

/**

*@author:?kl@kailing.pub

*@date:?2019/5/31

*/

@Configuration

publicclassKafkaConfig{

@Bean

publicKafkaAdminadmin(KafkaProperties?properties){

KafkaAdmin?admin?=newKafkaAdmin(properties.buildAdminProperties());

admin.setFatalIfBrokerNotAvailable(true);

returnadmin;

}

@Bean

publicNewTopictopic2(){

returnnewNewTopic("topic-kl",1,?(short)1);

}

}

如果Kafka Broker支持(1.0.0或更高版本),則如果發(fā)現(xiàn)現(xiàn)有Topic的Partition 數(shù)少于設(shè)置的Partition 數(shù),則會(huì)新增新的Partition分區(qū)。

關(guān)于KafkaAdmin有幾個(gè)常用的用法如下:

setFatalIfBrokerNotAvailable(true):默認(rèn)這個(gè)值是False的,在Broker不可用時(shí),不影響Spring 上下文的初始化。如果你覺(jué)得Broker不可用影響正常業(yè)務(wù)需要顯示的將這個(gè)值設(shè)置為T(mén)rue

setAutoCreate(false) :?默認(rèn)值為T(mén)rue,也就是Kafka實(shí)例化后會(huì)自動(dòng)創(chuàng)建已經(jīng)實(shí)例化的NewTopic對(duì)象

initialize():當(dāng)setAutoCreate為false時(shí),需要我們程序顯示的調(diào)用admin的initialize()方法來(lái)初始化NewTopic對(duì)象

代碼邏輯中創(chuàng)建

有時(shí)候我們?cè)诔绦騿?dòng)時(shí)并不知道某個(gè)Topic需要多少Partition數(shù)合適,但是又不能一股腦的直接使用Broker的默認(rèn)設(shè)置,這個(gè)時(shí)候就需要使用Kafka-Client自帶的AdminClient來(lái)進(jìn)行處理。

上面的Spring封裝的KafkaAdmin也是使用的AdminClient來(lái)處理的。如:

@Autowired

privateKafkaProperties?properties;

@Test

publicvoidtestCreateToipc(){

AdminClient?client?=?AdminClient.create(properties.buildAdminProperties());

if(client?!=null){

try{

Collection?newTopics?=newArrayList<>(1);

newTopics.add(newNewTopic("topic-kl",1,(short)1));

client.createTopics(newTopics);

}catch(Throwable?e){

e.printStackTrace();

}finally{

client.close();

}

}

}

ps:其他的方式創(chuàng)建Topic

上面的這些創(chuàng)建Topic方式前提是你的spring boot版本到2.x以上了,因?yàn)閟pring-kafka2.x版本只支持spring boot2.x的版本。在1.x的版本中還沒(méi)有這些api。下面補(bǔ)充一種在程序中通過(guò)Kafka_2.10創(chuàng)建Topic的方式

引入依賴

org.apache.kafka

kafka_2.10

0.8.2.2

api方式創(chuàng)建

@Test

publicvoidtestCreateTopic()throwsException{

ZkClient?zkClient?=newZkClient("127.0.0.1:2181",3000,3000,?ZKStringSerializer$.MODULE$)

String?topicName?="topic-kl";

intpartitions?=1;

intreplication?=1;

AdminUtils.createTopic(zkClient,topicName,partitions,replication,newProperties());

}

注意下ZkClient最后一個(gè)構(gòu)造入?yún)?,是一個(gè)序列化反序列化的接口實(shí)現(xiàn),博主測(cè)試如果不填的話,創(chuàng)建的Topic在ZK上的數(shù)據(jù)是有問(wèn)題的,默認(rèn)的Kafka實(shí)現(xiàn)也很簡(jiǎn)單,就是做了字符串UTF-8編碼處理。

ZKStringSerializer$是Kafka中已經(jīng)實(shí)現(xiàn)好的一個(gè)接口實(shí)例,是一個(gè)Scala的伴生對(duì)象,在Java中直接調(diào)用點(diǎn)MODULE$就可以得到一個(gè)實(shí)例

命令方式創(chuàng)建

@Test

publicvoidtestCreateTopic(){

String?[]?options=newString[]{

"--create",

"--zookeeper","127.0.0.1:2181",

"--replication-factor","3",

"--partitions","3",

"--topic","topic-kl"

};

TopicCommand.main(options);

}

消息發(fā)送之KafkaTemplate探秘

獲取發(fā)送結(jié)果

異步獲取

template.send("","").addCallback(newListenableFutureCallback>()?{

@Override

publicvoidonFailure(Throwable?throwable){

......

}

@Override

publicvoidonSuccess(SendResult<Object,?Object>?objectObjectSendResult){

....

}

});

同步獲取

ListenableFuture>?future?=?template.send("topic-kl","kl");

try{

SendResult?result?=?future.get();

}catch(Throwable?e){

e.printStackTrace();

}

kafka事務(wù)消息

默認(rèn)情況下,Spring-kafka自動(dòng)生成的KafkaTemplate實(shí)例,是不具有事務(wù)消息發(fā)送能力的。需要使用如下配置激活事務(wù)特性。事務(wù)激活后,所有的消息發(fā)送只能在發(fā)生事務(wù)的方法內(nèi)執(zhí)行了,不然就會(huì)拋一個(gè)沒(méi)有事務(wù)交易的異常

spring.kafka.producer.transaction-id-prefix=kafka_tx.

當(dāng)發(fā)送消息有事務(wù)要求時(shí),比如,當(dāng)所有消息發(fā)送成功才算成功,如下面的例子:假設(shè)第一條消費(fèi)發(fā)送后,在發(fā)第二條消息前出現(xiàn)了異常,那么第一條已經(jīng)發(fā)送的消息也會(huì)回滾。

而且正常情況下,假設(shè)在消息一發(fā)送后休眠一段時(shí)間,在發(fā)送第二條消息,消費(fèi)端也只有在事務(wù)方法執(zhí)行完成后才會(huì)接收到消息

@GetMapping("/send/{input}")

publicvoidsendFoo(@PathVariable?String?input){

template.executeInTransaction(t?->{

t.send("topic_input","kl");

if("error".equals(input)){

thrownewRuntimeException("failed");

}

t.send("topic_input","ckl");

returntrue;

});

}

當(dāng)事務(wù)特性激活時(shí),同樣,在方法上面加@Transactional注解也會(huì)生效

@GetMapping("/send/{input}")

@Transactional(rollbackFor?=?RuntimeException.class)

publicvoidsendFoo(@PathVariable?String?input){

template.send("topic_input","kl");

if("error".equals(input))?{

thrownewRuntimeException("failed");

}

template.send("topic_input","ckl");

}

Spring-Kafka的事務(wù)消息是基于Kafka提供的事務(wù)消息功能的。而Kafka Broker默認(rèn)的配置針對(duì)的三個(gè)或以上Broker高可用服務(wù)而設(shè)置的。這邊在測(cè)試的時(shí)候?yàn)榱撕?jiǎn)單方便,使用了嵌入式服務(wù)新建了一個(gè)單Broker的Kafka服務(wù),出現(xiàn)了一些問(wèn)題:如

1、事務(wù)日志副本集大于Broker數(shù)量,會(huì)拋如下異常:

Number of alive brokers '1' does not meet the required replication factor '3'

for the transactions state topic (configured via 'transaction.state.log.replication.factor').

This error can be ignored if the cluster is starting up and not all brokers are up yet.

默認(rèn)Broker的配置transaction.state.log.replication.factor=3,單節(jié)點(diǎn)只能調(diào)整為1

2、副本數(shù)小于副本同步隊(duì)列數(shù)目,會(huì)拋如下異常

Number of insync replicas for partition __transaction_state-13 is [1], below required minimum [2]

默認(rèn)Broker的配置transaction.state.log.min.isr=2,單節(jié)點(diǎn)只能調(diào)整為1

ReplyingKafkaTemplate獲得消息回復(fù)

ReplyingKafkaTemplate是KafkaTemplate的一個(gè)子類,除了繼承父類的方法,新增了一個(gè)方法sendAndReceive,實(shí)現(xiàn)了消息發(fā)送\回復(fù)語(yǔ)義

RequestReplyFuturesendAndReceive(ProducerRecord<K,?V>?record);

也就是我發(fā)送一條消息,能夠拿到消費(fèi)者給我返回的結(jié)果。就像傳統(tǒng)的RPC交互那樣。當(dāng)消息的發(fā)送者需要知道消息消費(fèi)者的具體的消費(fèi)情況,非常適合這個(gè)api。

如,一條消息中發(fā)送一批數(shù)據(jù),需要知道消費(fèi)者成功處理了哪些數(shù)據(jù)。下面代碼演示了怎么集成以及使用ReplyingKafkaTemplate

/**

*@author:?kl@kailing.pub

*@date:?2019/5/30

*/

@SpringBootApplication

@RestController

publicclassApplication{

privatefinalLogger?logger?=?LoggerFactory.getLogger(Application.class);

publicstaticvoidmain(String[]?args){

SpringApplication.run(Application.class,?args);

}

@Bean

publicConcurrentMessageListenerContainerrepliesContainer(ConcurrentKafkaListenerContainerFactory<String,?String>?containerFactory){

ConcurrentMessageListenerContainer?repliesContainer?=?containerFactory.createContainer("replies");

repliesContainer.getContainerProperties().setGroupId("repliesGroup");

repliesContainer.setAutoStartup(false);

returnrepliesContainer;

}

@Bean

publicReplyingKafkaTemplatereplyingTemplate(ProducerFactory<String,?String>?pf,?ConcurrentMessageListenerContainer<String,?String>?repliesContainer){

returnnewReplyingKafkaTemplate(pf,?repliesContainer);

}

@Bean

publicKafkaTemplatekafkaTemplate(ProducerFactory<String,?String>?pf){

returnnewKafkaTemplate(pf);

}

@Autowired

privateReplyingKafkaTemplate?template;

@GetMapping("/send/{input}")

@Transactional(rollbackFor?=?RuntimeException.class)

publicvoidsendFoo(@PathVariable?String?input)throwsException{

ProducerRecord?record?=newProducerRecord<>("topic-kl",?input);

RequestReplyFuture?replyFuture?=?template.sendAndReceive(record);

ConsumerRecord?consumerRecord?=?replyFuture.get();

System.err.println("Return?value:?"+?consumerRecord.value());

}

@KafkaListener(id?="webGroup",?topics?="topic-kl")

@SendTo

publicStringlisten(String?input){

logger.info("input?value:?{}",?input);

return"successful";

}

}

Spring-kafka消息消費(fèi)用法探秘

@KafkaListener的使用

前面在簡(jiǎn)單集成中已經(jīng)演示過(guò)了@KafkaListener接收消息的能力,但是@KafkaListener的功能不止如此,其他的比較常見(jiàn)的,使用場(chǎng)景比較多的功能點(diǎn)如下:

顯示的指定消費(fèi)哪些Topic和分區(qū)的消息,

設(shè)置每個(gè)Topic以及分區(qū)初始化的偏移量,

設(shè)置消費(fèi)線程并發(fā)度

設(shè)置消息異常處理器

@KafkaListener(id?="webGroup",?topicPartitions?=?{

@TopicPartition(topic?="topic1",?partitions?=?{"0","1"}),

@TopicPartition(topic?="topic2",?partitions?="0",

partitionOffsets?=@PartitionOffset(partition?="1",?initialOffset?="100"))

},concurrency?="6",errorHandler?="myErrorHandler")

publicStringlisten(String?input){

logger.info("input?value:?{}",?input);

return"successful";

}

其他的注解參數(shù)都很好理解,errorHandler需要說(shuō)明下,設(shè)置這個(gè)參數(shù)需要實(shí)現(xiàn)一個(gè)接口KafkaListenerErrorHandler。而且注解里的配置,是你自定義實(shí)現(xiàn)實(shí)例在spring上下文中的Name。比如,上面配置為errorHandler = "myErrorHandler"。則在spring上線中應(yīng)該存在這樣一個(gè)實(shí)例:

/**

*@author:?kl@kailing.pub

*@date:?2019/5/31

*/

@Service("myErrorHandler")

publicclassMyKafkaListenerErrorHandlerimplementsKafkaListenerErrorHandler{

Logger?logger?=LoggerFactory.getLogger(getClass());

@Override

publicObject?handleError(Message<?>?message,?ListenerExecutionFailedExceptionexception)?{

logger.info(message.getPayload().toString());

returnnull;

}

@Override

publicObject?handleError(Message<?>?message,?ListenerExecutionFailedExceptionexception,?Consumer<?,?>consumer)?{

logger.info(message.getPayload().toString());

returnnull;

}

}

手動(dòng)Ack模式

手動(dòng)ACK模式,由業(yè)務(wù)邏輯控制提交偏移量。比如程序在消費(fèi)時(shí),有這種語(yǔ)義,特別異常情況下不確認(rèn)ack,也就是不提交偏移量,那么你只能使用手動(dòng)Ack模式來(lái)做了。開(kāi)啟手動(dòng)首先需要關(guān)閉自動(dòng)提交,然后設(shè)置下consumer的消費(fèi)模式

spring.kafka.consumer.enable-auto-commit=false

spring.kafka.listener.ack-mode=manual

上面的設(shè)置好后,在消費(fèi)時(shí),只需要在@KafkaListener監(jiān)聽(tīng)方法的入?yún)⒓尤階cknowledgment 即可,執(zhí)行到ack.acknowledge()代表提交了偏移量

@KafkaListener(id?="webGroup",?topics?="topic-kl")

publicStringlisten(String?input,?Acknowledgment?ack){

logger.info("input?value:?{}",?input);

if("kl".equals(input))?{

ack.acknowledge();

}

return"successful";

}

@KafkaListener注解監(jiān)聽(tīng)器生命周期

@KafkaListener注解的監(jiān)聽(tīng)器的生命周期是可以控制的,默認(rèn)情況下,@KafkaListener的參數(shù)autoStartup = "true"。也就是自動(dòng)啟動(dòng)消費(fèi),但是也可以同過(guò)KafkaListenerEndpointRegistry來(lái)干預(yù)他的生命周期。

KafkaListenerEndpointRegistry有三個(gè)動(dòng)作方法分別如:start(),pause(),resume()/啟動(dòng),停止,繼續(xù)。如下代碼詳細(xì)演示了這種功能。

/**

*@author:?kl@kailing.pub

*@date:?2019/5/30

*/

@SpringBootApplication

@RestController

publicclassApplication{

privatefinalLogger?logger?=?LoggerFactory.getLogger(Application.class);

publicstaticvoidmain(String[]?args){

SpringApplication.run(Application.class,?args);

}

@Autowired

privateKafkaTemplate?template;

@GetMapping("/send/{input}")

@Transactional(rollbackFor?=?RuntimeException.class)

publicvoidsendFoo(@PathVariable?String?input)throwsException{

ProducerRecord?record?=newProducerRecord<>("topic-kl",?input);

template.send(record);

}

@Autowired

privateKafkaListenerEndpointRegistry?registry;

@GetMapping("/stop/{listenerID}")

publicvoidstop(@PathVariable?String?listenerID){

registry.getListenerContainer(listenerID).pause();

}

@GetMapping("/resume/{listenerID}")

publicvoidresume(@PathVariable?String?listenerID){

registry.getListenerContainer(listenerID).resume();

}

@GetMapping("/start/{listenerID}")

publicvoidstart(@PathVariable?String?listenerID){

registry.getListenerContainer(listenerID).start();

}

@KafkaListener(id?="webGroup",?topics?="topic-kl",autoStartup?="false")

publicStringlisten(String?input){

logger.info("input?value:?{}",?input);

return"successful";

}

}

在上面的代碼中,listenerID就是@KafkaListener中的id值“webGroup”。項(xiàng)目啟動(dòng)好后,分別執(zhí)行如下url,就可以看到效果了。

先發(fā)送一條消息:http://localhost:8081/send/ckl。因?yàn)閍utoStartup = "false",所以并不會(huì)看到有消息進(jìn)入監(jiān)聽(tīng)器。

接著啟動(dòng)監(jiān)聽(tīng)器:http://localhost:8081/start/webGroup??梢钥吹接幸粭l消息進(jìn)來(lái)了。

暫停和繼續(xù)消費(fèi)的效果使用類似方法就可以測(cè)試出來(lái)了。

SendTo消息轉(zhuǎn)發(fā)

前面的消息發(fā)送響應(yīng)應(yīng)用里面已經(jīng)見(jiàn)過(guò)@SendTo,其實(shí)除了做發(fā)送響應(yīng)語(yǔ)義外,@SendTo注解還可以帶一個(gè)參數(shù),指定轉(zhuǎn)發(fā)的Topic隊(duì)列。

常見(jiàn)的場(chǎng)景如,一個(gè)消息需要做多重加工,不同的加工耗費(fèi)的cup等資源不一致,那么就可以通過(guò)跨不同Topic和部署在不同主機(jī)上的consumer來(lái)解決了。如:

@KafkaListener(id?="webGroup",?topics?="topic-kl")

@SendTo("topic-ckl")

publicStringlisten(String?input){

logger.info("input?value:?{}",?input);

returninput?+"hello!";

}

@KafkaListener(id?="webGroup1",?topics?="topic-ckl")

publicvoidlisten2(String?input){

logger.info("input?value:?{}",?input);

}

消息重試和死信隊(duì)列的應(yīng)用

除了上面談到的通過(guò)手動(dòng)Ack模式來(lái)控制消息偏移量外,其實(shí)Spring-kafka內(nèi)部還封裝了可重試消費(fèi)消息的語(yǔ)義,也就是可以設(shè)置為當(dāng)消費(fèi)數(shù)據(jù)出現(xiàn)異常時(shí),重試這個(gè)消息。而且可以設(shè)置重試達(dá)到多少次后,讓消息進(jìn)入預(yù)定好的Topic。也就是死信隊(duì)列里。

下面代碼演示了這種效果:

@Autowired

privateKafkaTemplate?template;

@Bean

publicConcurrentKafkaListenerContainerFactory?kafkaListenerContainerFactory(

ConcurrentKafkaListenerContainerFactoryConfigurer?configurer,

ConsumerFactory?kafkaConsumerFactory,

KafkaTemplate?template)?{

ConcurrentKafkaListenerContainerFactory?factory?=newConcurrentKafkaListenerContainerFactory<>();

configurer.configure(factory,?kafkaConsumerFactory);

//最大重試三次

factory.setErrorHandler(newSeekToCurrentErrorHandler(newDeadLetterPublishingRecoverer(template),3));

returnfactory;

}

@GetMapping("/send/{input}")

publicvoidsendFoo(@PathVariable?String?input){

template.send("topic-kl",?input);

}

@KafkaListener(id?="webGroup",?topics?="topic-kl")

publicStringlisten(String?input){

logger.info("input?value:?{}",?input);

thrownewRuntimeException("dlt");

}

@KafkaListener(id?="dltGroup",?topics?="topic-kl.DLT")

publicvoiddltListen(String?input){

logger.info("Received?from?DLT:?"+?input);

}

上面應(yīng)用,在topic-kl監(jiān)聽(tīng)到消息會(huì),會(huì)觸發(fā)運(yùn)行時(shí)異常,然后監(jiān)聽(tīng)器會(huì)嘗試三次調(diào)用,當(dāng)?shù)竭_(dá)最大的重試次數(shù)后。消息就會(huì)被丟掉重試死信隊(duì)列里面去。死信隊(duì)列的Topic的規(guī)則是,業(yè)務(wù)Topic名字+“.DLT”。

如上面業(yè)務(wù)Topic的name為“topic-kl”,那么對(duì)應(yīng)的死信隊(duì)列的Topic就是“topic-kl.DLT”

文末結(jié)語(yǔ)

最近業(yè)務(wù)上使用了kafka用到了Spring-kafka,所以系統(tǒng)性的探索了下Spring-kafka的各種用法,發(fā)現(xiàn)了很多好玩很酷的特性,比如,一個(gè)注解開(kāi)啟嵌入式的Kafka服務(wù)、像RPC調(diào)用一樣的發(fā)送\響應(yīng)語(yǔ)義調(diào)用、事務(wù)消息等功能。

希望此博文能夠幫助那些正在使用Spring-kafka或即將使用的人少走一些彎路少踩一點(diǎn)坑。

來(lái)源;https://mp.weixin.qq.com/s/P0Oevv6GmZMW_mnJT7J_7g

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本文代碼格式不好調(diào)整,可以參考本人在其他地方的同篇博文 https://blog.csdn.net/russle/...
    不1見(jiàn)2不3散4閱讀 2,747評(píng)論 2 1
  • 一、入門(mén)1、簡(jiǎn)介Kafka is a distributed,partitioned,replicated com...
    HxLiang閱讀 3,700評(píng)論 0 9
  • Kafka是當(dāng)前分布式系統(tǒng)中最流行的消息中間件之一,憑借著其高吞吐量的設(shè)計(jì),在日志收集系統(tǒng)和消息系統(tǒng)的應(yīng)用場(chǎng)景中深...
    Dali王閱讀 536評(píng)論 0 0
  • Kafka史上最詳細(xì)原理總結(jié)分為上下兩部分,承上啟下 Kafka史上最詳細(xì)原理總結(jié)上 Kafka史上最詳細(xì)原理總結(jié)...
    小波同學(xué)閱讀 22,485評(píng)論 1 114
  • MQ(消息隊(duì)列)是跨進(jìn)程通信的方式之一,可理解為異步rpc,上游系統(tǒng)對(duì)調(diào)用結(jié)果的態(tài)度往往是重要不緊急。使用消息隊(duì)列...
    allin8116閱讀 641評(píng)論 0 0

友情鏈接更多精彩內(nèi)容