引言
kafka早有耳聞,未嘗一用,目前實(shí)際使用過rabbitmq和阿里云工作隊(duì)列datahup,這次在實(shí)際項(xiàng)目中終于使用上了。根據(jù)項(xiàng)目要求,將訂單流水對(duì)象進(jìn)行mysql和es的雙寫,由es對(duì)外提供查詢服務(wù),數(shù)據(jù)庫作為數(shù)據(jù)備份。
我這邊當(dāng)即想出兩個(gè)策略:
- 通過kafka,把訂單數(shù)據(jù)推送進(jìn)入kafka,然后消費(fèi)者分組監(jiān)聽,實(shí)現(xiàn)mysql和es的雙寫
- 通過插件,讓插件監(jiān)聽mysql的bin日志,由插件進(jìn)行數(shù)據(jù)同步(公司有個(gè)項(xiàng)目目
前使用的就是這種方式)
最終還是選擇了方式一,在對(duì)比多方插件的配置情況下和項(xiàng)目存在kafka的情況下,再使用插件進(jìn)行數(shù)據(jù)同步顯得有些多余(有部分插件同步方式提供消息隊(duì)列進(jìn)行異步同步)
引入問題
kafka具有異步、消峰、解耦的效果,但是引入看樣子是很不錯(cuò),但是實(shí)際使用起來也需要考慮其他問題
- 消息重復(fù)消費(fèi),如何處理
數(shù)據(jù)雙寫,先說數(shù)據(jù)庫消息對(duì)象中存在訂單號(hào),是由雪花算法生成的,同時(shí)把訂單號(hào)設(shè)置為mysql主鍵id,再根據(jù)shardingjdbc配置的分庫、分表策略,我們把一天的數(shù)據(jù)統(tǒng)一放在一張表里,通過數(shù)據(jù)庫來實(shí)現(xiàn)數(shù)據(jù)庫的消費(fèi)者的重復(fù)消費(fèi);
es 通過RestHighLevelClient向es寫入數(shù)據(jù),存在相同數(shù)據(jù)他更新,不存在相同數(shù)據(jù)他就更新,所以說重復(fù)消費(fèi)在當(dāng)前項(xiàng)目很容易解決
- 消息消費(fèi)不掉阻塞消費(fèi),又該如何處理
kafka中的消費(fèi)者是按照索引按順序消費(fèi),這個(gè)索引未消費(fèi)確認(rèn)以前,是不會(huì)進(jìn)行下一個(gè)索引的消費(fèi),這個(gè)倒是和datahup的消費(fèi)方式類似。在默認(rèn)配置情況下,消費(fèi)首次消費(fèi)失敗后,他會(huì)繼續(xù)再嘗試9次,如果接連10次都消費(fèi)失敗,他就會(huì)舍棄當(dāng)前消息,從而進(jìn)行下一個(gè)索引的消費(fèi),他這么設(shè)計(jì)感覺是沒問題,但是,他舍棄了消息,就代表這個(gè)消息無人處理了,就會(huì)造成項(xiàng)目中mysql和es的數(shù)據(jù)不統(tǒng)一,這是個(gè)大麻煩呀!好在他提供了修改默認(rèn)配置的方法,我的策略是一個(gè)消息重復(fù)消費(fèi)5次,每一次消費(fèi)失敗后延遲消費(fèi)一段時(shí)間,如果5次都消費(fèi)失敗,就將其推入死信隊(duì)列中,通知運(yùn)營(yíng)人員去處理消費(fèi)失敗的消息,人工介入。
下面直接看部分代碼吧
引入依賴
<!--引入kafka依賴-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
添加配置文件
kafka:
bootstrap-servers: 192.168.10.237:9092,192.168.10.238:9092,192.168.10.239:9092
template:
default-topic: kfpt-dev
listener:
ack-model: MANUAL_IMMEDIATE
death-topic: ${spring.kafka.template.default-topic}.DLX
生產(chǎn)者代碼
import com.alibaba.fastjson.JSON;
import com.xtm.platform.sharding.generator.entity.TOrderDetail;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource;
@Component
@Slf4j
public class KafkaProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${spring.kafka.template.default-topic}")
private String topicUser;
/**
* 發(fā)送用戶消息
*
* @param tOrderDetail 用戶信息
*/
public void sendMessage(TOrderDetail tOrderDetail) {
ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topicUser, JSON.toJSONString(tOrderDetail));
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
RecordMetadata metadata = result.getRecordMetadata();
log.info("message sent to " + metadata.topic() + ", partition " + metadata.partition() + ", offset " + metadata.offset());
}
@Override
public void onFailure(Throwable ex) {
log.info(("send message failed with " + ex.getMessage()));
}
});
}
}
消費(fèi)者代碼
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xtm.platform.sharding.generator.entity.TOrderDetail;
import com.xtm.platform.sharding.service.OrderDetailService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.FixedBackOff;
import javax.annotation.Resource;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* @version : 1.0
* @description: java類作用描述
* @author: tianwen
* @create: 2021/5/11 17:18
**/
@Component
@Slf4j
public class KafkaSqlConsumer {
@Autowired
private OrderDetailService orderDetailService;
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.death-topic}")
private String deathTopic;
@Value("${spring.kafka.template.default-topic}")
private String topic;
private final String groups = "cs";
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Bean("sqlKafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
//設(shè)置提交偏移量的方式
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> {
log.error("異常.拋棄這個(gè)消息============,{}", consumerRecord.toString(), e);
kafkaTemplate.send(deathTopic, JSON.toJSONString(DeathMessage.builder().topic(topic).message(consumerRecord.value().toString()).desc("同步sql").createAt(new Date()).build()));
}, new FixedBackOff(15000L, 5L));
factory.setErrorHandler(seekToCurrentErrorHandler);
return factory;
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(10);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groups);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//設(shè)置每次接收Message的數(shù)量
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@KafkaListener(topics = {"${spring.kafka.template.default-topic}"}, containerFactory = "sqlKafkaListenerContainerFactory", groupId = groups, concurrency = "3")
public void consumerMsg(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
log.info("自動(dòng)topic是: {}, offset是: {}, value是: {}", record.topic(), record.offset(), record.value());
TOrderDetail tOrderDetail = JSONObject.parseObject(record.value(), TOrderDetail.class);
orderDetailService.addOrderDetail(tOrderDetail);
ack.acknowledge();
} catch (Exception e) {
e.printStackTrace();
log.error("kafka push to es fail,topic: {} group : {} body: {} ", record.topic(), "cs", record.value());
}
}
}
死信隊(duì)列消費(fèi)者
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.FixedBackOff;
import javax.annotation.Resource;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* @version : 1.0
* @description: 死信隊(duì)列
* @author: tianwen
* @create: 2021/5/11 17:18
**/
@Component
@Slf4j
public class KafkaDeathConsumer {
@Value("${spring.kafka.death-topic}")
private String deathTopic;
@Value("${spring.kafka.template.default-topic}")
private String topic;
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
private final String groups = "death";
@Bean("deathKafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
//設(shè)置提交偏移量的方式
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> {
log.error("推入死信隊(duì)列.拋棄這個(gè)消息============,{}", consumerRecord.toString(), e);
kafkaTemplate.send(deathTopic, JSON.toJSONString(DeathMessage.builder().topic(topic).message(consumerRecord.value().toString()).desc("死信隊(duì)列").createAt(new Date()).build()));
}, new FixedBackOff(15000L, 5L));
factory.setErrorHandler(seekToCurrentErrorHandler);
return factory;
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(10);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groups);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//設(shè)置每次接收Message的數(shù)量
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@KafkaListener(topics = {"${spring.kafka.death-topic}"}, containerFactory = "deathKafkaListenerContainerFactory", groupId = groups, concurrency = "1")
public void consumerMsg(ConsumerRecord record, Acknowledgment ack) {
log.info("自動(dòng)topic是: {}, offset是: {}, value是: {}", record.topic(), record.offset(), record.value());
DingUtil.pushMsgToDing("");
ack.acknowledge();
}
}