前言
Flink 可以從各種來源獲取數(shù)據(jù),然后構(gòu)建 DataStream 進(jìn)行轉(zhuǎn)換處理。一般將數(shù)據(jù)的輸入來源稱為數(shù)據(jù)源(data source),而讀取數(shù)據(jù)的算子就是源算子(source operator)。所以,source就是我們整個(gè)處理程序的輸入端。
Flink 代碼中通用的添加 source 的方式,是調(diào)用執(zhí)行環(huán)境的 addSource()方法:
DataStream<String> stream = env.addSource(...);
方法傳入一個(gè)對(duì)象參數(shù),需要實(shí)現(xiàn) SourceFunction 接口;返回 DataStreamSource。這里的 DataStreamSource 類繼承自 SingleOutputStreamOperator 類,又進(jìn)一步繼承自 DataStream。所以很明顯,讀取數(shù)據(jù)的 source 操作是一個(gè)算子,得到的是一個(gè)數(shù)據(jù)流(DataStream)。
傳入的參數(shù)是一個(gè)“源函數(shù)”(source function),需要實(shí)現(xiàn)SourceFunction 接口。
Flink 直接提供了很多預(yù)實(shí)現(xiàn)的接口,此外還有很多外部連接工具也幫我們實(shí)現(xiàn)了對(duì)應(yīng)的 source function,通常情況下足以應(yīng)對(duì)我們的實(shí)際需求。
Flink 已實(shí)現(xiàn)的Source:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/overview/
準(zhǔn)備工作
為了更好地理解,我們先構(gòu)建一個(gè)實(shí)際應(yīng)用場(chǎng)景。比如網(wǎng)站的訪問操作,可以抽象成一個(gè)三元組(用戶名,用戶訪問的 urrl,用戶訪問 url 的時(shí)間戳),所以在這里,我們可以創(chuàng)建一個(gè)類 Event,將用戶行為包裝成它的一個(gè)對(duì)象。Event 包含了以下一些字段。
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Event {
private String user;
private String url;
private Long timestamp;
}
導(dǎo)入相關(guān)maven依賴
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
</dependencies>
一、從集合中讀取數(shù)據(jù)
public class SourceTest {
public static void main(String[] args) throws Exception {
readCollection();
}
/**
* 從集合中讀取數(shù)據(jù)
* @throws Exception
*/
private static void readCollection() throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
List<Event> list = new ArrayList<>();
list.add(new Event("Mary","./home",1000L));
list.add(new Event("Bob","./cart",2000L));
//從集合中讀取數(shù)據(jù)
DataStreamSource<Event> dataStream = env.fromCollection(list);
dataStream.print();
env.execute();
}
}
二、從元素中讀取數(shù)據(jù)
public class SourceTest {
public static void main(String[] args) throws Exception {
readElement();
}
/**
* 從元素中讀取數(shù)據(jù)
* @throws Exception
*/
private static void readElement() throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//從元素中讀取數(shù)據(jù)
DataStreamSource<Event> dataStream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
dataStream.print();
env.execute();
}
}
三、從文件中讀取數(shù)據(jù)
真正的實(shí)際應(yīng)用中,自然不會(huì)直接將數(shù)據(jù)寫在代碼中。通常情況下,我們會(huì)從存儲(chǔ)介質(zhì)中獲取數(shù)據(jù),一個(gè)比較常見的方式就是讀取日志文件。這也是批處理中最常見的讀取方式。
說明:
- 參數(shù)可以是目錄,也可以是文件;
- 路徑可以是相對(duì)路徑,也可以是絕對(duì)路徑;
- 相對(duì)路徑是從系統(tǒng)屬性 user.dir 獲取路徑: idea 下是 project 的根目錄, standalone 模式
下是集群節(jié)點(diǎn)根目錄; - 也可以從 hdfs 目錄下讀取, 使用路徑 hdfs://..., 由于 Flink 沒有提供 hadoop 相關(guān)依賴,
需要 pom 中添加相關(guān)依賴:
public class SourceTest {
public static void main(String[] args) throws Exception {
readFile();
}
/**
* 從文件中讀取數(shù)據(jù)
* @throws Exception
*/
private static void readFile() throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//從文件中讀取數(shù)據(jù)
DataStreamSource<String> dataStream = env.readTextFile("src/main/resources/clicks.txt");
dataStream.print();
env.execute();
}
}
四、從hdfs中讀取數(shù)據(jù)
引入依賴
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.3</version>
</dependency>
public class SourceTest {
public static void main(String[] args) throws Exception {
readHdfs();
}
/**
* 從hdfs中讀取數(shù)據(jù)
* @throws Exception
*/
private static void readHdfs() throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//讀取hdfs文件路徑
DataStreamSource<String> hdfsSource = env.readTextFile("hdfs://192.168.111.188:8020/input/README.txt");
//將hdfs文件路徑打印輸出
hdfsSource.print();
env.execute();
}
}
五、從socket中讀取數(shù)據(jù)
public class SourceTest {
public static void main(String[] args) throws Exception {
readSocket();
}
/**
* 從socket中讀取數(shù)據(jù)
* @throws Exception
*/
private static void readSocket() throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//讀取文本流, 監(jiān)聽linux主機(jī)端口, linux通過nc -lk 7777發(fā)送文本
DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.111.188",7777);
dataStreamSource.print();
env.execute();
}
}
六、從kafka中讀取數(shù)據(jù)
Kafka 作為分布式消息傳輸隊(duì)列,是一個(gè)高吞吐、易于擴(kuò)展的消息系統(tǒng)。而消息隊(duì)列的傳輸方式,恰恰和流處理是完全一致的。所以可以說 Kafka 和 Flink 天生一對(duì),是當(dāng)前處理流式數(shù)據(jù)的雙子星。在如今的實(shí)時(shí)流處理應(yīng)用中,由 Kafka 進(jìn)行數(shù)據(jù)的收集和傳輸,F(xiàn)link 進(jìn)行分析計(jì)算,這樣的架構(gòu)已經(jīng)成為眾多企業(yè)的首選。

Flink官方提供了連接工具flink-connector-kafka,直接幫我們實(shí)現(xiàn)了一個(gè)消費(fèi)者FlinkKafkaConsumer,它就是用來讀取 Kafka 數(shù)據(jù)的SourceFunction。
所以想要以 Kafka 作為數(shù)據(jù)源獲取數(shù)據(jù),我們只需要引入 Kafka 連接器的依賴。Flink 官方提供的是一個(gè)通用的 Kafka 連接器,它會(huì)自動(dòng)跟蹤最新版本的 Kafka 客戶端。目前最新版本只支持 0.10.0 版本以上的 Kafka,讀者使用時(shí)可以根據(jù)自己安裝的 Kafka 版本選定連接器的依賴版本。這里我們需要導(dǎo)入的依賴如下。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.12</artifactId>
<version>1.15.0</version>
</dependency>
public class SourceTest {
public static void main(String[] args) throws Exception {
readKafka();
}
/**
* 從kafka中讀取數(shù)據(jù)
* @throws Exception
*/
private static void readKafka() throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.111.188:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStreamSource<String> kafkaDataStream = env.addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties));
kafkaDataStream.print();
env.execute();
}
}
創(chuàng)建 FlinkKafkaConsumer 時(shí)需要傳入三個(gè)參數(shù):
- 第一個(gè)參數(shù) topic,定義了從哪些主題中讀取數(shù)據(jù)??梢允且粋€(gè) topic,也可以是 topic
列表,還可以是匹配所有想要讀取的 topic 的正則表達(dá)式。當(dāng)從多個(gè) topic 中讀取數(shù)據(jù)
時(shí),Kafka 連接器將會(huì)處理所有 topic 的分區(qū),將這些分區(qū)的數(shù)據(jù)放到一條流中去。 - 第二個(gè)參數(shù)是一個(gè) DeserializationSchema 或者 KeyedDeserializationSchema。Kafka 消
息被存儲(chǔ)為原始的字節(jié)數(shù)據(jù),所以需要反序列化成 Java 或者 Scala 對(duì)象。上面代碼中
使用的 SimpleStringSchema,是一個(gè)內(nèi)置的 DeserializationSchema,它只是將字節(jié)數(shù)
組簡(jiǎn)單地反序列化成字符串。DeserializationSchema 和 KeyedDeserializationSchema 是公共接口,所以我們也可以自定義反序列化邏輯。 - 第三個(gè)參數(shù)是一個(gè) Properties 對(duì)象,設(shè)置了 Kafka 客戶端的一些屬性。
七、從Pulsar中讀取數(shù)據(jù)
隨著數(shù)據(jù)日益膨脹,采用事件流處理數(shù)據(jù)至關(guān)重要。Apache Flink 將批流處理統(tǒng)一到計(jì)算引擎中,提供了一致化的編程接口。Apache Pulsar(與 Apache BookKeeper 一起)以 "流 "的方式統(tǒng)一數(shù)據(jù)。在 Pulsar 中,數(shù)據(jù)存儲(chǔ)成一個(gè)副本,以流(streaming)(通過 pub-sub 接口)和 segment(用于批處理)的方式進(jìn)行訪問。Pulsar 解決了企業(yè)在使用不同的存儲(chǔ)和消息技術(shù)解決方案時(shí)遇到的數(shù)據(jù)孤島問題。
Flink 可以直接與 Pulsar broker 進(jìn)行實(shí)時(shí)的流式讀寫,同時(shí) Flink 也可以批量讀取 Pulsar 底層離線存儲(chǔ),與 BookKeeper 的內(nèi)容進(jìn)行批次讀寫。同時(shí)支持批流,使得 Pulsar 和 Flink 先天就是契合的伙伴。把 Flink 和 Pulsar 結(jié)合使用,這兩種開源技術(shù)可以創(chuàng)建一個(gè)統(tǒng)一的數(shù)據(jù)架構(gòu),為實(shí)時(shí)數(shù)據(jù)驅(qū)動(dòng)企業(yè)提供最佳解決方案。
為了將 Pulsar 與 Flink 的功能進(jìn)行整合,為用戶提供更強(qiáng)大的開發(fā)能力,StreamNative 開發(fā)并開源了 Pulsar Flink Connector。經(jīng)過多次的打磨,Pulsar Flink Connector 已合并進(jìn) Flink 代碼倉庫,并在 Flink 1.14.0 及其之后版本中發(fā)布!
Pulsar Flink Connector 基于 Apache Pulsar 和 Apache Flink 提供彈性數(shù)據(jù)處理,允許 Apache Flink 讀寫 Apache Pulsar 中的數(shù)據(jù)。使用 Pulsar Flink Connector,企業(yè)能夠更專注于業(yè)務(wù)邏輯,無需關(guān)注存儲(chǔ)問題。
引入依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-pulsar</artifactId>
<version>1.15.0</version>
</dependency>
public class SourceTest {
public static void main(String[] args) throws Exception {
readPulsar();
}
/**
* 從Pulsar中讀取數(shù)據(jù)
* @throws Exception
*/
private static void readPulsar() throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2. 添加source源, 用于讀取數(shù)據(jù) pulsar
String serviceUrl = "pulsar://192.168.23.111:6650,192.168.23.112:6650,192.168.23.113:6650";
String adminUrl = "http://192.168.23.111:8080,192.168.23.112:8080,192.168.23.113:8080";
String topic = "persistent://my-tenant/my-ns/my-partitioned-topic";
PulsarSource<String> pulsarSource = PulsarSource.builder()
.setServiceUrl(serviceUrl)
.setAdminUrl(adminUrl)
.setStartCursor(StartCursor.earliest())
.setTopics(topic)
.setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(new SimpleStringSchema()))
.setSubscriptionName("my-subscription")
.setSubscriptionType(SubscriptionType.Exclusive)
.build();
DataStreamSource<String> streamSource = env.fromSource(pulsarSource, WatermarkStrategy.noWatermarks(),"Pulsar Source");
streamSource.print();
env.execute();
}
}
八、自定義 Source
大多數(shù)情況下,前面的數(shù)據(jù)源已經(jīng)能夠滿足需要。但是凡事總有例外,如果遇到特殊情況,我們想要讀取的數(shù)據(jù)源來自某個(gè)外部系統(tǒng),而 flink 既沒有預(yù)實(shí)現(xiàn)的方法、也沒有提供連接器,又該怎么辦呢?
那就只好自定義實(shí)現(xiàn) SourceFunction 了。
Flink還提供了數(shù)據(jù)源接口,實(shí)現(xiàn)該接口就可以實(shí)現(xiàn)自定義數(shù)據(jù)源,不同的接口有不同的功能,分類如下:
- SourceFunction:非并行數(shù)據(jù)源(并行度只能=1)
- ParallelSourceFunction:并行數(shù)據(jù)源(并行度能夠>=l)
- RichSourceFunction:多功能非并行數(shù)據(jù)源(并行度只能=1)
- RichParallelSourceFunction:多功能并行數(shù)據(jù)源(并行度能夠>=1)
接下來我們創(chuàng)建一個(gè)自定義的數(shù)據(jù)源,實(shí)現(xiàn) SourceFunction 接口。主要重寫兩個(gè)關(guān)鍵方法:run()和 cancel()。
- run()方法:使用運(yùn)行時(shí)上下文對(duì)象(SourceContext)向下游發(fā)送數(shù)據(jù);
- cancel()方法:通過標(biāo)識(shí)位控制退出循環(huán),來達(dá)到中斷數(shù)據(jù)源的效果。
8.1 SourceFunction——單并行度Source
實(shí)現(xiàn)代碼
public class ClickSource implements SourceFunction<Event> {
//聲明一個(gè)標(biāo)志位
private boolean running = true;
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
//隨機(jī)生成數(shù)據(jù)
Random random = new Random();
//定義字段選取的數(shù)據(jù)集
String[] users = {"Mary", "Alice", "Bobo", "lucy"};
String[] urls = {"./home", "./cart", "./prod", "./order"};
//循環(huán)生成數(shù)據(jù)
while (running){
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(urls.length)];
long timestamp = System.currentTimeMillis();
sourceContext.collect(new Event(user,url,timestamp));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
調(diào)用代碼
public class SourceCustomTest {
public static void main(String[] args) throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> customDataStream = env.addSource(new ClickSource());
parallelDataStream.print();
env.execute();
}
}
注意:SourceFunction 接口定義的數(shù)據(jù)源,并行度只能設(shè)置為 1,如果數(shù)據(jù)源設(shè)置為大于 1 的并行度,則會(huì)拋出異常。
8.2 ParallelSourceFunction——多并行度Source
實(shí)現(xiàn)代碼
public class ParallelClickSource implements ParallelSourceFunction<Event> {
//聲明一個(gè)標(biāo)志位
private boolean running = true;
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
//隨機(jī)生成數(shù)據(jù)
Random random = new Random();
//定義字段選取的數(shù)據(jù)集
String[] users = {"Mary", "Alice", "Bobo", "lucy"};
String[] urls = {"./home", "./cart", "./prod", "./order"};
//循環(huán)生成數(shù)據(jù)
while (running){
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(urls.length)];
long timestamp = System.currentTimeMillis();
sourceContext.collect(new Event(user,url,timestamp));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
調(diào)用代碼
public class SourceCustomTest {
public static void main(String[] args) throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> parallelDataStream = env.addSource(new ParallelClickSource()).setParallelism(2);
parallelDataStream.print();
env.execute();
}
}
RichSourceFunction、RichParallelSourceFunction提供了外部連接的open和close方法以及運(yùn)行時(shí)的getRuntimeContext等方法
8.3 RichSourceFunction——多功能非并行Source
實(shí)現(xiàn)代碼
public class ClickRichSource extends RichSourceFunction<Event> {
//聲明一個(gè)標(biāo)志位
private boolean running = true;
@Override
public void open(Configuration parameters) throws Exception {
RuntimeContext runtimeContext = getRuntimeContext();
String taskName = runtimeContext.getTaskName();
int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
System.out.println(taskName + "-" + indexOfThisSubtask);
}
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
//隨機(jī)生成數(shù)據(jù)
Random random = new Random();
//定義字段選取的數(shù)據(jù)集
String[] users = {"Mary", "Alice", "Bobo", "lucy"};
String[] urls = {"./home", "./cart", "./prod", "./order"};
//循環(huán)生成數(shù)據(jù)
while (running){
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(urls.length)];
long timestamp = System.currentTimeMillis();
Event event = new Event(user, url, timestamp);
System.out.println(event.toString());
sourceContext.collect(event);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
@Override
public void close() throws Exception {
System.out.println("關(guān)閉資源.....");
}
}
調(diào)用代碼
public class SourceCustomTest {
public static void main(String[] args) throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> RichDataStream = env.addSource(new ClickRichSource());
parallelDataStream.print();
env.execute();
}
}
8.4 RichParallelSourceFunction——多功能并行Source
實(shí)現(xiàn)代碼
public class RichParallelClickSource extends RichParallelSourceFunction<Event> {
//聲明一個(gè)標(biāo)志位
private boolean running = true;
@Override
public void open(Configuration parameters) throws Exception {
RuntimeContext runtimeContext = getRuntimeContext();
String taskName = runtimeContext.getTaskName();
int indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
System.out.println(taskName + "-" + indexOfThisSubtask);
}
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
//隨機(jī)生成數(shù)據(jù)
Random random = new Random();
//定義字段選取的數(shù)據(jù)集
String[] users = {"Mary", "Alice", "Bobo", "lucy"};
String[] urls = {"./home", "./cart", "./prod", "./order"};
//循環(huán)生成數(shù)據(jù)
while (running){
String user = users[random.nextInt(users.length)];
String url = urls[random.nextInt(urls.length)];
long timestamp = System.currentTimeMillis();
sourceContext.collect(new Event(user,url,timestamp));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
@Override
public void close() throws Exception {
System.out.println("關(guān)閉資源.....");
}
}
調(diào)用代碼
public class SourceCustomTest {
public static void main(String[] args) throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> RichDataStream = env.addSource(new RichParallelClickSource()).setParallelism(2);
parallelDataStream.print();
env.execute();
}
}
8.5 從MySQL實(shí)時(shí)加載數(shù)據(jù)
實(shí)現(xiàn)代碼
public class MySQLSource extends RichParallelSourceFunction<Student> {
private boolean flag = true;
private Connection conn;
private PreparedStatement statement;
private ResultSet resultSet;
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "123456");
String sql = "select id, name, age from t_student";
statement = conn.prepareStatement(sql);
super.open(parameters);
}
@Override
public void run(SourceContext<Student> ctx) throws Exception {
while (flag) {
resultSet = statement.executeQuery();
while (resultSet.next()){
String id = resultSet.getString("id");
String name = resultSet.getString("name");
Integer age = resultSet.getInt("age");
ctx.collect(new Student(id, name, age));
}
Thread.sleep(3000);
}
}
@Override
public void cancel() {
flag = false;
}
@Override
public void close() throws Exception {
if(conn != null) conn.close();
if(statement != null) statement.close();
if(resultSet != null) resultSet.close();
}
}
調(diào)用代碼
public class SourceCustomTest {
public static void main(String[] args) throws Exception {
//創(chuàng)建執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStreamSource<Student> parallelDataStream = env.addSource(new MySQLSource()).setParallelism(4);
parallelDataStream.print();
env.execute();
}
}
參考:
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/datastream/pulsar/
https://blog.csdn.net/weixin_47491957/article/details/124317150
https://blog.csdn.net/weixin_45417821/article/details/124143407
https://blog.csdn.net/weixin_45417821/article/details/124145083
https://blog.csdn.net/weixin_45417821/article/details/124146085
https://blog.csdn.net/weixin_45417821/article/details/124147285