Flink 源碼之 LookupTableSource

Flink源碼分析系列文檔目錄

請點擊:Flink 源碼分析系列文檔目錄

簡介

Flink一個常用的場景是維表關(guān)聯(lián)。一個流式的事實表和一個全量(可認(rèn)為是批)的維表關(guān)聯(lián)。對于條事實表數(shù)據(jù),如果每次join的時候,都去全量掃描維表。效率會非常底下。幸好Flink為我們提供了LookupTableSource??梢愿鶕?jù)join的字段值來查詢維表。避免了全表掃描。甚至還為我們提供了緩存功能。對于相同的join字段值,無需反復(fù)查詢維表。進一步提高了運行效率。

Flink自帶的數(shù)據(jù)源中JDBC,Hive和HBase實現(xiàn)了上述的LookupTableSource

LookupTableSource

可以在運行時從外部存儲中按照指定的key查找數(shù)據(jù)。

ScanTableSource不同的是,LookupTableSource無需掃描整個表,它可以按需從外部存儲的表中提取出數(shù)據(jù)。(有關(guān)ScanTableSource的內(nèi)容,參見Flink 源碼之 SQL TableSource 和 TableSink)

它具有一個方法getLookupRuntimeProvider,定義如下:

@PublicEvolving
public interface LookupTableSource extends DynamicTableSource {
    LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
}

該方法要求返回一個provider,這個provider提供根據(jù)條件查詢外部數(shù)據(jù)源中對應(yīng)數(shù)據(jù)的邏輯。

LookupTableSource擁有4個實現(xiàn)類:

  • JdbcDynamicTableSource
  • HiveDynamicTableSource
  • 兩個HBaseDynamicTableSource: 分別對應(yīng)HBase 1.x和2.x版本。

接下來我們以外部JDBC數(shù)據(jù)源為例。分析LookupTableSource查找數(shù)據(jù)的邏輯。

JdbcDynamicTableSource

JdbcDynamicTableSource是JDBC外部數(shù)據(jù)源專用的TableSource。它同時支持scan方式和lookup方式讀取。我們分析它實現(xiàn)LookupTableSource接口的getLookupRuntimeProvider方法。

@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
    // JDBC only support non-nested look up keys
    // context.getKeys()獲取的是一個int[][]二維數(shù)組
    // 例如:
    // ROW < i INT, s STRING, r ROW < i2 INT, s2 STRING > >
    // 如果key為i和s2的話
    // context.getKeys()返回的是[[0], [2, 1]]
    // 第一個[0]表示i字段的索引,從0開始
    // 第二個[2, 1]中2代表s2位于最外層ROW的索引為2的元素,也是一個ROW。1代表在內(nèi)嵌ROW中,s2的索引為1
    // 所以說對于無內(nèi)嵌結(jié)構(gòu)的數(shù)據(jù),內(nèi)層數(shù)組的長度一定是1
    String[] keyNames = new String[context.getKeys().length];
    for (int i = 0; i < keyNames.length; i++) {
        int[] innerKeyArr = context.getKeys()[i];
        // 不支持內(nèi)嵌數(shù)據(jù)結(jié)構(gòu)的查找,這里需要檢查
        Preconditions.checkArgument(
            innerKeyArr.length == 1, "JDBC only support non-nested look up keys");
        // 獲取key字段的名字,存入數(shù)組
        keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
    }
    // 獲取數(shù)據(jù)ROW每個字段的類型
    final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
    // 構(gòu)造出JdbcRowDataLookupFunction
    JdbcRowDataLookupFunction lookupFunction =
        new JdbcRowDataLookupFunction(
        options,
        lookupMaxRetryTimes,
        // 所有字段名
        DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
        // 所有字段類型,index和字段名一一對應(yīng)
        DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
        keyNames,
        rowType);
    if (cache != null) {
        // 如果配置緩存,使用PartialCachingLookupProvider包裝lookupFunction
        return PartialCachingLookupProvider.of(lookupFunction, cache);
    } else {
        return LookupFunctionProvider.of(lookupFunction);
    }
}

在開始分析lookupFunction之前。我們先分析下帶有cache的LookupFunction的工作邏輯。

從上面看不出來Flink對PartialCachingLookupProvider包裝的lookupFunction做了什么。PartialCachingLookupProvider只是一個provider,顧名思義是一個提供方。具體怎么用還得看調(diào)用方。我們追蹤到調(diào)用getLookupRuntimeProvider的地方。它位于LookupJoinUtilfindLookupFunctionFromNewSource方法。該方法代碼很長,我們關(guān)心的片段如下:

if (provider instanceof LookupFunctionProvider) {
    if (provider instanceof PartialCachingLookupProvider) {
        PartialCachingLookupProvider partialCachingLookupProvider =
            (PartialCachingLookupProvider) provider;
        syncLookupFunction =
            new CachingLookupFunction(
            partialCachingLookupProvider.getCache(),
            wrapSyncRetryDelegator(partialCachingLookupProvider, joinHintSpec));
    } else if (provider instanceof FullCachingLookupProvider) {
        // ...
    } else {
        syncLookupFunction =
            wrapSyncRetryDelegator((LookupFunctionProvider) provider, joinHintSpec);
    }
}

從上面代碼片段中我們發(fā)現(xiàn)如果Provider是PartialCachingLookupProvider類型,將其封裝到CachingLookupFunction中。

我們查看它的檢索數(shù)據(jù)lookup方法:

@Override
public Collection<RowData> lookup(RowData keyRow) throws IOException {
    // cache中存儲了關(guān)鍵字?jǐn)?shù)據(jù)(keyRow)和根據(jù)關(guān)鍵字?jǐn)?shù)據(jù)查找出的數(shù)據(jù)的對應(yīng)關(guān)系
    Collection<RowData> cachedValues = cache.getIfPresent(keyRow);
    // 如果命中緩存,返回緩存數(shù)據(jù)
    if (cachedValues != null) {
        // Cache hit
        return cachedValues;
    } else {
        // Cache miss
        // 如果沒有找到,使用包裝的lookupFunction查找數(shù)據(jù)
        Collection<RowData> lookupValues = lookupByDelegate(keyRow);
        // Here we use keyRow as the cache key directly, as keyRow always contains the copy of
        // key fields from left table, no matter if object reuse is enabled.
        // 如果沒有查找到數(shù)據(jù),緩存空集合
        if (lookupValues == null || lookupValues.isEmpty()) {
            cache.put(keyRow, Collections.emptyList());
        } else {
            // 否則,緩存查詢到的數(shù)據(jù)
            cache.put(keyRow, lookupValues);
        }
        // 返回查詢到的數(shù)據(jù)
        return lookupValues;
    }
}

上面的cache底層實現(xiàn)是Guava cache,可以配置cache數(shù)據(jù)寫入/讀取之后多長時間過期。防止外部數(shù)據(jù)源關(guān)聯(lián)數(shù)據(jù)變化之后永遠(yuǎn)沒有機會感知到。

JdbcRowDataLookupFunction

JdbcRowDataLookupFunction這個方法用于查找通過JDBC連接的外部數(shù)據(jù)源中相關(guān)數(shù)據(jù)記錄的。它實現(xiàn)了LookupFunction抽象類。這個接口中有一個抽象方法lookup,接受一個攜帶檢索關(guān)鍵字的數(shù)據(jù)行keyRowkeyRow的內(nèi)容為join從句關(guān)鍵字段的內(nèi)容),返回外部數(shù)據(jù)源中有關(guān)聯(lián)的數(shù)據(jù)。LookupFunction通過LookupJoinCodeGenerator代碼生成器,最終生成Flink的ProcessFunction。LookupFunction共有4個實現(xiàn)類:

  • JdbcRowDataLookupFunction: 專用于查詢JDBC外部數(shù)據(jù)源
  • HBaseRowDataLookupFunction: 專用于查詢HBase外部數(shù)據(jù)源
  • CachingLookupFunction: 也是一個包裝類。加入了緩存功能。如果緩存命中,直接返回緩存中結(jié)果。如果沒有命中,查詢后將結(jié)果放入緩存。上面已經(jīng)分析過。
  • RetryableLookupFunctionDelegator: 包裝類。通過外部的ResultRetryStrategy配置重試策略。

接下來開始分析JdbcRowDataLookupFunction。它的構(gòu)造函數(shù)如下:

public JdbcRowDataLookupFunction(
    JdbcConnectorOptions options,
    int maxRetryTimes,
    String[] fieldNames,
    DataType[] fieldTypes,
    String[] keyNames,
    RowType rowType) {
    checkNotNull(options, "No JdbcOptions supplied.");
    checkNotNull(fieldNames, "No fieldNames supplied.");
    checkNotNull(fieldTypes, "No fieldTypes supplied.");
    checkNotNull(keyNames, "No keyNames supplied.");
    // 創(chuàng)建JDBC連接Provider,用來獲取JDBC連接
    this.connectionProvider = new SimpleJdbcConnectionProvider(options);
    this.keyNames = keyNames;
    List<String> nameList = Arrays.asList(fieldNames);
    // 檢查keyName的內(nèi)容必須在nameList中
    // 返回keyName對應(yīng)的字段名
    DataType[] keyTypes =
        Arrays.stream(keyNames)
        .map(
        s -> {
            checkArgument(
                nameList.contains(s),
                "keyName %s can't find in fieldNames %s.",
                s,
                nameList);
            return fieldTypes[nameList.indexOf(s)];
        })
        .toArray(DataType[]::new);
    // 最大重試次數(shù)
    this.maxRetryTimes = maxRetryTimes;
    // 根據(jù)不同數(shù)據(jù)庫的dialect,創(chuàng)建按照key查找對應(yīng)數(shù)據(jù)的SQL語句
    // JdbcDialect共支持Derby, MySQL, Oracle和Postgres四種數(shù)據(jù)庫
    this.query =
        options.getDialect()
        .getSelectFromStatement(options.getTableName(), fieldNames, keyNames);
    String dbURL = options.getDbURL();
    // 根據(jù)數(shù)據(jù)庫連接URL來判斷是哪個數(shù)據(jù)庫dialect
    JdbcDialect jdbcDialect = JdbcDialectLoader.load(dbURL);
    // 獲取當(dāng)前dialect對應(yīng)的數(shù)據(jù)庫數(shù)據(jù)類型和Flink內(nèi)部數(shù)據(jù)類型轉(zhuǎn)換器
    this.jdbcRowConverter = jdbcDialect.getRowConverter(rowType);
    // 獲取key字段對應(yīng)的類型轉(zhuǎn)換器
    this.lookupKeyRowConverter =
        jdbcDialect.getRowConverter(
        RowType.of(
            Arrays.stream(keyTypes)
            .map(DataType::getLogicalType)
            .toArray(LogicalType[]::new)));
}

上面這段代碼的重點是getSelectFromStatement。

getSelectFromStatement方法創(chuàng)建按照key字段查詢數(shù)據(jù)的select語句。代碼如下所示:

@Override
public String getSelectFromStatement(
    String tableName, String[] selectFields, String[] conditionFields) {
    String selectExpressions =
        // 獲取所有字段名
        Arrays.stream(selectFields)
        // 將他們按照對應(yīng)數(shù)據(jù)庫dialect的要求,加引號引起來
        .map(this::quoteIdentifier)
        // 逗號分隔
        .collect(Collectors.joining(", "));
        // 將conditionFields構(gòu)建為類似`key1` = :key1 AND `key2` = :key2 (以MySQL dialect為例)
        // 冒號+命名是JDBC PreparedStatement的命名參數(shù),例如上面的:key1,:key2
    String fieldExpressions =
        Arrays.stream(conditionFields)
        .map(f -> format("%s = :%s", quoteIdentifier(f), f))
        .collect(Collectors.joining(" AND "));
    // 返回拼裝的select語句
    return "SELECT "
        + selectExpressions
        + " FROM "
        + quoteIdentifier(tableName)
        + (conditionFields.length > 0 ? " WHERE " + fieldExpressions : "");
}

JdbcRowDataLookupFunction初始化的時候會執(zhí)行open方法。該方法開啟數(shù)據(jù)庫連接,同時將上面創(chuàng)建出的查詢SQL字符串轉(zhuǎn)化為PreparedStatement。代碼如下:

@Override
public void open(FunctionContext context) throws Exception {
    try {
        // 創(chuàng)建數(shù)據(jù)庫連接,構(gòu)建PreparedStatement
        establishConnectionAndStatement();
    } catch (SQLException sqe) {
        throw new IllegalArgumentException("open() failed.", sqe);
    } catch (ClassNotFoundException cnfe) {
        throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
    }
}

一切準(zhǔn)備工作完成之后,lookup方法的邏輯看起來就非常明了了。它根據(jù)keyRow查找對應(yīng)的數(shù)據(jù)。代碼如下:

@Override
public Collection<RowData> lookup(RowData keyRow) {
    // 最多重試maxRetryTimes次
    for (int retry = 0; retry <= maxRetryTimes; retry++) {
        try {
            // 清除綁定的參數(shù)
            statement.clearParameters();
            // 將keyRow中的數(shù)據(jù)set到statement中
            statement = lookupKeyRowConverter.toExternal(keyRow, statement);
            // 執(zhí)行查詢
            try (ResultSet resultSet = statement.executeQuery()) {
                ArrayList<RowData> rows = new ArrayList<>();
                while (resultSet.next()) {
                    // 將數(shù)據(jù)逐條轉(zhuǎn)換為Flink內(nèi)部的類型
                    RowData row = jdbcRowConverter.toInternal(resultSet);
                    rows.add(row);
                }
                // 裁剪list大小到真實包含數(shù)據(jù)的條數(shù)
                rows.trimToSize();
                return rows;
            }
        } catch (SQLException e) {
            LOG.error(String.format("JDBC executeBatch error, retry times = %d", retry), e);
            // 如果超出重試次數(shù),報錯返回
            if (retry >= maxRetryTimes) {
                throw new RuntimeException("Execution of JDBC statement failed.", e);
            }

            try {
                // 如果連接已關(guān)閉,創(chuàng)建新的連接
                if (!connectionProvider.isConnectionValid()) {
                    statement.close();
                    connectionProvider.closeConnection();
                    establishConnectionAndStatement();
                }
            } catch (SQLException | ClassNotFoundException exception) {
                LOG.error(
                    "JDBC connection is not valid, and reestablish connection failed",
                    exception);
                throw new RuntimeException("Reestablish JDBC connection failed", exception);
            }

            try {
                // 每次重試間隔時間遞增
                Thread.sleep(1000L * retry);
            } catch (InterruptedException e1) {
                throw new RuntimeException(e1);
            }
        }
    }
    return Collections.emptyList();
}

本博客為作者原創(chuàng),歡迎大家參與討論和批評指正。如需轉(zhuǎn)載請注明出處。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容