Spark-自定義Receiver

可以繼承Receiver類來實(shí)現(xiàn)自定義采集器,需要實(shí)現(xiàn)方法如下:

  • onStart:接收器啟動(dòng)方法
  • onStop:接收器停止方法

同時(shí)還要指定Receiver的緩存等級

  1. 代碼實(shí)現(xiàn)

    class MyReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
    
      var socket: Socket = _
    
      override def onStart(): Unit = {
        new Thread(new Runnable {
          override def run(): Unit = {
            receive()
          }
        }).start()
      }
    
      override def onStop(): Unit = {
        if(socket != null) {
          socket.close()
          socket = null
        }
      }
    
      def receive() {
        socket = new Socket(host, port)
        val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
        var line: String = null
        while ((line = reader.readLine()) != null) {
          this.store(line)
        }
      }
    
    }
    
  2. 使用自定義接收器

    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    
    object MyReceiverDemo {
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setMaster("local[4]").setAppName("MyReceiverDemo")
        val streamingContext = new StreamingContext(conf, Seconds(5))
    
        // 使用StreamingContext對象的receiverStream方法,指定自定義Receiver接收數(shù)據(jù)
        val receiverDStream = streamingContext.receiverStream(new MyReceiver("192.168.0.100", 9999))
    
        val flatMapDStream = receiverDStream.flatMap(_.split(" "))
    
        val mapDStream = flatMapDStream.map((_, 1))
    
        val reduceByKeyDStream = mapDStream.reduceByKey(_ + _)
    
        reduceByKeyDStream.print()
    
        streamingContext.start()
        streamingContext.awaitTermination()
      }
    }
    
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容