Flink源碼分析系列文檔目錄
請點擊:Flink 源碼分析系列文檔目錄
簡介
Flink一個常用的場景是維表關(guān)聯(lián)。一個流式的事實表和一個全量(可認(rèn)為是批)的維表關(guān)聯(lián)。對于條事實表數(shù)據(jù),如果每次join的時候,都去全量掃描維表。效率會非常底下。幸好Flink為我們提供了LookupTableSource??梢愿鶕?jù)join的字段值來查詢維表。避免了全表掃描。甚至還為我們提供了緩存功能。對于相同的join字段值,無需反復(fù)查詢維表。進一步提高了運行效率。
Flink自帶的數(shù)據(jù)源中JDBC,Hive和HBase實現(xiàn)了上述的LookupTableSource。
LookupTableSource
可以在運行時從外部存儲中按照指定的key查找數(shù)據(jù)。
和ScanTableSource不同的是,LookupTableSource無需掃描整個表,它可以按需從外部存儲的表中提取出數(shù)據(jù)。(有關(guān)ScanTableSource的內(nèi)容,參見Flink 源碼之 SQL TableSource 和 TableSink)
它具有一個方法getLookupRuntimeProvider,定義如下:
@PublicEvolving
public interface LookupTableSource extends DynamicTableSource {
LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
}
該方法要求返回一個provider,這個provider提供根據(jù)條件查詢外部數(shù)據(jù)源中對應(yīng)數(shù)據(jù)的邏輯。
LookupTableSource擁有4個實現(xiàn)類:
- JdbcDynamicTableSource
- HiveDynamicTableSource
- 兩個HBaseDynamicTableSource: 分別對應(yīng)HBase 1.x和2.x版本。
接下來我們以外部JDBC數(shù)據(jù)源為例。分析LookupTableSource查找數(shù)據(jù)的邏輯。
JdbcDynamicTableSource
JdbcDynamicTableSource是JDBC外部數(shù)據(jù)源專用的TableSource。它同時支持scan方式和lookup方式讀取。我們分析它實現(xiàn)LookupTableSource接口的getLookupRuntimeProvider方法。
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
// JDBC only support non-nested look up keys
// context.getKeys()獲取的是一個int[][]二維數(shù)組
// 例如:
// ROW < i INT, s STRING, r ROW < i2 INT, s2 STRING > >
// 如果key為i和s2的話
// context.getKeys()返回的是[[0], [2, 1]]
// 第一個[0]表示i字段的索引,從0開始
// 第二個[2, 1]中2代表s2位于最外層ROW的索引為2的元素,也是一個ROW。1代表在內(nèi)嵌ROW中,s2的索引為1
// 所以說對于無內(nèi)嵌結(jié)構(gòu)的數(shù)據(jù),內(nèi)層數(shù)組的長度一定是1
String[] keyNames = new String[context.getKeys().length];
for (int i = 0; i < keyNames.length; i++) {
int[] innerKeyArr = context.getKeys()[i];
// 不支持內(nèi)嵌數(shù)據(jù)結(jié)構(gòu)的查找,這里需要檢查
Preconditions.checkArgument(
innerKeyArr.length == 1, "JDBC only support non-nested look up keys");
// 獲取key字段的名字,存入數(shù)組
keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
}
// 獲取數(shù)據(jù)ROW每個字段的類型
final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
// 構(gòu)造出JdbcRowDataLookupFunction
JdbcRowDataLookupFunction lookupFunction =
new JdbcRowDataLookupFunction(
options,
lookupMaxRetryTimes,
// 所有字段名
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
// 所有字段類型,index和字段名一一對應(yīng)
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
keyNames,
rowType);
if (cache != null) {
// 如果配置緩存,使用PartialCachingLookupProvider包裝lookupFunction
return PartialCachingLookupProvider.of(lookupFunction, cache);
} else {
return LookupFunctionProvider.of(lookupFunction);
}
}
在開始分析lookupFunction之前。我們先分析下帶有cache的LookupFunction的工作邏輯。
從上面看不出來Flink對PartialCachingLookupProvider包裝的lookupFunction做了什么。PartialCachingLookupProvider只是一個provider,顧名思義是一個提供方。具體怎么用還得看調(diào)用方。我們追蹤到調(diào)用getLookupRuntimeProvider的地方。它位于LookupJoinUtil的findLookupFunctionFromNewSource方法。該方法代碼很長,我們關(guān)心的片段如下:
if (provider instanceof LookupFunctionProvider) {
if (provider instanceof PartialCachingLookupProvider) {
PartialCachingLookupProvider partialCachingLookupProvider =
(PartialCachingLookupProvider) provider;
syncLookupFunction =
new CachingLookupFunction(
partialCachingLookupProvider.getCache(),
wrapSyncRetryDelegator(partialCachingLookupProvider, joinHintSpec));
} else if (provider instanceof FullCachingLookupProvider) {
// ...
} else {
syncLookupFunction =
wrapSyncRetryDelegator((LookupFunctionProvider) provider, joinHintSpec);
}
}
從上面代碼片段中我們發(fā)現(xiàn)如果Provider是PartialCachingLookupProvider類型,將其封裝到CachingLookupFunction中。
我們查看它的檢索數(shù)據(jù)lookup方法:
@Override
public Collection<RowData> lookup(RowData keyRow) throws IOException {
// cache中存儲了關(guān)鍵字?jǐn)?shù)據(jù)(keyRow)和根據(jù)關(guān)鍵字?jǐn)?shù)據(jù)查找出的數(shù)據(jù)的對應(yīng)關(guān)系
Collection<RowData> cachedValues = cache.getIfPresent(keyRow);
// 如果命中緩存,返回緩存數(shù)據(jù)
if (cachedValues != null) {
// Cache hit
return cachedValues;
} else {
// Cache miss
// 如果沒有找到,使用包裝的lookupFunction查找數(shù)據(jù)
Collection<RowData> lookupValues = lookupByDelegate(keyRow);
// Here we use keyRow as the cache key directly, as keyRow always contains the copy of
// key fields from left table, no matter if object reuse is enabled.
// 如果沒有查找到數(shù)據(jù),緩存空集合
if (lookupValues == null || lookupValues.isEmpty()) {
cache.put(keyRow, Collections.emptyList());
} else {
// 否則,緩存查詢到的數(shù)據(jù)
cache.put(keyRow, lookupValues);
}
// 返回查詢到的數(shù)據(jù)
return lookupValues;
}
}
上面的cache底層實現(xiàn)是Guava cache,可以配置cache數(shù)據(jù)寫入/讀取之后多長時間過期。防止外部數(shù)據(jù)源關(guān)聯(lián)數(shù)據(jù)變化之后永遠(yuǎn)沒有機會感知到。
JdbcRowDataLookupFunction
JdbcRowDataLookupFunction這個方法用于查找通過JDBC連接的外部數(shù)據(jù)源中相關(guān)數(shù)據(jù)記錄的。它實現(xiàn)了LookupFunction抽象類。這個接口中有一個抽象方法lookup,接受一個攜帶檢索關(guān)鍵字的數(shù)據(jù)行keyRow(keyRow的內(nèi)容為join從句關(guān)鍵字段的內(nèi)容),返回外部數(shù)據(jù)源中有關(guān)聯(lián)的數(shù)據(jù)。LookupFunction通過LookupJoinCodeGenerator代碼生成器,最終生成Flink的ProcessFunction。LookupFunction共有4個實現(xiàn)類:
- JdbcRowDataLookupFunction: 專用于查詢JDBC外部數(shù)據(jù)源
- HBaseRowDataLookupFunction: 專用于查詢HBase外部數(shù)據(jù)源
- CachingLookupFunction: 也是一個包裝類。加入了緩存功能。如果緩存命中,直接返回緩存中結(jié)果。如果沒有命中,查詢后將結(jié)果放入緩存。上面已經(jīng)分析過。
- RetryableLookupFunctionDelegator: 包裝類。通過外部的
ResultRetryStrategy配置重試策略。
接下來開始分析JdbcRowDataLookupFunction。它的構(gòu)造函數(shù)如下:
public JdbcRowDataLookupFunction(
JdbcConnectorOptions options,
int maxRetryTimes,
String[] fieldNames,
DataType[] fieldTypes,
String[] keyNames,
RowType rowType) {
checkNotNull(options, "No JdbcOptions supplied.");
checkNotNull(fieldNames, "No fieldNames supplied.");
checkNotNull(fieldTypes, "No fieldTypes supplied.");
checkNotNull(keyNames, "No keyNames supplied.");
// 創(chuàng)建JDBC連接Provider,用來獲取JDBC連接
this.connectionProvider = new SimpleJdbcConnectionProvider(options);
this.keyNames = keyNames;
List<String> nameList = Arrays.asList(fieldNames);
// 檢查keyName的內(nèi)容必須在nameList中
// 返回keyName對應(yīng)的字段名
DataType[] keyTypes =
Arrays.stream(keyNames)
.map(
s -> {
checkArgument(
nameList.contains(s),
"keyName %s can't find in fieldNames %s.",
s,
nameList);
return fieldTypes[nameList.indexOf(s)];
})
.toArray(DataType[]::new);
// 最大重試次數(shù)
this.maxRetryTimes = maxRetryTimes;
// 根據(jù)不同數(shù)據(jù)庫的dialect,創(chuàng)建按照key查找對應(yīng)數(shù)據(jù)的SQL語句
// JdbcDialect共支持Derby, MySQL, Oracle和Postgres四種數(shù)據(jù)庫
this.query =
options.getDialect()
.getSelectFromStatement(options.getTableName(), fieldNames, keyNames);
String dbURL = options.getDbURL();
// 根據(jù)數(shù)據(jù)庫連接URL來判斷是哪個數(shù)據(jù)庫dialect
JdbcDialect jdbcDialect = JdbcDialectLoader.load(dbURL);
// 獲取當(dāng)前dialect對應(yīng)的數(shù)據(jù)庫數(shù)據(jù)類型和Flink內(nèi)部數(shù)據(jù)類型轉(zhuǎn)換器
this.jdbcRowConverter = jdbcDialect.getRowConverter(rowType);
// 獲取key字段對應(yīng)的類型轉(zhuǎn)換器
this.lookupKeyRowConverter =
jdbcDialect.getRowConverter(
RowType.of(
Arrays.stream(keyTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new)));
}
上面這段代碼的重點是getSelectFromStatement。
getSelectFromStatement方法創(chuàng)建按照key字段查詢數(shù)據(jù)的select語句。代碼如下所示:
@Override
public String getSelectFromStatement(
String tableName, String[] selectFields, String[] conditionFields) {
String selectExpressions =
// 獲取所有字段名
Arrays.stream(selectFields)
// 將他們按照對應(yīng)數(shù)據(jù)庫dialect的要求,加引號引起來
.map(this::quoteIdentifier)
// 逗號分隔
.collect(Collectors.joining(", "));
// 將conditionFields構(gòu)建為類似`key1` = :key1 AND `key2` = :key2 (以MySQL dialect為例)
// 冒號+命名是JDBC PreparedStatement的命名參數(shù),例如上面的:key1,:key2
String fieldExpressions =
Arrays.stream(conditionFields)
.map(f -> format("%s = :%s", quoteIdentifier(f), f))
.collect(Collectors.joining(" AND "));
// 返回拼裝的select語句
return "SELECT "
+ selectExpressions
+ " FROM "
+ quoteIdentifier(tableName)
+ (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
}
JdbcRowDataLookupFunction初始化的時候會執(zhí)行open方法。該方法開啟數(shù)據(jù)庫連接,同時將上面創(chuàng)建出的查詢SQL字符串轉(zhuǎn)化為PreparedStatement。代碼如下:
@Override
public void open(FunctionContext context) throws Exception {
try {
// 創(chuàng)建數(shù)據(jù)庫連接,構(gòu)建PreparedStatement
establishConnectionAndStatement();
} catch (SQLException sqe) {
throw new IllegalArgumentException("open() failed.", sqe);
} catch (ClassNotFoundException cnfe) {
throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
}
}
一切準(zhǔn)備工作完成之后,lookup方法的邏輯看起來就非常明了了。它根據(jù)keyRow查找對應(yīng)的數(shù)據(jù)。代碼如下:
@Override
public Collection<RowData> lookup(RowData keyRow) {
// 最多重試maxRetryTimes次
for (int retry = 0; retry <= maxRetryTimes; retry++) {
try {
// 清除綁定的參數(shù)
statement.clearParameters();
// 將keyRow中的數(shù)據(jù)set到statement中
statement = lookupKeyRowConverter.toExternal(keyRow, statement);
// 執(zhí)行查詢
try (ResultSet resultSet = statement.executeQuery()) {
ArrayList<RowData> rows = new ArrayList<>();
while (resultSet.next()) {
// 將數(shù)據(jù)逐條轉(zhuǎn)換為Flink內(nèi)部的類型
RowData row = jdbcRowConverter.toInternal(resultSet);
rows.add(row);
}
// 裁剪list大小到真實包含數(shù)據(jù)的條數(shù)
rows.trimToSize();
return rows;
}
} catch (SQLException e) {
LOG.error(String.format("JDBC executeBatch error, retry times = %d", retry), e);
// 如果超出重試次數(shù),報錯返回
if (retry >= maxRetryTimes) {
throw new RuntimeException("Execution of JDBC statement failed.", e);
}
try {
// 如果連接已關(guān)閉,創(chuàng)建新的連接
if (!connectionProvider.isConnectionValid()) {
statement.close();
connectionProvider.closeConnection();
establishConnectionAndStatement();
}
} catch (SQLException | ClassNotFoundException exception) {
LOG.error(
"JDBC connection is not valid, and reestablish connection failed",
exception);
throw new RuntimeException("Reestablish JDBC connection failed", exception);
}
try {
// 每次重試間隔時間遞增
Thread.sleep(1000L * retry);
} catch (InterruptedException e1) {
throw new RuntimeException(e1);
}
}
}
return Collections.emptyList();
}
本博客為作者原創(chuàng),歡迎大家參與討論和批評指正。如需轉(zhuǎn)載請注明出處。