Canal同步es

CanalAdapter同步ES7

預(yù)準(zhǔn)備

  1. Mysql5.7
  2. ElasticSearch7.12.1集群或單擊(7.x +)
  3. 一個(gè)數(shù)據(jù)庫和一個(gè)表結(jié)構(gòu)

1.下載

下載Canal1.1.5,支持es7

canal.deployer-1.1.5.tar.gz對應(yīng)的是canal的server端,負(fù)責(zé)訂閱并解析Mysql-Binlog
canal.adapter-1.1.5.tar.gz對應(yīng)的是適配器,負(fù)責(zé)將server的binlog轉(zhuǎn)換并發(fā)送給對應(yīng)的應(yīng)用
canal.admin-1.1.5.tar.gz一個(gè)可視化webui可以不安裝
額外需要下載v1.1.5-alpha-2快照版本的canal.adapter-1.1.5.tar.gz(release1.1.5版本的jar包有bug)


分別解壓縮后,將v1.1.5-alpha-2解壓縮文件夾下plugin文件夾中的 client-adapter.es7x-1.1.5-SNAPSHOT-jar-with-dependencies.jar 替換掉release版本的plugin文件的 client-adapter.es7x-1.1.5-jar-with-dependencies.jar

2. 開啟Mysql-Binlog

進(jìn)入mysql終端執(zhí)行

show variables like 'log_bin';

如果是ON就代表已經(jīng)開啟。
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+

3. 修改配置

  1. 先修改canal.deployer的配置
    參考官方中文文檔: https://github.com/alibaba/canal/wiki/QuickStart
  2. 修改canal.adapter配置 conf/application.yml
    更多請參考官方中文文檔:https://github.com/alibaba/canal/tree/master/client-adapter
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
   # 省略.... 默認(rèn)即可
  srcDataSources:   # 注意打開注釋,然后要和上面對齊否則會報(bào)錯(但是卻又不告訴你是格式問題..)
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: es7 # 這里要改為es7 他會去es7下去找配置
        key: exampleKey #這里是全量導(dǎo)入時(shí)的taskid
        hosts: es1:9201,es2:9202,es3:9203 # 我這里是集群,如果單機(jī)寫一個(gè)就行ip:port
        properties:
          mode: rest # restful 模式
          cluster.name: es-cluster # 集群名字 GET _cat/health?v 可以看到對應(yīng)的名字
  1. 修改es7下模板文件 conf/es7/mytest_user.yml
dataSourceKey: defaultDS #和上面canal.conf.srcDataSources.defaultDS要一樣
outerAdapterKey: exampleKey #和上面canal.conf.canalAdapters.instance.groups.outerAdapters.key要一樣
destination: example #和上面canal.conf.canalAdapters.instance要一樣
groupId: g1 #和上面canal.conf.canalAdapters.instance.groups.groupId要一樣
esMapping:
  _index: test #索引名稱
  _id: _id #documentid
  _type: _doc # type
#  upsert: true
#  pk: id
  sql: "select a.id as _id, a.name_cn, a.name_en, a.email
        from user a"  # 查詢的sql返回的結(jié)構(gòu)要使用as別名和es的filed對應(yīng)
#  objFields:
#    _labels: array:;
  commitBatch: 3000 #批量提交數(shù)量

4. 創(chuàng)建對應(yīng)的索引

PUT test
{
  "settings":{
        "number_of_shards":1,
        "number_of_replicas":2
    },
    "mappings":{
        "properties":{
            "name_cn" : {
          "type" : "keyword"
        },
        "name_en" : {
          "type" : "keyword"
        },
        "email" : {
          "type" : "keyword"
        }
      }
        }
    }

5. 啟動

均在對應(yīng)目錄下 /bin/startup.sh

  1. 先啟動deployer,查看/logs/canal/canal.log 無異常后再查看/logs/example/example.log均無異常則啟動成功
  2. 啟動adapter,查看/logs/adapter/adapter.log 無異常則啟動成功

6.首次全量導(dǎo)入

curl http://localhost:8081/etl/es7/exampleKey/mytest_user.yml -X POST

 //執(zhí)行完對應(yīng)在/logs/adapter/adapter.log會有批量導(dǎo)入成功的日志
 start etl to import data to index: test
 數(shù)據(jù)全量導(dǎo)入完成, 一共導(dǎo)入 1585 條數(shù)據(jù), 耗時(shí): 643
 /**
     * ETL curl http://127.0.0.1:8081/etl/rdb/oracle1/mytest_user.yml -X POST
     *
     * @param type 類型 hbase, es
     * @param key adapter key
     * @param task 任務(wù)名對應(yīng)配置文件名 mytest_user.yml
     * @param params etl where條件參數(shù), 為空全部導(dǎo)入,匹配etlCondition中的參數(shù),使用;分號多個(gè)值
     */
    @PostMapping("/etl/{type}/{key}/{task}")
    public EtlResult etl(@PathVariable String type, @PathVariable String key, @PathVariable String task,
                         @RequestParam(name = "params", required = false) String params) 

7.增量添加更新刪除自動同步

  1. 如果以上操作在程序運(yùn)行期間無任何異常則自動就開啟了同步,可以手動添加更新刪除一個(gè)記錄試試。
  2. 如果出現(xiàn)提示表異常,可能是因?yàn)閹焯嗄承煊袉栴}

修改/deployer/conf/example/instance.properties的匹配策略

#只同步test庫的全部表數(shù)據(jù)
canal.instance.filter.regex=test\\..*

如果解決不了則建議去看官方的issue,https://github.com/alibaba/canal/issues。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容