1.1 Flink之數(shù)據源
1.1.1 source簡介
source是程序的數(shù)據源輸入,你可以通過StreamExecutionEnvironment.addSource(sourceFunction) 來為你的程序添加一個source。 ?ink提供了大量的已經實現(xiàn)好的source方法,你也可以自定義source:
- (1)通過實現(xiàn)sourceFunction接口來自定義無并行度的source
- (2)通過實現(xiàn)ParallelSourceFunction 接口 or 繼承RichParallelSourceFunction 來自定義有并行度的 source 不過大多數(shù)情況下,我們使用自帶的source即可。
獲取source的方式
- (1)基于文件
readTextFile(path)
讀取文本文件,文件遵循TextInputFormat 讀取規(guī)則,逐行讀取并返回。 - (2)基于socket
socketTextStream
從socker中讀取數(shù)據,元素可以通過一個分隔符切開。 - (3)基于集合
fromCollection(Collection)
通過java 的collection集合創(chuàng)建一個數(shù)據流,集合中的所有元素必須是相同類型的。 - (4)自定義輸入
addSource 可以實現(xiàn)讀取第三方數(shù)據源的數(shù)據
系統(tǒng)內置提供了一批connectors,連接器會提供對應的source支持【kafka】
擴展的connectors
Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon Kinesis Streams (source/sink)
Elasticsearch (sink)
Hadoop FileSystem (sink)
RabbitMQ (source/sink)
Apache NiFi (source/sink)
Twitter Streaming API (source)
1.1.2 數(shù)據源之collection
StreamingSourceFromCollection.java
public class StreamingSourceFromCollection {
public static void main(String[] args) throws Exception {
//步驟一:獲取環(huán)境變量
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//步驟二:模擬數(shù)據
ArrayList<String> data = new ArrayList<String>();
data.add("hadoop");
data.add("spark");
data.add("flink");
//步驟三:獲取數(shù)據源
DataStreamSource<String> dataStream = env.fromCollection(data);
//步驟四:transformation操作
SingleOutputStreamOperator<String> addPreStream = dataStream.map(new MapFunction<String, String>() {
public String map(String word) throws Exception {
return "mi_" + word;
}
});
//步驟五:對結果進行處理(打?。? addPreStream.print().setParallelism(1);
//步驟六:啟動程序
env.execute("StreamingSourceFromCollection");
}
}

1.1.3 自定義單并行度數(shù)據源
MyNoParalleSource.java
/**
*
* 我們數(shù)據輸出的數(shù)據類型
*
* 代表我們的這個數(shù)據源只能支持一個并行度(單并行度)
*/
public class MyNoParalleSource implements SourceFunction<Long> {
private long number = 1L;
private boolean isRunning = true;
@Override
public void run(SourceContext<Long> sct) throws Exception {
while (isRunning){
//往下游發(fā)送數(shù)據
sct.collect(number);
number++;
//每秒生成一條數(shù)據
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning=false;
}
}
StreamingDemoWithMyNoPralalleSource.java
public class StreamingDemoWithMyNoPralalleSource {
public static void main(String[] args) throws Exception {
/**
* 1. 獲取程序入口
*/
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
/**
* 2 獲取數(shù)據源
*/
DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
/**
* 3 數(shù)據的處理
*/
SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接受到了數(shù)據:"+value);
return value;
}
}).setParallelism(2);
SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long number) throws Exception {
//過濾出來偶數(shù)
return number % 2 == 0;
}
}).setParallelism(2);
filterDataStream.print().setParallelism(1);
env.execute("StreamingDemoWithMyNoPralalleSource");
}
}


RichParallelSourceFunction是支持設置多并行度的,關于RichParallelSourceFunction與RichSourceFunction的區(qū)別,前者支持用戶設置多并行度,后者不支持通過setParallelism()方法設置并行度大于1,默認的并行度為1,否則會報如下錯誤:
bashException in thread "main" java.lang.IllegalArgumentException: The maximum parallelism of non parallel operator must be 1.
1.1.4 自定義多并行度數(shù)據源
MyParalleSource.java
/**
* 我們的這個source是支持多并行度的
*/
public class MyParalleSource implements ParallelSourceFunction<Long> {
private long number = 1L;
private boolean isRunning = true;
@Override
public void run(SourceContext<Long> sct) throws Exception {
while (isRunning){
sct.collect(number);
number++;
//每秒生成一條數(shù)據
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning=false;
}
}
StreamingDemoWithMyPralalleSource.java
public class StreamingDemoWithMyPralalleSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
DataStreamSource<Long> numberStream = env.addSource(new MyParalleSource()).setParallelism(2);
SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接受到了數(shù)據:"+value);
return value;
}
});
SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long number) throws Exception {
return number % 2 == 0;
}
});
filterDataStream.print().setParallelism(1);
env.execute("StreamingDemoWithMyNoPralalleSource");
}
}


1.2 常見Transformation操作
1.2.1 map和?lter
/**
* 數(shù)據源:1 2 3 4 5.....源源不斷過來
* 通過map打印一下接收到數(shù)據
* 通過filter過濾一下數(shù)據,我們只需要偶數(shù)
*/
public class MapDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
//flink FlatMap/map -> spark FlatMap/map -> Scala flatmap/Map
SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接受到了數(shù)據:"+value);
return value;
}
});
SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long number) throws Exception {
return number % 2 == 0;//true
}
});
filterDataStream.print().setParallelism(1);
env.execute("StreamingDemoWithMyNoPralalleSource");
}
}
1.2.3 union
/**
* 合并多個流,新的流會包含所有流中的數(shù)據,但是union是一個限制,就是所有合并的流類型必須是一致的
* union timeWindowAll
*/
public class unionDemo {
public static void main(String[] args) throws Exception {
//獲取Flink的運行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//獲取數(shù)據源
DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,并行度只能設置為1
DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
//把text1和text2組裝到一起
DataStream<Long> text = text1.union(text2);
DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("原始接收到數(shù)據:" + value);
return value;
}
});
//每2秒鐘處理一次數(shù)據
DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
//打印結果
sum.print().setParallelism(1);
String jobName = unionDemo.class.getSimpleName();
env.execute(jobName);
}
}

1.2.4 connect,conMap和conFlatMap
/**
* 和union類似,但是只能連接兩個流,兩個流的數(shù)據類型可以不同,會對兩個流中的數(shù)據應用不同的處理方法
*/
public class ConnectionDemo {
public static void main(String[] args) throws Exception {
//獲取Flink的運行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//獲取數(shù)據源
DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,并行度只能設置為1
DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {
@Override
public String map(Long value) throws Exception {
return "str_" + value;
}
});
//union
ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);
SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {
//這個方法處理的是數(shù)據源 1
@Override
public Object map1(Long value) throws Exception {
return value;
}
//這個方法處理的就是數(shù)據源 2
@Override
public Object map2(String value) throws Exception {
return value;
}
});
//打印結果
result.print().setParallelism(1);
String jobName = ConnectionDemo.class.getSimpleName();
env.execute(jobName);
}
}

1.2.5 Split和Select
/**
* 根據規(guī)則把一個數(shù)據流切分為多個流
應用場景:
* 可能在實際工作中,源數(shù)據流中混合了多種類似的數(shù)據,多種類型的數(shù)據處理規(guī)則不一樣,所以就可以在根據一定的規(guī)則,
* 把一個數(shù)據流切分成多個數(shù)據流,這樣每個數(shù)據流就可以使用不用的處理邏輯了
*/
public class SplitDemo {
public static void main(String[] args) throws Exception {
//獲取Flink的運行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//獲取數(shù)據源
DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對此source,并行度只能設置為1
//對流進行切分,按照數(shù)據的奇偶性進行區(qū)分
SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() {
@Override
public Iterable<String> select(Long value) {
ArrayList<String> outPut = new ArrayList<>();
if (value % 2 == 0) {
outPut.add("even");//偶數(shù)
} else {
outPut.add("odd");//奇數(shù)
}
return outPut;
}
});
//選擇一個或者多個切分后的流
DataStream<Long> evenStream = splitStream.select("even");
DataStream<Long> oddStream = splitStream.select("odd");
DataStream<Long> moreStream = splitStream.select("odd","even");
//打印結果
//打印偶數(shù)
evenStream.print().setParallelism(1);
//打印奇數(shù)
// oddStream.print().setParallelism(1);
//打印全部
// moreStream.print().setParallelism(1);
String jobName = SplitDemo.class.getSimpleName();
env.execute(jobName);
}
}
1.3常見sink操作
1.3.1 print() / printToErr()
打印每個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中
1.3.2 writeAsText()
/**
* 數(shù)據源:1 2 3 4 5.....源源不斷過來
* 通過map打印一下接收到數(shù)據
* 通過filter過濾一下數(shù)據,我們只需要偶數(shù)
*/
public class WriteTextDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> numberStream = env.addSource(new MyNoParalleSource()).setParallelism(1);
SingleOutputStreamOperator<Long> dataStream = numberStream.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接受到了數(shù)據:"+value);
return value;
}
});
SingleOutputStreamOperator<Long> filterDataStream = dataStream.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long number) throws Exception {
return number % 2 == 0;
}
});
filterDataStream.writeAsText("D:\\flinkout\\value.txt").setParallelism(1);
filterDataStream.print();
env.execute("StreamingDemoWithMyNoPralalleSource");
}
}


1.3.3 Flink提供的sink
Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon Kinesis Streams (source/sink)
Elasticsearch (sink)
Hadoop FileSystem (sink)
RabbitMQ (source/sink)
Apache NiFi (source/sink)
Twitter Streaming API (source)
Google PubSub (source/sink)
1.3.4 自定義sink
/**
* 把數(shù)據寫入redis
*/
public class SinkForRedisDemo {
public static void main(String[] args) throws Exception {
//程序入口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//數(shù)據源
DataStreamSource<String> text = env.socketTextStream("bigdata02", 8888, "\n");
//lpsuh l_words word
//對數(shù)據進行組裝,把string轉化為tuple2<String,String>
DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
//k v
return new Tuple2<>("f", value);
}
});
// //創(chuàng)建redis的配置
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("bigdata04").setPort(6379).setPassword("bigdata04").build();
//
// //創(chuàng)建redissink
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
l_wordsData.addSink(redisSink);
env.execute("StreamingDemoToRedis");
}
/**
* 把數(shù)據插入到redis到邏輯
*/
public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
//表示從接收的數(shù)據中獲取需要操作的redis key
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
//表示從接收的數(shù)據中獲取需要操作的redis value
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
}
}


1.4 【State】
1.4.1 state概述
Apache Flink? — Stateful Computations over Data Streams
回顧單詞計數(shù)的例子
//實時統(tǒng)計單詞出現(xiàn)次數(shù)
public class WordCount {
public static void main(String[] args) throws Exception{
//創(chuàng)建程序入口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//數(shù)據的輸入
DataStreamSource<String> myDataStream = env.socketTextStream("bigdata02", 1234);
//數(shù)據的處理
SingleOutputStreamOperator<Tuple2<String, Integer>> result = myDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
out.collect(new Tuple2<>(word, 1));
//out.collect(Tuple2.of(word,1));
}
}
}).keyBy(0)
.sum(1);
//數(shù)據的輸出
result.print();
//啟動應用程序
env.execute("WordCount");
}
}
輸入
hadoop,hadoop
hadoop
hive,hadoop
輸出
4> (hadoop,1)
4> (hadoop,2)
4> (hadoop,3)
1> (hive,1)
4> (hadoop,4)
我們會發(fā)現(xiàn),單詞出現(xiàn)的次數(shù)有累計的效果。如果沒有狀態(tài)的管理,是不會有累計的效果的,所以Flink 里面還有state的概念。

state:一般指一個具體的task/operator的狀態(tài)。State可以被記錄,在失敗的情況下數(shù)據還可以恢復, Flink中有兩種基本類型的State:Keyed State,Operator State,他們兩種都可以以兩種形式存在:原 始狀態(tài)(raw state)和托管狀態(tài)(managed state)
托管狀態(tài):由Flink框架管理的狀態(tài),我們通常使用的就是這種。
原始狀態(tài):由用戶自行管理狀態(tài)具體的數(shù)據結構,框架在做checkpoint的時候,使用byte[]來讀寫狀態(tài) 內容,對其內部數(shù)據結構一無所知。通常在DataStream上的狀態(tài)推薦使用托管的狀態(tài),當實現(xiàn)一個用 戶自定義的operator時,會使用到原始狀態(tài)。但是我們工作中一般不常用,所以我們不考慮他。
1.4.2 State類型
Operator State(task級別的)

- operator state是task級別的state,說白了就是每個task對應一個state
- Kafka Connector source中的每個分區(qū)(task)都需要記錄消費的topic的partition和o?set等信息。
Keyed State(針對每一個key)

- keyed state 記錄的是每個key的狀態(tài)
-
- Keyed state托管狀態(tài)有六種類型:
- ValueState
- ListState
- MapState
- ReducingState
- AggregatingState
-
FoldingState
-
state理解
數(shù)據源是Kafka

1.4.3 Keyed State的案例演示
ValueState
public class CountWindowAverageWithValueState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
/**
* 1.valueState 屬于keyed state
* 2.valueState里面只能存儲一條數(shù)據
*
* 思路:
* long1:當前key出現(xiàn)的次數(shù)
* long2:累加的value值
* if(long1=3){
* long2/long1 =avg
* }
*/
private ValueState<Tuple2<Long, Long>> countAndSum;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Tuple2<Long, Long>> average = new ValueStateDescriptor<>(
"average",
Types.TUPLE(Types.LONG, Types.LONG)
);
countAndSum = getRuntimeContext().getState(average);
}
@Override
public void flatMap(Tuple2<Long, Long> element, Collector<Tuple2<Long, Double>> out) throws Exception {
Tuple2<Long, Long> currentState = countAndSum.value();
if (currentState == null) {
currentState = Tuple2.of(0L, 0L);
}
//統(tǒng)計key出現(xiàn)的次數(shù)
currentState.f0 += 1;
//統(tǒng)計value總值
currentState.f1 += element.f1;
countAndSum.update(currentState);
if (currentState.f0 ==3){
double avg =(double)currentState.f1/currentState.f0;
out.collect(Tuple2.of(element.f0,avg));
//清空里面的數(shù)據
countAndSum.clear();
}
}
}
/**
* 需求:當接收到的相同 key 的元素個數(shù)等于 3 個
* 就計算這些元素的 value 的平均值。
* 計算 keyed stream 中每 3 個元素的 value 的平均值
*
* 1,3
* 1,7
*
* 1,5
*
* 1,5.0
*
* 2,4
*
* 2,2
* 2,5
*
* 2,3.666
*
* key,value
* 1 long,5 doulbe
*
*/
public class TestKeyedStateMain {
public static void main(String[] args) throws Exception{
//程序入口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//數(shù)據源
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));
// 輸出:
//(1,5.0)
//(2,3.6666666666666665)
dataStreamSource
.keyBy(0)
.flatMap(new CountWindowAverageWithValueState()) //flatMap,map + state = 自定義函數(shù)的感覺
.print();
env.execute("TestStatefulApi");
}
}
結果輸出:

ListState
public class CountWindowAverageWithListState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
/**
* 1,3
* 1,7
* 1,5
*/
private ListState<Tuple2<Long, Long>> elementsByKey;
@Override
public void open(Configuration parameters) throws Exception {
ListStateDescriptor<Tuple2<Long, Long>> average = new ListStateDescriptor<>(
"average",
Types.TUPLE(Types.LONG, Types.LONG)
);
elementsByKey = getRuntimeContext().getListState(average);
}
@Override
public void flatMap(Tuple2<Long, Long> element, Collector<Tuple2<Long, Double>> out) throws Exception {
Iterable<Tuple2<Long, Long>> currentState = elementsByKey.get();
if (currentState ==null){
elementsByKey.addAll(Collections.emptyList());
}
elementsByKey.add(element);
ArrayList<Tuple2<Long, Long>> allElements = Lists.newArrayList(elementsByKey.get());
if (allElements.size() ==3){
long count =0;
long sum=0;
for(Tuple2<Long,Long> ele:allElements){
count++;
sum +=ele.f1;
}
double avg =(double)sum/count;
out.collect(Tuple2.of(element.f0,avg));
elementsByKey.clear();
}
}
}
public class TestKeyedStateMain {
public static void main(String[] args) throws Exception{
//程序入口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//數(shù)據源
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));
// 輸出:
//(1,5.0)
//(2,3.6666666666666665)
dataStreamSource
.keyBy(0)
.flatMap(new CountWindowAverageWithListState()) //flatMap,map + state = 自定義函數(shù)的感覺
.print();
env.execute("TestStatefulApi");
}
}
結果輸出:

MapState
/**
* MapState<K, V> :這個狀態(tài)為每一個 key 保存一個 Map 集合
* put() 將對應的 key 的鍵值對放到狀態(tài)中
* values() 拿到 MapState 中所有的 value
* clear() 清除狀態(tài)
*/
public class CountWindowAverageWithMapState
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
// managed keyed state
//1. MapState :key 是一個唯一的值,value 是接收到的相同的 key 對應的 value 的值
//我們開發(fā)過程當中聲明的state其實我們可以理解為就是一個輔助變量。
//Map的數(shù)據類型:key相同 數(shù)據就覆蓋了
/**
* 1,3
* 1,5
* 1,7
*
*/
private MapState<String, Long> mapState;
@Override
public void open(Configuration parameters) throws Exception {
// 注冊狀態(tài)
MapStateDescriptor<String, Long> descriptor =
new MapStateDescriptor<String, Long>(
"average", // 狀態(tài)的名字
String.class, Long.class); // 狀態(tài)存儲的數(shù)據類型
mapState = getRuntimeContext().getMapState(descriptor);
}
/**
* 1,3
* 1,5
* 1,7
*
* dfsfsdafdsf,3
* dfsfxxxfdsf,5
* xxxx323123,7
*
*
* @param element
* @param out
* @throws Exception
*/
@Override
public void flatMap(Tuple2<Long, Long> element,
Collector<Tuple2<Long, Double>> out) throws Exception {
mapState.put(UUID.randomUUID().toString(), element.f1);
// 判斷,如果當前的 key 出現(xiàn)了 3 次,則需要計算平均值,并且輸出
List<Long> allElements = Lists.newArrayList(mapState.values());
if (allElements.size() == 3) {
long count = 0;
long sum = 0;
for (Long ele : allElements) {
count++;
sum += ele;
}
double avg = (double) sum / count;
//
out.collect(Tuple2.of(element.f0, avg));
// 清除狀態(tài)
mapState.clear();
}
}
}
public class TestKeyedStateMain {
public static void main(String[] args) throws Exception{
//程序入口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//數(shù)據源
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));
// 輸出:
//(1,5.0)
//(2,3.6666666666666665)
dataStreamSource
.keyBy(0)
.flatMap(new CountWindowAverageWithMapState()) //flatMap,map + state = 自定義函數(shù)的感覺
.print();
env.execute("TestStatefulApi");
}
}
輸出結果:

ReducingState
/**
* ReducingState<T> :這個狀態(tài)為每一個 key 保存一個聚合之后的值
* get() 獲取狀態(tài)值
* add() 更新狀態(tài)值,將數(shù)據放到狀態(tài)中
* clear() 清除狀態(tài)
*/
public class SumFunction
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
//sum = 最終累加的結果的數(shù)據類型
private ReducingState<Long> sumState;
@Override
public void open(Configuration parameters) throws Exception {
// 注冊狀態(tài)
ReducingStateDescriptor<Long> descriptor =
new ReducingStateDescriptor<Long>(
"sum", // 狀態(tài)的名字
new ReduceFunction<Long>() { // 聚合函數(shù)
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}, Long.class); // 狀態(tài)存儲的數(shù)據類型
sumState = getRuntimeContext().getReducingState(descriptor);
}
/**
*
* 3
* 5
* 7
*
* @param element
* @param out
* @throws Exception
*/
@Override
public void flatMap(Tuple2<Long, Long> element,
Collector<Tuple2<Long, Long>> out) throws Exception {
// 將數(shù)據放到狀態(tài)中
sumState.add(element.f1);
out.collect(Tuple2.of(element.f0, sumState.get()));
}
}
public class TestKeyedStateMain {
public static void main(String[] args) throws Exception{
//程序入口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//數(shù)據源
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));
// 輸出:
dataStreamSource
.keyBy(0)
.flatMap(new SumFunction()) //累加
.print();
env.execute("TestStatefulApi");
}
}
輸出:

AggregatingState
public class ContainsValueFunction
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, String>> {
/**
* 1, contains:3 and 5
*/
private AggregatingState<Long, String> totalStr;//輔助字段
@Override
public void open(Configuration parameters) throws Exception {
// 注冊狀態(tài)
AggregatingStateDescriptor<Long, String, String> descriptor =
new AggregatingStateDescriptor<Long, String, String>(
"totalStr", // 狀態(tài)的名字
//SparkSQL 自定義聚合函數(shù)
new AggregateFunction<Long, String, String>() {
//初始化的操作,只運行一次哦
@Override
public String createAccumulator() {
return "Contains:";
}
@Override
public String add(Long value, String accumulator) {
if ("Contains:".equals(accumulator)) {
return accumulator + value;
}
return accumulator + " and " + value;
}
@Override
public String merge(String a, String b) {
return a + " and " + b;
}
@Override
public String getResult(String accumulator) {
//contains:1
//contains: 1 and 3 and
return accumulator;
}
}, String.class); // 狀態(tài)存儲的數(shù)據類型
totalStr = getRuntimeContext().getAggregatingState(descriptor);
}
@Override
public void flatMap(Tuple2<Long, Long> element,
Collector<Tuple2<Long, String>> out) throws Exception {
totalStr.add(element.f1);
out.collect(Tuple2.of(element.f0, totalStr.get()));
}
}
public class TestKeyedStateMain {
public static void main(String[] args) throws Exception{
//程序入口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//數(shù)據源
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 7L),
Tuple2.of(2L, 4L), Tuple2.of(1L, 5L),Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));
// 輸出:
dataStreamSource
.keyBy(0)
.flatMap(new ContainsValueFunction()) //flatMap,map + state = 自定義函數(shù)的感覺
.print();
env.execute("TestStatefulApi");
}
}
輸出:

1.5 State backend
1.5.1 概述
Flink支持的StateBackend:
MemoryStateBackend
FsStateBackend
RocksDBStateBackend
1.5.2 MemoryStateBackend
默認情況下,狀態(tài)信息是存儲在 TaskManager 的堆內存中的,checkpoint 的時候將狀態(tài)保存到 JobManager 的堆內存中。

- 缺點:
只能保存數(shù)據量小的狀態(tài) 狀態(tài)數(shù)據有可能會丟失 - 優(yōu)點:
開發(fā)測試很方便
1.5.3 FSStateBackend
狀態(tài)信息存儲在 TaskManager 的堆內存中的,checkpoint 的時候將狀態(tài)保存到指定的文件中 (HDFS 等文件系統(tǒng))

- 缺點:
狀態(tài)大小受TaskManager內存限制(默認支持5M) - 優(yōu)點:
狀態(tài)訪問速度很快
狀態(tài)信息不會丟失
用于: 生產,也可存儲狀態(tài)數(shù)據量大的情況
1.5.4 RocksDBStateBackend
狀態(tài)信息存儲在 RocksDB 數(shù)據庫 (key-value 的數(shù)據存儲服務), 最終保存在本地文件中 checkpoint 的時候將狀態(tài)保存到指定的文件中 (HDFS 等文件系統(tǒng))

- 缺點:
狀態(tài)訪問速度有所下降 - 優(yōu)點:
可以存儲超大量的狀態(tài)信息
狀態(tài)信息不會丟失
用于: 生產,可以存儲超大量的狀態(tài)信息
1.5.5 StateBackend配置方式
(1)單任務調整
修改當前任務代碼 env.setStateBackend(new FsStateBackend("hdfs://bigdata02:9000/flink/checkpoints"));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依賴】
第三方依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
(2)全局調整
修改flink-conf.yaml
state.backend: filesystem state.checkpoints.dir: hdfs://bigdata02:9000/flink/checkpoints
注意:state.backend的值可以是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
1.6 checkpoint(容錯)
1.6.1 checkpoint概述
- (1)為了保證state的容錯性,F(xiàn)link需要對state進行checkpoint。
- (2)Checkpoint是Flink實現(xiàn)容錯機制最核心的功能,它能夠根據配置周期性地基于Stream中各個 Operator/task的狀態(tài)來生成快照,從而將這些狀態(tài)數(shù)據定期持久化存儲下來,當Flink程序一旦意外崩 潰時,重新運行程序時可以有選擇地從這些快照進行恢復,從而修正因為故障帶來的程序數(shù)據異常
- (3)Flink的checkpoint機制可以與(stream和state)的持久化存儲交互的前提: 持久化的source,它需要支持在一定時間內重放事件。這種sources的典型例子是持久化的消息隊列 (比如Apache Kafka,RabbitMQ等)或文件系統(tǒng)(比如HDFS,S3,GFS等) 用于state的持久化存儲,例如分布式文件系統(tǒng)(比如HDFS,S3,GFS等)
生成快照

恢復快照

1.6.2 checkpoint配置
默認checkpoint功能是disabled的,想要使用的時候需要先啟用,checkpoint開啟之后, checkPointMode有兩種,Exactly-once和At-least-once,默認的checkPointMode是Exactly-once, Exactly-once對于大多數(shù)應用來說是最合適的。At-least-once可能用在某些延遲超低的應用程序(始終 延遲為幾毫秒)。
/**
* state:
* keyed
* operator -> checkpoint
*/
public class WordCount4 {
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String hostname = parameterTool.get("hostname");
int port = parameterTool.getInt("port");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//10s 15s
//如果數(shù)據量比較大,建議5分鐘左右checkpoint的一次。
//阿里他們使用的時候 也是這樣建議的。
env.enableCheckpointing(10000);//10s 15s state
FsStateBackend fsStateBackend = new FsStateBackend("hdfs://bigdata02:9000/flink_1/checkpoint");
MemoryStateBackend memoryStateBackend = new MemoryStateBackend();
env.setStateBackend(fsStateBackend);
env.setStateBackend(new RocksDBStateBackend("hdfs://bigdata02:9000/flink_2/checkpoint"));
//setCheckpointingMode---是否允許數(shù)據重復
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//setMinPauseBetweenCheckpoints ---兩個checkpoint之間間隔多久
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//setCheckpointTimeout ---超時時間
env.getCheckpointConfig().setCheckpointTimeout(60000);
//enableExternalizedCheckpoints---cancel程序的時候保存checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 嘗試重啟的次數(shù)
Time.of(10, TimeUnit.SECONDS) // 間隔
));
DataStreamSource<String> dataStream = env.socketTextStream(hostname, port);
SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
out.collect(Tuple2.of(word, 1));
}
}
}).keyBy(0)
.sum(1);
result.print();
env.execute("WordCount check point....");
}
}
1.7 恢復數(shù)據(容錯)
1.7.1 重啟策略概述(重試)
Flink支持不同的重啟策略,以在故障發(fā)生時控制作業(yè)如何重啟,集群在啟動時會伴隨一個默認的重啟策 略,在沒有定義具體重啟策略時會使用該默認策略。 如果在工作提交時指定了一個重啟策略,該策略會 覆蓋集群的默認策略,默認的重啟策略可以通過 Flink 的配置文件 ?ink-conf.yaml 指定。配置參數(shù) restart-strategy 定義了哪個策略被使用。
常用的重啟策略
- (1)固定間隔 (Fixed delay)
- (2)失敗率 (Failure rate)
- (3)無重啟 (No restart)
如果沒有啟用 checkpointing,則使用無重啟 (no restart) 策略。
如果啟用了 checkpointing,但沒有配置重啟策略,則使用固定間隔 (?xed-delay) 策略, 嘗試重啟次數(shù) 默認值是:Integer.MAX_VALUE,重啟策略可以在?ink-conf.yaml中配置,表示全局的配置。也可以在 應用代碼中動態(tài)指定,會覆蓋全局配置。
1.7.2 重啟策略
固定間隔 (Fixed delay)
第一種:全局配置 flink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
第二種:應用代碼設置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 嘗試重啟的次數(shù)
Time.of(10, TimeUnit.SECONDS) // 間隔
));
失敗率 (Failure rate)
第一種:全局配置 flink-conf.yaml
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
第二種:應用代碼設置
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 一個時間段內的最大失敗次數(shù)
Time.of(5, TimeUnit.MINUTES), // 衡量失敗次數(shù)的是時間段
Time.of(10, TimeUnit.SECONDS) // 間隔
));
無重啟 (No restart)
第一種:全局配置 flink-conf.yaml
restart-strategy: none
第二種:應用代碼設置
env.setRestartStrategy(RestartStrategies.noRestart());
1.7.3 多checkpoint
默認情況下,如果設置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink 程序失敗時,可以從最近的這個Checkpoint來進行恢復。但是,如果我們希望保留多個Checkpoint, 并能夠根據實際需要選擇其中一個進行恢復,這樣會更加靈活,比如,我們發(fā)現(xiàn)最近4個小時數(shù)據記錄 處理有問題,希望將整個狀態(tài)還原到4小時之前Flink可以支持保留多個Checkpoint,需要在Flink的配置 文件conf/?ink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數(shù):
state.checkpoints.num-retained: 20
這樣設置以后就查看對應的Checkpoint在HDFS上存儲的文件目錄
hdfs dfs -ls hdfs://bigdata02:9000/?ink/checkpoints
如果希望回退到某個Checkpoint點,只需要指定對應的某個Checkpoint路徑即可實現(xiàn)
