canal源碼解析(1)—HA模式的實(shí)現(xiàn)

最近在看canal源碼,有一些疑問(wèn)。比如canal的HA模式是怎么實(shí)現(xiàn)的,mysql dump的位點(diǎn)又是怎么確定的,canal客戶端是如何獲取數(shù)據(jù)和ack的,又是如何實(shí)現(xiàn)mysql主備切換的等等,針對(duì)這些疑問(wèn)我將輸出幾篇源碼分析,歡迎指正交流。本文是關(guān)于canal 服務(wù)端和客戶端的HA實(shí)現(xiàn)源碼分析。在此之前,建議大家對(duì)canal的整體架構(gòu)有所了解,可參考官方文檔。首先看下官方文檔中對(duì)HA機(jī)制的描述。

HA機(jī)制設(shè)計(jì)

canal的ha分為兩部分,canal server和canal client分別有對(duì)應(yīng)的ha實(shí)現(xiàn)。
canal server: 為了減少對(duì)mysql dump的請(qǐng)求,不同server上的instance要求同一時(shí)間只能有一個(gè)處于running,其他的處于standby狀態(tài).
canal client: 為了保證有序性,一份instance同一時(shí)間只能由一個(gè)canal client進(jìn)行g(shù)et/ack/rollback操作,否則客戶端接收無(wú)法保證有序。
整個(gè)HA機(jī)制的控制主要是依賴了zookeeper的幾個(gè)特性,watcher和EPHEMERAL節(jié)點(diǎn)(和session生命周期綁定)。


canal-server
  1. canal server要啟動(dòng)某個(gè)canal instance時(shí)都先向zookeeper進(jìn)行一次嘗試啟動(dòng)判斷 (實(shí)現(xiàn):創(chuàng)建EPHEMERAL節(jié)點(diǎn),誰(shuí)創(chuàng)建成功就允許誰(shuí)啟動(dòng))
  2. 創(chuàng)建zookeeper節(jié)點(diǎn)成功后,對(duì)應(yīng)的canal server就啟動(dòng)對(duì)應(yīng)的canal instance,沒(méi)有創(chuàng)建成功的canal instance就會(huì)處于standby狀態(tài)
  3. 一旦zookeeper發(fā)現(xiàn)canal server A創(chuàng)建的節(jié)點(diǎn)消失后,立即通知其他的canal server再次進(jìn)行步驟1的操作,重新選出一個(gè)canal server啟動(dòng)instance.
  4. canal client每次進(jìn)行connect時(shí),會(huì)首先向zookeeper詢問(wèn)當(dāng)前是誰(shuí)啟動(dòng)了canal instance,然后和其建立鏈接,一旦鏈接不可用,會(huì)重新嘗試connect.

Canal Client的方式和canal server方式類(lèi)似,也是利用zookeeper的搶占EPHEMERAL節(jié)點(diǎn)的方式進(jìn)行控制.下文將開(kāi)始分析ha的源碼實(shí)現(xiàn)。

Canal Server HA實(shí)現(xiàn)

首先看下canal如何開(kāi)啟HA模式:
canal.properties中加入以下配置:

//指定注冊(cè)的zk地址
canal.zkServers =127.0.0.1:2181
//此配置會(huì)使用PeriodMixedMetaManager管理位點(diǎn),會(huì)把a(bǔ)ck位點(diǎn)注冊(cè)到zk節(jié)點(diǎn)上,當(dāng)failover時(shí)可從ack位點(diǎn)處重新消費(fèi)
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

然后我們?cè)诳碿anal-server啟動(dòng)是如何和zookeeper交互的,這部分主要是canal server端的HA實(shí)現(xiàn)。
啟動(dòng)入口為CanalLauncher的main方法,canal.serverMode = tcp模式下的實(shí)際啟動(dòng)類(lèi)為CanalController。
初始化CanalController時(shí),會(huì)初始化canal在zookeeper上的節(jié)點(diǎn)系統(tǒng)目錄。

public CanalController(final Properties properties){
······························省略與HA無(wú)關(guān)代碼····································
 final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
        if (StringUtils.isNotEmpty(zkServers)) {
            zkclientx = ZkClientx.getZkClient(zkServers);
            // 如果不存在以下目錄,則初始化系統(tǒng)目錄
            // /otter/canal/destinations:用于存放instance信息
zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
           // /otter/canal/cluster:用于存放canal-server節(jié)點(diǎn)信息
zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
        }
······························省略與HA無(wú)關(guān)代碼····································
}

啟動(dòng)CanalController時(shí),

 public void start() throws Throwable {
        logger.info("## start the canal server[{}:{}]", ip, port);
        // 創(chuàng)建整個(gè)canal的工作節(jié)點(diǎn)
        final String path = ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port);
//1.
        initCid(path);
        if (zkclientx != null) {
//2.
            this.zkclientx.subscribeStateChanges(new IZkStateListener() {

                public void handleStateChanged(KeeperState state) throws Exception {

                }

                public void handleNewSession() throws Exception {
                    initCid(path);
                }

                @Override
                public void handleSessionEstablishmentError(Throwable error) throws Exception {
                    logger.error("failed to connect to zookeeper", error);
                }
            });
        }
        // 優(yōu)先啟動(dòng)embeded服務(wù)
        embededCanalServer.start();
        // 嘗試啟動(dòng)一下非lazy狀態(tài)的通道
        for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
            final String destination = entry.getKey();
            InstanceConfig config = entry.getValue();
            // 創(chuàng)建destination的工作節(jié)點(diǎn)
            if (!embededCanalServer.isStart(destination)) {
                // 3.HA機(jī)制啟動(dòng)
                ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
                if (!config.getLazy() && !runningMonitor.isStart()) {
                    runningMonitor.start();
                }
            }

        // 啟動(dòng)網(wǎng)絡(luò)接口
        if (canalServer != null) {
            canalServer.start();
        }
    }

1.會(huì)在/otter/canal/cluster節(jié)點(diǎn)下創(chuàng)建"ip:port"臨時(shí)節(jié)點(diǎn),如/otter/canal/cluster/10.33.200.132:11111
2.注冊(cè)IZkStateListener,用來(lái)監(jiān)聽(tīng)和zk的連接狀態(tài)變化,這樣當(dāng)會(huì)話過(guò)期后重新建立新會(huì)話時(shí)再次創(chuàng)建"ip:port"臨時(shí)節(jié)點(diǎn)。
3.HA機(jī)制啟動(dòng)。對(duì)于每一個(gè)instance,都會(huì)在/otter/canal/destinations節(jié)點(diǎn)下記錄自己的canal-server和canal-client信息。每個(gè)canal-server對(duì)每個(gè)instance的管理是交給ServerRunningMonitor類(lèi)的。
3.1.ServerRunningMonitor的初始化,調(diào)用ServerRunningMonitors.getRunningMonitor(destination)時(shí)如果為null,便會(huì)調(diào)用ServerRunningMonitor.apply方法。

        final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
//設(shè)置canal-server信息
        ServerRunningMonitors.setServerData(serverData);
        ServerRunningMonitors
            .setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {

                public ServerRunningMonitor apply(final String destination) {
                    ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
//設(shè)置intance的名字
                    runningMonitor.setDestination(destination);
                    runningMonitor.setListener(new ServerRunningListener() {

                        public void processActiveEnter() {
                            try {
                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
//啟動(dòng)intance
                                embededCanalServer.start(destination);
                                if (canalMQStarter != null) {
                                    canalMQStarter.startDestination(destination);
                                }
                            } finally {
                                MDC.remove(CanalConstants.MDC_DESTINATION);
                            }
                        }

                        public void processActiveExit() {
                            try {
                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
                                if (canalMQStarter != null) {
                                    canalMQStarter.stopDestination(destination);
                                }
                                embededCanalServer.stop(destination);
                            } finally {
                                MDC.remove(CanalConstants.MDC_DESTINATION);
                            }
                        }

                        public void processStart() {
                            try {
                                if (zkclientx != null) {
                                    final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
                                        ip + ":" + port);

                                    initCid(path);
                                    zkclientx.subscribeStateChanges(new IZkStateListener() {

                                        public void handleStateChanged(KeeperState state) throws Exception {

                                        }

                                        public void handleNewSession() throws Exception {
                                            initCid(path);
                                        }

                                        @Override
                                        public void handleSessionEstablishmentError(Throwable error) throws Exception {
                                            logger.error("failed to connect to zookeeper", error);
                                        }
                                    });
                                }
                            } finally {
                                MDC.remove(CanalConstants.MDC_DESTINATION);
                            }
                        }

                        public void processStop() {
                            try {
                                MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
                                if (zkclientx != null) {
                                    final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
                                        ip + ":" + port);
                                    releaseCid(path);
                                }
                            } finally {
                                MDC.remove(CanalConstants.MDC_DESTINATION);
                            }
                        }

                    });
                    if (zkclientx != null) {
                        runningMonitor.setZkClient(zkclientx);
                    }
                    // 觸發(fā)創(chuàng)建一下cid節(jié)點(diǎn)
                    runningMonitor.init();
                    return runningMonitor;
                }
            }));

3.2 ServerRunningMonitor的啟動(dòng)

 public synchronized void start() {
        super.start();
        try {
//3.2.1 
            processStart();
            if (zkClient != null) {
                //3.2.2  如果需要盡可能釋放instance資源,不需要監(jiān)聽(tīng)running節(jié)點(diǎn),不然即使stop了這臺(tái)機(jī)器,另一臺(tái)機(jī)器立馬會(huì)start
//  /otter/canal/destinations/{destination}/running 節(jié)點(diǎn)
                String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
                zkClient.subscribeDataChanges(path, dataListener);

                initRunning();
            } else {
                processActiveEnter();// 沒(méi)有zk,直接啟動(dòng)
            }
        } catch (Exception e) {
            logger.error("start failed", e);
            // 沒(méi)有正常啟動(dòng),重置一下?tīng)顟B(tài),避免干擾下一次start
            stop();
        }
    }

3.2.1 調(diào)用processStart方法,這里會(huì)在/otter/canal/destinations/{destination}/cluster 節(jié)點(diǎn)下注冊(cè)IZkStateListener,用來(lái)監(jiān)聽(tīng)和zk的連接狀態(tài)變化,同時(shí)創(chuàng)建"ip:port"臨時(shí)節(jié)點(diǎn)。這個(gè)臨時(shí)節(jié)點(diǎn)主要是用來(lái)給canal-client提供可用canal-server節(jié)點(diǎn)列表使用。
3.2.2 在/otter/canal/destinations/{destination}/running 節(jié)點(diǎn)下注冊(cè)dataListener,用來(lái)監(jiān)聽(tīng)該節(jié)點(diǎn)的數(shù)據(jù)增刪改變化。
3.3.3初始化instance下的canal-server信息。

private void initRunning() {
        if (!isStart()) {
            return;
        }

        String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
        // 序列化
        byte[] bytes = JsonUtils.marshalToByte(serverData);
        try {
            mutex.set(false);
            zkClient.create(path, bytes, CreateMode.EPHEMERAL);
            activeData = serverData;
            processActiveEnter();// 觸發(fā)一下事件
            mutex.set(true);
        } catch (ZkNodeExistsException e) {
            bytes = zkClient.readData(path, true);
            if (bytes == null) {// 如果不存在節(jié)點(diǎn),立即嘗試一次
                initRunning();
            } else {
                activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
            }
        } catch (ZkNoNodeException e) {
            zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 嘗試創(chuàng)建父節(jié)點(diǎn)
            initRunning();
        }
    }

創(chuàng)建/otter/canal/destinations/{destination}/running 臨時(shí)節(jié)點(diǎn),不能創(chuàng)建成功會(huì)拋出ZkNodeExistsException異常,表明這個(gè)instance已經(jīng)有其他canal-server負(fù)責(zé)binlog同步,此時(shí)會(huì)讀取該臨時(shí)節(jié)點(diǎn)數(shù)據(jù),記錄下來(lái)為其服務(wù)的canal-server節(jié)點(diǎn)數(shù)據(jù)到activeData中。

能創(chuàng)建成功表明這個(gè)instance由當(dāng)前canal-server負(fù)責(zé)binlog同步,調(diào)用processActiveEnter方法啟動(dòng)這個(gè)instance。

所以/otter/canal/destinations/{destination}/running 臨時(shí)節(jié)點(diǎn) 表示當(dāng)前為該instance服務(wù)的canal-server節(jié)點(diǎn)是誰(shuí)。如果canal-server與zk連接超時(shí),會(huì)導(dǎo)致該臨時(shí)節(jié)點(diǎn)被刪除。此時(shí)每個(gè)canal-server注冊(cè)在該節(jié)點(diǎn)上的dataListener便會(huì)監(jiān)聽(tīng)到這一變化,做主備切換之類(lèi)的操作。

//初始化ServerRunningMonitor時(shí)會(huì)初始化dataListener
 public ServerRunningMonitor(){
        // 創(chuàng)建父節(jié)點(diǎn)
        dataListener = new IZkDataListener() {

            public void handleDataChange(String dataPath, Object data) throws Exception {
                MDC.put("destination", destination);
                ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
                if (!isMine(runningData.getAddress())) {
                    mutex.set(false);
                }

                if (!runningData.isActive() && isMine(runningData.getAddress())) { // 說(shuō)明出現(xiàn)了主動(dòng)釋放的操作,并且本機(jī)之前是active
                    release = true;
                    releaseRunning();// 徹底釋放mainstem
                }

                activeData = (ServerRunningData) runningData;
            }

            public void handleDataDeleted(String dataPath) throws Exception {
                MDC.put("destination", destination);
                mutex.set(false);
                if (!release && activeData != null && isMine(activeData.getAddress())) {
                    // 如果上一次active的狀態(tài)就是本機(jī),則即時(shí)觸發(fā)一下active搶占
                    initRunning();
                } else {
                    // 否則就是等待delayTime,避免因網(wǎng)絡(luò)瞬端或者zk異常,導(dǎo)致出現(xiàn)頻繁的切換操作
                    delayExector.schedule(new Runnable() {

                        public void run() {
                            initRunning();
                        }
                    }, delayTime, TimeUnit.SECONDS);
                }
            }

        };

    }

可以看到,監(jiān)聽(tīng)了/otter/canal/destinations/{destination}/running 臨時(shí)節(jié)點(diǎn)的節(jié)點(diǎn)刪除handleDataDeleted和節(jié)點(diǎn)數(shù)據(jù)變化handleDataChange兩個(gè)事件。

當(dāng)節(jié)點(diǎn)被刪除時(shí),如果上一次active的狀態(tài)就是本機(jī),則調(diào)用initRunning即時(shí)觸發(fā)一下active搶占。否則就是等待delayTime之后在搶占,避免因網(wǎng)絡(luò)瞬端或者zk異常,導(dǎo)致出現(xiàn)頻繁的切換操作。

當(dāng)節(jié)點(diǎn)修改時(shí),主要是記錄下此時(shí)/otter/canal/destinations/{destination}/running 臨時(shí)節(jié)點(diǎn)下激活的canal-server是誰(shuí)到activeData中。如果出現(xiàn)了主動(dòng)釋放,則徹底釋放instance。(刪除zk上該臨時(shí)節(jié)點(diǎn),關(guān)閉instance,沒(méi)有在代碼中看到release=false的情況)。

總結(jié)一下:/otter/canal/cluster節(jié)點(diǎn)下的臨時(shí)子節(jié)點(diǎn)代表當(dāng)前有多少個(gè)正常運(yùn)行的canal-server。/otter/canal/destinations/{destination}/cluster代表當(dāng)前instance下有多少可用的canal server。/otter/canal/destinations/{destination}/running 臨時(shí)節(jié)點(diǎn)下的數(shù)據(jù)代表當(dāng)前instance的激活canal-server是誰(shuí),每個(gè)正常運(yùn)行的canal-server都會(huì)在/otter/canal/destinations/{destination}/running 臨時(shí)節(jié)點(diǎn)下注冊(cè)dataListener,用于及時(shí)做HA切換。

Canal Client HA實(shí)現(xiàn)

canal中一個(gè)instance只能由一個(gè) client消費(fèi),接下來(lái)看一下canal-client的HA是如何通過(guò)zk實(shí)現(xiàn)的。官方給出了client test類(lèi)ClusterCanalClientTest,核心代碼就是完成了官方文檔中增量訂閱和消費(fèi)過(guò)程:

// 基于zookeeper動(dòng)態(tài)獲取canal server的地址,建立鏈接,其中一臺(tái)server發(fā)生crash,可以支持failover
        CanalConnector connector = CanalConnectors.newClusterConnector("127.0.0.1:2181", destination, "", "");
protected void process() {
        int batchSize = 5 * 1024;
        while (running) {
            try {
                MDC.put("destination", destination);
//1.建立連接
                connector.connect();
//2.客戶端訂閱,不提交客戶端filter,以服務(wù)端的filter為準(zhǔn)
                connector.subscribe();
                while (running) {
//3. 不指定 position 獲取事件,該方法返回的條件: 嘗試拿batchSize條記錄,有多少取多少,不會(huì)阻塞等待
//       canal 會(huì)記住此 client 最新的position。 
//        如果是第一次 fetch,則會(huì)從 canal 中保存的最老一條數(shù)據(jù)開(kāi)始輸出。

                    Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        // try {
                        // Thread.sleep(1000);
                        // } catch (InterruptedException e) {
                        // }
                    } else {
                        printSummary(message, batchId, size);
                        printEntry(message.getEntries());
                    }

                    connector.ack(batchId); // 提交確認(rèn)
                    // connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
                }
            } catch (Exception e) {
                logger.error("process error!", e);
            } finally {
                connector.disconnect();
                MDC.remove("destination");
            }
        }
    }

1.在canal client和canal server建立連接時(shí),會(huì)從zookeeper上獲取cluster和running節(jié)點(diǎn)下信息,并通過(guò)ClusterNodeAccessStrategy來(lái)注冊(cè)listeners實(shí)時(shí)感知這些數(shù)據(jù)的變化,從而支持連接重試的failover。

public void connect() throws CanalClientException {
        while (currentConnector == null) {
            int times = 0;
            while (true) {
                try {
                    currentConnector = new SimpleCanalConnector(null, username, password, destination) {

                        @Override
                        public SocketAddress getNextAddress() {
                            return accessStrategy.nextNode();
                        }

                    };
                    currentConnector.setSoTimeout(soTimeout);
                    currentConnector.setIdleTimeout(idleTimeout);
                    if (filter != null) {
                        currentConnector.setFilter(filter);
                    }
                    if (accessStrategy instanceof ClusterNodeAccessStrategy) {
//1.1
                        currentConnector.setZkClientx(((ClusterNodeAccessStrategy) accessStrategy).getZkClient());
                    }

                    currentConnector.connect();
                    break;
                } catch (Exception e) {
                    logger.warn("failed to connect to:{} after retry {} times", accessStrategy.currentNode(), times);
                    currentConnector.disconnect();
                    currentConnector = null;
                    // retry for #retryTimes for each node when trying to
                    // connect to it.
                    times = times + 1;
                    if (times >= retryTimes) {
                        throw new CanalClientException(e);
                    } else {
                        // fixed issue #55,增加sleep控制,避免重試connect時(shí)cpu使用過(guò)高
                        try {
                            Thread.sleep(retryInterval);
                        } catch (InterruptedException e1) {
                            throw new CanalClientException(e1);
                        }
                    }
                }
            }
        }
    }

1.1 設(shè)置當(dāng)前connector的zkClient,并在zk上初始化客戶端信息。

public void setZkClientx(ZkClientx zkClientx) {
        this.zkClientx = zkClientx;
        initClientRunningMonitor(this.clientIdentity);
    }

 private synchronized void initClientRunningMonitor(ClientIdentity clientIdentity) {
        if (zkClientx != null && clientIdentity != null && runningMonitor == null) {
            ClientRunningData clientData = new ClientRunningData();
            clientData.setClientId(clientIdentity.getClientId());
            clientData.setAddress(AddressUtils.getHostIp());

            runningMonitor = new ClientRunningMonitor();
            runningMonitor.setDestination(clientIdentity.getDestination());
            runningMonitor.setZkClient(zkClientx);
            runningMonitor.setClientData(clientData);
            runningMonitor.setListener(new ClientRunningListener() {

                public InetSocketAddress processActiveEnter() {
                    InetSocketAddress address = doConnect();
                    mutex.set(true);
                    if (filter != null) { // 如果存在條件,說(shuō)明是自動(dòng)切換,基于上一次的條件訂閱一次
                        subscribe(filter);
                    }

                    if (rollbackOnConnect) {
                        rollback();
                    }

                    return address;
                }

                public void processActiveExit() {
                    mutex.set(false);
                    doDisconnect();
                }

            });
        }
    }

ClientRunningMonitor 類(lèi)似于ServerRunningMonitor,是客戶端對(duì)instance的管理。在真正建立連接currentConnector.connect();時(shí),會(huì)啟動(dòng)ClientRunningMonitor

 public void connect() throws CanalClientException {
        if (connected) {
            return;
        }

        if (runningMonitor != null) {
            if (!runningMonitor.isStart()) {
//啟動(dòng)ClientRunningMonitor
                runningMonitor.start();
            }
        } else {
            waitClientRunning();
            if (!running) {
                return;
            }
//從ClusterNodeAccessStrategy中選擇當(dāng)前instance正在工作的canal server進(jìn)行連接
            doConnect();
            if (filter != null) { // 如果存在條件,說(shuō)明是自動(dòng)切換,基于上一次的條件訂閱一次
                subscribe(filter);
            }
            if (rollbackOnConnect) {
                rollback();
            }
        }

        connected = true;
    }

可以看到與canal server建立連接前會(huì)啟動(dòng)ClientRunningMonitor,獲取消費(fèi)instance 的權(quán)利。

//ClientRunningMonitor.start
  public void start() {
        super.start();

        String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
        zkClient.subscribeDataChanges(path, dataListener);
        initRunning();
    }

與serverRunningMonitor 原理一樣,在代表instance客戶端的running節(jié)點(diǎn)/otter/canal/destinations/{destination}/{clientId}/running下注冊(cè)dataListener,由dataListener監(jiān)聽(tīng)節(jié)點(diǎn)數(shù)據(jù)變化負(fù)責(zé)客戶端的HA切換。

public ClientRunningMonitor(){
        dataListener = new IZkDataListener() {

            public void handleDataChange(String dataPath, Object data) throws Exception {
                MDC.put("destination", destination);
                ClientRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ClientRunningData.class);
                if (!isMine(runningData.getAddress())) {
                    mutex.set(false);
                }

                if (!runningData.isActive() && isMine(runningData.getAddress())) { // 說(shuō)明出現(xiàn)了主動(dòng)釋放的操作,并且本機(jī)之前是active
                    release = true;
                    releaseRunning();// 徹底釋放mainstem
                }

                activeData = (ClientRunningData) runningData;
            }

            public void handleDataDeleted(String dataPath) throws Exception {
                MDC.put("destination", destination);
                mutex.set(false);
                // 觸發(fā)一下退出,可能是人為干預(yù)的釋放操作或者網(wǎng)絡(luò)閃斷引起的session expired timeout
                processActiveExit();
                if (!release && activeData != null && isMine(activeData.getAddress())) {
                    // 如果上一次active的狀態(tài)就是本機(jī),則即時(shí)觸發(fā)一下active搶占
                    initRunning();
                } else {
                    // 否則就是等待delayTime,避免因網(wǎng)絡(luò)瞬端或者zk異常,導(dǎo)致出現(xiàn)頻繁的切換操作
                    delayExector.schedule(new Runnable() {

                        public void run() {
                            initRunning();
                        }
                    }, delayTime, TimeUnit.SECONDS);
                }
            }

        };

    }

這個(gè)臨時(shí)節(jié)點(diǎn)的創(chuàng)建過(guò)程在ClientRunningMonitor.initRunning中,創(chuàng)建臨時(shí)節(jié)點(diǎn)成功才能與canal server建立連接。臨時(shí)節(jié)點(diǎn)寫(xiě)入信息為客戶端的IP,port和clientId信息。

// 1,在方法上加synchronized關(guān)鍵字,保證同步順序執(zhí)行;
    // 2,判斷Zk上已經(jīng)存在的activeData是否是本機(jī),是的話把mutex重置為true,否則會(huì)導(dǎo)致死鎖
    // 3,增加異常處理,保證出現(xiàn)異常時(shí),running節(jié)點(diǎn)能被刪除,否則會(huì)導(dǎo)致死鎖
    public synchronized void initRunning() {
        if (!isStart()) {
            return;
        }

        String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
        // 序列化
        byte[] bytes = JsonUtils.marshalToByte(clientData);
        try {
            mutex.set(false);
            zkClient.create(path, bytes, CreateMode.EPHEMERAL);

            processActiveEnter();// 觸發(fā)一下事件,建立和canal server的連接
            activeData = clientData;
            mutex.set(true);
        } catch (ZkNodeExistsException e) {
            bytes = zkClient.readData(path, true);
            if (bytes == null) {// 如果不存在節(jié)點(diǎn),立即嘗試一次
                initRunning();
            } else {
                activeData = JsonUtils.unmarshalFromByte(bytes, ClientRunningData.class);
                // 如果發(fā)現(xiàn)已經(jīng)存在,判斷一下是否自己,避免活鎖
                if (activeData.getAddress().contains(":") && isMine(activeData.getAddress())) {
                    mutex.set(true);
                }
            }
        } catch (ZkNoNodeException e) {
            zkClient.createPersistent(ZookeeperPathUtils.getClientIdNodePath(this.destination, clientData.getClientId()),
                true); // 嘗試創(chuàng)建父節(jié)點(diǎn)
            initRunning();
        } catch (Throwable t) {
            logger.error(MessageFormat.format("There is an error when execute initRunning method, with destination [{0}].",
                destination),
                t);

            // fixed issue 1220, 針對(duì)server節(jié)點(diǎn)不工作避免死循環(huán)
            if (t instanceof ServerNotFoundException) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            }

            // 出現(xiàn)任何異常嘗試release
            releaseRunning();
            throw new CanalClientException("something goes wrong in initRunning method. ", t);
        }
    }
  1. 客戶端與服務(wù)端建立連接后,會(huì)發(fā)送SUBSCRIPTION 請(qǐng)求給服務(wù)端。subscribe主要是告訴canal server需要按照什么過(guò)濾條件來(lái)過(guò)濾庫(kù)中的binlog信息,同時(shí)將當(dāng)前clientIdentity告訴服務(wù)端。因?yàn)橐粋€(gè)instance只能對(duì)應(yīng)一個(gè)client,所以clientIdentity統(tǒng)一初始化為:
this.clientIdentity = new ClientIdentity(destination, (short) 1001);
public ClientIdentity(String destination, short clientId){
        this.clientId = clientId;
        this.destination = destination;
    }
public void subscribe(String filter) throws CanalClientException {
        waitClientRunning();
        if (!running) {
            return;
        }
        try {
            writeWithHeader(Packet.newBuilder()
                .setType(PacketType.SUBSCRIPTION)
                .setBody(Sub.newBuilder()
                    .setDestination(clientIdentity.getDestination())
                    .setClientId(String.valueOf(clientIdentity.getClientId()))
                    .setFilter(filter != null ? filter : "")
                    .build()
                    .toByteString())
                .build()
                .toByteArray());
            //
            Packet p = Packet.parseFrom(readNextPacket());
            Ack ack = Ack.parseFrom(p.getBody());
            if (ack.getErrorCode() > 0) {
                throw new CanalClientException("failed to subscribe with reason: " + ack.getErrorMessage());
            }

            clientIdentity.setFilter(filter);
        } catch (IOException e) {
            throw new CanalClientException(e);
        }
    }

當(dāng)canal server接收到SUBSCRIPTION請(qǐng)求時(shí),會(huì)將客戶端信息clientIdentity注冊(cè)到對(duì)應(yīng)instance

//embeddedServer.subscribe(clientIdentity)
    /**
     * 客戶端訂閱,重復(fù)訂閱時(shí)會(huì)更新對(duì)應(yīng)的filter信息
     */
    @Override
    public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
        checkStart(clientIdentity.getDestination());

        CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
        if (!canalInstance.getMetaManager().isStart()) {
            canalInstance.getMetaManager().start();
        }
//1.
        canalInstance.getMetaManager().subscribe(clientIdentity); // 執(zhí)行一下meta訂閱

        Position position = canalInstance.getMetaManager().getCursor(clientIdentity);
        if (position == null) {
            position = canalInstance.getEventStore().getFirstPosition();// 獲取一下store中的第一條
            if (position != null) {
//2.
                canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor
            }
            logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, position);
        } else {
            logger.info("subscribe successfully, use last cursor position:{} ", clientIdentity, position);
        }

        // 通知下訂閱關(guān)系變化
        canalInstance.subscribeChange(clientIdentity);
    }

主要邏輯為:

  1. 執(zhí)行一下meta訂閱。metaManager是負(fù)責(zé)管理客戶端消費(fèi)位點(diǎn)等信息,對(duì)于HA模式下,客戶端位點(diǎn),filter等信息會(huì)放到zk上,方便canal server切換時(shí)的共用。

//ZooKeeperMetaManager.subscribe
 public void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
        String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
            clientIdentity.getClientId());

        try {
            zkClientx.createPersistent(path, true);
        } catch (ZkNodeExistsException e) {
            // ignore
        }
//如果客戶端存在filter,則創(chuàng)建/otter/canal/destinations/{destination}/{clientId}/filter持久節(jié)點(diǎn),存放客戶端的filter信息。
        if (clientIdentity.hasFilter()) {
            String filterPath = ZookeeperPathUtils.getFilterPath(clientIdentity.getDestination(),
                clientIdentity.getClientId());

            byte[] bytes = null;
            try {
                bytes = clientIdentity.getFilter().getBytes(ENCODE);
            } catch (UnsupportedEncodingException e) {
                throw new CanalMetaManagerException(e);
            }

            try {
                zkClientx.createPersistent(filterPath, bytes);
            } catch (ZkNodeExistsException e) {
                // ignore
                zkClientx.writeData(filterPath, bytes);
            }
        }
    }

如果客戶端存在filter,則創(chuàng)建/otter/canal/destinations/{destination}/{clientId}/filter持久節(jié)點(diǎn),存放客戶端的filter信息。

2.如果canal server存在原來(lái)的位點(diǎn)信息,則通過(guò)后臺(tái)定時(shí)任務(wù)將位點(diǎn)信息刷新到/otter/canal/destinations/{destination}/{clientId}/cursor持久節(jié)點(diǎn)中。

總結(jié)一下:canal client的HA模式同樣是有臨時(shí)節(jié)點(diǎn)和節(jié)點(diǎn)listener watch保證。當(dāng)canal client與canal server建立連接前,會(huì)創(chuàng)建臨時(shí)節(jié)點(diǎn)/otter/canal/destinations/{destination}/{clientId}/running,創(chuàng)建成功的client在該節(jié)點(diǎn)下寫(xiě)入自己的IP,port,clientId信息,表示當(dāng)前該instance下激活的client是自己。同時(shí)每個(gè)canal client都會(huì)在節(jié)點(diǎn)上注冊(cè)dataListener,監(jiān)聽(tīng)節(jié)點(diǎn)數(shù)據(jù)變化負(fù)責(zé)客戶端的HA切換。激活的client會(huì)通過(guò)ClusterNodeAccessStrategy獲得zk上canal server的信息,得知當(dāng)前instance下的激活canal server,并與之建立連接。之后客戶端發(fā)送SUBSCRIPTION 請(qǐng)求給canal server,如果存在客戶端位點(diǎn),filter等信息,會(huì)注冊(cè)到zk上,方便canal server切換時(shí)的共用。

至此canal的HA模式分析完畢,下篇文章將分析canal工作過(guò)程中的binlog位點(diǎn)是如何確定的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容