Kafka Connect JDBC Connector學(xué)習(xí)文檔

前言:

最近需要調(diào)研Kafka Connect可連接哪些數(shù)據(jù)源,特此學(xué)習(xí)官網(wǎng)并翻譯了下文檔。
Confluent JDBC Connector官網(wǎng)地址(官網(wǎng)布局可能有些凌亂)
本文為純理論學(xué)習(xí),實(shí)踐請(qǐng)看下文 Kafka Connect 實(shí)現(xiàn)MySQL增量同步
希望看過(guò)的小伙伴點(diǎn)個(gè)?,謝謝。

JDBC Connector

JDBC connector允許您通過(guò)JDBC驅(qū)動(dòng)程序?qū)⑷魏侮P(guān)系型數(shù)據(jù)庫(kù)中的數(shù)據(jù)導(dǎo)入到Kafka的主題Topic中。通過(guò)使用JDBC,這個(gè)連接器可以支持各種數(shù)據(jù)庫(kù),不需要為每個(gè)數(shù)據(jù)庫(kù)定制代碼。
通過(guò)定期地執(zhí)行SQL查詢語(yǔ)句并為結(jié)果集中的每一行創(chuàng)建輸出記錄來(lái)加載數(shù)據(jù)。在默認(rèn)情況下,在一個(gè)數(shù)據(jù)庫(kù)中的所有表都會(huì)被復(fù)制,每個(gè)表都復(fù)制到自己的輸出主題。數(shù)據(jù)庫(kù)那些新的或刪除的表被監(jiān)視并自動(dòng)適應(yīng)調(diào)整。當(dāng)從表中復(fù)制數(shù)據(jù)時(shí),連接器可以僅僅加載新增修改的行通過(guò)指定哪些列應(yīng)當(dāng)被用來(lái)發(fā)現(xiàn)新增或修改的數(shù)據(jù)。

Quickstart

為了了解連接器的基本功能,我們將從本地SQLite數(shù)據(jù)庫(kù)中復(fù)制單個(gè)表。在這個(gè)簡(jiǎn)單的示例中,我們假設(shè)表中的每個(gè)條目都被分配了一個(gè)唯一的ID,并且在創(chuàng)建之后不會(huì)對(duì)其進(jìn)行修改。Confluent Platform內(nèi)置包含了用于SQLitePostgreSQL的JDBC驅(qū)動(dòng)程序,但是如果使用的是不同的數(shù)據(jù)庫(kù)(如MySQL),則還需要確保JDBC驅(qū)動(dòng)程序在Kafka Connect進(jìn)程的CLASSPATH上可用。

快速開(kāi)始

創(chuàng)建表中測(cè)試數(shù)據(jù)

sqlite> CREATE TABLE accounts(id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, name VARCHAR(255));
sqlite> INSERT INTO accounts(name) VALUES('alice');
sqlite> INSERT INTO accounts(name) VALUES('bob');

創(chuàng)建一個(gè)配置文件,用于從該數(shù)據(jù)庫(kù)中加載數(shù)據(jù)。此文件包含在etc/kafka-connect-jdbc/quickstart-sqlite.properties中的連接器中,并包含以下設(shè)置:
(學(xué)習(xí)了解配置結(jié)構(gòu)即可)

name=test-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlite:test.db
mode=incrementing
incrementing.column.name=id
topic.prefix=test-sqlite-jdbc-

前幾個(gè)設(shè)置是您將為所有連接器指定的常見(jiàn)設(shè)置。connection.url指定要連接的數(shù)據(jù)庫(kù),在本例中是本地SQLite數(shù)據(jù)庫(kù)文件。mode指示我們想要如何查詢數(shù)據(jù)。在本例中,我們有一個(gè)自增的唯一ID,因此我們選擇incrementing遞增模式并設(shè)置incrementing.column.name遞增列的列名為id。在這種mode模式下,每次查詢新的數(shù)據(jù)將只返回那些ID大于之前查詢中最大ID的行。最后,我們可以通過(guò)設(shè)置topic.prefix,來(lái)控制每個(gè)表的輸出發(fā)送到的Topic主題的名稱。因?yàn)楝F(xiàn)在我們只有一個(gè)表,所以本例中唯一的輸出主題將會(huì)是test-sqlite-jdbc-accounts。

在另一個(gè)終端的獨(dú)立的Kafka Connect中運(yùn)行(運(yùn)行剛剛的配置文件)

$ ./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/quickstart-sqlite.properties

可以看到進(jìn)程啟動(dòng)并記錄一些消息,然后它將開(kāi)始執(zhí)行查詢并將結(jié)果發(fā)送到Topic

$ ./bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic test-sqlite-jdbc-accounts --from-beginning
{"id":1,"name":{"string":"alice"}}
{"id":2,"name":{"string":"bob"}}

默認(rèn)的輪詢間隔是5秒,因此可能需要幾秒鐘才能顯示。根據(jù)您預(yù)期的速率更新或期望的延遲,可以使用較短的輪詢間隔更快地顯示。

Features 特性

JDBC connector支持使用各種JDBC數(shù)據(jù)類型來(lái)復(fù)制表,動(dòng)態(tài)地從數(shù)據(jù)庫(kù)中添加和刪除表、whitelists白名單和blacklists黑名單、更改輪詢間隔以及其他設(shè)置。然而,對(duì)于大多數(shù)用戶來(lái)說(shuō),最重要的特性是控制如何從數(shù)據(jù)庫(kù)增量地復(fù)制數(shù)據(jù)的設(shè)置。
Kafka Connect跟蹤監(jiān)測(cè)從每個(gè)表中檢索到的最新記錄,因此在下一次迭代循環(huán)中(或者在崩潰的情況下),它可以在正確的位置開(kāi)始。JDBC connector使用此功能,僅在每次迭代中從表(或從自定義查詢的輸出)獲取更新過(guò)updated的行。支持多種模式,每一種模式在修改行上的不同都能被發(fā)現(xiàn)查明。

Incremental Query Modes 增量查詢模式

每一種增量查詢模式跟蹤每行的一組列,用途來(lái)跟蹤哪些行已被處理以及哪些行是新增的或者已經(jīng)被更新的。mode模式的設(shè)置控制它的行為,并支持以下選項(xiàng):

  1. Incrementing Column 自增列
    每行包含唯一ID的列,它的較新的行保證具有較大的ID,即AUTOINCREMENT自增列。注意,此模式只能檢測(cè)發(fā)現(xiàn)新增的行。對(duì)于存在的記錄行的更新是無(wú)法檢測(cè)到的,因此此模式應(yīng)當(dāng)僅用于那些不變的數(shù)據(jù)行。舉個(gè)例子,是在數(shù)據(jù)倉(cāng)庫(kù)中流式傳輸事實(shí)表時(shí),因?yàn)槭聦?shí)表通常是只插入的。
  2. Timestamp Column 時(shí)間戳列
    在這種模式下,一個(gè)列包含修改時(shí)間戳用來(lái)跟蹤最近一次latest time處理的數(shù)據(jù),并且僅查詢從那時(shí)起已經(jīng)修改的行。注意,由于時(shí)間戳不一定是唯一的,因此此模式不能保證所有更新的數(shù)據(jù)都將被傳遞:如果2行共有相同的時(shí)間戳,并且通過(guò)增量查詢返回,在崩潰之前只處理了一行,那么當(dāng)系統(tǒng)恢復(fù)時(shí),將錯(cuò)過(guò)第二個(gè)修改更新。
  3. Timestamp and Incrementing Columns 時(shí)間戳和自增列
    這是將一個(gè)遞增列和一個(gè)時(shí)間戳列組合在一起的最穩(wěn)健和精確的模式。通過(guò)組合這兩者,只要時(shí)間戳足夠細(xì),每個(gè)(id,timestamp)元組將唯一地標(biāo)識(shí)對(duì)行的更新。即使部分完成后更新失敗,當(dāng)系統(tǒng)恢復(fù)時(shí),仍然可以正確地檢測(cè)和傳遞未處理的更新。
  4. Custom Query 定制化查詢
    JDBC connector支持使用定制查詢而不是復(fù)制整個(gè)表。通過(guò)自定義查詢,可以使用其他自動(dòng)更新模式之一,只要必要的WHERE子句可以正確地附加到查詢中。或者,指定的查詢可以處理對(duì)新的修改更新本身的過(guò)濾;然而,請(qǐng)注意,將不執(zhí)行offset偏移量跟蹤(不同于自動(dòng)模式,其中為每個(gè)記錄記錄遞增和/或時(shí)間戳列值),因此查詢必須自己跟蹤offset偏移量。
  5. Bulk 大塊查詢
    此模式未經(jīng)過(guò)濾,因此根本不遞增。它將在每次迭代中從表中加載所有行。如果希望周期性地消費(fèi)轉(zhuǎn)儲(chǔ)整個(gè)表,其中條目最終會(huì)被刪除,并且下游系統(tǒng)可以安全地處理副本,那么這非常有用。

Configuration 配置

JDBC Drivers

JDBC connector在可以從數(shù)據(jù)庫(kù)導(dǎo)入數(shù)據(jù)以及如何導(dǎo)入數(shù)據(jù)的數(shù)據(jù)庫(kù)中提供了相當(dāng)大的靈活性。本節(jié)首先描述如何訪問(wèn)那些其驅(qū)動(dòng)程序不包含在Confluent Platform中的數(shù)據(jù)庫(kù),然后給出幾個(gè)覆蓋常見(jiàn)場(chǎng)景的示例配置文件,然后提供可用配置選項(xiàng)的詳盡描述。

JDBC connector在通用JDBC API上實(shí)現(xiàn)數(shù)據(jù)復(fù)制的功能,但是依賴于JDBC驅(qū)動(dòng)來(lái)處理這些API的數(shù)據(jù)庫(kù)特定實(shí)現(xiàn)。Confluent Platform附帶了一些JDBC驅(qū)動(dòng)程序,但是如果沒(méi)有包括你用的數(shù)據(jù)庫(kù)的驅(qū)動(dòng)程序,則需要通過(guò)CLASSPATH使其可用。

一種選擇是在連接器安裝JDBC驅(qū)動(dòng)程序jar。相對(duì)于安裝目錄,打包的連接器安裝在share/java/kafka-connect-jdbc目錄中。如果您已經(jīng)從DebianRPM包安裝,連接器將安裝在/usr/share/java/kafka-connect-jdbc中。如果你是從zip或tar文件安裝,連接器將安裝在上面給出的路徑中,該路徑位于您解壓縮Confluent Platform歸檔的目錄下。
另一種選擇,你也可以設(shè)置CLASSPATH變量在運(yùn)行copycat-standalonecopycat-distributed

$ CLASSPATH=/usr/local/firebird/* ./bin/copycat-distributed ./config/copycat-distributed.properties
jdbc:firebirdsql:localhost/3050:/var/lib/firebird/example.db

Examples*

使用whitelist白名單來(lái)限制MySQL數(shù)據(jù)庫(kù)表中子集的更改,對(duì)于所有白名單表上符合標(biāo)準(zhǔn)的,使用idmodified兩列列來(lái)檢測(cè)已修改的行。這種模式是最穩(wěn)健的,因?yàn)樗梢詫⑽ㄒ?、不可變的?code>id與修改時(shí)間戳相結(jié)合,以確保即使過(guò)程在增量更新查詢中死亡,也不會(huì)錯(cuò)過(guò)這些更新。

name=mysql-whitelist-timestamp-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10

connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=alice&password=secret
table.whitelist=users,products,transactions

mode=timestamp+incrementing
timestamp.column.name=modified
incrementing.column.name=id

topic.prefix=mysql-

下面這個(gè)例子使用了自定義查詢query代替加載整個(gè)表,允許您從多個(gè)表中`join``連接數(shù)據(jù)。只要查詢不包含其自身的過(guò)濾,您仍然可以使用內(nèi)置模式進(jìn)行增量查詢(在本例中,使用時(shí)間戳列)。注意,這限制了您對(duì)每個(gè)連接器的單個(gè)輸出,并且因?yàn)闆](méi)有表名,所以本例中主題“prefix”實(shí)際上是完整的主題名。

name=mysql-whitelist-timestamp-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10

connection.url=jdbc:postgresql://postgres.example.com/test_db?user=bob&password=secret&ssl=true
query=SELECT users.id, users.name, transactions.timestamp, transactions.user_id, transactions.payment FROM users JOIN transactions ON (users.id = transactions.user_id)
mode=timestamp
timestamp.column.name=timestamp

topic.prefix=mysql-joined-data

Configuration Options 配置項(xiàng)

connection.url
要加載數(shù)據(jù)庫(kù)的JDBC連接URL
-類型: String
-默認(rèn): ""
-重要度: 高

topic.prefix
眾多表名之前的前綴,向其發(fā)布數(shù)據(jù)的KafkaTopic主題的名稱,或者在自定義查詢的情況下,生成要向其發(fā)布的Topic主題的全名。
-類型: String
-默認(rèn): ""
-重要度: 高

mode
incrementing、timestamp、timestamp+incrementing、bulk
-類型: String
-默認(rèn): ""
-重要度: 高

poll.interval.ms
以ms為單位輪詢每個(gè)表中的新數(shù)據(jù)
-類型: int
-默認(rèn): "5000"
-重要度: 高

incrementing.column.name
用于檢測(cè)新增行的遞增列的名稱。此列可能不為空。
-類型: String
-默認(rèn): ""
-重要度: 中

query
使用自定義查詢代替加載表
-類型: String
-默認(rèn): ""
-重要度: 中

table.blacklist
不允許復(fù)制的表清單。如果指定table.blacklist,則不能設(shè)置table.whitelist。
-類型: List
-默認(rèn): ""
-重要度: 中

table.whitelist
允許復(fù)制的表清單。如果指定table.whitelist,則不能設(shè)置table.blacklist
-類型: List
-默認(rèn): ""
-重要度: 中

timestamp.column.name
時(shí)間戳列的名字,用于檢測(cè)新增或修改的行記錄。此列可能不為空。
-類型: List
-默認(rèn): ""
-重要度: 中

batch.max.rows
在輪詢新數(shù)據(jù)時(shí),單個(gè)批處理中包括的最大行數(shù)。此設(shè)置可用于限制連接器內(nèi)部緩沖的數(shù)據(jù)量。
-類型: int
-默認(rèn): 100
-重要度: 低

table.poll.interval.ms
以ms為單位輪詢新表或已刪除表的頻率,這可能導(dǎo)致更新任務(wù)配置,以開(kāi)始輪詢已添加表中的數(shù)據(jù)或停止輪詢已刪除表中的數(shù)據(jù)。
-類型: long
-默認(rèn): 60000
-重要度: 低

謝謝閱讀,有幫助的點(diǎn)個(gè)?!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容