1. 優(yōu)化數(shù)據(jù)結(jié)構(gòu)
2. 修改并行度
1. 改變并行度可以改善數(shù)據(jù)傾斜的原因是因為如果某個task有100個key并且數(shù)據(jù)巨大,那么有可能導(dǎo)致OOM或者任務(wù)運行緩慢;
2. 此時如果把并行度變大,那么可以分解每個task的數(shù)據(jù)量,比如把該task分解給10個task, 那么每個task的數(shù)據(jù)量將變小,從而可以解決OOM或者任務(wù)執(zhí)行慢.
對應(yīng)reduceByKey而言可以傳入并行度參數(shù)也可以自定義partition.
3. 增加并行度:改變計算資源并沒有從根本上解決數(shù)據(jù)傾斜的問題,但是加快了任務(wù)運行的速度.
4. 這是加入有傾斜的key, 加隨機數(shù)前綴,reduceByKey聚合操作可以分而治之,產(chǎn)生的結(jié)果是代前綴的,因此需要map操作去掉前綴,然后在進行reduceByKey操作.
3. 對數(shù)據(jù)做采樣, 對數(shù)據(jù)傾斜的key增加隨機的前綴.
(1) 針對如果傾斜的key比較少:
對與兩個RDD1和RDD2 的join操作, 其中一個RDD, 比如RDD1的數(shù)據(jù)傾斜的key比較少(比如可以通過sample取樣)在三個左右,那么這時候可以把RDD1轉(zhuǎn)換為RDD11(傾斜的key),RDD12(不包含傾斜的key),然后分別和RDD2進行join操作得到的兩個結(jié)果result1,result2再次join產(chǎn)生最終的result.
(2) 針對如果傾斜的key特別多.
如果特別多的key傾斜那么就不需要考慮某一個key了,把所有的key整體考慮即可,需要把整體的數(shù)據(jù)量變大;
比如10億的數(shù)據(jù)變成500億,這時候可以使用flatmap進行擴容,比如
scala> List(1,2,3,4,5)
res0: List[Int] = List(1, 2, 3, 4, 5)
scala> res0.flatMap(x => 1 to x )
res1: List[Int] = List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)
但是具體擴容的數(shù)量要依據(jù)機器的各方面的配置.
第一我們要解決的是數(shù)據(jù)能夠均勻的分布到各個節(jié)點,讓集群能夠正常運行起來.
偽代碼:
對一個rdd使用flatmap,另外一個使用random
val rdd1 = RDD1.flatmap{
for(i <- 1 to 10) {
i+"_"+item;
}
}
val rdd2 = RDD2.map{
val random = Random(10)
random+"_"+item;
}
val result = rdd1.join(rdd2);
result.map{
item.split //去掉前綴.
}
4. 局部聚合+全局聚合
5. ETL
6. 盡量不要產(chǎn)生shuffle
(1) 對小批量的數(shù)據(jù)進行廣播.
針對兩個或者多個RDD進行join操作, 如果其中一個RDDD數(shù)據(jù)比較小可以采用broadcast的方式(然后進行map操作,mappartition 批量加載數(shù)據(jù)進行優(yōu)化) 如果數(shù)據(jù)都比較大的話會給GC帶來負(fù)擔(dān).不建議使用.
(2) 大表適合使用廣播
7. tacheyon
8. 復(fù)用RDD.
9. 從數(shù)據(jù)源頭開始考慮.
(1) 可以把key-values 變?yōu)閗ey-subkey-values
(2) 提取聚集,預(yù)操作join, 把傾斜數(shù)據(jù)在上游進行操作.
(3) 把所有values的值進行組拼然后就可以形成一個單一的key-values.
(4) 針對比如大量的key傾斜,比如數(shù)十萬的key傾斜,最簡單的辦法就是從硬件上去調(diào)整,增加cpu, 內(nèi)存.