Spark Streaming學(xué)習(xí)六七八章筆記

通過詞頻統(tǒng)計功能學(xué)習(xí)Spark-submit的使用:

先打開一個命令窗口輸入nc -lk 9999


然后在另一個窗口,spark的bin文件夾下輸入

./spark-submit --master local[2] \

--class org.apache.spark.examples.streaming.NetworkWordCount \

--name NetworkWordCount \

/home/hadoop/app/spark-2.2.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.2.0.jar hadoop000 9999

在netcat窗口輸入a a a a b b之后再spark窗口的流式輸出會見到詞頻統(tǒng)計的結(jié)果。


sparkStreaming工作原理(粗粒度)

Spark Streaming接收到實時數(shù)據(jù)流,把數(shù)據(jù)按照指定的時間段切成一片片小的數(shù)據(jù)塊,然后把小的數(shù)據(jù)塊傳給Spark Engine處理。


sparkStreaming工作原理(細(xì)粒度)

細(xì)粒度工作原理

首先,spark應(yīng)用程序運行在driver端,driver需要在Executor(電腦)中啟動Receiver接收器,接收數(shù)據(jù)流,并且分模塊接收,可能還會以副本的方式存儲,接收了一個周期之后,Executor會向spark應(yīng)用程序返回接收情況(分塊數(shù)量,副本數(shù)量等等)應(yīng)用程序會將任務(wù)分發(fā)到Executor中。

DStream概念:對DStream進(jìn)行操作,比如map/flatMap,其實底層會被翻譯為對DStream中的每個RDD都做相同的操作,因為一個DStream是由不同批次的RDD所構(gòu)成的。

每一個輸入流Input DStreamings 都要對應(yīng)一個receivers來接收它,Input DStreamings的種類:文件系統(tǒng),socket傳輸,Kafka,F(xiàn)lume。

Output Operation 的種類:print(),saveAsTextFiles保存到文件系統(tǒng),saveAsHadoopFiles等。

實戰(zhàn):spark streaming 處理socket數(shù)據(jù)

object NetworkWorldCount {

? def main(args: Array[String]): Unit = {

? ? val sparkConf = new SparkConf().setMaster("local").setAppName("NetworkWorldCount")

? ? val ssc = new StreamingContext(sparkConf, Seconds(5))

? ? val lines = ssc.socketTextStream("localhost",6789)

? ? val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

? ? result.print()

? ? ssc.start()

? ? ssc.awaitTermination()

? }

}


在另外一個控制臺里輸入

nc -lk 6789

a a a a c c c d d d?

結(jié)果:


spark streaming 處理socket數(shù)據(jù)

實戰(zhàn):spark streaming 處理socket數(shù)據(jù)并寫入mysql數(shù)據(jù)庫

object ForeachRDDApp {

? def main(args: Array[String]): Unit = {

? ? val sparkConf = new SparkConf().setAppName("ForeachRDDApp").setMaster("local[2]")

? ? val ssc = new StreamingContext(sparkConf, Seconds(5))

? ? val lines = ssc.socketTextStream("localhost", 6789)

? ? val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

//前幾行不變。。

? ? result.foreachRDD(rdd => {? ? ? ?//循環(huán)每一個Rdd

? ? ? rdd.foreachPartition(partitionOfRecords => {? //在一個rdd里循環(huán)每一個partition

? ? ? ? val connection = createCOnnection()? ? ?//獲取mysql連接

? ? ? ? partitionOfRecords.foreach(record => {? ? ? ? //在每一個partition里獲取一條記錄

? ? ? ? ? val sql = "insert into wordcount(word, wordcount) values('" + record._1+ "'," + record._2+")"

? ? ? ? ? connection.createStatement().execute(sql)

? ? ? ? })

? ? ? ? connection.close()

? ? ? })

? ? })

? ? ssc.start()

? ? ssc.awaitTermination()

? }

? def createCOnnection() = {

? ? Class.forName("com.mysql.jdbc.Driver")

? ? DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_spark","root","root")

? }

結(jié)果:


結(jié)果


spark streaming從socket接收數(shù)據(jù)后根據(jù)標(biāo)準(zhǔn)過濾數(shù)據(jù)實戰(zhàn)(黑名單例子)




//構(gòu)建黑名單

object TransformApp {

? def main(args: Array[String]): Unit = {

? ? val sparkConf = new SparkConf().setAppName("TransformApp").setMaster("local[2]")

? ? val ssc = new StreamingContext(sparkConf, Seconds(5))

//跟前面一樣

? ? val blacks = List("zs","ls")? ? //構(gòu)建黑名單List

? ? val blackRDD = ssc.sparkContext.parallelize(blacks).map(x=>(x,true))? ? ? ? //將List轉(zhuǎn)成(zs,true)的這種RDD類型

? ? val lines = ssc.socketTextStream("localhost", 6789)? ? ? ? //lines是DSTream類型

? ? val clicklog = lines.map(x => (x.split(",")(1),x)).transform(rdd => {? ? ? ?

//lines是這種類型的數(shù)據(jù)(20160410,zs) 根據(jù)逗號分隔后重整為(zs:20160410,zs),即為(x.split(",")(1),x))的結(jié)果,得到的結(jié)果仍然是RDD類型,transform函數(shù)是將每個Rdd拿出來操作。

? ? ? rdd.leftOuterJoin(blackRDD)? ?//每個rdd都跟blackRDD進(jìn)行l(wèi)eftOuterJoin,得到(zs:[<20160410,zs>,<true>])這種類型的數(shù)據(jù)

? ? ? ? .filter(x=> x._2._2.getOrElse(false) != true)? ? ? ? ?//過濾,將參數(shù)的第二個中的第二個為true的過濾掉。

? ? ? ? .map(x =>x._2._1)? ? ? //重整,將結(jié)構(gòu)變?yōu)閞dd中第二個的第一個,即為<20160410,zs>

? ? })

? ? clicklog.print()

? ? ssc.start()

? ? ssc.awaitTermination()

? }

}


在nc -lk 6789中輸入

20160410,zs

20160410,ls

20160410,ww

20160410,zs

20160410,ls

20160410,ww

20160410,zs

20160410,ls

20160410,ww

控制臺輸出


結(jié)果
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容