spark寫入動態(tài)分區(qū)表小文件過多的問題

spark寫入動態(tài)分區(qū)表小文件過多的問題

在大數(shù)據(jù)中使用動態(tài)分區(qū)表時總是要考慮寫入小文件過多的問題,這個問題分為兩種情況。
1.由于分區(qū)鍵太多導(dǎo)致的小文件問題
2.由于寫入文件太多導(dǎo)致的小文件問題

第一種偏向于業(yè)務(wù)問題或者設(shè)計問題,我們主要討論的是第二種情況。

我們假設(shè)一種場景
sparksql 默認(rèn)分區(qū)數(shù)200 分區(qū)表一共有500個分區(qū) 這樣保存下來需要存200*500 =100000個文件。
如果每個分區(qū)中的數(shù)據(jù)量確實(shí)很大當(dāng)然沒有問題。但是實(shí)際更可能遇到的情況是小部分分區(qū)數(shù)據(jù)量比較大,其他的大部分分區(qū)數(shù)據(jù)量較小。最好的辦法是我們期望可以通過配置一個參數(shù)用來控制輸出文件大小,比如每256m生成一個文件。這樣數(shù)據(jù)量較小的分區(qū)只會生成一個文件,數(shù)據(jù)量大的分區(qū)也不會出現(xiàn)單個文件過大,導(dǎo)致查詢時并行度不足的問題。

如果是用hive,我們可以通過配置以下參數(shù)來達(dá)到目的

set hive.exec.reducers.bytes.per.reducer=67108864;  --設(shè)置每個reducer處理大約64MB數(shù)據(jù)。
set hive.merge.mapfiles=true;  --在Map任務(wù)完成后合并小文件。
set hive.merge.mapredfiles=true; --在Reduce任務(wù)完成后合并小文件。
set hive.merge.smallfiles.avgsize=16000000; --當(dāng)輸出文件的平均大小小于該值時,會啟動一個獨(dú)立的MapReduce任務(wù)進(jìn)行文件merge。
set hive.merge.size.per.task=256*1000*1000; --設(shè)置當(dāng)輸出文件大小小于這個值時,觸發(fā)文件合并。

如果用spark3.0以上版本,我們可以通過配置sparksql的partiton大小來達(dá)到目的

set spark.sql.adaptive.enabled=true  --開啟spark自適應(yīng)優(yōu)化
set spark.sql.adaptive.coalescePartitions.enabled=true --Spark 會根據(jù)目標(biāo)大小(由 指定) spark.sql.adaptive.advisoryPartitionSizeInBytes 合并連續(xù)的隨機(jī)分區(qū),以避免過多的小任務(wù)。
set spark.sql.adaptive.coalescePartitions.minPartitionNum=5  --合并后的最小隨機(jī)分區(qū)數(shù)。如果未設(shè)置,則默認(rèn)值為 Spark 群集的默認(rèn)并行度。此配置僅在同時啟用和 spark.sql.adaptive.coalescePartitions.enabled 啟用時 spark.sql.adaptive.enabled 才有效。
set spark.sql.adaptive.coalescePartitions.initialPartitionNum=200 --合并前的初始隨機(jī)分區(qū)數(shù)。如果未設(shè)置,則等于 spark.sql.shuffle.partitions 。此配置僅在同時啟用和 spark.sql.adaptive.coalescePartitions.enabled 啟用時 spark.sql.adaptive.enabled 才有效。
set spark.sql.adaptive.advisoryPartitionSizeInBytes=64*1024*1024 --自適應(yīng)優(yōu)化期間隨機(jī)分區(qū)的建議大?。ㄒ宰止?jié)為單位)(當(dāng)為 true 時 spark.sql.adaptive.enabled )。當(dāng) Spark 合并小的隨機(jī)分區(qū)或拆分傾斜的隨機(jī)分區(qū)時,它就會生效。

如果使用的是spark2.x,沒有直接的參數(shù),一個比較方便的方案是在寫入前使用coalesce減小寫入分區(qū),之后再通過限制輸出文件行數(shù)的方法防止輸出過大的文件

df.coalesce(5)  //這里設(shè)置了輸出時的文件數(shù),也同時是寫入文件的并行度,需要根據(jù)能接受的小文件數(shù)量酌情配置
  .write
  .option("maxRecordsPerFile",500000)  //設(shè)置了每個文件的最大記錄數(shù),如果過會另起一個文件寫。也可以設(shè)置全局參數(shù)指定spark.sql.files.maxRecordsPerFile
  .mode("append")
  .saveAsTable("existing_hive_table")
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容