通過詞頻統(tǒng)計功能學(xué)習(xí)Spark-submit的使用:
先打開一個命令窗口輸入nc -lk 9999
然后在另一個窗口,spark的bin文件夾下輸入
./spark-submit --master local[2] \
--class org.apache.spark.examples.streaming.NetworkWordCount \
--name NetworkWordCount \
/home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.2.0.jar hadoop000 9999
在netcat窗口輸入a a a a b b之后再spark窗口的流式輸出會見到詞頻統(tǒng)計的結(jié)果。
sparkStreaming工作原理(粗粒度)
Spark Streaming接收到實時數(shù)據(jù)流,把數(shù)據(jù)按照指定的時間段切成一片片小的數(shù)據(jù)塊,然后把小的數(shù)據(jù)塊傳給Spark Engine處理。

sparkStreaming工作原理(細(xì)粒度)

首先,spark應(yīng)用程序運行在driver端,driver需要在Executor(電腦)中啟動Receiver接收器,接收數(shù)據(jù)流,并且分模塊接收,可能還會以副本的方式存儲,接收了一個周期之后,Executor會向spark應(yīng)用程序返回接收情況(分塊數(shù)量,副本數(shù)量等等)應(yīng)用程序會將任務(wù)分發(fā)到Executor中。
DStream概念:對DStream進(jìn)行操作,比如map/flatMap,其實底層會被翻譯為對DStream中的每個RDD都做相同的操作,因為一個DStream是由不同批次的RDD所構(gòu)成的。
每一個輸入流Input DStreamings 都要對應(yīng)一個receivers來接收它,Input DStreamings的種類:文件系統(tǒng),socket傳輸,Kafka,F(xiàn)lume。
Output Operation 的種類:print(),saveAsTextFiles保存到文件系統(tǒng),saveAsHadoopFiles等。
實戰(zhàn):spark streaming 處理socket數(shù)據(jù)
object NetworkWorldCount {
? def main(args: Array[String]): Unit = {
? ? val sparkConf = new SparkConf().setMaster("local").setAppName("NetworkWorldCount")
? ? val ssc = new StreamingContext(sparkConf, Seconds(5))
? ? val lines = ssc.socketTextStream("localhost",6789)
? ? val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
? ? result.print()
? ? ssc.start()
? ? ssc.awaitTermination()
? }
}
在另外一個控制臺里輸入
nc -lk 6789
a a a a c c c d d d?
結(jié)果:

實戰(zhàn):spark streaming 處理socket數(shù)據(jù)并寫入mysql數(shù)據(jù)庫
object ForeachRDDApp {
? def main(args: Array[String]): Unit = {
? ? val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")
? ? val ssc = new StreamingContext(sparkConf, Seconds(5))
? ? val lines = ssc.socketTextStream("localhost", 6789)
? ? val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
//前幾行不變。。
? ? result.foreachRDD(rdd => {? ? ? ?//循環(huán)每一個Rdd
? ? ? rdd.foreachPartition(partitionOfRecords => {? //在一個rdd里循環(huán)每一個partition
? ? ? ? val connection = createCOnnection()? ? ?//獲取mysql連接
? ? ? ? partitionOfRecords.foreach(record => {? ? ? ? //在每一個partition里獲取一條記錄
? ? ? ? ? val sql = "insert into wordcount(word, wordcount) values('" + record._1+ "'," + record._2+")"
? ? ? ? ? connection.createStatement().execute(sql)
? ? ? ? })
? ? ? ? connection.close()
? ? ? })
? ? })
? ? ssc.start()
? ? ssc.awaitTermination()
? }
? def createCOnnection() = {
? ? Class.forName("com.mysql.jdbc.Driver")
? ? DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_spark","root","root")
? }
結(jié)果:

spark streaming從socket接收數(shù)據(jù)后根據(jù)標(biāo)準(zhǔn)過濾數(shù)據(jù)實戰(zhàn)(黑名單例子)
//構(gòu)建黑名單
object TransformApp {
? def main(args: Array[String]): Unit = {
? ? val sparkConf = new SparkConf().setAppName("TransformApp").setMaster("local[2]")
? ? val ssc = new StreamingContext(sparkConf, Seconds(5))
//跟前面一樣
? ? val blacks = List("zs","ls")? ? //構(gòu)建黑名單List
? ? val blackRDD = ssc.sparkContext.parallelize(blacks).map(x=>(x,true))? ? ? ? //將List轉(zhuǎn)成(zs,true)的這種RDD類型
? ? val lines = ssc.socketTextStream("localhost", 6789)? ? ? ? //lines是DSTream類型
? ? val clicklog = lines.map(x => (x.split(",")(1),x)).transform(rdd => {? ? ? ?
//lines是這種類型的數(shù)據(jù)(20160410,zs) 根據(jù)逗號分隔后重整為(zs:20160410,zs),即為(x.split(",")(1),x))的結(jié)果,得到的結(jié)果仍然是RDD類型,transform函數(shù)是將每個Rdd拿出來操作。
? ? ? rdd.leftOuterJoin(blackRDD)? ?//每個rdd都跟blackRDD進(jìn)行l(wèi)eftOuterJoin,得到(zs:[<20160410,zs>,<true>])這種類型的數(shù)據(jù)
? ? ? ? .filter(x=> x._2._2.getOrElse(false) != true)? ? ? ? ?//過濾,將參數(shù)的第二個中的第二個為true的過濾掉。
? ? ? ? .map(x =>x._2._1)? ? ? //重整,將結(jié)構(gòu)變?yōu)閞dd中第二個的第一個,即為<20160410,zs>
? ? })
? ? clicklog.print()
? ? ssc.start()
? ? ssc.awaitTermination()
? }
}
在nc -lk 6789中輸入
20160410,zs
20160410,ls
20160410,ww
20160410,zs
20160410,ls
20160410,ww
20160410,zs
20160410,ls
20160410,ww
控制臺輸出
