Spark數(shù)據(jù)傾斜的處理方案

0x01 數(shù)據(jù)傾斜發(fā)生的原理

Spark在進(jìn)行Shuffle的時(shí)候,必須將各個(gè)節(jié)點(diǎn)上相同的key拉取到某個(gè)節(jié)點(diǎn)上的一個(gè)task來(lái)進(jìn)行處理,比如按照key進(jìn)行聚合或join等操作。此時(shí)如果某個(gè)key對(duì)應(yīng)的數(shù)據(jù)量特別大的話,就會(huì)發(fā)生數(shù)據(jù)傾斜。

0x02 如何確定是數(shù)據(jù)傾斜問(wèn)題(如何定位發(fā)生數(shù)據(jù)傾斜的)

數(shù)據(jù)傾斜發(fā)生時(shí),Spark作業(yè)看起來(lái)運(yùn)行得非常緩慢,甚至可能因?yàn)槟硞€(gè)task處理的數(shù)據(jù)量過(guò)大導(dǎo)致內(nèi)存溢出。

1.數(shù)據(jù)傾斜問(wèn)題的排查步驟

1). 查看Spark UI,在stage頁(yè)面中可以看到每一個(gè)Stage的運(yùn)行時(shí)間、Shuffle Read數(shù)據(jù)量和Shuffle Write數(shù)據(jù)量。

stage信息.png

2). 根據(jù)運(yùn)行時(shí)間、Shuffle read的數(shù)據(jù)量,選擇一個(gè)認(rèn)為可能會(huì)發(fā)生數(shù)據(jù)傾斜的stage,查看該Stage的detail頁(yè)面。
3). 在detail頁(yè)面可以查看所有executor和該Stage所有Task的運(yùn)行時(shí)間、Shuffle read數(shù)據(jù)量。觀察是否有些Task的shuffle read的數(shù)據(jù)量比其他task的shuffle read的數(shù)據(jù)量明顯大很多。如果是則說(shuō)明有很多數(shù)據(jù)跑到了這個(gè)Task上,也就是說(shuō)這個(gè)Task所在的Stage發(fā)生了數(shù)據(jù)傾斜。
4). 從第3步定位到發(fā)生數(shù)據(jù)傾斜的Stage,然后根據(jù)spark切分Stage的邏輯,找到該Stage的代碼,根據(jù)代碼邏輯進(jìn)一步分析數(shù)據(jù)傾斜的原因。

task信息.png

0x03 數(shù)據(jù)傾斜的解決方案

1. 過(guò)濾掉少數(shù)導(dǎo)致傾斜的key

如果導(dǎo)致傾斜的key對(duì)計(jì)算結(jié)果影響不大的話(例如為null的key), 可以直接過(guò)濾掉這些key。這樣就可以避免數(shù)據(jù)傾斜。
缺點(diǎn):適用場(chǎng)景不多,大多數(shù)情況下,導(dǎo)致數(shù)據(jù)傾斜的key需要參與計(jì)算,是不能過(guò)濾的。

2. 提高shuffle操作的并行度【推薦】

原理:> 在對(duì)RDD執(zhí)行Shuffle算子時(shí),給Shuffle算子增加下游分區(qū)數(shù),如reduceByKey(1000),這樣可以增加Shuffle read task的數(shù)量,可以讓原本分配給一個(gè)task的多個(gè)key分配給多個(gè)task,從而讓每個(gè)task處理比原來(lái)更少的數(shù)據(jù)。
優(yōu)點(diǎn)
實(shí)現(xiàn)簡(jiǎn)單,可以解決多個(gè)數(shù)據(jù)量不多的key分配到一個(gè)task導(dǎo)致的數(shù)據(jù)傾斜問(wèn)題。
缺點(diǎn)
不能解決單個(gè)key數(shù)據(jù)量大導(dǎo)致的數(shù)據(jù)傾斜問(wèn)題。
結(jié)論:發(fā)現(xiàn)數(shù)據(jù)傾斜時(shí),這種方案只能當(dāng)做第一種手段,嘗試使用最簡(jiǎn)單的方法緩解數(shù)據(jù)傾斜,但是不能從根本上解決數(shù)據(jù)傾斜問(wèn)題(無(wú)法解決單個(gè)key的數(shù)據(jù)傾斜問(wèn)題)。

3. 兩階段聚合(局部聚合+全局聚合)【推薦】

原理:將原本相同的key通過(guò)附加隨機(jī)前綴的方式,變成多個(gè)不同的key,就可以讓原本被一個(gè)task處理的數(shù)據(jù)分散到多個(gè)task上去做局部聚合,進(jìn)而解決單個(gè)task數(shù)據(jù)量過(guò)大的問(wèn)題。接著去掉隨機(jī)前綴,再次進(jìn)行全局聚合,就可以得到最終的結(jié)果。
優(yōu)點(diǎn)
對(duì)于聚合類(lèi)的shuffle操作,可以解決單個(gè)key數(shù)據(jù)量大導(dǎo)致數(shù)據(jù)傾斜的問(wèn)題。
缺點(diǎn)
只能適用于聚合類(lèi)的shuffle操作,不能用于join類(lèi)操作。

4. 將reduce join轉(zhuǎn)為mep join【推薦】

原理:消除shuffle過(guò)程
普通join會(huì)走shuffle過(guò)程,將相同key的數(shù)據(jù)拉取到一個(gè)Shuffle read task中再進(jìn)行join,這就是reduce join,但如果一個(gè)RDD比較小,則可以采用廣播小RDD全量數(shù)據(jù)+map算子來(lái)實(shí)現(xiàn),也就是map join,此時(shí)不會(huì)發(fā)生Shuffle過(guò)程,也就不會(huì)發(fā)生數(shù)據(jù)傾斜。
優(yōu)點(diǎn)
對(duì)于處理大表join小表的數(shù)據(jù)傾斜問(wèn)題,效果非常好,可以消除Shuffle過(guò)程,也就根本不會(huì)發(fā)生數(shù)據(jù)傾斜。
缺點(diǎn)
只能適用于大表join小表的場(chǎng)景,另外,需要將小表進(jìn)行廣播,所以比較消耗內(nèi)存,driver和每個(gè)executor內(nèi)存都會(huì)駐留一份小表的全量數(shù)據(jù)。如果廣播的小表數(shù)據(jù)量比較大,也有可能發(fā)生內(nèi)存溢出——所以在使用map join的作業(yè)中需要增大Driver的分配內(nèi)存

5. 分離傾斜key并單獨(dú)進(jìn)行join操作【推薦】

原理:在兩個(gè)大表A,B進(jìn)行join的時(shí)候,其中A表的某些key數(shù)據(jù)量比較大,而B(niǎo)表的這些key數(shù)據(jù)量比較小(不可能同時(shí)存在兩個(gè)表這些key的數(shù)據(jù)量都很大,那樣會(huì)形成笛卡爾積),從兩個(gè)表中分離出這些傾斜key的數(shù)據(jù),此時(shí),兩個(gè)表分離出來(lái)的數(shù)據(jù)分別形成了一張大表A1和一張小表B1,A和B剩下的數(shù)據(jù)形成數(shù)據(jù)均勻的A2和B2,然后A1和B1使用map join,A2和B2使用普通join,因?yàn)锳2和B2的數(shù)據(jù)分布比較均勻所以不會(huì)再造成數(shù)據(jù)傾斜。
優(yōu)點(diǎn):
可以解決兩個(gè)大表join時(shí)的數(shù)據(jù)傾斜問(wèn)題。
缺點(diǎn):
如果A表的傾斜key很多的話,導(dǎo)致B表對(duì)應(yīng)key的數(shù)據(jù)量也很大,最后也不能使用map join,不過(guò)可以將A表的傾斜key拆分成n份,分別進(jìn)行處理。

參考:
https://tech.meituan.com/2016/05/12/spark-tuning-pro.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容