MySQL Binlog 同步工具go-mysql-transfer Lua模塊使用說明

一、go-mysql-transfer

go-mysql-transfer是一款MySQL實(shí)時(shí)、增量數(shù)據(jù)同步工具。能夠?qū)崟r(shí)解析MySQL二進(jìn)制日志binlog,并生成指定格式的消息,同步到接收端。

go-mysql-transfer具有如下特點(diǎn):

1、不依賴其它組件,一鍵部署

2、集成多種接收端,如:Redis、MongoDB、Elasticsearch、RabbitMQ、Kafka、RocketMQ,不需要再編寫客戶端,開箱即用

3、內(nèi)置豐富的數(shù)據(jù)解析、消息生成規(guī)則;支持Lua腳本,以處理更復(fù)雜的數(shù)據(jù)邏輯

4、支持監(jiān)控告警,集成Prometheus客戶端

5、高可用集群部署

6、數(shù)據(jù)同步失敗重試

7、全量數(shù)據(jù)初始化

詳情及安裝說明 請(qǐng)參見: MySQL Binlog 增量同步工具go-mysql-transfer實(shí)現(xiàn)詳解

項(xiàng)目開源地址:

gitee (速度更快) :go-mysql-transfer
github:go-mysql-transfer

如果此工具對(duì)你有幫助,請(qǐng)Star支持下

二、Lua腳本引擎

go-mysql-transfer中使用gopher-lua作為L(zhǎng)ua虛擬機(jī),支持Lua5.1規(guī)范。Lua作為專業(yè)的內(nèi)置腳本語言,其設(shè)計(jì)目的是為了嵌入應(yīng)用程序中,從而為應(yīng)用程序提供靈活的擴(kuò)展和定制功能。開發(fā)者只需要花費(fèi)少量時(shí)間就能大致掌握其用法。

基于Lua的高擴(kuò)展性,可以實(shí)現(xiàn)更為復(fù)雜的數(shù)據(jù)解析、消息生成、數(shù)據(jù)處理邏輯。

三、json模塊

提供json數(shù)據(jù)格式的序列化和反序列化功能,提供encode和decode兩個(gè)方法。
使用示例如下:

local json = require("json")   -- 加載json模塊
local ops = require("mqOps") --加載mq操作模塊

local row = ops.rawRow()  --當(dāng)前數(shù)據(jù)庫(kù)的一行數(shù)據(jù),table類型,key為列名稱
local action = ops.rawAction()  --當(dāng)前數(shù)據(jù)庫(kù)事件,包括:insert、updare、delete

local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取USER_NAME列的值
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值

local result = {}  -- 定義一個(gè)table,作為結(jié)果
result["id"] = id
result["action"] = action

if action == "delete" -- 刪除事件
then
    local val = json.encode(result) -- 將result轉(zhuǎn)為json
    ops.SEND("transfer_test_topic",val) -- 發(fā)送消息,第一個(gè)參數(shù)為topic(string類型),第二個(gè)參數(shù)為消息內(nèi)容
else 
    result["userName"] = userName
    result["password"] = password
    result["createTime"] = createTime
    result["source"] = "binlog" -- 數(shù)據(jù)來源
    local val = json.encode(result) -- 將result轉(zhuǎn)為json
    ops.SEND("transfer_test_topic",val) -- 發(fā)送消息,第一個(gè)參數(shù)為topic(string類型),第二個(gè)參數(shù)為消息內(nèi)容
    -- local obj = json.decode(val ) -- json反序列化
    -- print(obj ["createTime"])

四、db(數(shù)據(jù)庫(kù)操作)模塊

比如我們有角色表(t_role):

ID CODE NAME REMARK
1 r1 管理員 具有所有操作權(quán)限
2 r2 測(cè)試員 具有測(cè)試功能的操作權(quán)限

用戶表(t_user):

ID USER_NAME PASSWORD ROLE_CODE CREATE_TIME
1 admin 123456 r1 2020-10-20 22:00:10

我們需要監(jiān)聽t_user表,并向接收端發(fā)送如下格式的消息:

{
       
    "id": "1",
    "userName": "admin"
    "password": "123456",
    "createTime": 100001,
    "roleName": "系統(tǒng)管理員",
    "roleRemark": "管理后臺(tái)相關(guān)信息",
    "source": "binlog",
    
}

基于Binlog的數(shù)據(jù)同步工具,只能監(jiān)聽到一行數(shù)據(jù)的變更,進(jìn)行響應(yīng)。無法像基于SQL的ETL工具那樣具有多表連接的能力。如果要得到向上面那樣的聚合數(shù)據(jù),需要使用dbOps模塊,用法如下:

local json = require("json")   -- 加載json模塊
local ops = require("mqOps") --加載mq操作模塊
local db = require("dbOps") --加載數(shù)據(jù)庫(kù)(db)操作模塊

local row = ops.rawRow()  --當(dāng)前數(shù)據(jù)庫(kù)的一行數(shù)據(jù),table類型,key為列名稱
-- print(json.encode(row))
local action = ops.rawAction()  --當(dāng)前數(shù)據(jù)庫(kù)事件,包括:insert、updare、delete

local id = row["ID"] --獲取ID列的值
local userName = row["USER_NAME"] --獲取USER_NAME列的值
local password = row["PASSWORD"] --獲取USER_NAME列的值
local roleCode = row["ROLE_CODE"] --角色編碼
local createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值

local result = {}  -- 定義一個(gè)table,作為結(jié)果
result["id"] = id
result["action"] = action

if action == "delete" -- 刪除事件
then
    local val = json.encode(result) -- 將result轉(zhuǎn)為json
    ops.SEND("user_topic",val) -- 發(fā)送消息,第一個(gè)參數(shù)為topic(string類型),第二個(gè)參數(shù)為消息內(nèi)容
else 
    
    local sql = string.format("SELECT * FROM ESEAP.T_ROLE WHERE CODE = '%s'",roleCode) -- SQL語句,不能直接使用表名,要使用(數(shù)據(jù)庫(kù)名稱.表名稱),如:ESEAP.T_ROLE
    local roleRS = db.selectOne(sql) -- 執(zhí)行SQL查詢,返回一條查詢結(jié)果,table類型,結(jié)構(gòu)如:{"CODE":"a1","ID":"1","NAME":"系統(tǒng)管理員","REMARK":"管理后臺(tái)相關(guān)信息"}
    -- print(json.encode(roleRS))
    local roleName = roleRS["NAME"] --角色名稱
    local roleRemark = roleRS["REMARK"] --角色描述
    
    -- local roleListRS = db.select(sql) -- 執(zhí)行SQL查詢,返回多條條查詢結(jié)果,數(shù)組類型,元素為table,結(jié)構(gòu)如:[{"CODE":"a1","ID":"1","NAME":"系統(tǒng)管理員","REMARK":"管理后臺(tái)相關(guān)信息"}]
    -- print(json.encode(roleListRS))
    
    result["userName"] = userName
    result["password"] = password
    result["createTime"] = createTime
    result["source"] = "binlog" -- 數(shù)據(jù)來源
    result["roleName"] = roleName
    result["roleRemark"] = roleRemark
    local val = json.encode(result) -- 將result轉(zhuǎn)為json
    ops.SEND("user_topic",val) -- 發(fā)送消息,第一個(gè)參數(shù)為topic(string類型),第二個(gè)參數(shù)為消息內(nèi)容
end 

dbOps模塊的方法說明:
1、selectOne(sql) 查詢一條數(shù)據(jù),返回table類型的結(jié)果;如果查詢不到數(shù)據(jù),返回空table;如果查詢到多個(gè)結(jié)果,會(huì)出錯(cuò)
2、select(sql) 查詢多條數(shù)據(jù),返回?cái)?shù)組類型的結(jié)果,數(shù)組元素為tablem(格式如:[table1,table2]);查詢不到結(jié)果,返回空table;

四、http客戶端模塊

讓go-mysql-transfer具體發(fā)送任意http請(qǐng)求的能力,httpOps提供的方法說明:

1、get(url,headers) 發(fā)送get請(qǐng)求;url為請(qǐng)求地址;headers為請(qǐng)求頭參數(shù),table類型
2、delete(url,headers) 發(fā)送delete請(qǐng)求;url為請(qǐng)求地址;headers為請(qǐng)求頭參數(shù),table類型
3、post(url,headers,formItems) 發(fā)送post請(qǐng)求;url為請(qǐng)求地址;headers為請(qǐng)求頭參數(shù),table類型;formItems為表單數(shù)據(jù),table類型
4、put(url,headers,formItems) 發(fā)送put請(qǐng)求;url為請(qǐng)求地址;headers為請(qǐng)求頭參數(shù),table類型;formItems為表單數(shù)據(jù),table類型

上面4個(gè)方法的返回值為一個(gè)table類型的結(jié)果,元素"status_code"為http響應(yīng)狀態(tài),Number類型(如:200、401、403、500等);元素body為http響應(yīng)內(nèi)容,string類型

httpOps模塊具體用法如下:

local json = require("json")   -- 加載json模塊
local ops = require("redisOps") --加載redis操作模塊
local httpcli = require("httpOps") --加載http操作模塊

local row = ops.rawRow()  --數(shù)據(jù)庫(kù)當(dāng)前變更的一行數(shù)據(jù),table類型,key為列名稱
local action = ops.rawAction()  --當(dāng)前數(shù)據(jù)庫(kù)事件,包括:insert、updare、delete

local _id = row["ID"] --獲取ID列的值
local _userName = row["USER_NAME"] --獲取USER_NAME列的值
local _password = row["PASSWORD"] --獲取USER_NAME列的值
local _createTime = row["CREATE_TIME"] --獲取CREATE_TIME列的值
local key = "user_".._id -- 定義key

if action == "insert" -- 插入事件
then
    -- get
     local url = string.format("http://localhost:9999/http_tests?user_name=%s", userName) 
     local res = httpcli.get(url,{
        Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
     }) -- http get請(qǐng)求,第一個(gè)參數(shù)為URL,類型為string;第二個(gè)參數(shù)為header參數(shù),類型為table
     local status = res.status_code
    --print(res.status_code)  -- http響應(yīng)代碼,如:200、401、403、500等
    --print(res.body)-- http響應(yīng)內(nèi)容,string類型
    --local resObj = json.decode(res.body) -- json反序列化響應(yīng)內(nèi)容
    --print(resObj["msg"])
    
    
    -- delete
    --local url = string.format("http://localhost:9999/http_tests?user_name=%s", userName) 
    --local res = httpcli.delete(url,{
    --  Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
    --}) -- http delete請(qǐng)求,第一個(gè)參數(shù)為URL,類型為string;第二個(gè)參數(shù)為header參數(shù),類型為table
    
    -- post
    --local url = "http://localhost:9999/http_tests"
    --local res = httpcli.post(url,{
    --  Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
    --},{
    --  id=_id,
    --  userName=_userName,
    --  password=_password,
    --  createTime=_createTime
    --}) -- http post請(qǐng)求,第一個(gè)參數(shù)為URL,類型為string;第二個(gè)參數(shù)為header參數(shù),類型為table;第三個(gè)參數(shù)為post內(nèi)容,類型為table
    
    --put
    --local url = "http://localhost:9999/http_tests"
    --local res = httpcli.put(url,{
    --  Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="
    --},{
    --  id=_id,
    --  userName=_userName,
    --  password=_password,
    --  createTime=_createTime
    --}) -- http put請(qǐng)求,第一個(gè)參數(shù)為URL,類型為string;第二個(gè)參數(shù)為header參數(shù),類型為table;第三個(gè)參數(shù)為post內(nèi)容,類型為table
    
    if status == 200
    then 
        ops.SADD("user_set",userName.."|succeed") -- 對(duì)應(yīng)Redis的SADD命令,第一個(gè)參數(shù)為key(支持string類型),第二個(gè)參數(shù)為value
    else
        ops.SADD("user_set",userName.."|failed") -- 對(duì)應(yīng)Redis的SADD命令,第一個(gè)參數(shù)為key(支持string類型),第二個(gè)參數(shù)為value
    end
    
    
end 
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容