kafka 初探

引言

kafka早有耳聞,未嘗一用,目前實(shí)際使用過rabbitmq和阿里云工作隊(duì)列datahup,這次在實(shí)際項(xiàng)目中終于使用上了。根據(jù)項(xiàng)目要求,將訂單流水對(duì)象進(jìn)行mysql和es的雙寫,由es對(duì)外提供查詢服務(wù),數(shù)據(jù)庫作為數(shù)據(jù)備份。
我這邊當(dāng)即想出兩個(gè)策略:

  • 通過kafka,把訂單數(shù)據(jù)推送進(jìn)入kafka,然后消費(fèi)者分組監(jiān)聽,實(shí)現(xiàn)mysql和es的雙寫
  • 通過插件,讓插件監(jiān)聽mysql的bin日志,由插件進(jìn)行數(shù)據(jù)同步(公司有個(gè)項(xiàng)目目
    前使用的就是這種方式)

最終還是選擇了方式一,在對(duì)比多方插件的配置情況下和項(xiàng)目存在kafka的情況下,再使用插件進(jìn)行數(shù)據(jù)同步顯得有些多余(有部分插件同步方式提供消息隊(duì)列進(jìn)行異步同步)

引入問題

kafka具有異步、消峰、解耦的效果,但是引入看樣子是很不錯(cuò),但是實(shí)際使用起來也需要考慮其他問題

  • 消息重復(fù)消費(fèi),如何處理

數(shù)據(jù)雙寫,先說數(shù)據(jù)庫消息對(duì)象中存在訂單號(hào),是由雪花算法生成的,同時(shí)把訂單號(hào)設(shè)置為mysql主鍵id,再根據(jù)shardingjdbc配置的分庫、分表策略,我們把一天的數(shù)據(jù)統(tǒng)一放在一張表里,通過數(shù)據(jù)庫來實(shí)現(xiàn)數(shù)據(jù)庫的消費(fèi)者的重復(fù)消費(fèi);
es 通過RestHighLevelClient向es寫入數(shù)據(jù),存在相同數(shù)據(jù)他更新,不存在相同數(shù)據(jù)他就更新,所以說重復(fù)消費(fèi)在當(dāng)前項(xiàng)目很容易解決

  • 消息消費(fèi)不掉阻塞消費(fèi),又該如何處理

kafka中的消費(fèi)者是按照索引按順序消費(fèi),這個(gè)索引未消費(fèi)確認(rèn)以前,是不會(huì)進(jìn)行下一個(gè)索引的消費(fèi),這個(gè)倒是和datahup的消費(fèi)方式類似。在默認(rèn)配置情況下,消費(fèi)首次消費(fèi)失敗后,他會(huì)繼續(xù)再嘗試9次,如果接連10次都消費(fèi)失敗,他就會(huì)舍棄當(dāng)前消息,從而進(jìn)行下一個(gè)索引的消費(fèi),他這么設(shè)計(jì)感覺是沒問題,但是,他舍棄了消息,就代表這個(gè)消息無人處理了,就會(huì)造成項(xiàng)目中mysql和es的數(shù)據(jù)不統(tǒng)一,這是個(gè)大麻煩呀!好在他提供了修改默認(rèn)配置的方法,我的策略是一個(gè)消息重復(fù)消費(fèi)5次,每一次消費(fèi)失敗后延遲消費(fèi)一段時(shí)間,如果5次都消費(fèi)失敗,就將其推入死信隊(duì)列中,通知運(yùn)營(yíng)人員去處理消費(fèi)失敗的消息,人工介入。

下面直接看部分代碼吧

引入依賴

<!--引入kafka依賴-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

添加配置文件

  kafka:
    bootstrap-servers: 192.168.10.237:9092,192.168.10.238:9092,192.168.10.239:9092
    template:
      default-topic: kfpt-dev
    listener:
      ack-model: MANUAL_IMMEDIATE
    death-topic: ${spring.kafka.template.default-topic}.DLX

生產(chǎn)者代碼


import com.alibaba.fastjson.JSON;
import com.xtm.platform.sharding.generator.entity.TOrderDetail;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import javax.annotation.Resource;

@Component
@Slf4j
public class KafkaProducer {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${spring.kafka.template.default-topic}")
    private String topicUser;

    /**
     * 發(fā)送用戶消息
     *
     * @param tOrderDetail 用戶信息
     */
    public void sendMessage(TOrderDetail tOrderDetail) {
        ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topicUser, JSON.toJSONString(tOrderDetail));
        listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                RecordMetadata metadata = result.getRecordMetadata();
                log.info("message sent to " + metadata.topic() + ", partition " + metadata.partition() + ", offset " + metadata.offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                log.info(("send message failed with " + ex.getMessage()));
            }

        });
    }
}

消費(fèi)者代碼

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xtm.platform.sharding.generator.entity.TOrderDetail;
import com.xtm.platform.sharding.service.OrderDetailService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.FixedBackOff;

import javax.annotation.Resource;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * @version : 1.0
 * @description: java類作用描述
 * @author: tianwen
 * @create: 2021/5/11 17:18
 **/
@Component
@Slf4j
public class KafkaSqlConsumer {

    @Autowired
    private OrderDetailService orderDetailService;

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.death-topic}")
    private String deathTopic;

    @Value("${spring.kafka.template.default-topic}")
    private String topic;

    private final String groups = "cs";

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @Bean("sqlKafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        //設(shè)置提交偏移量的方式
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> {
            log.error("異常.拋棄這個(gè)消息============,{}", consumerRecord.toString(), e);
            kafkaTemplate.send(deathTopic, JSON.toJSONString(DeathMessage.builder().topic(topic).message(consumerRecord.value().toString()).desc("同步sql").createAt(new Date()).build()));
        }, new FixedBackOff(15000L, 5L));
        factory.setErrorHandler(seekToCurrentErrorHandler);
        return factory;
    }


    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>(10);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groups);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //設(shè)置每次接收Message的數(shù)量
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }


    @KafkaListener(topics = {"${spring.kafka.template.default-topic}"}, containerFactory = "sqlKafkaListenerContainerFactory", groupId = groups, concurrency = "3")
    public void consumerMsg(ConsumerRecord<String, String> record, Acknowledgment ack) {
        try {
            log.info("自動(dòng)topic是: {}, offset是: {}, value是: {}", record.topic(), record.offset(), record.value());
            TOrderDetail tOrderDetail = JSONObject.parseObject(record.value(), TOrderDetail.class);
            orderDetailService.addOrderDetail(tOrderDetail);
            ack.acknowledge();
        } catch (Exception e) {
            e.printStackTrace();
            log.error("kafka push to es fail,topic: {} group : {} body: {} ", record.topic(), "cs", record.value());
        }
    }
}

死信隊(duì)列消費(fèi)者


import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.FixedBackOff;

import javax.annotation.Resource;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * @version : 1.0
 * @description: 死信隊(duì)列
 * @author: tianwen
 * @create: 2021/5/11 17:18
 **/
@Component
@Slf4j
public class KafkaDeathConsumer {

    @Value("${spring.kafka.death-topic}")
    private String deathTopic;

    @Value("${spring.kafka.template.default-topic}")
    private String topic;

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    private final String groups = "death";

    @Bean("deathKafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        //設(shè)置提交偏移量的方式
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> {
            log.error("推入死信隊(duì)列.拋棄這個(gè)消息============,{}", consumerRecord.toString(), e);
            kafkaTemplate.send(deathTopic, JSON.toJSONString(DeathMessage.builder().topic(topic).message(consumerRecord.value().toString()).desc("死信隊(duì)列").createAt(new Date()).build()));
        }, new FixedBackOff(15000L, 5L));
        factory.setErrorHandler(seekToCurrentErrorHandler);
        return factory;
    }


    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>(10);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groups);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //設(shè)置每次接收Message的數(shù)量
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    @KafkaListener(topics = {"${spring.kafka.death-topic}"}, containerFactory = "deathKafkaListenerContainerFactory", groupId = groups, concurrency = "1")
    public void consumerMsg(ConsumerRecord record, Acknowledgment ack) {
        log.info("自動(dòng)topic是: {}, offset是: {}, value是: {}", record.topic(), record.offset(), record.value());
        DingUtil.pushMsgToDing("");
        ack.acknowledge();
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 閱讀路徑 谷歌搜索kafka中文介紹及翻譯 學(xué)習(xí)成熟的kafka的dockerfile及compose 閱讀官方文...
    ohmyadd閱讀 1,601評(píng)論 0 2
  • 簡(jiǎn)介 kafka是一個(gè)分布式的發(fā)布和訂閱的消息系統(tǒng)。也就是消息的發(fā)布者把消息進(jìn)行分類,然后發(fā)送到kafka上。而訂...
    單倍體閱讀 294評(píng)論 0 0
  • 消息隊(duì)列對(duì)比:https://www.cnblogs.com/qingyunzong/p/9004509.html...
    萌涼258閱讀 209評(píng)論 0 0
  • 主要對(duì)一些名詞進(jìn)行說明講解。 訂閱模式:sub/pub和負(fù)載均衡 消費(fèi)topic的對(duì)象是group。而具體的pat...
    機(jī)器不能學(xué)習(xí)閱讀 531評(píng)論 0 0
  • 背景 最近要把原來做的那套集中式日志監(jiān)控系統(tǒng)進(jìn)行遷移,原來的實(shí)現(xiàn)方案是: Log Agent => Log Ser...
    沒想好像閱讀 602評(píng)論 0 1

友情鏈接更多精彩內(nèi)容