Spark 的性能分析和調(diào)優(yōu)很有意思,今天再寫一篇。主要話題是 shuffle,當(dāng)然也牽涉一些其他代碼上的小把戲。
以前寫過一篇文章,比較了幾種不同場景的性能優(yōu)化,包括 portal 的性能優(yōu)化,web service 的性能優(yōu)化,還有 Spark job 的性能優(yōu)化。Spark 的性能優(yōu)化有一些特殊的地方,比如實(shí)時性一般不在考慮范圍之內(nèi),通常我們用Spark來處理的數(shù)據(jù),都是要求異步得到結(jié)果的數(shù)據(jù);再比如數(shù)據(jù)量一般都很大,要不然也沒有必要在集群上操縱這么一個大家伙,等等。事實(shí)上,我們都知道沒有銀彈,但是每一種性能優(yōu)化場景都有一些特定的“大 boss”,通常抓住和解決大 boss 以后,能解決其中一大部分問題。比如對于 portal 來說,是頁面靜態(tài)化,對于 web service 來說,是高并發(fā)(當(dāng)然,這兩種可以說并不確切,這只是針對我參與的項(xiàng)目總結(jié)的經(jīng)驗(yàn)而已),而對于Spark來說,這個大 boss 就是 shuffle。
首先要明確什么是 shuffle。Shuffle 指的是從map階段到 reduce 階段轉(zhuǎn)換的時候,即 map的 output 向著 reduce 的 input 映射的時候,并非節(jié)點(diǎn)一一對應(yīng)的,即干 map 工作的 slave A,它的輸出可能要分散跑到 reduce 節(jié)點(diǎn) A、B、C、D …… X、Y、Z 去,就好像shuffle 的字面意思“洗牌”一樣,這些 map 的輸出數(shù)據(jù)要打散然后根據(jù)新的路由算法(比如對key進(jìn)行某種 hash 算法),發(fā)送到不同的 reduce 節(jié)點(diǎn)上去。(下面這幅圖來自《Spark Architecture: Shuffle》)

?
Spark 性能優(yōu)化和 shuffle 搏斗
為什么說 shuffle 是 Spark job 的大 boss,就是因?yàn)?Spark 本身的計(jì)算通常都是在內(nèi)存中完成的,比如這樣一個 map 結(jié)構(gòu)的 RDD:(String, Seq),key 是字符串,value 是一個Seq,如果只是對 value 進(jìn)行一一映射的 map 操作,比如(1)先計(jì)算 Seq 的長度,(2)再把這個長度作為元素添加到 Seq 里面去。這兩步計(jì)算,都可以在 local 完成,而事實(shí)上也是在內(nèi)存中操作完成的,換言之,不需要跑到別的 node 上去拿數(shù)據(jù),因此執(zhí)行的速度是非??斓摹5?,如果對于一個大的 rdd,shuffle 發(fā)生的時候,就會因?yàn)榫W(wǎng)絡(luò)傳輸、數(shù)據(jù)序列化/反序列化產(chǎn)生大量的磁盤 IO 和 CPU 開銷。這個性能上的損失是非常巨大的。
要減少 shuffle 的開銷,主要有兩個思路:
減少 shuffle 次數(shù),盡量不改變 key,把數(shù)據(jù)處理在 local 完成;
減少 shuffle 的數(shù)據(jù)規(guī)模。
先去重,再合并
比如有 A、B 這樣兩個規(guī)模比較大的 RDD,如果各自內(nèi)部有大量重復(fù),那么二者一合并,再去重:
1A.union(B).distinct()
這樣的操作固然正確,但是如果可以先各自去重,再合并,再去重,可以大幅度減小shuffle 的開銷(注意 Spark 的默認(rèn) union 和 Oracle 里面的“union all”很像——不去重):
1A.distinct().union(B.distinct()).distinct()
看起來變復(fù)雜了對不對,但是當(dāng)時我解決這個問題的時候,用第二種方法時間開銷從3個小時減到20分鐘。
如果中間結(jié)果 rdd 如果被調(diào)用多次,可以顯式調(diào)用 cache()和 ersist(),以告知 Spark,保留當(dāng)前 rdd。當(dāng)然,即便不這么做,Spark 依然存放不久前計(jì)算過的結(jié)果(以下來自官方指南):
Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.
數(shù)據(jù)量大,并不一定慢。通常情況下,由于 Spark 的 job 是放到內(nèi)存里面進(jìn)行運(yùn)算的,因此一個復(fù)雜的 map 操作不一定執(zhí)行起來很慢。但是如果牽涉到 shuffle,這里面有網(wǎng)絡(luò)傳輸和序列化的問題,就有可能非常慢。
類似地,還有 filter 等等操作,目的也是要先對大的 RDD 進(jìn)行“瘦身”操作,然后在做其他操作。
mapValues 比 map 好
明確 key 不會變的 map,就用 mapValues 來替代,因?yàn)檫@樣可以保證 Spark 不會 shuffle 你的數(shù)據(jù):
1A.map{case (A, ((B, C), (D, E))) => (A, (B, C, E))}
改成:
1A.mapValues{case ((B, C), (D, E)) => (B, C, E)}
用 broadcast + filter 來代替 join
這種優(yōu)化是一種特定場景的神器,就是拿大的 RDD A 去 join 一個小的 RDD B,比如有這樣兩個 RDD:
A 的結(jié)構(gòu)為(name, age, sex),表示全國人民的 RDD,超大
B的結(jié)果為(age, title),表示“年齡 -> 稱號”的映射,比如60歲有稱號“花甲之年”,70歲則是“古稀之年”,這個 RDD 顯然很小,因?yàn)槿说哪挲g范圍在 0~200 歲之間,而且有的“年齡”還沒有“稱號”
現(xiàn)在我要從全國人民中找出這些有稱號的人來。如果直接寫成:
123A.map{case (name, age, sex) => (age, (name, sex))}.join(B).map{case (age, ((name, sex), title)) => (name, age, sex)}
你就可以想象,執(zhí)行的時候超大的A被打散和分發(fā)到各個節(jié)點(diǎn)去。而且更要命的是,為了恢復(fù)一開始的(name, age, sex)的結(jié)構(gòu),又做了一次map,而這次map一樣導(dǎo)致shuffle。兩次shuffle,太瘋狂了。但是如果這樣寫:
12val b = sc.broadcast(B.collectAsMap)A.filter{case (name, age, sex) => b.values.contains(age)}
一次 shuffle 都沒有,A 老老實(shí)實(shí)待著不動,等著全量的 B 被分發(fā)過來。
另外,在 Spark SQL 里面直接有 BroadcastHashJoin,也是把小的 rdd 廣播出去。
不均勻的 shuffle
在工作中遇到這樣一個問題,需要轉(zhuǎn)換成這樣一個非常巨大的 RDD A,結(jié)構(gòu)是(countryId, product),key 是國家 id,value 是商品的具體信息。當(dāng)時在 shuffle 的時候,這個 hash 算法是根據(jù) key來選擇節(jié)點(diǎn)的,但是事實(shí)上這個 countryId 的分布是極其不均勻的,大部分商品都在美國(countryId=1),于是我們通過 Ganglia 看到,其中一臺 slave 的 CPU 特別高,計(jì)算全部聚集到那一臺去了。
找到原因以后,問題解決就容易了,要么避免這個 shuffle,要么改進(jìn)一下 key,讓它的 shuffle 能夠均勻分布(比如可以拿 countryId+ 商品名稱的 tuple 作 key,甚至生成一個隨機(jī)串)。
明確哪些操作必須在 master 完成
如果想打印一些東西到 stdout 里去:
1A.foreach(println)
想把 RDD 的內(nèi)容逐條打印出來,但是結(jié)果卻沒有出現(xiàn)在 stdout 里面,因?yàn)檫@一步操作被放到 slave 上面去執(zhí)行了。其實(shí)只需要 collect 一下,這些內(nèi)容就被加載到 master 的內(nèi)存中打印了:
1A.collect.foreach(println)
再比如,如果遇到 RDD 操作嵌套的情況,通常考慮優(yōu)化掉,因?yàn)橹挥衜aster才能去理解和執(zhí)行 RDD 的操作,slave 只能處理被分配的 task 而已。比如:
1A.map{case (keyA, valueA) => doSomething(B.lookup(keyA).head, valueA)}
就可以用 join 來代替:
1A.join(B).map{case (key, (valueA, valueB)) => doSomething(valueB, valueA)}
用 reduceByKey 代替 groupByKey
這一條應(yīng)該是比較經(jīng)典的了。reduceByKey 會在當(dāng)前節(jié)點(diǎn)(local)中做 reduce 操作,也就是說,會在 shuffle 前,盡可能地減小數(shù)據(jù)量。而 groupByKey 則不是,它會不做任何處理而直接去 shuffle。當(dāng)然,有一些場景下,功能上二者并不能互相替換。因?yàn)?reduceByKey 要求參與運(yùn)算的 value,并且和輸出的 value 類型要一樣,但是 groupByKey 則沒有這個要求。
有一些類似的 xxxByKey 操作,都比 groupByKey 好,比如 foldByKey 和 aggregateByKey。
另外,還有一條類似的是用 treeReduce 來代替 reduce,主要是用于單個 reduce 操作開銷比較大,可以條件 treeReduce 的深度來控制每次 reduce 的規(guī)模。