java的生態(tài)就是好,實(shí)際開(kāi)發(fā)的時(shí)候經(jīng)常會(huì)用到數(shù)據(jù)同步,傳統(tǒng)的做法就是寫很多觸發(fā)器,監(jiān)聽(tīng)到變動(dòng)的時(shí)候,處理同步邏輯,或者定時(shí)任務(wù)處理每天跑全量同步邏輯。canal這個(gè)確實(shí)模仿主從復(fù)制,直接數(shù)據(jù)庫(kù)級(jí)別,效率和解耦程度就非常好用的。
Windows環(huán)境本地搭建canal php框架采用thinkphp8
前置 phpstudy 安裝php8.2.9,composer安裝好tp項(xiàng)目。
開(kāi)源項(xiàng)目源碼
https://github.com/xingwenge/canal-php
https://github.com/alibaba/canal
1.下載java客戶端canal

這里我下載是這個(gè)最新穩(wěn)定版本的這個(gè)。
解壓后可以看到項(xiàng)目。

Windows雙擊啟動(dòng) (需要先配置主從復(fù)制的從庫(kù))

2.配置主從復(fù)制的從庫(kù)
在主庫(kù)創(chuàng)建canal用戶,并分配權(quán)限。
-- 創(chuàng)建無(wú)IP限制的用戶(% 表示任意主機(jī))
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
-- 授予復(fù)制權(quán)限
GRANT REPLICATION SLAVE ON *.* TO 'canal'@'%';
-- REPLICATION CLIENT :允許 canal 查詢主從同步狀態(tài)(如 SHOW MASTER STATUS )。
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
--要給 canal 用戶賦予讀取 所有庫(kù)、所有表 的權(quán)限,需在 MySQL 中執(zhí)行授權(quán)語(yǔ)句,確保 canal 能正常采集 binlog ,以下是具體操作:
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- 刷新權(quán)限使更改生效
FLUSH PRIVILEGES;
雙擊啟動(dòng)canal

查看日志是否執(zhí)行成功。

3.thinkphp8里運(yùn)行監(jiān)聽(tīng)
安裝擴(kuò)展
composer show xingwenge/canal_php -a 可以查看最新版,截止當(dāng)前最新版本為v1.0.3
composer require xingwenge/canal_php:^1.0.3
安裝成功后創(chuàng)建命令,然后配置命令
<?php
declare (strict_types = 1);
namespace app\command;
use Com\Alibaba\Otter\Canal\Protocol\Entry;
use Com\Alibaba\Otter\Canal\Protocol\EntryType;
use Com\Alibaba\Otter\Canal\Protocol\EventType;
use Com\Alibaba\Otter\Canal\Protocol\RowChange;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
/***
* php think canal
*/
class Canal extends Command
{
protected function configure()
{
// 指令配置
$this->setName('canal')
->setDescription('the canal command');
}
protected function execute(Input $input, Output $output)
{
try {
$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
# $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);
$client->connect("127.0.0.1", 11111);
$client->checkValid();
$client->subscribe("1001", "example", ".*\\..*");
# $client->subscribe("1001", "example", "db_name.tb_name"); # 設(shè)置過(guò)濾
while (true) {
$message = $client->get(100);
if ($entries = $message->getEntries()) {
/**
* @var Entry $entry
*/
foreach ($entries as $entry) {
$entryType = $entry->getEntryType();
if ($entryType !== EntryType::ROWDATA) {
continue; // 跳過(guò)事務(wù)頭之類的
}
$rowChange = new RowChange();
$rowChange->mergeFromString($entry->getStoreValue());
$eventType = $rowChange->getEventType();
$dbName = $entry->getHeader()->getSchemaName(); // 數(shù)據(jù)庫(kù)名
$tableName = $entry->getHeader()->getTableName(); // 表名
$eventName = $this->getEventName($eventType); // insert / update / delete
/**
* @var \Com\Alibaba\Otter\Canal\Protocol\RowData $rowData
*/
foreach ($rowChange->getRowDatas() as $rowData) {
$beforeData = $this->parseColumns($rowData->getBeforeColumns()); // 更新前
$afterData = $this->parseColumns($rowData->getAfterColumns()); // 更新后
echo "=====================\n";
echo "數(shù)據(jù)庫(kù): {$dbName}\n";
echo "數(shù)據(jù)表: {$tableName}\n";
echo "操作類型: {$eventName}\n";
echo "更新前數(shù)據(jù):\n";
print_r($beforeData);
echo "更新后數(shù)據(jù):\n";
print_r($afterData);
// 示例:你可以根據(jù)表名和操作類型做處理
if ($tableName === 'user') {
if ($eventName === 'insert') {
// 新增邏輯
} elseif ($eventName === 'update') {
// 修改邏輯
} elseif ($eventName === 'delete') {
// 刪除邏輯
}
}
}
}
}
sleep(1);
}
$client->disConnect();
} catch (\Exception $e) {
echo $e->getMessage(), PHP_EOL;
}
}
// 將 ColumnList 轉(zhuǎn)為數(shù)組:["字段名" => "字段值"]
function parseColumns($columns): array {
$result = [];
foreach ($columns as $column) {
$result[$column->getName()] = $column->getValue();
}
return $result;
}
// 轉(zhuǎn)換事件類型為字符串
function getEventName($eventType): string {
switch ($eventType) {
case EventType::INSERT:
return 'insert';
case EventType::UPDATE:
return 'update';
case EventType::DELETE:
return 'delete';
default:
return 'unknown';
}
}
}
被AI坑了,說(shuō)是要ack確認(rèn)消息。實(shí)際使用getId()獲取消息的時(shí)候,源碼已經(jīng)自動(dòng)確認(rèn)了。

4.測(cè)試。
使用navicate連接數(shù)據(jù)庫(kù),修改數(shù)據(jù)表,增刪改查。查看監(jiān)聽(tīng)結(jié)果。成功監(jiān)聽(tīng)。

5.其他
canal自帶支持寫入消息隊(duì)列,處理消息,當(dāng)業(yè)務(wù)量大的時(shí)候可以直接使用消息隊(duì)列處理,比這種處理方式高效多,這種如果處理速度慢,會(huì)出現(xiàn)積壓。