kafka 消息堆積解決

一 :背景

線上kafka消費端因日志異常的解決導(dǎo)致消息堆積。

二 : 日志異常解決導(dǎo)致消息堆積

線上kafka消費端日志異常,頻繁打印錯誤日志,服務(wù)器磁盤一天就滿了,此時其他服務(wù)無法正常工作。報錯如下

java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null
 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:217)
 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
 at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
 at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.lang.Thread.run(Thread.java:748)

2021-05-06 17:54:38.467 ERROR 4998 --- [teParkcyy-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null

最終查找到原因,是 servlet 生命周期過期,重啟服務(wù)即可。

三 : 問題套娃

解決kafka消費端日志異常一天后,發(fā)現(xiàn)消息堆積,檢查kafka消費端日志,正常刷日志,無明顯異常信息 (最終發(fā)現(xiàn),自己還是不仔細,日志太多了,夾雜在里面的報錯信息被忽略了)
為了確認這個問題,我做了如下的測試來定位問題

1、查看kafka manager 是否有異常

管理界面大部分都看了,未發(fā)現(xiàn)異常情況,確認kafka 沒問題

2、查看消息是否被消費到

找一個積壓的topic ,對消費端的代碼進行日志打印,查詢消息是否被消費。
發(fā)現(xiàn)消息消費正常

3、查看數(shù)據(jù)庫是否有鎖表

查詢是否因為數(shù)據(jù)庫鎖表或其他原因,導(dǎo)致消息消費了,但是沒有入庫,給我們的錯覺是沒有消費
發(fā)現(xiàn)數(shù)據(jù)庫正常
此時發(fā)現(xiàn)一個重要的問題,重啟kafka 之后,消息消費正常,并入庫,但是到每個時間點入庫停止,消費卡主,打印的消費日志停止,不在消費。

4、 再次查看kafka消費端日志

此時在消費端日志打印停止為界,向下查詢?nèi)罩締栴},發(fā)現(xiàn)

org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [parkBook]

日志顯示parkBook topic 被拒絕訪問

5、代碼分析

parkBook topic 存在消費監(jiān)聽,此parkBook topic 應(yīng)該是之前被人取消了,而我們kafka消費端對 topic 的加載是在啟動時候直接加載到內(nèi)存中,所以取消了并不會立馬影響代碼的錯誤。
在我們解決日志異常重啟的時候,重新加載topic,導(dǎo)致此時內(nèi)存中沒有 parkBook topic ,監(jiān)聽失敗,導(dǎo)致其他topic 也失敗了

6、 問題解決

注釋 parkBook topic 監(jiān)聽,重啟kafka 消費端。

四 :優(yōu)化

線上的kafka,是分兩個Partition ,部署在一臺機器上,隨著數(shù)據(jù)的增加,消費能力不足以快速消費
我這邊消費端是使用線程池。兩倍了線程池的核心線程數(shù)、最大線程數(shù)
spring繼承的kafka,配置 消費者線程為2 :spring.kafka.listener.concurrency=2

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容