前言
???????? 上篇文章給大家講解了如何安裝一個Canal,以及講解了一部分的原理,今天我們就來深度聊一聊Canal的工作流程,以及他是怎么工作的,以及架構(gòu)師怎樣的。?????????
? ? ? ? ? ? 首先我們深度了解Canal時必須深度了解了一下MySQL主從復(fù)制原理。
一、MySQL主從復(fù)制

- MySQL master
將數(shù)據(jù)變更寫入二進制日志( binary log, 其中記錄叫做二進制日志事件 ?log events,可以通過show binlog events進行查看) - MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
- MySQL slave 重放 relay log 中事件,將數(shù)據(jù)變更反映它自己的數(shù)據(jù),以此來達到數(shù)據(jù)一致。
MySQL的binLog?????????
? ? ? ?它記錄了所有的DDL和DML(除了數(shù)據(jù)查詢語句)語句,以事件形式記錄,還包含語句所執(zhí)行的消耗的時間。主要用來備份和數(shù)據(jù)同步。binlog 有三種:STATEMENT、ROW、MIXED
- STATEMENT 記錄的是執(zhí)行的sql語句
- ROW 記錄的是真實的行數(shù)據(jù)記錄
- MIXED 記錄的是1+2,優(yōu)先按照1的模式記錄
名詞解釋:什么是中繼日志
???????? 從服務(wù)器I/O線程將主服務(wù)器的二進制日志讀取過來記錄到從服務(wù)器本地文件,然后從服務(wù)器SQL線程會讀取relay-log日志的內(nèi)容并應(yīng)用到從服務(wù)器,從而使從服務(wù)器和主服務(wù)器的數(shù)據(jù)保持一致
二、Canal架構(gòu)

- server 代表一個 canal 運行實例,對應(yīng)于一個 jvm
- instance 對應(yīng)于一個數(shù)據(jù)隊列 (1個 canal server 對應(yīng) 1..n 個 instance )
- instance 下的子模塊
- eventParser: 數(shù)據(jù)源接入,模擬 slave 協(xié)議和 master 進行交互,協(xié)議解析
- eventSink: Parser 和 Store 鏈接器,進行數(shù)據(jù)過濾,加工,分發(fā)的工作
- eventStore: 數(shù)據(jù)存儲
- metaManager: 增量訂閱 & 消費信息管理器

- EventSink起到一個類似channel的功能,可以對數(shù)據(jù)進行過濾、分發(fā)/路由(1:n)、歸并(n:1)和加工。EventSink是連接EventParser和EventStore的橋梁。
- EventStore實現(xiàn)模式是內(nèi)存模式,內(nèi)存結(jié)構(gòu)為環(huán)形隊列,由三個指針(Put、Get和Ack)標識數(shù)據(jù)存儲和讀取的位置。
- MetaManager是增量訂閱&消費信息管理器,增量訂閱和消費之間的協(xié)議包括get/ack/rollback,分別為:
- Message getWithoutAck(int batchSize),允許指定batchSize,一次可以獲取多條,每次返回的對象為Message,包含的內(nèi)容為:batch id[唯一標識]和entries[具體的數(shù)據(jù)對象]
- void rollback(long batchId),顧名思義,回滾上次的get請求,重新獲取數(shù)據(jù)?;趃et獲取的batchId進行提交,避免誤操作
- void ack(long batchId),顧名思議,確認已經(jīng)消費成功,通知server刪除數(shù)據(jù)。基于get獲取的batchId進行提交,避免誤操作
三、server/client交互協(xié)議
???????? canal client與canal server之間是C/S模式的通信,客戶端采用NIO,服務(wù)端采用Netty。canal server啟動后,如果沒有canal client,那么canal server不會去mysql拉取binlog。即Canal客戶端主動發(fā)起拉取請求,服務(wù)端才會模擬一個MySQL Slave節(jié)點去主節(jié)點拉取binlog。通常Canal客戶端是一個死循環(huán),這樣客戶端一直調(diào)用get方法,服務(wù)端也就會一直拉取binlog
BIO、NIO、AIO的區(qū)別
IO的方式通常分為幾種,同步阻塞的BIO、同步非阻塞的NIO、異步非阻塞的AIO。


異步非阻塞IO:在此種模式下,用戶進程只需要發(fā)起一個IO操作然后立即返回,等IO操作真正的完成以后,應(yīng)用程序會得到IO操作完成的通知,此時用戶進程只需要對數(shù)據(jù)進行處理就好了,不需要進行實際的IO讀寫操作,因為真正的IO讀取或者寫入操作已經(jīng)由內(nèi)核完成了。目前Java中還沒有支持此種IO模型。

- handshake,
- ClientAuthentication。
canal client調(diào)用subscribe()方法,類型為[subscription]。對應(yīng)服務(wù)端采用netty處理RPC請求(CanalServerWithNetty):
public?class?CanalServerWithNetty?extends?AbstractCanalLifeCycle?implements?CanalServer?{
????public?void?start()?{
????????bootstrap.setPipelineFactory(new?ChannelPipelineFactory()?{
????????????public?ChannelPipeline?getPipeline()?throws?Exception?{
????????????????ChannelPipeline?pipelines?=?Channels.pipeline();
????????????????pipelines.addLast(FixedHeaderFrameDecoder.class.getName(),?new?FixedHeaderFrameDecoder());
????????????????//?處理客戶端的HANDSHAKE請求
????????????????pipelines.addLast(HandshakeInitializationHandler.class.getName(),
????????????????????new?HandshakeInitializationHandler(childGroups));
????????????????//?處理客戶端的CLIENTAUTHENTICATION請求
????????????????pipelines.addLast(ClientAuthenticationHandler.class.getName(),
????????????????????new?ClientAuthenticationHandler(embeddedServer));
????????????????//?處理客戶端的會話請求,包括SUBSCRIPTION,GET等
????????????????SessionHandler?sessionHandler?=?new?SessionHandler(embeddedServer);
????????????????pipelines.addLast(SessionHandler.class.getName(),?sessionHandler);
????????????????return?pipelines;
????????????}
????????});
????}
}ClientAuthenticationHandler處理鑒權(quán)后,會移除HandshakeInitializationHandler和ClientAuthenticationHandler。最重要的是會話處理器SessionHandler。
以client發(fā)送GET,server從mysql得到binlog后,返回MESSAGES給client為例,說明client和server的rpc交互過程:
SimpleCanalConnector發(fā)送GET請求,并讀取響應(yīng)結(jié)果的流程:
public?Message?getWithoutAck(int?batchSize,?Long?timeout,?TimeUnit?unit)?throws?CanalClientException?{
????waitClientRunning();
????int?size?=?(batchSize?<=?0)???1000?:?batchSize;
????long?time?=?(timeout?==?null?||?timeout?<?0)???-1?:?timeout;?//?-1代表不做timeout控制
????if?(unit?==?null)?unit?=?TimeUnit.MILLISECONDS;??//默認是毫秒
????//?client發(fā)送GET請求
????writeWithHeader(Packet.newBuilder()
????????.setType(PacketType.GET)
????????.setBody(Get.newBuilder()
????????????.setAutoAck(false)
????????????.setDestination(clientIdentity.getDestination())
????????????.setClientId(String.valueOf(clientIdentity.getClientId()))
????????????.setFetchSize(size)
????????????.setTimeout(time)
????????????.setUnit(unit.ordinal())
????????????.build()
????????????.toByteString())
????????.build()
????????.toByteArray());
????//?client獲取GET結(jié)果????
????return?receiveMessages();
}
private?Message?receiveMessages()?throws?IOException?{
????//?讀取server發(fā)送的數(shù)據(jù)包
????Packet?p?=?Packet.parseFrom(readNextPacket());
????switch?(p.getType())?{
????????case?MESSAGES:?{
????????????Messages?messages?=?Messages.parseFrom(p.getBody());
????????????Message?result?=?new?Message(messages.getBatchId());
????????????for?(ByteString?byteString?:?messages.getMessagesList())?{
????????????????result.addEntry(Entry.parseFrom(byteString));
????????????}
????????????return?result;
????????}
????}
}服務(wù)端SessionHandler處理客戶端發(fā)送的GET請求流程:
case?GET:
????//?讀取客戶端發(fā)送的數(shù)據(jù)包,封裝為Get對象
????Get?get?=?CanalPacket.Get.parseFrom(packet.getBody());
????//?destination表示canal?instance
????if?(StringUtils.isNotEmpty(get.getDestination())?&&?StringUtils.isNotEmpty(get.getClientId()))?{
????????clientIdentity?=?new?ClientIdentity(get.getDestination(),?Short.valueOf(get.getClientId()));
????????Message?message?=?null;
????????if?(get.getTimeout()?==?-1)?{//?是否是初始值
????????????message?=?embeddedServer.getWithoutAck(clientIdentity,?get.getFetchSize());
????????}?else?{
????????????TimeUnit?unit?=?convertTimeUnit(get.getUnit());
????????????message?=?embeddedServer.getWithoutAck(clientIdentity,?get.getFetchSize(),?get.getTimeout(),?unit);
????????}
????????//?設(shè)置返回給客戶端的數(shù)據(jù)包類型為MESSAGES???
????????Packet.Builder?packetBuilder?=?CanalPacket.Packet.newBuilder();
????????packetBuilder.setType(PacketType.MESSAGES);
????????//?構(gòu)造Message
????????Messages.Builder?messageBuilder?=?CanalPacket.Messages.newBuilder();
????????messageBuilder.setBatchId(message.getId());
????????if?(message.getId()?!=?-1?&&?!CollectionUtils.isEmpty(message.getEntries()))?{
????????????for?(Entry?entry?:?message.getEntries())?{
????????????????messageBuilder.addMessages(entry.toByteString());
????????????}
????????}
????????packetBuilder.setBody(messageBuilder.build().toByteString());
????????//?輸出數(shù)據(jù),返回給客戶端
????????NettyUtils.write(ctx.getChannel(),?packetBuilder.build().toByteArray(),?null);
????}具體的網(wǎng)絡(luò)協(xié)議格式,可參見:CanalProtocol.proto
get/ack/rollback協(xié)議介紹:
- Message getWithoutAck(int batchSize)
- batch id 唯一標識
- entries 具體的數(shù)據(jù)對象,對應(yīng)的數(shù)據(jù)對象格式:EntryProtocol.proto
- 允許指定batchSize,一次可以獲取多條,每次返回的對象為Message,包含的內(nèi)容為:
- getWithoutAck(int batchSize, Long timeout, TimeUnit unit)
- 拿夠batchSize條記錄或者超過timeout時間
- timeout=0,阻塞等到足夠的batchSize
- 相比于getWithoutAck(int batchSize),允許設(shè)定獲取數(shù)據(jù)的timeout超時時間
- void rollback(long batchId)
- 回滾上次的get請求,重新獲取數(shù)據(jù)。基于get獲取的batchId進行提交,避免誤操作
- void ack(long batchId)
- 確認已經(jīng)消費成功,通知server刪除數(shù)據(jù)?;趃et獲取的batchId進行提交,避免誤操作
EntryProtocol.protod對應(yīng)的canal消息結(jié)構(gòu)如下:
Entry??
????Header??
????????logfileName?[binlog文件名]??
????????logfileOffset?[binlog?position]??
????????executeTime?[binlog里記錄變更發(fā)生的時間戳,精確到秒]??
????????schemaName???
????????tableName??
????????eventType?[insert/update/delete類型]??
????entryType???[事務(wù)頭BEGIN/事務(wù)尾END/數(shù)據(jù)ROWDATA]??
????storeValue??[byte數(shù)據(jù),可展開,對應(yīng)的類型為RowChange]??
??????
RowChange??
????isDdl???????[是否是ddl變更操作,比如create?table/drop?table]??
????sql?????????[具體的ddl?sql]??
????rowDatas????[具體insert/update/delete的變更數(shù)據(jù),可為多條,1個binlog?event事件可對應(yīng)多條變更,比如批處理]??
????????beforeColumns?[Column類型的數(shù)組,變更前的數(shù)據(jù)字段]??
????????afterColumns?[Column類型的數(shù)組,變更后的數(shù)據(jù)字段]??
??????????
Column???
????index?????????
????sqlType?????[jdbc?type]??
????name????????[column?name]??
????isKey???????[是否為主鍵]??
????updated?????[是否發(fā)生過變更]??
????isNull??????[值是否為null]??
????value???????[具體的內(nèi)容,注意為string文本]SessionHandler中服務(wù)端處理客戶端的其他類型請求,都會調(diào)用CanalServerWithEmbedded的相關(guān)方法:
case?SUBSCRIPTION:
????????Sub?sub?=?Sub.parseFrom(packet.getBody());
????????embeddedServer.subscribe(clientIdentity);
case?GET:
????????Get?get?=?CanalPacket.Get.parseFrom(packet.getBody());
????????message?=?embeddedServer.getWithoutAck(clientIdentity,?get.getFetchSize());
case?CLIENTACK:
????????ClientAck?ack?=?CanalPacket.ClientAck.parseFrom(packet.getBody());
????????embeddedServer.ack(clientIdentity,?ack.getBatchId());
case?CLIENTROLLBACK:
????????ClientRollback?rollback?=?CanalPacket.ClientRollback.parseFrom(packet.getBody());
????????embeddedServer.rollback(clientIdentity);//?回滾所有批次所以真正的處理邏輯在CanalServerWithEmbedded中,下面重點來了。。。
3.1 CanalServerWithEmbedded
???????? CanalServer包含多個Instance,它的成員變量canalInstances記錄了instance名稱與實例的映射關(guān)系。????????
? ? ? ? 因為是一個Map,所以同一個Server不允許出現(xiàn)相同instance名稱(本例中實例名稱為example),比如不能同時有兩個example在一個server上。但是允許一個Server上有example1和example2。
注意:CanalServer中最重要的是CanalServerWithEmbedded,而CanalServerWithEmbedded中最重要的是CanalInstance。

理解下各個組件的對應(yīng)關(guān)系:
- Canal Client通過destination找出Canal Server中對應(yīng)的Canal Instance。
- 一個Canal Server可以配置多個Canal Instances。
下面以CanalServerWithEmbedded的訂閱方法為例:
- 根據(jù)客戶端標識獲取CanalInstance
- 向CanalInstance的元數(shù)據(jù)管理器訂閱當(dāng)前客戶端
- 從元數(shù)據(jù)管理中獲取客戶端的游標
- 通知CanalInstance訂閱關(guān)系發(fā)生變化
注意:提供訂閱方法的作用是:MySQL新增了一張表,客戶端原先沒有同步這張表,現(xiàn)在需要同步,所以需要重新訂閱。
public?void?subscribe(ClientIdentity?clientIdentity)?throws?CanalServerException?{
???//?ClientIdentity表示Canal?Client客戶端,從中可以獲取出客戶端指定連接的Destination
???//?由于CanalServerWithEmbedded記錄了每個Destination對應(yīng)的Instance,可以獲取客戶端對應(yīng)的Instance
???CanalInstance?canalInstance?=?canalInstances.get(clientIdentity.getDestination());
???if?(!canalInstance.getMetaManager().isStart())?{
???????canalInstance.getMetaManager().start();?//?啟動Instance的元數(shù)據(jù)管理器
???}
???canalInstance.getMetaManager().subscribe(clientIdentity);?//?執(zhí)行一下meta訂閱
???Position?position?=?canalInstance.getMetaManager().getCursor(clientIdentity);
???if?(position?==?null)?{
???????position?=?canalInstance.getEventStore().getFirstPosition();//?獲取一下store中的第一條
???????if?(position?!=?null)?{
???????????canalInstance.getMetaManager().updateCursor(clientIdentity,?position);?//?更新一下cursor
???????}
???}
???//?通知下訂閱關(guān)系變化
???canalInstance.subscribeChange(clientIdentity);
}每個CanalInstance中包括了四個組件:EventParser、EventSink、EventStore、MetaManager。
服務(wù)端主要的處理方法包括get/ack/rollback,這三個方法都會用到Instance上面的幾個內(nèi)部組件,主要還是EventStore和MetaManager:
在這之前,要先理解EventStore的含義,EventStore是一個RingBuffer,有三個指針:Put、Get、Ack。
- Put: Canal Server從MySQL拉取到數(shù)據(jù)后,放到內(nèi)存中,Put增加
- Get: 消費者(Canal Client)從內(nèi)存中消費數(shù)據(jù),Get增加
- Ack: 消費者消費完成,Ack增加。并且會刪除Put中已經(jīng)被Ack的數(shù)據(jù)

- 如果timeout為null,則采用tryGet方式,即時獲取
- 如果timeout不為null
- timeout為0,則采用get阻塞方式,獲取數(shù)據(jù),不設(shè)置超時,直到有足夠的batchSize數(shù)據(jù)才返回
- timeout不為0,則采用get+timeout方式,獲取數(shù)據(jù),超時還沒有batchSize足夠的數(shù)據(jù),有多少返回多少
private?Events<Event>?getEvents(CanalEventStore?eventStore,?Position?start,?int?batchSize,?Long?timeout,????????????????????????????????TimeUnit?unit)?{
????if?(timeout?==?null)?{
????????return?eventStore.tryGet(start,?batchSize);?//?即時獲取
????}?else?if?(timeout?<=?0){
????????return?eventStore.get(start,?batchSize);?//?阻塞獲取
????}?else?{
????????return?eventStore.get(start,?batchSize,?timeout,?unit);?//?異步獲取
????}
}注意:EventStore的實現(xiàn)采用了類似Disruptor的RingBuffer環(huán)形緩沖區(qū)。RingBuffer的實現(xiàn)類是MemoryEventStoreWithBuffer
get方法和getWithoutAck方法的區(qū)別是:
- get方法會立即調(diào)用ack
- getWithoutAck方法不會調(diào)用ack
3.2 ?EventStore


下面是Put填充環(huán)形緩沖區(qū)的代碼,檢查可用slot(checkFreeSlotAt方法)在幾個put方法中。
public?class?MemoryEventStoreWithBuffer?extends?AbstractCanalStoreScavenge?implements?CanalEventStore<Event>,?CanalStoreScavenge?{
????private?static?final?long?INIT_SQEUENCE?=?-1;
????private?int???????????????bufferSize????=?16?*?1024;
????private?int???????????????bufferMemUnit?=?1024;?????????????????????????//?memsize的單位,默認為1kb大小
????private?int???????????????indexMask;
????private?Event[]???????????entries;
????//?記錄下put/get/ack操作的三個下標
????private?AtomicLong????????putSequence???=?new?AtomicLong(INIT_SQEUENCE);?//?代表當(dāng)前put操作最后一次寫操作發(fā)生的位置
????private?AtomicLong????????getSequence???=?new?AtomicLong(INIT_SQEUENCE);?//?代表當(dāng)前get操作讀取的最后一條的位置
????private?AtomicLong????????ackSequence???=?new?AtomicLong(INIT_SQEUENCE);?//?代表當(dāng)前ack操作的最后一條的位置
????//?啟動EventStore時,創(chuàng)建指定大小的緩沖區(qū),Event數(shù)組的大小是16*1024
????//?也就是說算個數(shù)的話,數(shù)組可以容納16000個事件。算內(nèi)存的話,大小為16MB
????public?void?start()?throws?CanalStoreException?{
????????super.start();
????????indexMask?=?bufferSize?-?1;
????????entries?=?new?Event[bufferSize];
????}
????//?EventParser解析后,會放入內(nèi)存中(Event數(shù)組,緩沖區(qū))
????private?void?doPut(List<Event>?data)?{
????????long?current?=?putSequence.get();?//?取得當(dāng)前的位置,初始時為-1,第一個元素為-1+1=0
????????long?end?=?current?+?data.size();?//?最末尾的位置,假設(shè)Put了10條數(shù)據(jù),end=-1+10=9
????????//?先寫數(shù)據(jù),再更新對應(yīng)的cursor,并發(fā)度高的情況,putSequence會被get請求可見,拿出了ringbuffer中的老的Entry值
????????for?(long?next?=?current?+?1;?next?<=?end;?next++)?{
????????????entries[getIndex(next)]?=?data.get((int)?(next?-?current?-?1));
????????}
????????putSequence.set(end);
????}?
}???????? Put是生產(chǎn)數(shù)據(jù),Get是消費數(shù)據(jù),Get一定不會超過Put。比如Put了10條數(shù)據(jù),Get最多只能獲取到10條數(shù)據(jù)。但有時候為了保證Get處理的速度,Put和Get并不會相等??梢园裀ut看做是生產(chǎn)者,Get看做是消費者。生產(chǎn)者速度可以很快,消費者則可以慢慢地消費。比如Put了1000條,而Get我們只需要每次處理10條數(shù)據(jù)。
???????? 仍然以前面的示例來說明Get的流程,初始時current=-1,假設(shè)Put了兩批數(shù)據(jù)一共15條,maxAbleSequence=14,而Get的BatchSize假設(shè)為10。初始時next=current=-1,end=-1。通過startPosition,會設(shè)置next=0。最后end又被賦值為9,即循環(huán)緩沖區(qū)[0,9]一共10個元素。
private?Events<Event>?doGet(Position?start,?int?batchSize)?throws?CanalStoreException?{
????LogPosition?startPosition?=?(LogPosition)?start;
????long?current?=?getSequence.get();
????long?maxAbleSequence?=?putSequence.get();
????long?next?=?current;
????long?end?=?current;
????//?如果startPosition為null,說明是第一次,默認+1處理
????if?(startPosition?==?null?||?!startPosition.getPostion().isIncluded())?{?//?第一次訂閱之后,需要包含一下start位置,防止丟失第一條記錄
????????next?=?next?+?1;
????}
????end?=?(next?+?batchSize?-?1)?<?maxAbleSequence???(next?+?batchSize?-?1)?:?maxAbleSequence;
????//?提取數(shù)據(jù)并返回
????for?(;?next?<=?end;?next++)?{
????????Event?event?=?entries[getIndex(next)];
????????if?(ddlIsolation?&&?isDdl(event.getEntry().getHeader().getEventType()))?{
????????????//?如果是ddl隔離,直接返回
????????????if?(entrys.size()?==?0)?{
????????????????entrys.add(event);//?如果沒有DML事件,加入當(dāng)前的DDL事件
????????????????end?=?next;?//?更新end為當(dāng)前
????????????}?else?{
????????????????//?如果之前已經(jīng)有DML事件,直接返回了,因為不包含當(dāng)前next這記錄,需要回退一個位置
????????????????end?=?next?-?1;?//?next-1一定大于current,不需要判斷
????????????}
????????????break;
????????}?else?{
????????????entrys.add(event);
????????}
????}
????//?處理PositionRange,然后設(shè)置getSequence為end
????getSequence.compareAndSet(current,?end)
}ack操作的上限是Get,假設(shè)Put了15條數(shù)據(jù),Get了10條數(shù)據(jù),最多也只能Ack10條數(shù)據(jù)。Ack的目的是清空緩沖區(qū)中已經(jīng)被Get過的數(shù)據(jù)
public?void?ack(Position?position)?throws?CanalStoreException?{
????cleanUntil(position);
}
public?void?cleanUntil(Position?position)?throws?CanalStoreException?{
????long?sequence?=?ackSequence.get();
????long?maxSequence?=?getSequence.get();
????boolean?hasMatch?=?false;
????long?memsize?=?0;
????for?(long?next?=?sequence?+?1;?next?<=?maxSequence;?next++)?{
????????Event?event?=?entries[getIndex(next)];
????????memsize?+=?calculateSize(event);
????????boolean?match?=?CanalEventUtils.checkPosition(event,?(LogPosition)?position);
????????if?(match)?{//?找到對應(yīng)的position,更新ack?seq
????????????hasMatch?=?true;
????????????if?(batchMode.isMemSize())?{
????????????????ackMemSize.addAndGet(memsize);
????????????????//?嘗試清空buffer中的內(nèi)存,將ack之前的內(nèi)存全部釋放掉
????????????????for?(long?index?=?sequence?+?1;?index?<?next;?index++)?{
????????????????????entries[getIndex(index)]?=?null;//?設(shè)置為null
????????????????}
????????????}
????????????ackSequence.compareAndSet(sequence,?next)
????????}
????}
}rollback回滾方法的實現(xiàn)則比較簡單,將getSequence回退到ack位置。
public?void?rollback()?throws?CanalStoreException?{
????getSequence.set(ackSequence.get());
????getMemSize.set(ackMemSize.get());
}
3.3 EventParser WorkFlow
EventStore負責(zé)存儲解析后的Binlog事件,而解析動作負責(zé)拉取Binlog,它的流程比較復(fù)雜。需要和MetaManager進行交互。比如要記錄每次拉取的Position,這樣下一次就可以從上一次的最后一個位置繼續(xù)拉取。所以MetaManager應(yīng)該是有狀態(tài)的。
EventParser的流程如下:
- Connection獲取上一次解析成功的位置 (如果第一次啟動,則獲取初始指定的位置或者是當(dāng)前數(shù)據(jù)庫的binlog位點)
- Connection建立鏈接,發(fā)送BINLOG_DUMP指令
- Mysql開始推送Binaly Log
- 接收到的Binaly Log的通過Binlog parser進行協(xié)議解析,補充一些特定信息
- 傳遞給EventSink模塊進行數(shù)據(jù)存儲,是一個阻塞操作,直到存儲成功
總結(jié)
???????? ?上述我們講了一些架構(gòu)和一些交互模式,和比較多原理,做為一名優(yōu)秀的程序員不能只單純的會使用,而是多去了解他的思想和為什么這么寫,這樣你的代碼能力才一天比一天強。我在這里為大家提供大數(shù)據(jù)的資源需要的朋友可以去下面GitHub去下載,信自己,努力和汗水總會能得到回報的。我是大數(shù)據(jù)老哥,我們下期見~~~
資源獲取 獲取Flink面試題,Spark面試題,程序員必備軟件,hive面試題,Hadoop面試題,Docker面試題,簡歷模板等資源請去?
GitHub自行下載 https://github.com/lhh2002/Framework-Of-BigData?
Gitee 自行下載 ?https://gitee.com/li_hey_hey/dashboard/projects?
實時數(shù)倉代碼:https://github.com/lhh2002/Real_Time_Data_WareHouse
