flink的鍵控流轉(zhuǎn)換算子

輸入文件:


image.png
sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718206,32
sensor_1,1547718208,36.2
sensor_1,1547718210,29.7
sensor_1,1547718213,30.9

min算子

image.png

輸入輸出之對(duì)照:


image.png

Reduce算子

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

case class SensorReading(id: String, timestamp: Long, temperature: Double)

object ReduceTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    
    val inputPath = "D:\\jacky\\flink\\src\\main\\resources\\sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    import org.apache.flink.streaming.api.scala._
    val stream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    //    val aggStream: DataStream[SensorReading] = stream.keyBy("id").minBy("temperature")

    val ans: DataStream[SensorReading] = stream.keyBy("id")
      .reduce((currState, newState) => {
        SensorReading(currState.id, newState.timestamp, currState.temperature.min(newState.temperature))
      })

    ans.print()

    env.setParallelism(1);

    env.execute()
  }
}

為了排除并行度帶來的影響,先把并行度設(shè)置為1:


image.png

如下兩圖,分別是 關(guān)鍵邏輯 和 輸出結(jié)果與輸入文件的對(duì)比:

image.png
image.png

另外,也可以用這種等價(jià)寫法:


image.png

注意,在KeyedStream類中,才有min等轉(zhuǎn)換算子:

image.png

而真正對(duì)每一條數(shù)據(jù)進(jìn)行處理的算子,是aggregate算子:


image.png

split算子

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

case class SensorReading(id: String, timestamp: Long, temperature: Double)

object ReduceTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputPath = "D:\\jacky\\flink\\src\\main\\resources\\sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    import org.apache.flink.streaming.api.scala._
    val stream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    val splitStream: SplitStream[SensorReading] = stream.split(data => {
      if (data.temperature > 30.0) Seq("high") else Seq("low")
    })
    val high: DataStream[SensorReading] = splitStream.select("high")
    val low: DataStream[SensorReading] = splitStream.select("low")
    val all: DataStream[SensorReading] = splitStream.select("high", "low")

    high.print("high")
    low.print("low")
    all.print("all")

    env.execute()
  }
}

class MyReduceFunction extends ReduceFunction[SensorReading] {
  override def reduce(a: SensorReading, b: SensorReading): SensorReading =
    SensorReading(a.id, b.timestamp, a.temperature.min(b.temperature))
}
image.png
image.png

connect算子(同床異夢(mèng))

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容