1 Spark SQL性能調(diào)優(yōu)
通過(guò)緩存數(shù)據(jù)、調(diào)優(yōu)參數(shù)、增加并行度提升性能P94
1)緩存數(shù)據(jù)
構(gòu)建一個(gè)內(nèi)存中的列格式緩存表,Spark SQL僅掃描需要的列,并自動(dòng)調(diào)整壓縮比,使內(nèi)存使用率和GC壓力最小
2)調(diào)優(yōu)參數(shù)
優(yōu)化選項(xiàng)配置參數(shù)
配置可分區(qū)的數(shù)目 shuffle.partitions
配置將需要執(zhí)行的sort溢出到磁盤上,否則在每個(gè)分區(qū)的內(nèi)存中
3)增加并行度
合理設(shè)置并行度提升文件加載效率和并行執(zhí)行效率
Spark的數(shù)據(jù)采用內(nèi)存列式存儲(chǔ),實(shí)際執(zhí)行查詢階段效率較高,相對(duì)而言,數(shù)據(jù)加載階段耗時(shí)較長(zhǎng),對(duì)于如何提升數(shù)據(jù)加載效率,并行加載數(shù)據(jù)是一個(gè)優(yōu)化方向。
2 Spark Streaming性能調(diào)優(yōu)
重點(diǎn)需要考慮以下兩件事情:
有效使用集群資源,減少每批次數(shù)據(jù)的處理時(shí)間;
設(shè)置合理的窗口大小,從而使數(shù)據(jù)盡可能得到處理(即數(shù)據(jù)處理和數(shù)據(jù)接收節(jié)奏一致)
優(yōu)化運(yùn)行時(shí)間
優(yōu)化運(yùn)行時(shí)間可以降低每個(gè)批次數(shù)據(jù)的處理時(shí)間,主要包括:
提升數(shù)據(jù)接收并行度
- 通過(guò)提升Recerver的并發(fā)度:通過(guò)創(chuàng)建多個(gè)Dstream并配置從數(shù)據(jù)源接受不同分區(qū)的數(shù)據(jù)流,從而實(shí)現(xiàn)接受多個(gè)數(shù)據(jù)流;
- 調(diào)整Receiver的RDD數(shù)據(jù)分區(qū)時(shí)間間隔:修改blockInterval參數(shù),調(diào)整Receiver的blocking interval,對(duì)于大多數(shù)的Receiver,接受到的數(shù)據(jù)合并成大的數(shù)據(jù)塊,然后存儲(chǔ)在Spark的內(nèi)存中;推薦block interval最小值是50毫秒
提升數(shù)據(jù)處理的并行度
- 任務(wù)執(zhí)行階段并行度不高,則會(huì)造成集群資源利用率低下;確保均衡地使用整個(gè)集群的資源,而不是把任務(wù)集中在幾個(gè)特定的節(jié)點(diǎn)上,對(duì)于包含Shuffle的操作,增加其并行度以確保更充分的使用集群資源
減少序列化和反序列化負(fù)擔(dān)
- 數(shù)據(jù)序列化只要包括兩個(gè)方面:
RDD數(shù)據(jù)序列化:默認(rèn)情況下RDD被保存為序列化字節(jié)數(shù)組來(lái)減少GC停頓
輸入數(shù)據(jù)序列化,將獲取的外部數(shù)據(jù)插入Spark,接收到的數(shù)據(jù)為字節(jié)型,需要反序列化為Spark的序列化格式。因此輸入數(shù)據(jù)的反序列化開(kāi)銷可能會(huì)成為一個(gè)瓶頸
- Spark Streaming默認(rèn)將接收到的數(shù)據(jù)序列化存儲(chǔ),以減少內(nèi)存的使用。序列化和反序列化,需要更多的CPU時(shí)間,更加高效的序列化方式(Kryo)和自定義的序列化接口,可以更高效的使用CPU。
減少任務(wù)提交和分發(fā)開(kāi)銷
- 設(shè)置合適的Batch間隔,減少時(shí)延,可以將任務(wù)序列化(序列化的任務(wù)可以減少任務(wù)的大小,因此減少了發(fā)送到節(jié)點(diǎn)的時(shí)間),也可以使用粗粒度運(yùn)行任務(wù),相比細(xì)粒度有著更低的延遲。
優(yōu)化內(nèi)存使用
- 合理設(shè)置DStream存儲(chǔ)級(jí)別:RDD默認(rèn)是MEMORY_ONLY,DStream默認(rèn)是MEMORY_ONLY_SER,盡管保持?jǐn)?shù)據(jù)的序列化和反序列化會(huì)帶來(lái)更高的開(kāi)銷,但是卻大大減少了GC停頓的情況
- 及時(shí)清理持久化的RDD:會(huì)使用內(nèi)置的內(nèi)存清理策略LRU,設(shè)置自動(dòng)定期清除舊的內(nèi)容,設(shè)置spark.streaming.unpersist屬性啟動(dòng)內(nèi)存清理,減少Spark RDD內(nèi)存的使用,提升GC性能
- 并發(fā)垃圾收集策略:采用不同的GC策略進(jìn)一步減少GC對(duì)Job運(yùn)行的影響,例如:使用并行mark-and-sweep GC能減少GC的突然暫停情況
設(shè)置合適的批次大小
- 處理數(shù)據(jù)的速度要跟的上數(shù)據(jù)流入的速度,如何設(shè)置Batch size和數(shù)據(jù)輸入速度,確保系統(tǒng)能跟得上數(shù)據(jù)輸入速度,可根據(jù)經(jīng)驗(yàn)調(diào)整,查看日志獲取總延遲,進(jìn)行調(diào)整;如果延遲時(shí)間< Batch批處理時(shí)間,則系統(tǒng)穩(wěn)定,如果延遲一直增加,說(shuō)明系統(tǒng)的處理速度跟不上數(shù)據(jù)的輸入速度
Spark Sreaming的容錯(cuò)處理
1)回顧RDD的容錯(cuò)處理

2)Spark streaming的容錯(cuò)處理
- 文件輸入源:
有容錯(cuò)的文件系統(tǒng),HDFS S3
- 基于Recerver的輸入源
Spark1.2后,接受數(shù)據(jù)進(jìn)行容錯(cuò)存儲(chǔ),并提前寫日志(write ahead log),用來(lái)實(shí)現(xiàn)零數(shù)據(jù)丟失。
可靠的接收器
不可靠的接收器
- 輸出操作
所有的數(shù)據(jù)都以RDD操作的血統(tǒng)形式存在,任何重復(fù)計(jì)算都會(huì)得到相同的結(jié)果,這樣一來(lái),所有的DStream轉(zhuǎn)換都確保恰有一次的語(yǔ)義。
3 Spark性能調(diào)優(yōu)
優(yōu)化的目的是
保證大數(shù)據(jù)量下任務(wù)運(yùn)行成功
降低資源消耗
提高計(jì)算性能
1)程序優(yōu)化:
在進(jìn)行shuffle操作時(shí),如reduceByKey、groupByKey,會(huì)劃分新的stage。同一個(gè)stage內(nèi)部使用pipe line進(jìn)行執(zhí)行,效率較高;stage之間進(jìn)行shuffle,效率較低。shuffle會(huì)產(chǎn)生網(wǎng)絡(luò)磁盤IO,故大數(shù)據(jù)量下,應(yīng)進(jìn)行代碼結(jié)構(gòu)優(yōu)化,盡量減少shuffle操作。
設(shè)置緩存cache,根據(jù)程序緩存合適的RDD,設(shè)置緩存級(jí)別
優(yōu)化Partition,重新分區(qū)或者合并小文件,降低并發(fā)量
2)資源配置:
設(shè)置合適的資源參數(shù)(executor的數(shù)量,內(nèi)存大小,并發(fā)數(shù)量等)
查看日志
打印GC日志,或者查看本地的executor的日志,或者查看yarn web UI運(yùn)行的日志
內(nèi)存優(yōu)化/GC優(yōu)化:
可以減少整個(gè)堆內(nèi)存的大??;可以讓年輕代的對(duì)象盡快進(jìn)入年老代,增加年老代的內(nèi)存;可以讓年老代更頻繁的進(jìn)行父gc。
3)其他優(yōu)化
壓縮處理、序列化、設(shè)置共享變量等
1)輸入采用大文件
將數(shù)量眾多的小文件合并成大一些的文件,對(duì)小文件做預(yù)處理
2)IZO壓縮處理
3)Cache壓縮
RDD Cache本身的目的是追求速度,減少重算步驟。但往往會(huì)對(duì)內(nèi)存造成負(fù)擔(dān),因此緩存壓縮也是性能優(yōu)化、減小負(fù)擔(dān)的一部分。下面主要介紹Spark中各種配置對(duì)壓縮的影響。
1、利用spark.rdd.compress壓縮(默認(rèn)是不壓縮的)
2、利用spark.io.compression.codec壓縮(默認(rèn)采用Snappy壓縮)
4)序列化數(shù)據(jù)
Spark提供了一下兩種序列化類庫(kù):
1、Java序列化
Java序列化: Java的ObjectOutputStream框架作為Spark序列化默認(rèn)的序列化方法,只需要實(shí)現(xiàn)java.io.Serializable接口就可以直接使用。Java序列化很靈活,但是速度很慢,同時(shí)序列化的格式也很大。
2、Kryo序列化
Spark也可以支持Kryo的序列化庫(kù)(version 2), Kryo序列化能夠更快地序列化數(shù)據(jù),而且比Java序列化更加高效,通常序列化速度為Java序列化的10倍。但是Kryo序列化并不支持所有的Serializable類型,并且使用Kryo序列化,需要注冊(cè)后才能使用。
5)緩存
spark.executor.memory:決定了每個(gè)Executor可用內(nèi)存的大小 ;
spark.storage memoryFraction:則決定了在這部分內(nèi)存中有多少可以用于Memory Store管理RDD Cache數(shù)據(jù);
剩下的內(nèi)存:用來(lái)保證任務(wù)運(yùn)行時(shí)各種其他內(nèi)存空間的需要。
spark.executor.memory默認(rèn)值為0.6,官方文檔建議這個(gè)比值不要超過(guò)JVM Old Gen區(qū)域的比值。這也很容易理解,因?yàn)镽DD Cache數(shù)據(jù)通常都是長(zhǎng)期駐留內(nèi)存的,理論上也就是說(shuō)最終會(huì)被轉(zhuǎn)移到Old Gen區(qū)域(如果該RDD還沒(méi)有被刪除),如果這部分?jǐn)?shù)據(jù)允許的尺寸太大,勢(shì)必把Old Gen區(qū)域占滿,造成頻繁的Full GC.
如何調(diào)整這個(gè)比值,取決于你的應(yīng)用對(duì)數(shù)據(jù)的使用模式和數(shù)據(jù)的規(guī)模,粗略地來(lái)說(shuō),如果頻繁發(fā)生Full GC,可以考慮降低這個(gè)比值,這樣RDD Cache可用的內(nèi)存空間減少(剩下的部分Cache數(shù)據(jù)就需要通過(guò)Disk Store寫到磁盤上),會(huì)帶來(lái)一定的性能損失,但是這會(huì)騰出更多的內(nèi)存空間用于執(zhí)行任務(wù),減少Full GC發(fā)生的次數(shù),反而可能改善程序運(yùn)行的整體性能。
當(dāng)發(fā)現(xiàn)JVM的垃圾收集經(jīng)常消耗較高或耗盡內(nèi)存時(shí),以防任務(wù)運(yùn)行緩慢,可以設(shè)置此值降低內(nèi)存消耗。如果想改變此值為50%,可以通過(guò)在SparkConf中設(shè)置conf.set("spark storage.memoryFraction", "0.5")完成。結(jié)合使用序列化緩存,使用一個(gè)較小的緩存應(yīng)足以減輕大多數(shù)垃圾收集的問(wèn)題。
6)共享變量
Spark支持以下兩種類型的共享變量:
- 廣播變量:可以在內(nèi)存的所有節(jié)點(diǎn)中被訪問(wèn),用于緩存變量(只讀)。
- 累加器:只能用來(lái)做加法的變量,如計(jì)數(shù)和求和。
在任務(wù)和驅(qū)動(dòng)程序之間共享變量(如靜態(tài)查找表),可以極大減少每個(gè)序列化任務(wù)的大小以及在集群中啟動(dòng)一個(gè)Job的代價(jià)。
7)流水線優(yōu)化
寬依賴和窄依賴的根本區(qū)別是操作是否存在Shuffle操作。
窄依賴指父RDD的每一個(gè)分區(qū)最多被一個(gè)子RDD的分區(qū)所用,表現(xiàn)為一個(gè)父RDD的分區(qū)對(duì)應(yīng)于一個(gè)子RDD的分區(qū),以及兩個(gè)父RDD的分區(qū)對(duì)應(yīng)于一個(gè)子RDD的分區(qū)。(一個(gè)或者多個(gè)父分區(qū)對(duì)應(yīng)一個(gè)子分區(qū))
寬依賴指子RDD的分區(qū)依賴于父RDD的所有分區(qū),這是因?yàn)镾huffle類操作。(一個(gè)父分區(qū)對(duì)應(yīng)多個(gè)子分區(qū))
窄依賴對(duì)優(yōu)化很有利。邏輯上,每個(gè)RDD的算子都是一個(gè)Fork/Join (此Join非上文的 Join算子,而是指同步多個(gè)并行任務(wù)的屏障Barrier):把計(jì)算Fork到每個(gè)分區(qū),算完后Join,然后Fork/Join下一個(gè)RDD的算子。如果直接翻譯到物理實(shí)現(xiàn),是很不經(jīng)濟(jì)的:
一是每一個(gè) RDD (即使是中間結(jié)果)都需要物化到內(nèi)存或存儲(chǔ)中,費(fèi)時(shí)費(fèi)空間;
二是Join作為全局的屏障Barrier,是很昂貴的,會(huì)被最慢的那個(gè)節(jié)點(diǎn)拖死。如果子RDD的分區(qū)到父RDD的分區(qū)是窄依賴,就可以實(shí)施經(jīng)典的Fusion優(yōu)化,把兩個(gè)Fork/Join合為一個(gè);
如果連續(xù)的變換算子序列都是窄依賴,就可以把很多個(gè)Fork/Join并為一個(gè),不但減少了大量的全局屏障Barrier,而且無(wú)需物化很多中間結(jié)果RDD,這將極大地提升性能。
Boy-20180717