一、go-mysql-transfer
go-mysql-transfer是一款MySQL實(shí)時(shí)、增量數(shù)據(jù)同步工具。能夠?qū)崟r(shí)解析MySQL二進(jìn)制日志binlog,并生成指定格式的消息,同步到接收端。
go-mysql-transfer具有如下特點(diǎn):
1、不依賴其它組件,一鍵部署
2、集成多種接收端,如:Redis、MongoDB、Elasticsearch、RabbitMQ、Kafka、RocketMQ,不需要再編寫客戶端,開箱即用
3、內(nèi)置豐富的數(shù)據(jù)解析、消息生成規(guī)則;支持Lua腳本,以處理更復(fù)雜的數(shù)據(jù)邏輯
4、支持監(jiān)控告警,集成Prometheus客戶端
5、高可用集群部署
6、數(shù)據(jù)同步失敗重試
7、全量數(shù)據(jù)初始化
詳情及安裝說明 請(qǐng)參見: MySQL Binlog 增量同步工具go-mysql-transfer實(shí)現(xiàn)詳解
項(xiàng)目開源地址:
gitee (速度更快) :go-mysql-transfer
github:go-mysql-transfer
如果此工具對(duì)你有幫助,請(qǐng)Star支持下
二、Lua腳本引擎
go-mysql-transfer中使用gopher-lua作為L(zhǎng)ua虛擬機(jī),支持Lua5.1規(guī)范。Lua作為專業(yè)的內(nèi)置腳本語言,其設(shè)計(jì)目的是為了嵌入應(yīng)用程序中,從而為應(yīng)用程序提供靈活的擴(kuò)展和定制功能。開發(fā)者只需要花費(fèi)少量時(shí)間就能大致掌握其用法。
基于Lua的高擴(kuò)展性,可以實(shí)現(xiàn)更為復(fù)雜的數(shù)據(jù)解析、消息生成、數(shù)據(jù)處理邏輯。
三、json模塊
提供json數(shù)據(jù)格式的序列化和反序列化功能,提供encode和decode兩個(gè)方法。
使用示例如下:
local json = require("json") -- 加載json模塊
local ops = require("mqOps") --加載mq操作模塊
local row = ops.rawRow() --當(dāng)前數(shù)據(jù)庫(kù)的一行數(shù)據(jù),table類型,key為列名稱
local action = ops.rawAction() --當(dāng)前數(shù)據(jù)庫(kù)事件,包括:insert、updare、delete
local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取USER_NAME列的值
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
local result = {} -- 定義一個(gè)table,作為結(jié)果
result["id"] = id
result["action"] = action
if action == "delete" -- 刪除事件
then
local val = json.encode(result) -- 將result轉(zhuǎn)為json
ops.SEND("transfer_test_topic",val) -- 發(fā)送消息,第一個(gè)參數(shù)為topic(string類型),第二個(gè)參數(shù)為消息內(nèi)容
else
result["userName"] = userName
result["password"] = password
result["createTime"] = createTime
result["source"] = "binlog" -- 數(shù)據(jù)來源
local val = json.encode(result) -- 將result轉(zhuǎn)為json
ops.SEND("transfer_test_topic",val) -- 發(fā)送消息,第一個(gè)參數(shù)為topic(string類型),第二個(gè)參數(shù)為消息內(nèi)容
-- local obj = json.decode(val ) -- json反序列化
-- print(obj ["createTime"])
四、db(數(shù)據(jù)庫(kù)操作)模塊
比如我們有角色表(t_role):
| ID | CODE | NAME | REMARK |
|---|---|---|---|
| 1 | r1 | 管理員 | 具有所有操作權(quán)限 |
| 2 | r2 | 測(cè)試員 | 具有測(cè)試功能的操作權(quán)限 |
用戶表(t_user):
| ID | USER_NAME | PASSWORD | ROLE_CODE | CREATE_TIME |
|---|---|---|---|---|
| 1 | admin | 123456 | r1 | 2020-10-20 22:00:10 |
我們需要監(jiān)聽t_user表,并向接收端發(fā)送如下格式的消息:
{
"id": "1",
"userName": "admin"
"password": "123456",
"createTime": 100001,
"roleName": "系統(tǒng)管理員",
"roleRemark": "管理后臺(tái)相關(guān)信息",
"source": "binlog",
}
基于Binlog的數(shù)據(jù)同步工具,只能監(jiān)聽到一行數(shù)據(jù)的變更,進(jìn)行響應(yīng)。無法像基于SQL的ETL工具那樣具有多表連接的能力。如果要得到向上面那樣的聚合數(shù)據(jù),需要使用dbOps模塊,用法如下:
local json = require("json") -- 加載json模塊
local ops = require("mqOps") --加載mq操作模塊
local db = require("dbOps") --加載數(shù)據(jù)庫(kù)(db)操作模塊
local row = ops.rawRow() --當(dāng)前數(shù)據(jù)庫(kù)的一行數(shù)據(jù),table類型,key為列名稱
-- print(json.encode(row))
local action = ops.rawAction() --當(dāng)前數(shù)據(jù)庫(kù)事件,包括:insert、updare、delete
local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取USER_NAME列的值
local roleCode = row["ROLE_CODE"] --角色編碼
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
local result = {} -- 定義一個(gè)table,作為結(jié)果
result["id"] = id
result["action"] = action
if action == "delete" -- 刪除事件
then
local val = json.encode(result) -- 將result轉(zhuǎn)為json
ops.SEND("user_topic",val) -- 發(fā)送消息,第一個(gè)參數(shù)為topic(string類型),第二個(gè)參數(shù)為消息內(nèi)容
else
local sql = string.format("SELECT * FROM ESEAP.T_ROLE WHERE CODE = '%s'",roleCode) -- SQL語句,不能直接使用表名,要使用(數(shù)據(jù)庫(kù)名稱.表名稱),如:ESEAP.T_ROLE
local roleRS = db.selectOne(sql) -- 執(zhí)行SQL查詢,返回一條查詢結(jié)果,table類型,結(jié)構(gòu)如:{"CODE":"a1","ID":"1","NAME":"系統(tǒng)管理員","REMARK":"管理后臺(tái)相關(guān)信息"}
-- print(json.encode(roleRS))
local roleName = roleRS["NAME"] --角色名稱
local roleRemark = roleRS["REMARK"] --角色描述
-- local roleListRS = db.select(sql) -- 執(zhí)行SQL查詢,返回多條條查詢結(jié)果,數(shù)組類型,元素為table,結(jié)構(gòu)如:[{"CODE":"a1","ID":"1","NAME":"系統(tǒng)管理員","REMARK":"管理后臺(tái)相關(guān)信息"}]
-- print(json.encode(roleListRS))
result["userName"] = userName
result["password"] = password
result["createTime"] = createTime
result["source"] = "binlog" -- 數(shù)據(jù)來源
result["roleName"] = roleName
result["roleRemark"] = roleRemark
local val = json.encode(result) -- 將result轉(zhuǎn)為json
ops.SEND("user_topic",val) -- 發(fā)送消息,第一個(gè)參數(shù)為topic(string類型),第二個(gè)參數(shù)為消息內(nèi)容
end
dbOps模塊的方法說明:
1、selectOne(sql) 查詢一條數(shù)據(jù),返回table類型的結(jié)果;如果查詢不到數(shù)據(jù),返回空table;如果查詢到多個(gè)結(jié)果,會(huì)出錯(cuò)
2、select(sql) 查詢多條數(shù)據(jù),返回?cái)?shù)組類型的結(jié)果,數(shù)組元素為tablem(格式如:[table1,table2]);查詢不到結(jié)果,返回空table;
四、http客戶端模塊
讓go-mysql-transfer具體發(fā)送任意http請(qǐng)求的能力,httpOps提供的方法說明:
1、get(url,headers) 發(fā)送get請(qǐng)求;url為請(qǐng)求地址;headers為請(qǐng)求頭參數(shù),table類型
2、delete(url,headers) 發(fā)送delete請(qǐng)求;url為請(qǐng)求地址;headers為請(qǐng)求頭參數(shù),table類型
3、post(url,headers,formItems) 發(fā)送post請(qǐng)求;url為請(qǐng)求地址;headers為請(qǐng)求頭參數(shù),table類型;formItems為表單數(shù)據(jù),table類型
4、put(url,headers,formItems) 發(fā)送put請(qǐng)求;url為請(qǐng)求地址;headers為請(qǐng)求頭參數(shù),table類型;formItems為表單數(shù)據(jù),table類型
上面4個(gè)方法的返回值為一個(gè)table類型的結(jié)果,元素"status_code"為http響應(yīng)狀態(tài),Number類型(如:200、401、403、500等);元素body為http響應(yīng)內(nèi)容,string類型
httpOps模塊具體用法如下:
local json = require("json") -- 加載json模塊
local ops = require("redisOps") --加載redis操作模塊
local httpcli = require("httpOps") --加載http操作模塊
local row = ops.rawRow() --數(shù)據(jù)庫(kù)當(dāng)前變更的一行數(shù)據(jù),table類型,key為列名稱
local action = ops.rawAction() --當(dāng)前數(shù)據(jù)庫(kù)事件,包括:insert、updare、delete
local _id = row["ID"] --獲取ID列的值
local _userName = row["USER_NAME"] --獲取USER_NAME列的值
local _password = row["PASSWORD"] --獲取USER_NAME列的值
local _createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
local key = "user_".._id -- 定義key
if action == "insert" -- 插入事件
then
-- get
local url = string.format("http://localhost:9999/http_tests?user_name=%s", userName)
local res = httpcli.get(url,{
Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
}) -- http get請(qǐng)求,第一個(gè)參數(shù)為URL,類型為string;第二個(gè)參數(shù)為header參數(shù),類型為table
local status = res.status_code
--print(res.status_code) -- http響應(yīng)代碼,如:200、401、403、500等
--print(res.body)-- http響應(yīng)內(nèi)容,string類型
--local resObj = json.decode(res.body) -- json反序列化響應(yīng)內(nèi)容
--print(resObj["msg"])
-- delete
--local url = string.format("http://localhost:9999/http_tests?user_name=%s", userName)
--local res = httpcli.delete(url,{
-- Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
--}) -- http delete請(qǐng)求,第一個(gè)參數(shù)為URL,類型為string;第二個(gè)參數(shù)為header參數(shù),類型為table
-- post
--local url = "http://localhost:9999/http_tests"
--local res = httpcli.post(url,{
-- Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
--},{
-- id=_id,
-- userName=_userName,
-- password=_password,
-- createTime=_createTime
--}) -- http post請(qǐng)求,第一個(gè)參數(shù)為URL,類型為string;第二個(gè)參數(shù)為header參數(shù),類型為table;第三個(gè)參數(shù)為post內(nèi)容,類型為table
--put
--local url = "http://localhost:9999/http_tests"
--local res = httpcli.put(url,{
-- Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
--},{
-- id=_id,
-- userName=_userName,
-- password=_password,
-- createTime=_createTime
--}) -- http put請(qǐng)求,第一個(gè)參數(shù)為URL,類型為string;第二個(gè)參數(shù)為header參數(shù),類型為table;第三個(gè)參數(shù)為post內(nèi)容,類型為table
if status == 200
then
ops.SADD("user_set",userName.."|succeed") -- 對(duì)應(yīng)Redis的SADD命令,第一個(gè)參數(shù)為key(支持string類型),第二個(gè)參數(shù)為value
else
ops.SADD("user_set",userName.."|failed") -- 對(duì)應(yīng)Redis的SADD命令,第一個(gè)參數(shù)為key(支持string類型),第二個(gè)參數(shù)為value
end
end