Apache Flink——數(shù)據(jù)源算子(Source)

前言

Flink 可以從各種來源獲取數(shù)據(jù),然后構(gòu)建 DataStream 進(jìn)行轉(zhuǎn)換處理。一般將數(shù)據(jù)的輸入來源稱為數(shù)據(jù)源(data source),而讀取數(shù)據(jù)的算子就是源算子(source operator)。所以,source就是我們整個(gè)處理程序的輸入端。

Flink 代碼中通用的添加 source 的方式,是調(diào)用執(zhí)行環(huán)境的 addSource()方法:

DataStream<String> stream = env.addSource(...);

方法傳入一個(gè)對(duì)象參數(shù),需要實(shí)現(xiàn) SourceFunction 接口;返回 DataStreamSource。這里的 DataStreamSource 類繼承自 SingleOutputStreamOperator 類,又進(jìn)一步繼承自 DataStream。所以很明顯,讀取數(shù)據(jù)的 source 操作是一個(gè)算子,得到的是一個(gè)數(shù)據(jù)流(DataStream)。

傳入的參數(shù)是一個(gè)“源函數(shù)”(source function),需要實(shí)現(xiàn)SourceFunction 接口。

Flink 直接提供了很多預(yù)實(shí)現(xiàn)的接口,此外還有很多外部連接工具也幫我們實(shí)現(xiàn)了對(duì)應(yīng)的 source function,通常情況下足以應(yīng)對(duì)我們的實(shí)際需求。

Flink 已實(shí)現(xiàn)的Source:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/overview/

準(zhǔn)備工作

為了更好地理解,我們先構(gòu)建一個(gè)實(shí)際應(yīng)用場(chǎng)景。比如網(wǎng)站的訪問操作,可以抽象成一個(gè)三元組(用戶名,用戶訪問的 urrl,用戶訪問 url 的時(shí)間戳),所以在這里,我們可以創(chuàng)建一個(gè)類 Event,將用戶行為包裝成它的一個(gè)對(duì)象。Event 包含了以下一些字段。

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Event {

    private String user;

    private String url;

    private Long timestamp;
    
}

導(dǎo)入相關(guān)maven依賴

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.15.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.15.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>1.15.0</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.36</version>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.36</version>
    </dependency>

    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>2.17.2</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.24</version>
    </dependency>
</dependencies>

一、從集合中讀取數(shù)據(jù)

public class SourceTest {

    public static void main(String[] args) throws Exception {
        readCollection();
    }

    /**
     * 從集合中讀取數(shù)據(jù)
     * @throws Exception
     */
    private static void readCollection() throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        List<Event> list = new ArrayList<>();
        list.add(new Event("Mary","./home",1000L));
        list.add(new Event("Bob","./cart",2000L));
        //從集合中讀取數(shù)據(jù)
        DataStreamSource<Event> dataStream = env.fromCollection(list);

        dataStream.print();

        env.execute();
    }
}

二、從元素中讀取數(shù)據(jù)

public class SourceTest {

    public static void main(String[] args) throws Exception {
        readElement();
    }

    /**
     * 從元素中讀取數(shù)據(jù)
     * @throws Exception
     */
    private static void readElement() throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //從元素中讀取數(shù)據(jù)
        DataStreamSource<Event> dataStream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

        dataStream.print();

        env.execute();
    }
}

三、從文件中讀取數(shù)據(jù)

真正的實(shí)際應(yīng)用中,自然不會(huì)直接將數(shù)據(jù)寫在代碼中。通常情況下,我們會(huì)從存儲(chǔ)介質(zhì)中獲取數(shù)據(jù),一個(gè)比較常見的方式就是讀取日志文件。這也是批處理中最常見的讀取方式。

說明:

  • 參數(shù)可以是目錄,也可以是文件;
  • 路徑可以是相對(duì)路徑,也可以是絕對(duì)路徑;
  • 相對(duì)路徑是從系統(tǒng)屬性 user.dir 獲取路徑: idea 下是 project 的根目錄, standalone 模式
    下是集群節(jié)點(diǎn)根目錄;
  • 也可以從 hdfs 目錄下讀取, 使用路徑 hdfs://..., 由于 Flink 沒有提供 hadoop 相關(guān)依賴,
    需要 pom 中添加相關(guān)依賴:
public class SourceTest {

    public static void main(String[] args) throws Exception {
        readFile();
    }

    /**
     * 從文件中讀取數(shù)據(jù)
     * @throws Exception
     */
    private static void readFile() throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //從文件中讀取數(shù)據(jù)
        DataStreamSource<String> dataStream = env.readTextFile("src/main/resources/clicks.txt");

        dataStream.print();

        env.execute();
    }
}

四、從hdfs中讀取數(shù)據(jù)

引入依賴

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.3.3</version>
</dependency>
public class SourceTest {

    public static void main(String[] args) throws Exception {
        readHdfs();
    }
    
    /**
     * 從hdfs中讀取數(shù)據(jù)
     * @throws Exception
     */
    private static void readHdfs() throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //讀取hdfs文件路徑
        DataStreamSource<String> hdfsSource = env.readTextFile("hdfs://192.168.111.188:8020/input/README.txt");

        //將hdfs文件路徑打印輸出
        hdfsSource.print();

        env.execute();
    }   
}

五、從socket中讀取數(shù)據(jù)

public class SourceTest {

    public static void main(String[] args) throws Exception {
        readSocket();
    }

    /**
     * 從socket中讀取數(shù)據(jù)
     * @throws Exception
     */
    private static void readSocket() throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //讀取文本流, 監(jiān)聽linux主機(jī)端口, linux通過nc -lk 7777發(fā)送文本
        DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.111.188",7777);

        dataStreamSource.print();

        env.execute();
    }
}

六、從kafka中讀取數(shù)據(jù)

Kafka 作為分布式消息傳輸隊(duì)列,是一個(gè)高吞吐、易于擴(kuò)展的消息系統(tǒng)。而消息隊(duì)列的傳輸方式,恰恰和流處理是完全一致的。所以可以說 Kafka 和 Flink 天生一對(duì),是當(dāng)前處理流式數(shù)據(jù)的雙子星。在如今的實(shí)時(shí)流處理應(yīng)用中,由 Kafka 進(jìn)行數(shù)據(jù)的收集和傳輸,F(xiàn)link 進(jìn)行分析計(jì)算,這樣的架構(gòu)已經(jīng)成為眾多企業(yè)的首選。

Flink官方提供了連接工具flink-connector-kafka,直接幫我們實(shí)現(xiàn)了一個(gè)消費(fèi)者FlinkKafkaConsumer,它就是用來讀取 Kafka 數(shù)據(jù)的SourceFunction。

所以想要以 Kafka 作為數(shù)據(jù)源獲取數(shù)據(jù),我們只需要引入 Kafka 連接器的依賴。Flink 官方提供的是一個(gè)通用的 Kafka 連接器,它會(huì)自動(dòng)跟蹤最新版本的 Kafka 客戶端。目前最新版本只支持 0.10.0 版本以上的 Kafka,讀者使用時(shí)可以根據(jù)自己安裝的 Kafka 版本選定連接器的依賴版本。這里我們需要導(dǎo)入的依賴如下。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.15.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hadoop-compatibility_2.12</artifactId>
    <version>1.15.0</version>
</dependency>
public class SourceTest {

    public static void main(String[] args) throws Exception {
        readKafka();
    }

    /**
     * 從kafka中讀取數(shù)據(jù)
     * @throws Exception
     */
    private static void readKafka() throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.111.188:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");
        DataStreamSource<String> kafkaDataStream = env.addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties));

        kafkaDataStream.print();

        env.execute();
    }
}

創(chuàng)建 FlinkKafkaConsumer 時(shí)需要傳入三個(gè)參數(shù):

  • 第一個(gè)參數(shù) topic,定義了從哪些主題中讀取數(shù)據(jù)??梢允且粋€(gè) topic,也可以是 topic
    列表,還可以是匹配所有想要讀取的 topic 的正則表達(dá)式。當(dāng)從多個(gè) topic 中讀取數(shù)據(jù)
    時(shí),Kafka 連接器將會(huì)處理所有 topic 的分區(qū),將這些分區(qū)的數(shù)據(jù)放到一條流中去。
  • 第二個(gè)參數(shù)是一個(gè) DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消
    息被存儲(chǔ)為原始的字節(jié)數(shù)據(jù),所以需要反序列化成 Java 或者 Scala 對(duì)象。上面代碼中
    使用的 SimpleStringSchema,是一個(gè)內(nèi)置的 DeserializationSchema,它只是將字節(jié)數(shù)
    組簡(jiǎn)單地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是公共接口,所以我們也可以自定義反序列化邏輯。
  • 第三個(gè)參數(shù)是一個(gè) Properties 對(duì)象,設(shè)置了 Kafka 客戶端的一些屬性。

七、從Pulsar中讀取數(shù)據(jù)

隨著數(shù)據(jù)日益膨脹,采用事件流處理數(shù)據(jù)至關(guān)重要。Apache Flink 將批流處理統(tǒng)一到計(jì)算引擎中,提供了一致化的編程接口。Apache Pulsar(與 Apache BookKeeper 一起)以 "流 "的方式統(tǒng)一數(shù)據(jù)。在 Pulsar 中,數(shù)據(jù)存儲(chǔ)成一個(gè)副本,以流(streaming)(通過 pub-sub 接口)和 segment(用于批處理)的方式進(jìn)行訪問。Pulsar 解決了企業(yè)在使用不同的存儲(chǔ)和消息技術(shù)解決方案時(shí)遇到的數(shù)據(jù)孤島問題。

Flink 可以直接與 Pulsar broker 進(jìn)行實(shí)時(shí)的流式讀寫,同時(shí) Flink 也可以批量讀取 Pulsar 底層離線存儲(chǔ),與 BookKeeper 的內(nèi)容進(jìn)行批次讀寫。同時(shí)支持批流,使得 Pulsar 和 Flink 先天就是契合的伙伴。把 Flink 和 Pulsar 結(jié)合使用,這兩種開源技術(shù)可以創(chuàng)建一個(gè)統(tǒng)一的數(shù)據(jù)架構(gòu),為實(shí)時(shí)數(shù)據(jù)驅(qū)動(dòng)企業(yè)提供最佳解決方案。

為了將 Pulsar 與 Flink 的功能進(jìn)行整合,為用戶提供更強(qiáng)大的開發(fā)能力,StreamNative 開發(fā)并開源了 Pulsar Flink Connector。經(jīng)過多次的打磨,Pulsar Flink Connector 已合并進(jìn) Flink 代碼倉庫,并在 Flink 1.14.0 及其之后版本中發(fā)布!

Pulsar Flink Connector 基于 Apache Pulsar 和 Apache Flink 提供彈性數(shù)據(jù)處理,允許 Apache Flink 讀寫 Apache Pulsar 中的數(shù)據(jù)。使用 Pulsar Flink Connector,企業(yè)能夠更專注于業(yè)務(wù)邏輯,無需關(guān)注存儲(chǔ)問題。

引入依賴

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-pulsar</artifactId>
    <version>1.15.0</version>
</dependency>
public class SourceTest {

    public static void main(String[] args) throws Exception {
        readPulsar();
    }

    /**
     * 從Pulsar中讀取數(shù)據(jù)
     * @throws Exception
     */
    private static void readPulsar() throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2. 添加source源, 用于讀取數(shù)據(jù) pulsar
        String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
        String adminUrl = "http://192.168.23.111:8080,192.168.23.112:8080,192.168.23.113:8080";
        String topic = "persistent://my-tenant/my-ns/my-partitioned-topic";

        PulsarSource<String> pulsarSource = PulsarSource.builder()
                .setServiceUrl(serviceUrl)
                .setAdminUrl(adminUrl)
                .setStartCursor(StartCursor.earliest())
                .setTopics(topic)
                .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
                .setSubscriptionName("my-subscription")
                .setSubscriptionType(SubscriptionType.Exclusive)
                .build();
        DataStreamSource<String> streamSource = env.fromSource(pulsarSource, WatermarkStrategy.noWatermarks(),"Pulsar Source");

        streamSource.print();

        env.execute();
    }
}

八、自定義 Source

大多數(shù)情況下,前面的數(shù)據(jù)源已經(jīng)能夠滿足需要。但是凡事總有例外,如果遇到特殊情況,我們想要讀取的數(shù)據(jù)源來自某個(gè)外部系統(tǒng),而 flink 既沒有預(yù)實(shí)現(xiàn)的方法、也沒有提供連接器,又該怎么辦呢?

那就只好自定義實(shí)現(xiàn) SourceFunction 了。

Flink還提供了數(shù)據(jù)源接口,實(shí)現(xiàn)該接口就可以實(shí)現(xiàn)自定義數(shù)據(jù)源,不同的接口有不同的功能,分類如下:

  • SourceFunction:非并行數(shù)據(jù)源(并行度只能=1)
  • ParallelSourceFunction:并行數(shù)據(jù)源(并行度能夠>=l)
  • RichSourceFunction:多功能非并行數(shù)據(jù)源(并行度只能=1)
  • RichParallelSourceFunction:多功能并行數(shù)據(jù)源(并行度能夠>=1)

接下來我們創(chuàng)建一個(gè)自定義的數(shù)據(jù)源,實(shí)現(xiàn) SourceFunction 接口。主要重寫兩個(gè)關(guān)鍵方法:run()和 cancel()。

  • run()方法:使用運(yùn)行時(shí)上下文對(duì)象(SourceContext)向下游發(fā)送數(shù)據(jù);
  • cancel()方法:通過標(biāo)識(shí)位控制退出循環(huán),來達(dá)到中斷數(shù)據(jù)源的效果。

8.1 SourceFunction——單并行度Source

實(shí)現(xiàn)代碼

public class ClickSource implements SourceFunction<Event> {

    //聲明一個(gè)標(biāo)志位
    private boolean running = true;

    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        //隨機(jī)生成數(shù)據(jù)
        Random random = new Random();
        //定義字段選取的數(shù)據(jù)集
        String[] users = {"Mary", "Alice", "Bobo", "lucy"};
        String[] urls = {"./home", "./cart", "./prod", "./order"};

        //循環(huán)生成數(shù)據(jù)
        while (running){
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            long timestamp = System.currentTimeMillis();
            sourceContext.collect(new Event(user,url,timestamp));

            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

調(diào)用代碼

public class SourceCustomTest {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> customDataStream = env.addSource(new ClickSource());

        parallelDataStream.print();

        env.execute();
    }
}

注意:SourceFunction 接口定義的數(shù)據(jù)源,并行度只能設(shè)置為 1,如果數(shù)據(jù)源設(shè)置為大于 1 的并行度,則會(huì)拋出異常。

8.2 ParallelSourceFunction——多并行度Source

實(shí)現(xiàn)代碼

public class ParallelClickSource implements ParallelSourceFunction<Event> {

    //聲明一個(gè)標(biāo)志位
    private boolean running = true;

    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        //隨機(jī)生成數(shù)據(jù)
        Random random = new Random();
        //定義字段選取的數(shù)據(jù)集
        String[] users = {"Mary", "Alice", "Bobo", "lucy"};
        String[] urls = {"./home", "./cart", "./prod", "./order"};

        //循環(huán)生成數(shù)據(jù)
        while (running){
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            long timestamp = System.currentTimeMillis();
            sourceContext.collect(new Event(user,url,timestamp));

            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

調(diào)用代碼

public class SourceCustomTest {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> parallelDataStream = env.addSource(new ParallelClickSource()).setParallelism(2);

        parallelDataStream.print();

        env.execute();
    }
}

RichSourceFunction、RichParallelSourceFunction提供了外部連接的open和close方法以及運(yùn)行時(shí)的getRuntimeContext等方法

8.3 RichSourceFunction——多功能非并行Source

實(shí)現(xiàn)代碼

public class ClickRichSource extends RichSourceFunction<Event> {

    //聲明一個(gè)標(biāo)志位
    private boolean running = true;

    @Override
    public void open(Configuration parameters) throws Exception {
        RuntimeContext runtimeContext = getRuntimeContext();
        String taskName = runtimeContext.getTaskName();
        int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
        System.out.println(taskName + "-" + indexOfThisSubtask);
    }

    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        //隨機(jī)生成數(shù)據(jù)
        Random random = new Random();
        //定義字段選取的數(shù)據(jù)集
        String[] users = {"Mary", "Alice", "Bobo", "lucy"};
        String[] urls = {"./home", "./cart", "./prod", "./order"};

        //循環(huán)生成數(shù)據(jù)
        while (running){
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            long timestamp = System.currentTimeMillis();
            Event event = new Event(user, url, timestamp);
            System.out.println(event.toString());
            sourceContext.collect(event);

            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }

    @Override
    public void close() throws Exception {
        System.out.println("關(guān)閉資源.....");
    }
}

調(diào)用代碼

public class SourceCustomTest {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> RichDataStream = env.addSource(new ClickRichSource());

        parallelDataStream.print();

        env.execute();
    }
}

8.4 RichParallelSourceFunction——多功能并行Source

實(shí)現(xiàn)代碼

public class RichParallelClickSource extends RichParallelSourceFunction<Event> {

    //聲明一個(gè)標(biāo)志位
    private boolean running = true;


    @Override
    public void open(Configuration parameters) throws Exception {
        RuntimeContext runtimeContext = getRuntimeContext();
        String taskName = runtimeContext.getTaskName();
        int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
        System.out.println(taskName + "-" + indexOfThisSubtask);
    }

    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        //隨機(jī)生成數(shù)據(jù)
        Random random = new Random();
        //定義字段選取的數(shù)據(jù)集
        String[] users = {"Mary", "Alice", "Bobo", "lucy"};
        String[] urls = {"./home", "./cart", "./prod", "./order"};

        //循環(huán)生成數(shù)據(jù)
        while (running){
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            long timestamp = System.currentTimeMillis();
            sourceContext.collect(new Event(user,url,timestamp));

            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }

    @Override
    public void close() throws Exception {
        System.out.println("關(guān)閉資源.....");
    }
}

調(diào)用代碼

public class SourceCustomTest {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> RichDataStream = env.addSource(new RichParallelClickSource()).setParallelism(2);

        parallelDataStream.print();

        env.execute();
    }
}

8.5 從MySQL實(shí)時(shí)加載數(shù)據(jù)

實(shí)現(xiàn)代碼

public class MySQLSource extends RichParallelSourceFunction<Student> {

    private boolean flag = true;
    private Connection conn;
    private PreparedStatement statement;
    private ResultSet resultSet;

    @Override
    public void open(Configuration parameters) throws Exception {
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "123456");
        String sql = "select id, name, age from t_student";
        statement = conn.prepareStatement(sql);
        super.open(parameters);
    }

    @Override
    public void run(SourceContext<Student> ctx) throws Exception {
        while (flag) {
            resultSet = statement.executeQuery();
            while (resultSet.next()){
                String id = resultSet.getString("id");
                String name = resultSet.getString("name");
                Integer age = resultSet.getInt("age");
                ctx.collect(new Student(id, name, age));
            }
            Thread.sleep(3000);
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }

    @Override
    public void close() throws Exception {
        if(conn != null) conn.close();
        if(statement != null) statement.close();
        if(resultSet != null) resultSet.close();
    }
}

調(diào)用代碼

public class SourceCustomTest {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        DataStreamSource<Student> parallelDataStream = env.addSource(new MySQLSource()).setParallelism(4);

        parallelDataStream.print();

        env.execute();
    }
}

參考:
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/datastream/pulsar/

https://blog.csdn.net/weixin_47491957/article/details/124317150

https://blog.csdn.net/weixin_45417821/article/details/124143407

https://blog.csdn.net/weixin_45417821/article/details/124145083

https://blog.csdn.net/weixin_45417821/article/details/124146085

https://blog.csdn.net/weixin_45417821/article/details/124147285

https://segmentfault.com/a/1190000041048040

https://blog.csdn.net/qq_41924766/article/details/130681921

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容