Flink SQL 自定義 Sink

1.背景

內(nèi)部要做 Flink SQL 平臺(tái),本文以自定義 Redis Sink 為例來(lái)說(shuō)明 Flink SQL 如何自定義 Sink 以及自定義完了之后如何使用
基于 Flink 1.11

2.步驟

  1. implements DynamicTableSinkFactory
  2. implements DynamicTableSink
  3. 創(chuàng)建 Redis Sink

3.自定義 sink 代碼

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import redis.clients.jedis.JedisCluster;

import java.util.*;

import static org.apache.flink.configuration.ConfigOptions.key;


/**
 * @author shengjk1
 * @date 2020/10/16
 */
public class RedisTableSinkFactory implements DynamicTableSinkFactory {
    
    public static final String IDENTIFIER = "redis";
    
    public static final ConfigOption<String> HOST_PORT = key("hostPort")
            .stringType()
            .noDefaultValue()
            .withDescription("redis host and port,");
    
    public static final ConfigOption<String> PASSWORD = key("password")
            .stringType()
            .noDefaultValue()
            .withDescription("redis password");
    
    public static final ConfigOption<Integer> EXPIRE_TIME = key("expireTime")
            .intType()
            .noDefaultValue()
            .withDescription("redis key expire time");
    
    public static final ConfigOption<String> KEY_TYPE = key("keyType")
            .stringType()
            .noDefaultValue()
            .withDescription("redis key type,such as hash,string and so on ");
    
    public static final ConfigOption<String> KEY_TEMPLATE = key("keyTemplate")
            .stringType()
            .noDefaultValue()
            .withDescription("redis key template ");
    
    public static final ConfigOption<String> FIELD_TEMPLATE = key("fieldTemplate")
            .stringType()
            .noDefaultValue()
            .withDescription("redis field template ");
    
    
    public static final ConfigOption<String> VALUE_NAMES = key("valueNames")
            .stringType()
            .noDefaultValue()
            .withDescription("redis value name ");
    
    @Override
    // 當(dāng) connector 與 IDENTIFIER 一直才會(huì)找到 RedisTableSinkFactory 通過(guò) 
    public String factoryIdentifier() {
        return IDENTIFIER;
    }
    
    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        return new HashSet<>();
    }
    
    @Override
    //我們自己定義的所有選項(xiàng) (with 后面的 ) 都會(huì)在這里獲取
    public Set<ConfigOption<?>> optionalOptions() {
        Set<ConfigOption<?>> options = new HashSet<>();
        options.add(HOST_PORT);
        options.add(PASSWORD);
        options.add(EXPIRE_TIME);
        options.add(KEY_TYPE);
        options.add(KEY_TEMPLATE);
        options.add(FIELD_TEMPLATE);
        options.add(VALUE_NAMES);
        return options;
    }
    
    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
        helper.validate();
        ReadableConfig options = helper.getOptions();
        return new RedisSink(
                context.getCatalogTable().getSchema().toPhysicalRowDataType(),
                options);
    }
    
    
    private static class RedisSink implements DynamicTableSink {
        
        private final DataType type;
        private final ReadableConfig options;
        
        private RedisSink(DataType type, ReadableConfig options) {
            this.type = type;
            this.options = options;
        }
        
        @Override
        //ChangelogMode 
        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
            return requestedMode;
        }
        
        @Override
        //具體運(yùn)行的地方,真正開(kāi)始調(diào)用用戶自己定義的 streaming sink ,建立 sql 與 streaming 的聯(lián)系
        public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
            DataStructureConverter converter = context.createDataStructureConverter(type);
            return SinkFunctionProvider.of(new RowDataPrintFunction(converter, options, type));
        }
        
        @Override
        // sink 可以不用實(shí)現(xiàn),主要用來(lái) source 的謂詞下推
        public DynamicTableSink copy() {
            return new RedisSink(type, options);
        }
        
        @Override
        public String asSummaryString() {
            return "redis";
        }
    }
    
    /**
     同 flink streaming 自定義 sink ,只不過(guò)我們這次處理的是 RowData,不細(xì)說(shuō)
     */
    private static class RowDataPrintFunction extends RichSinkFunction<RowData> {
        
        private static final long serialVersionUID = 1L;
        
        private final DataStructureConverter converter;
        private final ReadableConfig options;
        private final DataType type;
        private RowType logicalType;
        private HashMap<String, Integer> fields;
        private JedisCluster jedisCluster;
        
        private RowDataPrintFunction(
                DataStructureConverter converter, ReadableConfig options, DataType type) {
            this.converter = converter;
            this.options = options;
            this.type = type;
        }
        
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            logicalType = (RowType) type.getLogicalType();
            fields = new HashMap<>();
            List<RowType.RowField> rowFields = logicalType.getFields();
            int size = rowFields.size();
            for (int i = 0; i < size; i++) {
                fields.put(rowFields.get(i).getName(), i);
            }
            
            jedisCluster = RedisUtil.getJedisCluster(options.get(HOST_PORT));
        }

        @Override
        public void close() throws Exception {
            RedisUtil.closeConn(jedisCluster);
        }

        @Override
        /*
        2> +I(1,30017323,1101)
        2> -U(1,30017323,1101)
        2> +U(2,30017323,1101)
        2> -U(2,30017323,1101)
        2> +U(3,30017323,1101)
        2> -U(3,30017323,1101)
        2> +U(4,30017323,1101)
        3> -U(3,980897,3208)
        3> +U(4,980897,3208)
         */
        public void invoke(RowData rowData, Context context) {
            RowKind rowKind = rowData.getRowKind();
            Row data = (Row) converter.toExternal(rowData);
            if (rowKind.equals(RowKind.UPDATE_AFTER) || rowKind.equals(RowKind.INSERT)) {
                
                String keyTemplate = options.get(KEY_TEMPLATE);
                if (Objects.isNull(keyTemplate) || keyTemplate.trim().length() == 0) {
                    throw new NullPointerException(" keyTemplate is null or keyTemplate is empty");
                }
                
                if (keyTemplate.contains("${")) {
                    String[] split = keyTemplate.split("\\$\\{");
                    keyTemplate = "";
                    for (String s : split) {
                        if (s.contains("}")) {
                            String filedName = s.substring(0, s.length() - 1);
                            int index = fields.get(filedName);
                            keyTemplate = keyTemplate + data.getField(index).toString();
                        } else {
                            keyTemplate = keyTemplate + s;
                        }
                    }
                }
                
                String keyType = options.get(KEY_TYPE);
                String valueNames = options.get(VALUE_NAMES);
                // type=hash must need fieldTemplate
                if ("hash".equalsIgnoreCase(keyType)) {
                    String fieldTemplate = options.get(FIELD_TEMPLATE);
                    if (fieldTemplate.contains("${")) {
                        String[] split = fieldTemplate.split("\\$\\{");
                        fieldTemplate = "";
                        for (String s : split) {
                            if (s.contains("}")) {
                                String fieldName = s.substring(0, s.length() - 1);
                                int index = fields.get(fieldName);
                                fieldTemplate = fieldTemplate + data.getField(index).toString();
                            } else {
                                fieldTemplate = fieldTemplate + s;
                            }
                        }
                    }
                    
                    //fieldName = fieldTemplate-valueName
                    if (valueNames.contains(",")) {
                        HashMap<String, String> map = new HashMap<>();
                        String[] fieldNames = valueNames.split(",");
                        for (String fieldName : fieldNames) {
                            String value = data.getField(fields.get(fieldName)).toString();
                            map.put(fieldTemplate + "_" + fieldName, value);
                        }
                        jedisCluster.hset(keyTemplate, map);
                    } else {
                        jedisCluster.hset(keyTemplate, fieldTemplate + "_" + valueNames, data.getField(fields.get(valueNames)).toString());
                    }
                    
                } else if ("set".equalsIgnoreCase(keyType)) {
                    jedisCluster.set(keyTemplate, data.getField(fields.get(valueNames)).toString());
                    
                } else if ("sadd".equalsIgnoreCase(keyType)) {
                    jedisCluster.sadd(keyTemplate, data.getField(fields.get(valueNames)).toString());
                } else if ("zadd".equalsIgnoreCase(keyType)) {
                    jedisCluster.sadd(keyTemplate, data.getField(fields.get(valueNames)).toString());
                } else {
                    throw new IllegalArgumentException(" not find this keyType:" + keyType);
                }
                
                if (Objects.nonNull(options.get(EXPIRE_TIME))) {
                    jedisCluster.expire(keyTemplate, options.get(EXPIRE_TIME));
                }
            }
        }
    }
}

4.使用 Redis Sink

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

/**
 * @author shengjk1
 * @date 2020/9/25
 */
public class SqlKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, environmentSettings);
        // enable checkpointing
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        configuration.set(
                ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
        configuration.set(
                ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));
        
        String sql = "CREATE TABLE sourcedata (`id` bigint,`status` int,`city_id` bigint,`courier_id` bigint,info_index int,order_id bigint,tableName String" +
                ") WITH (" +
                "'connector' = 'kafka','topic' = 'xxx'," +
                "'properties.bootstrap.servers' = 'xxx','properties.group.id' = 'testGroup'," +
                "'format' = 'json','scan.startup.mode' = 'earliest-offset')";
        tableEnv.executeSql(sql);
        
        //15017284 distinct
        Table bigtable = tableEnv.sqlQuery("select distinct a.id,a.courier_id,a.status,a.city_id,b.info_index from (select id,status,city_id,courier_id from sourcedata where tableName = 'orders' and status=60)a join (select " +
                " order_id,max(info_index)info_index from sourcedata  where tableName = 'infos'  group by order_id )b on a.id=b.order_id");

        sql = "CREATE TABLE redis (info_index BIGINT,courier_id BIGINT,city_id BIGINT" +
                ") WITH (" +
                "'connector' = 'redis'," +
                "'hostPort'='xxx'," +
                "'keyType'='hash'," +
                "'keyTemplate'='test2_${city_id}'," +
                "'fieldTemplate'='test2_${courier_id}'," +
                "'valueNames'='info_index,city_id'," +
                "'expireTime'='1000')";
            
        tableEnv.executeSql(sql);
        
        Table resultTable = tableEnv.sqlQuery("select sum(info_index)info_index,courier_id,city_id from " + bigtable + " group by city_id,courier_id");
        TupleTypeInfo<Tuple3<Long, Long, Long>> tupleType = new TupleTypeInfo<>(
                Types.LONG(),
                Types.LONG(),
                Types.LONG());
        tableEnv.toRetractStream(resultTable, tupleType).print("===== ");
        tableEnv.executeSql("INSERT INTO redis SELECT info_index,courier_id,city_id FROM " + resultTable);
        env.execute("");
    }
}

5.詳細(xì)解釋

create table test(
`id` bigint,
 `url` string,
 `day` string,
  `pv` long,
  `uv` long
) with {
    'connector'='redis',
    'hostPort'='xxx',
    'password'='',
    'expireTime'='100',
    'keyType'='hash',
    'keyTemplate'='test_${id}',
    'fieldTemplate'='${day}',
    'valueNames'='pv,uv',
}

redis result: 假設(shè) id=1 day=20201016 pv=20,uv=20
    hash
    test_1 20201016-pv 20,20201016-uv 20

參數(shù)解釋:
connector  固定寫(xiě)法
hostPort   redis 的地址
password   redis 的密碼
expireTime  redis key 過(guò)期時(shí)間,單位為 s
keyType  redis key 的類型,目前有 hash、set、sadd、zadd
keyTemplate  redis key 的表達(dá)式,如 test_${id} 注意 id 為表的字段名
fieldTemplate  redis keyType==hash 時(shí),此選項(xiàng)為必選,表達(dá)式規(guī)則同 keyTemplate
valueNames  redis value  only 可以有多個(gè)

6.原理

在這里插入圖片描述
  1. 整個(gè)流程如圖,CatalogTable ---> DynamicTableSource and DynamicTableSink 這個(gè)過(guò)程中,其實(shí)是通過(guò) DynamicTableSourceFactory and DynamicTableSinkFactory 起到了一個(gè)橋梁的作用

  2. (Source/Sink)Factory 通過(guò) connector='xxx' 找到,理論上會(huì)做三種操作
    1. validate options
    2. configure encoding/decoding formats( if required )
    3. create a parameterized instance of the table connector
    其中 formats 是通過(guò) format='xxx' 找到

  3. DynamicTableSource DynamicTableSink
    官網(wǎng)雖說(shuō)可以看做是有狀態(tài)的,但是否真的有狀態(tài)取決于具體實(shí)現(xiàn)的 source 和 sink

  4. 生成 Runtime logic,Runtime logic 被 Flink core connector interfaces( 如 InputFormat or SourceFunction),如果是 kafka 的話 則 是 FlinkKafkaConsumer 實(shí)現(xiàn),而這些實(shí)現(xiàn)又被抽象為 *Provider,然后開(kāi)始執(zhí)行 *Provider

  5. *Provider 是連接 SQL 與 Streaming 代碼級(jí)別的橋梁

7.參考

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容