這個(gè)組件基于一個(gè)很現(xiàn)實(shí)的需求,系統(tǒng)隨著業(yè)務(wù)發(fā)展而擴(kuò)張時(shí),總會(huì)碰到新業(yè)務(wù)需要整合老業(yè)務(wù)數(shù)據(jù)的現(xiàn)象,一般的微服務(wù)會(huì)實(shí)現(xiàn)為新業(yè)務(wù)系統(tǒng)從老業(yè)務(wù)系統(tǒng)獲取數(shù)據(jù)。這里提供一種基于Mysql Binlog的思路,新業(yè)務(wù)系統(tǒng)可以通過(guò)監(jiān)聽(tīng)老業(yè)務(wù)系統(tǒng)的數(shù)據(jù)變化,這樣在不改變老業(yè)務(wù)系統(tǒng)代碼的前提下,新業(yè)務(wù)系統(tǒng)可以獲取到老業(yè)務(wù)系統(tǒng)數(shù)據(jù)變化的通知,及時(shí)更新自己的Redis或其他緩存。
也可以用于在同一個(gè)系統(tǒng)中,跨表整合數(shù)據(jù)。
我自己實(shí)現(xiàn)這個(gè)組件是為了跨微服務(wù)整合數(shù)據(jù)到Elastic Search。
組件使用起來(lái)比較簡(jiǎn)單,需要一個(gè)前提就是Mysql開(kāi)啟binlog且binlog格式為ROW,格式見(jiàn)https://zhuanlan.zhihu.com/p/26977878
1. 配置文件
binlog:
mysql:
hosts: #配置數(shù)據(jù)庫(kù)的連接信息
- name: test-db
host: localhost
port: 3306
username: root
password: root
timeOffset: 28800000 #時(shí)區(qū)偏移 單位:毫秒
- name: test-db2
host: localhost
port: 3306
username: root
password: root
timeOffset: 28800000
2. bean掃描
@SpringBootApplication(scanBasePackages = {"net.giafei"})
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
3.監(jiān)聽(tīng)
//hostName 即為配置文件中的name
//MysqlWatcher注解繼承有Component注解,所以會(huì)被Spring創(chuàng)建為Bean
@MysqlWatcher(hostName = "test-db", database = "test_binlog", table = "test_table1")
public class TestTable1Watcher implements IMysqlDataListener<TestTable1> {
private Logger logger = LoggerFactory.getLogger(TestTable1Watcher.class);
@Override
public void onUpdate(TestTable1 from, TestTable1 to) {
logger.info("ID 為 {} 的條目數(shù)據(jù)變更", from.getId());
logger.info("\t變化前:" + JSON.toJSONString(from, SerializerFeature.WriteDateUseDateFormat));
logger.info("\t變化后:" + JSON.toJSONString(to, SerializerFeature.WriteDateUseDateFormat));
}
@Override
public void onInsert(TestTable1 data) {
logger.info("插入ID為 {} 的數(shù)據(jù)", data.getId());
}
@Override
public void onDelete(TestTable1 data) {
logger.info("ID 為 {} 的數(shù)據(jù)被刪除", data.getId());
}
}
binlog解析通過(guò)組件mysql-binlog-connector-java,數(shù)據(jù)到Entity通過(guò)fastjson,其他部分比較簡(jiǎn)單就不再贅述了。