canal php_canal使用 數(shù)據(jù)同步解耦

java的生態(tài)就是好,實(shí)際開(kāi)發(fā)的時(shí)候經(jīng)常會(huì)用到數(shù)據(jù)同步,傳統(tǒng)的做法就是寫很多觸發(fā)器,監(jiān)聽(tīng)到變動(dòng)的時(shí)候,處理同步邏輯,或者定時(shí)任務(wù)處理每天跑全量同步邏輯。canal這個(gè)確實(shí)模仿主從復(fù)制,直接數(shù)據(jù)庫(kù)級(jí)別,效率和解耦程度就非常好用的。

Windows環(huán)境本地搭建canal php框架采用thinkphp8

前置 phpstudy 安裝php8.2.9,composer安裝好tp項(xiàng)目。

開(kāi)源項(xiàng)目源碼

https://github.com/xingwenge/canal-php
https://github.com/alibaba/canal

1.下載java客戶端canal

image.png

這里我下載是這個(gè)最新穩(wěn)定版本的這個(gè)。
解壓后可以看到項(xiàng)目。


image.png

Windows雙擊啟動(dòng) (需要先配置主從復(fù)制的從庫(kù))


image.png

2.配置主從復(fù)制的從庫(kù)

在主庫(kù)創(chuàng)建canal用戶,并分配權(quán)限。

-- 創(chuàng)建無(wú)IP限制的用戶(% 表示任意主機(jī))
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';

-- 授予復(fù)制權(quán)限
GRANT REPLICATION SLAVE ON *.* TO 'canal'@'%';


-- REPLICATION CLIENT :允許 canal 查詢主從同步狀態(tài)(如 SHOW MASTER STATUS )。
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
--要給 canal 用戶賦予讀取 所有庫(kù)、所有表 的權(quán)限,需在 MySQL 中執(zhí)行授權(quán)語(yǔ)句,確保 canal 能正常采集 binlog ,以下是具體操作:
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

-- 刷新權(quán)限使更改生效
FLUSH PRIVILEGES;

雙擊啟動(dòng)canal


image.png

查看日志是否執(zhí)行成功。


image.png

3.thinkphp8里運(yùn)行監(jiān)聽(tīng)

安裝擴(kuò)展
composer show xingwenge/canal_php -a 可以查看最新版,截止當(dāng)前最新版本為v1.0.3

composer require xingwenge/canal_php:^1.0.3

安裝成功后創(chuàng)建命令,然后配置命令

<?php
declare (strict_types = 1);

namespace app\command;
use Com\Alibaba\Otter\Canal\Protocol\Entry;
use Com\Alibaba\Otter\Canal\Protocol\EntryType;
use Com\Alibaba\Otter\Canal\Protocol\EventType;
use Com\Alibaba\Otter\Canal\Protocol\RowChange;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;

/***
 * php think canal
 */
class Canal extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('canal')
            ->setDescription('the canal command');
    }

    protected function execute(Input $input, Output $output)
    {
        try {
            $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
            # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);

            $client->connect("127.0.0.1", 11111);
            $client->checkValid();
            $client->subscribe("1001", "example", ".*\\..*");
            # $client->subscribe("1001", "example", "db_name.tb_name"); # 設(shè)置過(guò)濾

            while (true) {
                $message = $client->get(100);
                if ($entries = $message->getEntries()) {
                    /**
                     * @var Entry $entry
                     */
                    foreach ($entries as $entry) {
                        $entryType = $entry->getEntryType();
                        if ($entryType !== EntryType::ROWDATA) {
                            continue; // 跳過(guò)事務(wù)頭之類的
                        }

                        $rowChange = new RowChange();
                        $rowChange->mergeFromString($entry->getStoreValue());
                        $eventType = $rowChange->getEventType();

                        $dbName = $entry->getHeader()->getSchemaName();      // 數(shù)據(jù)庫(kù)名
                        $tableName = $entry->getHeader()->getTableName();    // 表名
                        $eventName = $this->getEventName($eventType);               // insert / update / delete

                        /**
                         * @var \Com\Alibaba\Otter\Canal\Protocol\RowData $rowData
                         */
                        foreach ($rowChange->getRowDatas() as $rowData) {
                            $beforeData = $this->parseColumns($rowData->getBeforeColumns()); // 更新前
                            $afterData = $this->parseColumns($rowData->getAfterColumns());   // 更新后

                            echo "=====================\n";
                            echo "數(shù)據(jù)庫(kù): {$dbName}\n";
                            echo "數(shù)據(jù)表: {$tableName}\n";
                            echo "操作類型: {$eventName}\n";

                            echo "更新前數(shù)據(jù):\n";
                            print_r($beforeData);

                            echo "更新后數(shù)據(jù):\n";
                            print_r($afterData);

                            // 示例:你可以根據(jù)表名和操作類型做處理
                            if ($tableName === 'user') {
                                if ($eventName === 'insert') {
                                    // 新增邏輯
                                } elseif ($eventName === 'update') {
                                    // 修改邏輯
                                } elseif ($eventName === 'delete') {
                                    // 刪除邏輯
                                }
                            }
                        }
                    }

                }
                sleep(1);
            }

            $client->disConnect();
        } catch (\Exception $e) {
            echo $e->getMessage(), PHP_EOL;
        }
    }


    // 將 ColumnList 轉(zhuǎn)為數(shù)組:["字段名" => "字段值"]
    function parseColumns($columns): array {
        $result = [];
        foreach ($columns as $column) {
            $result[$column->getName()] = $column->getValue();
        }
        return $result;
    }

// 轉(zhuǎn)換事件類型為字符串
    function getEventName($eventType): string {
        switch ($eventType) {
            case EventType::INSERT:
                return 'insert';
            case EventType::UPDATE:
                return 'update';
            case EventType::DELETE:
                return 'delete';
            default:
                return 'unknown';
        }
    }
}

被AI坑了,說(shuō)是要ack確認(rèn)消息。實(shí)際使用getId()獲取消息的時(shí)候,源碼已經(jīng)自動(dòng)確認(rèn)了。


image.png

4.測(cè)試。

使用navicate連接數(shù)據(jù)庫(kù),修改數(shù)據(jù)表,增刪改查。查看監(jiān)聽(tīng)結(jié)果。成功監(jiān)聽(tīng)。


image.png

5.其他

canal自帶支持寫入消息隊(duì)列,處理消息,當(dāng)業(yè)務(wù)量大的時(shí)候可以直接使用消息隊(duì)列處理,比這種處理方式高效多,這種如果處理速度慢,會(huì)出現(xiàn)積壓。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容