1.讀取數(shù)據(jù)
文本/文件 集合 kafka 自定義
import java.util.{Properties, Random}
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import scala.collection.immutable
case class SensorReading(num: String, standstamp: Double, date: String)
object SourseTest {
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
var listSourse = List(
SensorReading("1", 12.0, "12-12"),
SensorReading("2", 11.0, "12-11"),
SensorReading("3", 10.0, "12-10"),
SensorReading("4", 13.0, "12-9")
)
//讀集合
val value = environment.fromCollection(listSourse)
//讀文件
//val value1 = environment.fromElements("1", 3, "uu")
//讀文本
var path = ""
// environment.readTextFile(path)
//從kafka中讀取
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
// val value = environment.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
//自定義sourse
val value1 = environment.addSource(new Mysourse())
value1.print()
environment.execute("myjob")
}
}
class Mysourse() extends SourceFunction[SensorReading] {
//定義一個標簽
var running: Boolean = true
override def cancel(): Unit = running = false
//假設有10個SensorReading進來
private val random = new Random()
private var curtuples: immutable.IndexedSeq[(String, Double)] = 1.to(10).map(i => ("source_" + i, random.nextDouble() * 100))
override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
while (running) {
//循環(huán)獲取
curtuples = curtuples.map(data => (data._1, data._2 + random.nextGaussian()))
//獲取時間
val stamp = System.currentTimeMillis()
//設置sleep
Thread.sleep(1000)
curtuples.foreach(data => sourceContext.collect(SensorReading(data._1, stamp, String.valueOf(data._2))))
}
}
}