翻譯自Spark官網(wǎng)文檔:https://spark.apache.org/docs/2.1.0/tuning.html
前言
由于大多數(shù)Spark計(jì)算的內(nèi)存使用特性,集群中的任何資源都可能成為Spark計(jì)算程序中的瓶頸:CPU,網(wǎng)絡(luò)帶寬或是內(nèi)存。大多數(shù)情況下,如果內(nèi)存可以容納數(shù)據(jù)量,那么瓶頸就會是網(wǎng)絡(luò)帶寬,但有時,用戶也需要去做一點(diǎn)調(diào)優(yōu)的工作,例如以序列化的格式存儲RDD,來減少內(nèi)存使用。本文主要關(guān)注兩個主題:數(shù)據(jù)序列化,對網(wǎng)絡(luò)性能和內(nèi)存使用來說很重要,和內(nèi)存調(diào)優(yōu)。同時也會討論一些較小的主題。
一、數(shù)據(jù)序列化
序列化在分布式應(yīng)用中起到很重要的作用。那些會讓對象序列化過程緩慢,或是會消耗大量字節(jié)存儲的序列化格式會大大降低計(jì)算速率。通常這會用戶在優(yōu)化Spark應(yīng)用程序中的第一件事。Spark旨在在便利(允許您使用您的操作中的任何Java類型)和性能之間實(shí)現(xiàn)平衡。它提供了下面兩種序列化庫:
(1)Java serialization:Spark默認(rèn)使用Java的ObjectOutputStream框架來序列化對象,可以對任何實(shí)現(xiàn)了java.io.Serializable的任何類進(jìn)行序列化。用戶也可以通過繼承來實(shí)現(xiàn)更緊密的序列化性能控制。
(2)Kryo serialization:Spark也可以使用Kryo庫(version 2)來實(shí)現(xiàn)更快的對象序列化。Kryo比Java序列化更快、數(shù)據(jù)格式更緊湊,但不支持所有的Serializable類型。用戶如果希望使用Kryo來獲取更好的性能,需要先去注冊應(yīng)用程序中會使用到的類。
用戶可以在初始化任務(wù)時通過設(shè)定SparkConf中的conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")來切換序列化框架為Kryo。這里的序列化配置不僅可以對worker節(jié)點(diǎn)之間的shuffle數(shù)據(jù)起作用,還可以在將RDD序列化到disk上時起作用。Kryo不是默認(rèn)序列化選擇的唯一原因它要求了用戶的注冊行為,但是我們建議在所有網(wǎng)絡(luò)密集型應(yīng)用程序中使用它。從Spark2.0.0開始,我們在傳輸簡單類型或是字符串類型的Shuffle RDD時會默認(rèn)使用Kryo序列化。
Spark自動對許多在Twitter chill庫中的AllScalaRegistrar被覆蓋的常用的Scala類注冊了Kryo。
注冊用戶自身的類到kryo時,可以使用registerKryoClasses方法:
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
Kryo的文檔https://github.com/EsotericSoftware/kryo描述了更多進(jìn)階的注冊選項(xiàng),例如增加用戶序列化代碼等等。
如果用戶的對象很大,也需要去增加spark.kryoserializer.buffer配置項(xiàng)。這個值需要達(dá)到足以保存你將要序列化的最大的對象。
最后,如果你不注冊用戶類,kryo也可以工作,但是它將會存儲每個對象的全類名,會造成存儲空間的浪費(fèi)。
二、內(nèi)存調(diào)優(yōu)
在對內(nèi)存的使用進(jìn)行調(diào)優(yōu)時有三個考慮點(diǎn):用戶對象的內(nèi)存使用量(用戶可能希望整個數(shù)據(jù)集都保存在內(nèi)存中),訪問這些對象的開銷和垃圾回收的開銷(如果用戶的對象周轉(zhuǎn)率很高)。
默認(rèn)情況下,java對象的訪問是很快的,但很容易就會消耗比字段中原始數(shù)據(jù)多2-5倍的空間。這是以下幾個原因?qū)е碌模?/p>
(1)每個不同的Java對象都有一個“object header”,這個頭部大概會占用16bytes的空間并且會包含指向類的指針等信息。對于一個數(shù)據(jù)量很小的對象(例如一個Int對象),它會比數(shù)據(jù)占用的空間更大。
(2)Java字符串比原始字符串?dāng)?shù)據(jù)多了大約40個字節(jié)的開銷(因?yàn)樗鼈兪且訡hars數(shù)據(jù)的形式存儲的,并且保存了一些例如length的額外信息),并且由于字符串內(nèi)部的UTF-16編碼,會將它存儲為兩個bytes。所以一個有10個character的字符串會很容易消耗60bytes。
(3)常用的集合類,例如HashMap和LinkedList,使用鏈?zhǔn)綌?shù)據(jù)結(jié)構(gòu),它對于每個entry(例如Map.Entry)會有一個"wrapper"對象。這個對象不僅包含頭部信息,還包含了一個指向列表中下一個對象的指針(通常會占用8bytes)。
(4)原始類型的集合通常將它們存儲為“boxed”對象,如java .lang. integer
本章會以Spark的內(nèi)存管理機(jī)制的概述開始,然后討論用戶能在應(yīng)用程序中采用的更有效的內(nèi)存策略。特別地,我們還會討論如何確定你的對象的內(nèi)存使用量,以及如何通過改變數(shù)據(jù)結(jié)構(gòu)或是在序列化格式中進(jìn)行排序來對內(nèi)存使用進(jìn)行改進(jìn)。最后我們會討論Spark的內(nèi)存調(diào)優(yōu)和java的垃圾回收器。
2.1 內(nèi)存管理概述
Spark的內(nèi)存使用基本上可以分為兩大類:執(zhí)行內(nèi)存和存儲內(nèi)存。執(zhí)行內(nèi)存指的是在shuffle,join,和aggregation計(jì)算中使用的內(nèi)存,存儲內(nèi)存指的是集群中緩存和傳播內(nèi)部數(shù)據(jù)使用的內(nèi)存。在Spark中,執(zhí)行和存儲共享一個統(tǒng)一的區(qū)域M。當(dāng)沒有執(zhí)行內(nèi)存使用時,存儲可以獲得全部的可用內(nèi)存,反之亦然。執(zhí)行在必要的時候可能會驅(qū)逐內(nèi)存,但只有在總存儲內(nèi)存使用量地域某個閾值R時才會觸發(fā)。用另一句話來說,R描述在統(tǒng)一內(nèi)存M中一定不會被驅(qū)逐的緩存block子集。由于實(shí)現(xiàn)的復(fù)雜性,存儲不會進(jìn)行內(nèi)存驅(qū)逐。
這種設(shè)計(jì)方案確保了幾個令人滿意的特性。首先,不使用緩存的應(yīng)用可以使用全部內(nèi)存來用于執(zhí)行,從而消除不必要的磁盤溢出。其次,使用緩存的應(yīng)用程序可以保留最小的不受驅(qū)逐的數(shù)據(jù)庫存儲空間R。最后,這種方法為各種工作負(fù)載提供了合理的開箱即用性能,不需要用戶了解內(nèi)存如何內(nèi)部劃分的專門知識。
盡管有兩個相關(guān)的配置,但是通常用戶不需要對它們進(jìn)行調(diào)整,因?yàn)槟J(rèn)值適用于大多數(shù)工作負(fù)載:
spark.memory.fraction 代表整體JVM堆內(nèi)存中M的百分比(默認(rèn)0.6)。剩余的空間(40%)是為用戶數(shù)據(jù)結(jié)構(gòu)、Spark內(nèi)部metadata預(yù)留的,并在稀疏使用和異常大記錄的情況下避免OOM錯誤。
spark.memory.storageFraction 代表M中R的百分比(默認(rèn)0.5)。R是M中提供給緩存數(shù)據(jù)塊避免受到執(zhí)行驅(qū)逐的存儲空間。
spark.memory.fraction的值應(yīng)該設(shè)置為可以適配JVM的老年代或終身代的使用。具體可以參考下面的GC章節(jié)。
2.2 內(nèi)存消耗確定
評估數(shù)據(jù)集所需的內(nèi)存消耗的最好方法是創(chuàng)建一個RDD,放到內(nèi)存里,并且通過web UI來查看存儲使用量。這個頁面會告訴你這個RDD占用了多少內(nèi)存。
估算某一個特定對象的內(nèi)存消耗,可以使用SizeEstimator的estimate方法,這對于嘗試不同的數(shù)據(jù)布局來減少內(nèi)存使用,以及確定一個廣播變量將占用每個執(zhí)行器堆的空間量是很有用的。
2.3 數(shù)據(jù)結(jié)構(gòu)調(diào)優(yōu)
減少內(nèi)存消耗的首選方法是避免使用會增加開銷的java特性,例如基于指針的數(shù)據(jù)結(jié)構(gòu)和包裝器對象。下面是集中解決方法:
將數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì)為更傾向于數(shù)組結(jié)構(gòu)和基本類型,而不是標(biāo)準(zhǔn)的Java或是Scala集合類(例如. HashMap)。fastutil庫提供了與java標(biāo)準(zhǔn)庫兼容的原始類型的集合。
盡可能避免包含需要小對象和指針的嵌套結(jié)構(gòu)
考慮使用數(shù)字ID或是枚舉對象而不是字符串key
如果你的RAM小于32GB,設(shè)置JVM參數(shù) -XX:+UseCompressedOops 來讓指針變?yōu)?個字節(jié)而不是8個字節(jié)。可以將這個配置加載spark-env.sh中
2.4 序列化RDD存儲
當(dāng)盡管進(jìn)行了調(diào)優(yōu),但你的對象仍然太大,無法有效存儲時,一個更簡單的方法是使用序列化的格式來存儲它們以此來減少內(nèi)存的使用,使用RDD persistance API來設(shè)置序列化的存儲級別,例如MEMORY_ONLY_SER。Spark將RDD的每一個分區(qū)作為一個大的字節(jié)數(shù)組進(jìn)行存儲。以序列化格式存儲數(shù)據(jù)的唯一缺點(diǎn)是訪問速度較慢,因?yàn)椴坏貌辉谑褂弥蟹葱蛄谢恳粋€對象。如果您想以序列化的形式緩存數(shù)據(jù),那么我們強(qiáng)烈建議使用Kryo,因?yàn)樗菾ava序列化(當(dāng)然也要比原始Java對象)小得多。
2.5 垃圾回收調(diào)優(yōu)
當(dāng)你的程序中存儲的RDD有大量的替換和變更時,JVM垃圾回收可能會造成問題。它在只讀取一次RDD并在其上運(yùn)行許多操作的程序中通常不會造成問題。當(dāng)Java需要將舊對象驅(qū)逐出去來為新對象騰出空間時,它需要跟蹤所有的Java對象來找到未引用的對象。這里需要記住的要點(diǎn)是,垃圾收集的成本與Java對象的數(shù)量成正比,因此使用較少對象的數(shù)據(jù)結(jié)構(gòu)(例如使用int的數(shù)組而不是LinkedList)會極大地減少消耗。一個更好的方法是以序列化的形式持久化對象,如上所述:每個RDD的分區(qū)只會有一個對象(一個字節(jié)數(shù)組)。在嘗試其他技術(shù)之前,首先要嘗試的是使用序列化的緩存。
由于任務(wù)的工作內(nèi)存(運(yùn)行任務(wù)所需的空間量)和在節(jié)點(diǎn)上緩存的RDDs之間的干擾, GC也可能是一個問題。我們將討論如何控制分配給RDD緩存的空間以減輕這個問題。
2.5.1 測量GC的影響
GC調(diào)優(yōu)的第一步是收集GC發(fā)生頻率和GC時間的統(tǒng)計(jì)??梢酝ㄟ^增加 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 的Java選項(xiàng)來實(shí)現(xiàn)。http://spark.apache.org/docs/latest/configuration.html#Dynamically-Loading-Spark-Properties中詳細(xì)描述了將Java參數(shù)傳遞給Spark Job的方法。下次Spark應(yīng)用程序運(yùn)行時,就可以看到Woker節(jié)點(diǎn)的log會打印出GC信息。注意這些log是在集群中的workder節(jié)點(diǎn),而不是driver程序中。
2.5.2 GC調(diào)優(yōu)
為了進(jìn)一步優(yōu)化垃圾收集,我們首先需要了解JVM中關(guān)于內(nèi)存管理的一些基本信息:
Java對內(nèi)存被分為兩個區(qū)域,新生代和老年代。新生代是為了保存壽命較短的對象,而老年代是為了保持壽命更長的對象。
新生代被進(jìn)一步劃分為三個區(qū)域: Eden,Survivor1,Survivor2
垃圾收集過程的簡化描述:當(dāng)Eden區(qū)使用占滿時,一個minor GC會在Eden中發(fā)生,仍然存活的對象會從Eden和Survivor1區(qū)域中復(fù)制到Survivor2。如果一個對象存活的時間夠久或是Survivor2區(qū)域空間占滿時,它會移動到老年代。最后當(dāng)老年空間接近占滿時,會觸發(fā)full GC。
Spark中的GC調(diào)優(yōu)的目的是為了確保只有長期存在RDD會存儲在老年代中,新生代有足夠大的空間來存儲短期對象。這有助于在任務(wù)執(zhí)行期間避免收集臨時對象造成的full GC。下面是一些可用步驟:
通過收集GC狀態(tài)來檢查是否有太多GC。如果在一個任務(wù)完成之前觸發(fā)了好幾次full GC,意味著任務(wù)執(zhí)行的可用內(nèi)存不足。
如果有許多minor GC但是沒有太多major GC,可以為Eden分配更多內(nèi)存。可以通過估計(jì)任務(wù)的來村來設(shè)置Eden的大小。如果Eden的大小被設(shè)定為E,可以通過-Xmn=4/3*E來設(shè)置新生代的大小。(4 / 3的比例是為了Survivor使用的空間)
在打印出來的GC狀態(tài)中,如果老年代接近占滿,可以通過減低spark.memory.fraction來減少用于緩存的內(nèi)存。緩存較少的隊(duì)相比減慢任務(wù)執(zhí)行速率要好。另外,也可以考慮減少新生代的大小。這意味著降低-Xmn的設(shè)置。或者嘗試獲取JVM的NewRatio參數(shù),許多JVM默認(rèn)設(shè)置為2,意味著老年代占據(jù)了2/3的堆內(nèi)存。它應(yīng)該足夠大,一直未這個比例超過了spark.memory.fraction、
通過設(shè)置-XX:+UseG1GC來使用G1GC垃圾回收器。在某些情況,垃圾收集是一個瓶頸,它可以提高性能。注意,在堆內(nèi)存夠大時,需要通過-XX:G1HeapRegionSize來增大G1區(qū)域大小。
如果你的任務(wù)是從HDFS中讀取數(shù)據(jù),可以使用從HDFS讀取的數(shù)據(jù)塊的大小來估計(jì)任務(wù)所使用的內(nèi)存數(shù)量。注意,解壓縮塊的大小通常是塊大小的2-3倍,因此,如果我們希望獲得3-4個任務(wù)空間,而HDFS的塊大小是128MB,我們可以估計(jì)Eden的大小為43128MB。
更改設(shè)置后持續(xù)監(jiān)視GC的頻率和時間
我們的經(jīng)驗(yàn)表明,GC調(diào)優(yōu)的效果取決于您的應(yīng)用程序和可用內(nèi)存的數(shù)量。在網(wǎng)上有更多的調(diào)優(yōu)選項(xiàng),管理頻繁的GC發(fā)生的頻率可以幫助減少開銷。
執(zhí)行器的GC調(diào)整標(biāo)志可以通過設(shè)置作業(yè)配置中的"spark.executor.extraJavaOptions"來指定。
三、其他
3.1 并行級別
除非每一個操作的并行度都設(shè)置的足夠高,要不然集群不會被充分利用。Spark自動根據(jù)文件的大小設(shè)定了運(yùn)行在其上的map任務(wù)的數(shù)量(也可以通過SparkContext.textFile參數(shù)來控制),并且對于分布式的reduce操作,例如groupBykey和reduceByKey,它會使用父RDD中最大的分區(qū)數(shù)量。你可以將并行度作為一個次級參數(shù)床底,或是設(shè)置在配置文件spark.default.parallelism來改變默認(rèn)配置。通常情況下,我們推薦為集群中的每個CPU分配2-3個任務(wù)。
3.2 Reduce任務(wù)的內(nèi)存使用
有些時候,你會因?yàn)閠ask中的數(shù)據(jù)集,例如groupByKey,太大而造成OutOfMemoryError,而不是RDD和內(nèi)存不匹配。Spark的shuffle操作(sortByKey,groupByKey,reduceByKey,join等等)會在每個任務(wù)中創(chuàng)建一個hash table來執(zhí)行g(shù)rouping操作,這個操作經(jīng)常會很大。最簡單的處理方案是增加并行度,讓每個任務(wù)獲取到的數(shù)據(jù)集更小。Spark對于短于200ms的任務(wù)執(zhí)行的很好,因?yàn)樗诙鄠€任務(wù)中重用一個executor JVM,任務(wù)的啟動成本很低,因此,你可以安全地將并行級別增加到您的集群中的核心數(shù)量。
3.3 廣播大變量
使用SparkContext中的廣播特性,你可以極大地減少序列化任務(wù)的大小,和集群中的啟動任務(wù)開銷。如果你的任務(wù)用到了driver中的一個大的對象(例如一個static lookup table),可以考慮將它變?yōu)閺V播變量。Spark將每個任務(wù)的序列化大小打印在主服務(wù)器上,因此您可以查看它來決定您的任務(wù)是否太大;一般來說,大于20kb的任務(wù)很可能是值得優(yōu)化的
3.4 數(shù)據(jù)本地性
數(shù)據(jù)本地性對于Spark任務(wù)的性能有很大的影響。如果數(shù)據(jù)和操作的代碼在一起,那么計(jì)算往往很快。但是由于代碼和數(shù)據(jù)是分離開的,它們中總會有一方要向另一方傳遞。通常,將序列化的代碼從一個地方發(fā)送到另一個地方比傳輸數(shù)據(jù)塊要快,因?yàn)榇a的大小比數(shù)據(jù)要小得多。Spark構(gòu)建了它圍繞數(shù)據(jù)局部性原則的調(diào)度。
數(shù)據(jù)本地性是數(shù)據(jù)和處理它的代碼之間的距離。下面有基于數(shù)據(jù)當(dāng)前維值的幾種本地性設(shè)置。通過選取最短距離來達(dá)成最快的處理速度:
PROCESS_LOCAL 數(shù)據(jù)在運(yùn)行代碼的同一個JVM中。這是最優(yōu)選擇
NODE_LOCAL 數(shù)據(jù)在同一個節(jié)點(diǎn)上。例如可能在同一個節(jié)點(diǎn)上的HDFS上,或是在同一個節(jié)點(diǎn)上的另一個處理器中。這比PROCESS_LOCAL稍微慢一點(diǎn),因?yàn)檫@涉及到進(jìn)程間的數(shù)據(jù)通信
NO_PREF 數(shù)據(jù)可以從任何地方同樣快速地訪問,并且沒有本地偏好
RACK_LOCAL 數(shù)據(jù)位于相同的服務(wù)器機(jī)架上。數(shù)據(jù)在同一個機(jī)架上的另一臺服務(wù)器上,所以需要通過網(wǎng)絡(luò)發(fā)送,通常需要通過一個網(wǎng)關(guān)
ANY 數(shù)據(jù)是在網(wǎng)絡(luò)上的其他地方,而不是在同一個機(jī)架上
Spark希望把所有的任務(wù)都安排在最合適的位置上,但這并不會總是可行的。在沒有任何空閑執(zhí)行機(jī)的情況下,Spark會切換到較低的局部性。有兩種選擇:a. 在同一個服務(wù)器上等待CPU空閑,再提交任務(wù) b. 立即在一個其他執(zhí)行機(jī)上開始執(zhí)行任務(wù),并將數(shù)據(jù)移動過去
Spark通常情況下會等待CPU空閑。一旦等待時間超時,它會開始移動數(shù)據(jù)到較遠(yuǎn)的空閑CPU上。每個級別之間的等待超時可以單獨(dú)配置,也可以在一個參數(shù)中組合在一起。具體配置參考spark.locality。默認(rèn)配置通常效果較好,可以根據(jù)任務(wù)特性來修改這些配置。
四、總結(jié)
本文是針對Spark應(yīng)用程序調(diào)優(yōu)中需要注意的主要問題的一個簡單指南,主要關(guān)注數(shù)據(jù)序列化和內(nèi)存調(diào)優(yōu)。對大多數(shù)應(yīng)用來說,切換到Kryo序列化并persist序列化數(shù)據(jù)可以解決大多數(shù)性能問題。