Flink 旁路輸出(分流)

官方文檔-關(guān)于分流

最好大家還是看官方文檔,我只是當(dāng)一個搬運工

一、背景

在一些業(yè)務(wù)場景中,一個流中可能有多種類型的數(shù)據(jù),比如訂單:有線上訂單,有線下訂單。當(dāng)需要將不同類型的數(shù)據(jù)進(jìn)行分別處理,比如 寫入到不同的數(shù)據(jù)表或者join 不同的其他流時,這個時候使用分流就比較合適。

二、官方常用的幾種方法

三、示范

本文只詳細(xì)介紹最常用 process 分流 ,道理都是相通的

直接上代碼偽碼,大家主要要理解,而不是直接復(fù)制代碼

//這是訂單source,最原始的流
val orderSource =  這是你構(gòu)建source 的方法

  //創(chuàng)建線上訂單 tag
    val onlineOrderTag = new OutputTag[JSONObject]("onlineOrder")
   //創(chuàng)建線下訂單 tag
    val offlineOrderTag = new OutputTag[JSONObject]("offlineOrder")

   // 這個sideOutStream 就是分流之后的流對象
   val sideOutStream = orderSource
      .filter(new PaymentFilter)  // 這里是一個過濾邏輯,如果你沒有可以不過濾
       // 這個process 就是分流的操作了
      .process(new ProcessFunction[String, JSONObject] {
        override def processElement(orderString: String, ctx: ProcessFunction[String, JSONObject]#Context, out: Collector[JSONObject]): Unit = {
          val outOrder = JSON.parseObject(orderString)
        
          //通過收銀員信息判斷是否屬于線下訂單
          if (!outOrder.containsKey("cashier_id") || StringUtils.isBlank(outOrder.getString("cashier_id"))) {
            ctx.output(onlineOrderTag, outOrder)
          } else {
            ctx.output(offlineOrderTag, outOrder)
          }
        }
      }
      )


val onlineStream = sideOutStream.getSideOutput(onlineOrderTag)

val offlineStream = sideOutStream.getSideOutput(offlineOrderTag)

// 流已經(jīng)分好了,后面是sink 還是 去干其他的,就看你的業(yè)務(wù)邏輯了
onlineStream.addSink()

offlineStream.addSink()

生產(chǎn)實踐

下圖是真實生產(chǎn)的一個DAG圖

內(nèi)部使用了分流, join ,自定義剔除器 等滿足業(yè)務(wù)需求

后面會更新 join 和 自定義剔除器 trigger 等 實戰(zhàn)場景,感興趣的朋友可以加個關(guān)注喲

實際運用DAG
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 寫在前面的話 代碼中的# > 表示的是輸出結(jié)果 輸入 使用input()函數(shù) 用法 注意input函數(shù)輸出的均是字...
    FlyingLittlePG閱讀 3,240評論 0 9
  • MySQL邏輯架構(gòu) 下面是一幅MySQL各組件之間如何協(xié)同工作的架構(gòu)圖,有助于我們深入理解MySQL服務(wù)器。 如圖...
    騎小豬看流星閱讀 4,915評論 2 135
  • 漸變的面目拼圖要我怎么拼? 我是疲乏了還是投降了? 不是不允許自己墜落, 我沒有滴水不進(jìn)的保護(hù)膜。 就是害怕變得面...
    悶熱當(dāng)乘涼閱讀 4,505評論 0 13
  • 感覺自己有點神經(jīng)衰弱,總是覺得手機(jī)響了;屋外有人走過;每次媽媽不聲不響的進(jìn)房間突然跟我說話,我都會被嚇得半死!一整...
    章魚的擁抱閱讀 2,416評論 4 5
  • 夜鶯2517閱讀 128,218評論 1 9

友情鏈接更多精彩內(nèi)容