spark寫入動態(tài)分區(qū)表小文件過多的問題
在大數(shù)據(jù)中使用動態(tài)分區(qū)表時總是要考慮寫入小文件過多的問題,這個問題分為兩種情況。
1.由于分區(qū)鍵太多導(dǎo)致的小文件問題
2.由于寫入文件太多導(dǎo)致的小文件問題
第一種偏向于業(yè)務(wù)問題或者設(shè)計問題,我們主要討論的是第二種情況。
我們假設(shè)一種場景
sparksql 默認(rèn)分區(qū)數(shù)200 分區(qū)表一共有500個分區(qū) 這樣保存下來需要存200*500 =100000個文件。
如果每個分區(qū)中的數(shù)據(jù)量確實(shí)很大當(dāng)然沒有問題。但是實(shí)際更可能遇到的情況是小部分分區(qū)數(shù)據(jù)量比較大,其他的大部分分區(qū)數(shù)據(jù)量較小。最好的辦法是我們期望可以通過配置一個參數(shù)用來控制輸出文件大小,比如每256m生成一個文件。這樣數(shù)據(jù)量較小的分區(qū)只會生成一個文件,數(shù)據(jù)量大的分區(qū)也不會出現(xiàn)單個文件過大,導(dǎo)致查詢時并行度不足的問題。
如果是用hive,我們可以通過配置以下參數(shù)來達(dá)到目的
set hive.exec.reducers.bytes.per.reducer=67108864; --設(shè)置每個reducer處理大約64MB數(shù)據(jù)。
set hive.merge.mapfiles=true; --在Map任務(wù)完成后合并小文件。
set hive.merge.mapredfiles=true; --在Reduce任務(wù)完成后合并小文件。
set hive.merge.smallfiles.avgsize=16000000; --當(dāng)輸出文件的平均大小小于該值時,會啟動一個獨(dú)立的MapReduce任務(wù)進(jìn)行文件merge。
set hive.merge.size.per.task=256*1000*1000; --設(shè)置當(dāng)輸出文件大小小于這個值時,觸發(fā)文件合并。
如果用spark3.0以上版本,我們可以通過配置sparksql的partiton大小來達(dá)到目的
set spark.sql.adaptive.enabled=true --開啟spark自適應(yīng)優(yōu)化
set spark.sql.adaptive.coalescePartitions.enabled=true --Spark 會根據(jù)目標(biāo)大小(由 指定) spark.sql.adaptive.advisoryPartitionSizeInBytes 合并連續(xù)的隨機(jī)分區(qū),以避免過多的小任務(wù)。
set spark.sql.adaptive.coalescePartitions.minPartitionNum=5 --合并后的最小隨機(jī)分區(qū)數(shù)。如果未設(shè)置,則默認(rèn)值為 Spark 群集的默認(rèn)并行度。此配置僅在同時啟用和 spark.sql.adaptive.coalescePartitions.enabled 啟用時 spark.sql.adaptive.enabled 才有效。
set spark.sql.adaptive.coalescePartitions.initialPartitionNum=200 --合并前的初始隨機(jī)分區(qū)數(shù)。如果未設(shè)置,則等于 spark.sql.shuffle.partitions 。此配置僅在同時啟用和 spark.sql.adaptive.coalescePartitions.enabled 啟用時 spark.sql.adaptive.enabled 才有效。
set spark.sql.adaptive.advisoryPartitionSizeInBytes=64*1024*1024 --自適應(yīng)優(yōu)化期間隨機(jī)分區(qū)的建議大?。ㄒ宰止?jié)為單位)(當(dāng)為 true 時 spark.sql.adaptive.enabled )。當(dāng) Spark 合并小的隨機(jī)分區(qū)或拆分傾斜的隨機(jī)分區(qū)時,它就會生效。
如果使用的是spark2.x,沒有直接的參數(shù),一個比較方便的方案是在寫入前使用coalesce減小寫入分區(qū),之后再通過限制輸出文件行數(shù)的方法防止輸出過大的文件
df.coalesce(5) //這里設(shè)置了輸出時的文件數(shù),也同時是寫入文件的并行度,需要根據(jù)能接受的小文件數(shù)量酌情配置
.write
.option("maxRecordsPerFile",500000) //設(shè)置了每個文件的最大記錄數(shù),如果過會另起一個文件寫。也可以設(shè)置全局參數(shù)指定spark.sql.files.maxRecordsPerFile
.mode("append")
.saveAsTable("existing_hive_table")