最近在看canal源碼,有一些疑問(wèn)。比如canal的HA模式是怎么實(shí)現(xiàn)的,mysql dump的位點(diǎn)又是怎么確定的,canal客戶端是如何獲取數(shù)據(jù)和ack的,又是如何實(shí)現(xiàn)mysql主備切換的等等,針對(duì)這些疑問(wèn)我將輸出幾篇源碼分析,歡迎指正交流。本文是關(guān)于canal 服務(wù)端和客戶端的HA實(shí)現(xiàn)源碼分析。在此之前,建議大家對(duì)canal的整體架構(gòu)有所了解,可參考官方文檔。首先看下官方文檔中對(duì)HA機(jī)制的描述。
HA機(jī)制設(shè)計(jì)
canal的ha分為兩部分,canal server和canal client分別有對(duì)應(yīng)的ha實(shí)現(xiàn)。
canal server: 為了減少對(duì)mysql dump的請(qǐng)求,不同server上的instance要求同一時(shí)間只能有一個(gè)處于running,其他的處于standby狀態(tài).
canal client: 為了保證有序性,一份instance同一時(shí)間只能由一個(gè)canal client進(jìn)行g(shù)et/ack/rollback操作,否則客戶端接收無(wú)法保證有序。
整個(gè)HA機(jī)制的控制主要是依賴了zookeeper的幾個(gè)特性,watcher和EPHEMERAL節(jié)點(diǎn)(和session生命周期綁定)。

- canal server要啟動(dòng)某個(gè)canal instance時(shí)都先向zookeeper進(jìn)行一次嘗試啟動(dòng)判斷 (實(shí)現(xiàn):創(chuàng)建EPHEMERAL節(jié)點(diǎn),誰(shuí)創(chuàng)建成功就允許誰(shuí)啟動(dòng))
- 創(chuàng)建zookeeper節(jié)點(diǎn)成功后,對(duì)應(yīng)的canal server就啟動(dòng)對(duì)應(yīng)的canal instance,沒(méi)有創(chuàng)建成功的canal instance就會(huì)處于standby狀態(tài)
- 一旦zookeeper發(fā)現(xiàn)canal server A創(chuàng)建的節(jié)點(diǎn)消失后,立即通知其他的canal server再次進(jìn)行步驟1的操作,重新選出一個(gè)canal server啟動(dòng)instance.
- canal client每次進(jìn)行connect時(shí),會(huì)首先向zookeeper詢問(wèn)當(dāng)前是誰(shuí)啟動(dòng)了canal instance,然后和其建立鏈接,一旦鏈接不可用,會(huì)重新嘗試connect.
Canal Client的方式和canal server方式類(lèi)似,也是利用zookeeper的搶占EPHEMERAL節(jié)點(diǎn)的方式進(jìn)行控制.下文將開(kāi)始分析ha的源碼實(shí)現(xiàn)。
Canal Server HA實(shí)現(xiàn)
首先看下canal如何開(kāi)啟HA模式:
canal.properties中加入以下配置:
//指定注冊(cè)的zk地址
canal.zkServers =127.0.0.1:2181
//此配置會(huì)使用PeriodMixedMetaManager管理位點(diǎn),會(huì)把a(bǔ)ck位點(diǎn)注冊(cè)到zk節(jié)點(diǎn)上,當(dāng)failover時(shí)可從ack位點(diǎn)處重新消費(fèi)
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
然后我們?cè)诳碿anal-server啟動(dòng)是如何和zookeeper交互的,這部分主要是canal server端的HA實(shí)現(xiàn)。
啟動(dòng)入口為CanalLauncher的main方法,canal.serverMode = tcp模式下的實(shí)際啟動(dòng)類(lèi)為CanalController。
初始化CanalController時(shí),會(huì)初始化canal在zookeeper上的節(jié)點(diǎn)系統(tǒng)目錄。
public CanalController(final Properties properties){
······························省略與HA無(wú)關(guān)代碼····································
final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
if (StringUtils.isNotEmpty(zkServers)) {
zkclientx = ZkClientx.getZkClient(zkServers);
// 如果不存在以下目錄,則初始化系統(tǒng)目錄
// /otter/canal/destinations:用于存放instance信息
zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
// /otter/canal/cluster:用于存放canal-server節(jié)點(diǎn)信息
zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
}
······························省略與HA無(wú)關(guān)代碼····································
}
啟動(dòng)CanalController時(shí),
public void start() throws Throwable {
logger.info("## start the canal server[{}:{}]", ip, port);
// 創(chuàng)建整個(gè)canal的工作節(jié)點(diǎn)
final String path = ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port);
//1.
initCid(path);
if (zkclientx != null) {
//2.
this.zkclientx.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
}
public void handleNewSession() throws Exception {
initCid(path);
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
logger.error("failed to connect to zookeeper", error);
}
});
}
// 優(yōu)先啟動(dòng)embeded服務(wù)
embededCanalServer.start();
// 嘗試啟動(dòng)一下非lazy狀態(tài)的通道
for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
final String destination = entry.getKey();
InstanceConfig config = entry.getValue();
// 創(chuàng)建destination的工作節(jié)點(diǎn)
if (!embededCanalServer.isStart(destination)) {
// 3.HA機(jī)制啟動(dòng)
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (!config.getLazy() && !runningMonitor.isStart()) {
runningMonitor.start();
}
}
// 啟動(dòng)網(wǎng)絡(luò)接口
if (canalServer != null) {
canalServer.start();
}
}
1.會(huì)在/otter/canal/cluster節(jié)點(diǎn)下創(chuàng)建"ip:port"臨時(shí)節(jié)點(diǎn),如/otter/canal/cluster/10.33.200.132:11111
2.注冊(cè)IZkStateListener,用來(lái)監(jiān)聽(tīng)和zk的連接狀態(tài)變化,這樣當(dāng)會(huì)話過(guò)期后重新建立新會(huì)話時(shí)再次創(chuàng)建"ip:port"臨時(shí)節(jié)點(diǎn)。
3.HA機(jī)制啟動(dòng)。對(duì)于每一個(gè)instance,都會(huì)在/otter/canal/destinations節(jié)點(diǎn)下記錄自己的canal-server和canal-client信息。每個(gè)canal-server對(duì)每個(gè)instance的管理是交給ServerRunningMonitor類(lèi)的。
3.1.ServerRunningMonitor的初始化,調(diào)用ServerRunningMonitors.getRunningMonitor(destination)時(shí)如果為null,便會(huì)調(diào)用ServerRunningMonitor.apply方法。
final ServerRunningData serverData = new ServerRunningData(cid, ip + ":" + port);
//設(shè)置canal-server信息
ServerRunningMonitors.setServerData(serverData);
ServerRunningMonitors
.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {
public ServerRunningMonitor apply(final String destination) {
ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
//設(shè)置intance的名字
runningMonitor.setDestination(destination);
runningMonitor.setListener(new ServerRunningListener() {
public void processActiveEnter() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
//啟動(dòng)intance
embededCanalServer.start(destination);
if (canalMQStarter != null) {
canalMQStarter.startDestination(destination);
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
public void processActiveExit() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
if (canalMQStarter != null) {
canalMQStarter.stopDestination(destination);
}
embededCanalServer.stop(destination);
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
public void processStart() {
try {
if (zkclientx != null) {
final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
ip + ":" + port);
initCid(path);
zkclientx.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
}
public void handleNewSession() throws Exception {
initCid(path);
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
logger.error("failed to connect to zookeeper", error);
}
});
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
public void processStop() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
if (zkclientx != null) {
final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
ip + ":" + port);
releaseCid(path);
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
});
if (zkclientx != null) {
runningMonitor.setZkClient(zkclientx);
}
// 觸發(fā)創(chuàng)建一下cid節(jié)點(diǎn)
runningMonitor.init();
return runningMonitor;
}
}));
3.2 ServerRunningMonitor的啟動(dòng)
public synchronized void start() {
super.start();
try {
//3.2.1
processStart();
if (zkClient != null) {
//3.2.2 如果需要盡可能釋放instance資源,不需要監(jiān)聽(tīng)running節(jié)點(diǎn),不然即使stop了這臺(tái)機(jī)器,另一臺(tái)機(jī)器立馬會(huì)start
// /otter/canal/destinations/{destination}/running 節(jié)點(diǎn)
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
zkClient.subscribeDataChanges(path, dataListener);
initRunning();
} else {
processActiveEnter();// 沒(méi)有zk,直接啟動(dòng)
}
} catch (Exception e) {
logger.error("start failed", e);
// 沒(méi)有正常啟動(dòng),重置一下?tīng)顟B(tài),避免干擾下一次start
stop();
}
}
3.2.1 調(diào)用processStart方法,這里會(huì)在/otter/canal/destinations/{destination}/cluster 節(jié)點(diǎn)下注冊(cè)IZkStateListener,用來(lái)監(jiān)聽(tīng)和zk的連接狀態(tài)變化,同時(shí)創(chuàng)建"ip:port"臨時(shí)節(jié)點(diǎn)。這個(gè)臨時(shí)節(jié)點(diǎn)主要是用來(lái)給canal-client提供可用canal-server節(jié)點(diǎn)列表使用。
3.2.2 在/otter/canal/destinations/{destination}/running 節(jié)點(diǎn)下注冊(cè)dataListener,用來(lái)監(jiān)聽(tīng)該節(jié)點(diǎn)的數(shù)據(jù)增刪改變化。
3.3.3初始化instance下的canal-server信息。
private void initRunning() {
if (!isStart()) {
return;
}
String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
// 序列化
byte[] bytes = JsonUtils.marshalToByte(serverData);
try {
mutex.set(false);
zkClient.create(path, bytes, CreateMode.EPHEMERAL);
activeData = serverData;
processActiveEnter();// 觸發(fā)一下事件
mutex.set(true);
} catch (ZkNodeExistsException e) {
bytes = zkClient.readData(path, true);
if (bytes == null) {// 如果不存在節(jié)點(diǎn),立即嘗試一次
initRunning();
} else {
activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
}
} catch (ZkNoNodeException e) {
zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 嘗試創(chuàng)建父節(jié)點(diǎn)
initRunning();
}
}
創(chuàng)建/otter/canal/destinations/{destination}/running 臨時(shí)節(jié)點(diǎn),不能創(chuàng)建成功會(huì)拋出ZkNodeExistsException異常,表明這個(gè)instance已經(jīng)有其他canal-server負(fù)責(zé)binlog同步,此時(shí)會(huì)讀取該臨時(shí)節(jié)點(diǎn)數(shù)據(jù),記錄下來(lái)為其服務(wù)的canal-server節(jié)點(diǎn)數(shù)據(jù)到activeData中。
能創(chuàng)建成功表明這個(gè)instance由當(dāng)前canal-server負(fù)責(zé)binlog同步,調(diào)用processActiveEnter方法啟動(dòng)這個(gè)instance。
所以/otter/canal/destinations/{destination}/running 臨時(shí)節(jié)點(diǎn) 表示當(dāng)前為該instance服務(wù)的canal-server節(jié)點(diǎn)是誰(shuí)。如果canal-server與zk連接超時(shí),會(huì)導(dǎo)致該臨時(shí)節(jié)點(diǎn)被刪除。此時(shí)每個(gè)canal-server注冊(cè)在該節(jié)點(diǎn)上的dataListener便會(huì)監(jiān)聽(tīng)到這一變化,做主備切換之類(lèi)的操作。
//初始化ServerRunningMonitor時(shí)會(huì)初始化dataListener
public ServerRunningMonitor(){
// 創(chuàng)建父節(jié)點(diǎn)
dataListener = new IZkDataListener() {
public void handleDataChange(String dataPath, Object data) throws Exception {
MDC.put("destination", destination);
ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
if (!isMine(runningData.getAddress())) {
mutex.set(false);
}
if (!runningData.isActive() && isMine(runningData.getAddress())) { // 說(shuō)明出現(xiàn)了主動(dòng)釋放的操作,并且本機(jī)之前是active
release = true;
releaseRunning();// 徹底釋放mainstem
}
activeData = (ServerRunningData) runningData;
}
public void handleDataDeleted(String dataPath) throws Exception {
MDC.put("destination", destination);
mutex.set(false);
if (!release && activeData != null && isMine(activeData.getAddress())) {
// 如果上一次active的狀態(tài)就是本機(jī),則即時(shí)觸發(fā)一下active搶占
initRunning();
} else {
// 否則就是等待delayTime,避免因網(wǎng)絡(luò)瞬端或者zk異常,導(dǎo)致出現(xiàn)頻繁的切換操作
delayExector.schedule(new Runnable() {
public void run() {
initRunning();
}
}, delayTime, TimeUnit.SECONDS);
}
}
};
}
可以看到,監(jiān)聽(tīng)了/otter/canal/destinations/{destination}/running 臨時(shí)節(jié)點(diǎn)的節(jié)點(diǎn)刪除handleDataDeleted和節(jié)點(diǎn)數(shù)據(jù)變化handleDataChange兩個(gè)事件。
當(dāng)節(jié)點(diǎn)被刪除時(shí),如果上一次active的狀態(tài)就是本機(jī),則調(diào)用initRunning即時(shí)觸發(fā)一下active搶占。否則就是等待delayTime之后在搶占,避免因網(wǎng)絡(luò)瞬端或者zk異常,導(dǎo)致出現(xiàn)頻繁的切換操作。
當(dāng)節(jié)點(diǎn)修改時(shí),主要是記錄下此時(shí)/otter/canal/destinations/{destination}/running 臨時(shí)節(jié)點(diǎn)下激活的canal-server是誰(shuí)到activeData中。如果出現(xiàn)了主動(dòng)釋放,則徹底釋放instance。(刪除zk上該臨時(shí)節(jié)點(diǎn),關(guān)閉instance,沒(méi)有在代碼中看到release=false的情況)。
總結(jié)一下:/otter/canal/cluster節(jié)點(diǎn)下的臨時(shí)子節(jié)點(diǎn)代表當(dāng)前有多少個(gè)正常運(yùn)行的canal-server。/otter/canal/destinations/{destination}/cluster代表當(dāng)前instance下有多少可用的canal server。/otter/canal/destinations/{destination}/running 臨時(shí)節(jié)點(diǎn)下的數(shù)據(jù)代表當(dāng)前instance的激活canal-server是誰(shuí),每個(gè)正常運(yùn)行的canal-server都會(huì)在/otter/canal/destinations/{destination}/running 臨時(shí)節(jié)點(diǎn)下注冊(cè)dataListener,用于及時(shí)做HA切換。
Canal Client HA實(shí)現(xiàn)
canal中一個(gè)instance只能由一個(gè) client消費(fèi),接下來(lái)看一下canal-client的HA是如何通過(guò)zk實(shí)現(xiàn)的。官方給出了client test類(lèi)ClusterCanalClientTest,核心代碼就是完成了官方文檔中增量訂閱和消費(fèi)過(guò)程:
// 基于zookeeper動(dòng)態(tài)獲取canal server的地址,建立鏈接,其中一臺(tái)server發(fā)生crash,可以支持failover
CanalConnector connector = CanalConnectors.newClusterConnector("127.0.0.1:2181", destination, "", "");
protected void process() {
int batchSize = 5 * 1024;
while (running) {
try {
MDC.put("destination", destination);
//1.建立連接
connector.connect();
//2.客戶端訂閱,不提交客戶端filter,以服務(wù)端的filter為準(zhǔn)
connector.subscribe();
while (running) {
//3. 不指定 position 獲取事件,該方法返回的條件: 嘗試拿batchSize條記錄,有多少取多少,不會(huì)阻塞等待
// canal 會(huì)記住此 client 最新的position。
// 如果是第一次 fetch,則會(huì)從 canal 中保存的最老一條數(shù)據(jù)開(kāi)始輸出。
Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// }
} else {
printSummary(message, batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交確認(rèn)
// connector.rollback(batchId); // 處理失敗, 回滾數(shù)據(jù)
}
} catch (Exception e) {
logger.error("process error!", e);
} finally {
connector.disconnect();
MDC.remove("destination");
}
}
}
1.在canal client和canal server建立連接時(shí),會(huì)從zookeeper上獲取cluster和running節(jié)點(diǎn)下信息,并通過(guò)ClusterNodeAccessStrategy來(lái)注冊(cè)listeners實(shí)時(shí)感知這些數(shù)據(jù)的變化,從而支持連接重試的failover。
public void connect() throws CanalClientException {
while (currentConnector == null) {
int times = 0;
while (true) {
try {
currentConnector = new SimpleCanalConnector(null, username, password, destination) {
@Override
public SocketAddress getNextAddress() {
return accessStrategy.nextNode();
}
};
currentConnector.setSoTimeout(soTimeout);
currentConnector.setIdleTimeout(idleTimeout);
if (filter != null) {
currentConnector.setFilter(filter);
}
if (accessStrategy instanceof ClusterNodeAccessStrategy) {
//1.1
currentConnector.setZkClientx(((ClusterNodeAccessStrategy) accessStrategy).getZkClient());
}
currentConnector.connect();
break;
} catch (Exception e) {
logger.warn("failed to connect to:{} after retry {} times", accessStrategy.currentNode(), times);
currentConnector.disconnect();
currentConnector = null;
// retry for #retryTimes for each node when trying to
// connect to it.
times = times + 1;
if (times >= retryTimes) {
throw new CanalClientException(e);
} else {
// fixed issue #55,增加sleep控制,避免重試connect時(shí)cpu使用過(guò)高
try {
Thread.sleep(retryInterval);
} catch (InterruptedException e1) {
throw new CanalClientException(e1);
}
}
}
}
}
}
1.1 設(shè)置當(dāng)前connector的zkClient,并在zk上初始化客戶端信息。
public void setZkClientx(ZkClientx zkClientx) {
this.zkClientx = zkClientx;
initClientRunningMonitor(this.clientIdentity);
}
private synchronized void initClientRunningMonitor(ClientIdentity clientIdentity) {
if (zkClientx != null && clientIdentity != null && runningMonitor == null) {
ClientRunningData clientData = new ClientRunningData();
clientData.setClientId(clientIdentity.getClientId());
clientData.setAddress(AddressUtils.getHostIp());
runningMonitor = new ClientRunningMonitor();
runningMonitor.setDestination(clientIdentity.getDestination());
runningMonitor.setZkClient(zkClientx);
runningMonitor.setClientData(clientData);
runningMonitor.setListener(new ClientRunningListener() {
public InetSocketAddress processActiveEnter() {
InetSocketAddress address = doConnect();
mutex.set(true);
if (filter != null) { // 如果存在條件,說(shuō)明是自動(dòng)切換,基于上一次的條件訂閱一次
subscribe(filter);
}
if (rollbackOnConnect) {
rollback();
}
return address;
}
public void processActiveExit() {
mutex.set(false);
doDisconnect();
}
});
}
}
ClientRunningMonitor 類(lèi)似于ServerRunningMonitor,是客戶端對(duì)instance的管理。在真正建立連接currentConnector.connect();時(shí),會(huì)啟動(dòng)ClientRunningMonitor
public void connect() throws CanalClientException {
if (connected) {
return;
}
if (runningMonitor != null) {
if (!runningMonitor.isStart()) {
//啟動(dòng)ClientRunningMonitor
runningMonitor.start();
}
} else {
waitClientRunning();
if (!running) {
return;
}
//從ClusterNodeAccessStrategy中選擇當(dāng)前instance正在工作的canal server進(jìn)行連接
doConnect();
if (filter != null) { // 如果存在條件,說(shuō)明是自動(dòng)切換,基于上一次的條件訂閱一次
subscribe(filter);
}
if (rollbackOnConnect) {
rollback();
}
}
connected = true;
}
可以看到與canal server建立連接前會(huì)啟動(dòng)ClientRunningMonitor,獲取消費(fèi)instance 的權(quán)利。
//ClientRunningMonitor.start
public void start() {
super.start();
String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
zkClient.subscribeDataChanges(path, dataListener);
initRunning();
}
與serverRunningMonitor 原理一樣,在代表instance客戶端的running節(jié)點(diǎn)/otter/canal/destinations/{destination}/{clientId}/running下注冊(cè)dataListener,由dataListener監(jiān)聽(tīng)節(jié)點(diǎn)數(shù)據(jù)變化負(fù)責(zé)客戶端的HA切換。
public ClientRunningMonitor(){
dataListener = new IZkDataListener() {
public void handleDataChange(String dataPath, Object data) throws Exception {
MDC.put("destination", destination);
ClientRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ClientRunningData.class);
if (!isMine(runningData.getAddress())) {
mutex.set(false);
}
if (!runningData.isActive() && isMine(runningData.getAddress())) { // 說(shuō)明出現(xiàn)了主動(dòng)釋放的操作,并且本機(jī)之前是active
release = true;
releaseRunning();// 徹底釋放mainstem
}
activeData = (ClientRunningData) runningData;
}
public void handleDataDeleted(String dataPath) throws Exception {
MDC.put("destination", destination);
mutex.set(false);
// 觸發(fā)一下退出,可能是人為干預(yù)的釋放操作或者網(wǎng)絡(luò)閃斷引起的session expired timeout
processActiveExit();
if (!release && activeData != null && isMine(activeData.getAddress())) {
// 如果上一次active的狀態(tài)就是本機(jī),則即時(shí)觸發(fā)一下active搶占
initRunning();
} else {
// 否則就是等待delayTime,避免因網(wǎng)絡(luò)瞬端或者zk異常,導(dǎo)致出現(xiàn)頻繁的切換操作
delayExector.schedule(new Runnable() {
public void run() {
initRunning();
}
}, delayTime, TimeUnit.SECONDS);
}
}
};
}
這個(gè)臨時(shí)節(jié)點(diǎn)的創(chuàng)建過(guò)程在ClientRunningMonitor.initRunning中,創(chuàng)建臨時(shí)節(jié)點(diǎn)成功才能與canal server建立連接。臨時(shí)節(jié)點(diǎn)寫(xiě)入信息為客戶端的IP,port和clientId信息。
// 1,在方法上加synchronized關(guān)鍵字,保證同步順序執(zhí)行;
// 2,判斷Zk上已經(jīng)存在的activeData是否是本機(jī),是的話把mutex重置為true,否則會(huì)導(dǎo)致死鎖
// 3,增加異常處理,保證出現(xiàn)異常時(shí),running節(jié)點(diǎn)能被刪除,否則會(huì)導(dǎo)致死鎖
public synchronized void initRunning() {
if (!isStart()) {
return;
}
String path = ZookeeperPathUtils.getDestinationClientRunning(this.destination, clientData.getClientId());
// 序列化
byte[] bytes = JsonUtils.marshalToByte(clientData);
try {
mutex.set(false);
zkClient.create(path, bytes, CreateMode.EPHEMERAL);
processActiveEnter();// 觸發(fā)一下事件,建立和canal server的連接
activeData = clientData;
mutex.set(true);
} catch (ZkNodeExistsException e) {
bytes = zkClient.readData(path, true);
if (bytes == null) {// 如果不存在節(jié)點(diǎn),立即嘗試一次
initRunning();
} else {
activeData = JsonUtils.unmarshalFromByte(bytes, ClientRunningData.class);
// 如果發(fā)現(xiàn)已經(jīng)存在,判斷一下是否自己,避免活鎖
if (activeData.getAddress().contains(":") && isMine(activeData.getAddress())) {
mutex.set(true);
}
}
} catch (ZkNoNodeException e) {
zkClient.createPersistent(ZookeeperPathUtils.getClientIdNodePath(this.destination, clientData.getClientId()),
true); // 嘗試創(chuàng)建父節(jié)點(diǎn)
initRunning();
} catch (Throwable t) {
logger.error(MessageFormat.format("There is an error when execute initRunning method, with destination [{0}].",
destination),
t);
// fixed issue 1220, 針對(duì)server節(jié)點(diǎn)不工作避免死循環(huán)
if (t instanceof ServerNotFoundException) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
// 出現(xiàn)任何異常嘗試release
releaseRunning();
throw new CanalClientException("something goes wrong in initRunning method. ", t);
}
}
- 客戶端與服務(wù)端建立連接后,會(huì)發(fā)送SUBSCRIPTION 請(qǐng)求給服務(wù)端。subscribe主要是告訴canal server需要按照什么過(guò)濾條件來(lái)過(guò)濾庫(kù)中的binlog信息,同時(shí)將當(dāng)前clientIdentity告訴服務(wù)端。因?yàn)橐粋€(gè)instance只能對(duì)應(yīng)一個(gè)client,所以clientIdentity統(tǒng)一初始化為:
this.clientIdentity = new ClientIdentity(destination, (short) 1001);
public ClientIdentity(String destination, short clientId){
this.clientId = clientId;
this.destination = destination;
}
public void subscribe(String filter) throws CanalClientException {
waitClientRunning();
if (!running) {
return;
}
try {
writeWithHeader(Packet.newBuilder()
.setType(PacketType.SUBSCRIPTION)
.setBody(Sub.newBuilder()
.setDestination(clientIdentity.getDestination())
.setClientId(String.valueOf(clientIdentity.getClientId()))
.setFilter(filter != null ? filter : "")
.build()
.toByteString())
.build()
.toByteArray());
//
Packet p = Packet.parseFrom(readNextPacket());
Ack ack = Ack.parseFrom(p.getBody());
if (ack.getErrorCode() > 0) {
throw new CanalClientException("failed to subscribe with reason: " + ack.getErrorMessage());
}
clientIdentity.setFilter(filter);
} catch (IOException e) {
throw new CanalClientException(e);
}
}
當(dāng)canal server接收到SUBSCRIPTION請(qǐng)求時(shí),會(huì)將客戶端信息clientIdentity注冊(cè)到對(duì)應(yīng)instance
//embeddedServer.subscribe(clientIdentity)
/**
* 客戶端訂閱,重復(fù)訂閱時(shí)會(huì)更新對(duì)應(yīng)的filter信息
*/
@Override
public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
checkStart(clientIdentity.getDestination());
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
if (!canalInstance.getMetaManager().isStart()) {
canalInstance.getMetaManager().start();
}
//1.
canalInstance.getMetaManager().subscribe(clientIdentity); // 執(zhí)行一下meta訂閱
Position position = canalInstance.getMetaManager().getCursor(clientIdentity);
if (position == null) {
position = canalInstance.getEventStore().getFirstPosition();// 獲取一下store中的第一條
if (position != null) {
//2.
canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor
}
logger.info("subscribe successfully, {} with first position:{} ", clientIdentity, position);
} else {
logger.info("subscribe successfully, use last cursor position:{} ", clientIdentity, position);
}
// 通知下訂閱關(guān)系變化
canalInstance.subscribeChange(clientIdentity);
}
主要邏輯為:
- 執(zhí)行一下meta訂閱。metaManager是負(fù)責(zé)管理客戶端消費(fèi)位點(diǎn)等信息,對(duì)于HA模式下,客戶端位點(diǎn),filter等信息會(huì)放到zk上,方便canal server切換時(shí)的共用。
//ZooKeeperMetaManager.subscribe
public void subscribe(ClientIdentity clientIdentity) throws CanalMetaManagerException {
String path = ZookeeperPathUtils.getClientIdNodePath(clientIdentity.getDestination(),
clientIdentity.getClientId());
try {
zkClientx.createPersistent(path, true);
} catch (ZkNodeExistsException e) {
// ignore
}
//如果客戶端存在filter,則創(chuàng)建/otter/canal/destinations/{destination}/{clientId}/filter持久節(jié)點(diǎn),存放客戶端的filter信息。
if (clientIdentity.hasFilter()) {
String filterPath = ZookeeperPathUtils.getFilterPath(clientIdentity.getDestination(),
clientIdentity.getClientId());
byte[] bytes = null;
try {
bytes = clientIdentity.getFilter().getBytes(ENCODE);
} catch (UnsupportedEncodingException e) {
throw new CanalMetaManagerException(e);
}
try {
zkClientx.createPersistent(filterPath, bytes);
} catch (ZkNodeExistsException e) {
// ignore
zkClientx.writeData(filterPath, bytes);
}
}
}
如果客戶端存在filter,則創(chuàng)建/otter/canal/destinations/{destination}/{clientId}/filter持久節(jié)點(diǎn),存放客戶端的filter信息。
2.如果canal server存在原來(lái)的位點(diǎn)信息,則通過(guò)后臺(tái)定時(shí)任務(wù)將位點(diǎn)信息刷新到/otter/canal/destinations/{destination}/{clientId}/cursor持久節(jié)點(diǎn)中。
總結(jié)一下:canal client的HA模式同樣是有臨時(shí)節(jié)點(diǎn)和節(jié)點(diǎn)listener watch保證。當(dāng)canal client與canal server建立連接前,會(huì)創(chuàng)建臨時(shí)節(jié)點(diǎn)/otter/canal/destinations/{destination}/{clientId}/running,創(chuàng)建成功的client在該節(jié)點(diǎn)下寫(xiě)入自己的IP,port,clientId信息,表示當(dāng)前該instance下激活的client是自己。同時(shí)每個(gè)canal client都會(huì)在節(jié)點(diǎn)上注冊(cè)dataListener,監(jiān)聽(tīng)節(jié)點(diǎn)數(shù)據(jù)變化負(fù)責(zé)客戶端的HA切換。激活的client會(huì)通過(guò)ClusterNodeAccessStrategy獲得zk上canal server的信息,得知當(dāng)前instance下的激活canal server,并與之建立連接。之后客戶端發(fā)送SUBSCRIPTION 請(qǐng)求給canal server,如果存在客戶端位點(diǎn),filter等信息,會(huì)注冊(cè)到zk上,方便canal server切換時(shí)的共用。
至此canal的HA模式分析完畢,下篇文章將分析canal工作過(guò)程中的binlog位點(diǎn)是如何確定的。