2020-10-16-Flink-3(讀取數(shù)據(jù)的方式)

1.讀取數(shù)據(jù)
文本/文件 集合 kafka 自定義

import java.util.{Properties, Random}
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import scala.collection.immutable

case class SensorReading(num: String, standstamp: Double, date: String)

object SourseTest {
  def main(args: Array[String]): Unit = {
    val environment = StreamExecutionEnvironment.getExecutionEnvironment

    var listSourse = List(
      SensorReading("1", 12.0, "12-12"),
      SensorReading("2", 11.0, "12-11"),
      SensorReading("3", 10.0, "12-10"),
      SensorReading("4", 13.0, "12-9")
    )
    //讀集合
    val value = environment.fromCollection(listSourse)
    //讀文件
    //val value1 = environment.fromElements("1", 3, "uu")
    //讀文本
    var path = ""
    // environment.readTextFile(path)
    //從kafka中讀取
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "consumer-group")
    // val value = environment.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
    //自定義sourse
    val value1 = environment.addSource(new Mysourse())
    value1.print()
    environment.execute("myjob")
  }
}
class Mysourse() extends SourceFunction[SensorReading] {
  //定義一個標簽
  var running: Boolean = true

  override def cancel(): Unit = running = false

  //假設有10個SensorReading進來
  private val random = new Random()
  private var curtuples: immutable.IndexedSeq[(String, Double)] = 1.to(10).map(i => ("source_" + i, random.nextDouble() * 100))

  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
    while (running) {
      //循環(huán)獲取
      curtuples = curtuples.map(data => (data._1, data._2 + random.nextGaussian()))
      //獲取時間
      val stamp = System.currentTimeMillis()
      //設置sleep
      Thread.sleep(1000)
      curtuples.foreach(data => sourceContext.collect(SensorReading(data._1, stamp, String.valueOf(data._2))))
    }
  }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容