Hive、Spark SQL任務(wù)參數(shù)調(diào)優(yōu)

動態(tài)分區(qū)

參數(shù) 說明
hive.exec.dynamic.partition 是否開啟動態(tài)分區(qū),默認(rèn)是false。如果要開啟動態(tài)分區(qū),就設(shè)置為true
hive.exec.dynamic.partition.mode 動態(tài)分區(qū)模式,默認(rèn)是strict。也可以改為nonstrict,表示允許所有的分區(qū)字段都可以使用動態(tài)分區(qū),需要結(jié)合hive.exec.dynamic.partition=true一起使用。不過即使開啟動態(tài)分區(qū),首個分區(qū)字段也必須是靜態(tài)字段
hive.exec.max.dynamic.partitions 可以創(chuàng)建的最大分區(qū)數(shù),如果實際分區(qū)超過了就會報錯

使用案例:

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=5000;

資源申請

參數(shù) 說明
spark.executor.cores 每個executor上的槽位個數(shù)
spark.executor.memory executor的內(nèi)存總量,executor上所有的core共享
yarn中container的內(nèi)存限制為spark.executor.memory+spark.yarn.executor.memoryOverhead<=16G
spark.yarn.executor.memoryOverhead executor堆外內(nèi)存大小,直接由yarn控制,單位是MB
yarn中container的內(nèi)存限制為spark.executor.memory+spark.yarn.executor.memoryOverhead<=16G
spark.driver.memory driver的內(nèi)存總量,主要是存放任務(wù)過程中的shuffle元數(shù)據(jù),以及任務(wù)中collect的數(shù)據(jù)。Broadcast的小表也會先存放在driver中
yarn中container的內(nèi)存限制為spark.driver.memory+spark.yarn.driver.memoryOverhead<=16G
spark.yarn.driver.memoryOverhead driver堆外內(nèi)存大小,直接由yarn控制,單位是MB
yarn中container的內(nèi)存限制為spark.driver.memory+spark.yarn.driver.memoryOverhead<=16G
spark.memory.fraction storage memory+execution memory占總內(nèi)存(java heap-reserved memory)的比例
這里說明下,executor jvm中內(nèi)存分為storage、execution和other內(nèi)存。storage存放緩存RDD數(shù)據(jù),execution存放shuffle過程的中間數(shù)據(jù),other存放用戶定義的數(shù)據(jù)結(jié)構(gòu)或spark內(nèi)部元數(shù)據(jù)。如果用戶自定義數(shù)據(jù)結(jié)構(gòu)較少,可以將該參數(shù)比例適當(dāng)上調(diào)

使用案例:

set spark.executor.cores=2;
set spark.executor.memory=4G;
set spark.yarn.executor.memoryOverhead=1024;
set spark.driver.memory=8G;
set spark.yarn.driver.memoryOverhead=1024;
set spark.memory.fraction=0.7;

Executor動態(tài)申請

參數(shù) 說明
spark.dynamicAllocation.enabled 是否開啟動態(tài)資源分配,強烈建議開啟
spark.dynamicAllocation.maxExecutors 開啟動態(tài)資源分配后,同一時刻可以申請的最大executor數(shù)
spark.dynamicAllocation.minExecutors 開啟動態(tài)資源分配后,同一時刻可以申請的最小executor數(shù)

使用案例:

set spark.dynamicAllocation.enabled=true;
set spark.dynamicAllocation.maxExecutors=1000;
set spark.dynamicAllocation.minExecutors=400;

ORC文件性能優(yōu)化

參數(shù) 說明
spark.sql.orc.filterPushdown ORC謂詞下推,默認(rèn)是關(guān)閉
spark.sql.orc.splits.include.file.footer 開啟后,在split劃分時會使用footer信息
spark.sql.orc.cache.stripe.details.size 設(shè)置每個stripe可以緩存的大小
spark.sql.hive.metatorePartitionPruning 當(dāng)為true,Spark SQL的謂語將被下推到Hive Metastore中,更早的消除不匹配的分區(qū)

使用案例:

set spark.sql.orc.filterPushdown=true;
set spark.sql.orc.splits.include.file.footer=true;
set spark.sql.orc.cache.stripe.details.size=10000;
set spark.sql.hive.metastorePartitionPruning=true;

補充說明:ORC文件存儲格式如下表

ORC文件存儲格式.png
Postscript是文件描述信息,包括file footer和元數(shù)據(jù)長度,文件版本,壓縮格式等
Footer就是文件的元數(shù)據(jù)信息,包括數(shù)據(jù)量、每列的統(tǒng)計信息等
而文件中數(shù)據(jù)主要是分為stripe,每個stripe包括索引數(shù)據(jù)、行數(shù)據(jù)和stripe footer
具體ORC文檔可參考ORC Specification v1

文件輸入輸出

參數(shù) 說明
spark.hadoop.hive.exec.orc.split.strategy 控制在讀取ORC表時生成split的策略:
?1.BI:以文件尾粒度進(jìn)行split劃分
?2.ETL:將文件進(jìn)行切分,多個stripe組成一個split
?3.HYBRID:當(dāng)文件的平均大小大于hadoop最大split值時采用ETL策略,否則采用BI策略
較大的ORC表,可能其footer較大,ETL策略可能導(dǎo)致從HDFS拉取大量數(shù)據(jù)進(jìn)行split,甚至導(dǎo)致driver端OOM,此時建議使用BI策略;較小的尤其有數(shù)據(jù)傾斜的表,建議使用ETL策略
spark.hadoop.mapreduce.input.fileinputformat.split.minsize 讀ORC表時,設(shè)置小文件合并的閾值,低于該值的split會合并在一個task中執(zhí)行
spark.hadoop.mapreduce.input.fileinputformat.split.maxsize 讀ORC表時,設(shè)置一個split的最大閾值,大于該值的split會切分成多個split
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 文件提交到HDFS上的算法:
?1.version=1是按照文件提交
?2.version=2是批量按照目錄進(jìn)行提交,可以極大節(jié)約文件提交到HDFS的時間,減輕NameNode壓力

使用案例:

set spark.hadoop.hive.exec.orc.split.strategy=ETL;
set spark.hadoop.mapreduce.input.fileinputformat.split.minsize=67108864;
set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=268435456;
set spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2;

小文件合并

參數(shù) 說明
spark.sql.mergeSmallFileSize 小文件合并閾值,如果生成的文件平均大小低于閾值會額外啟動一輪stage進(jìn)行小文件的合并,默認(rèn)不合并小文件
spark.sql.targetBytesInPartitionWhenMerge 設(shè)置額外的合并job時的map端輸入size
spark.hadoopRDD.targetBytesInPartition 設(shè)置map端輸入的合并文件大小

使用案例

set spark.sql.mergeSmallFileSize=67108864;
set spark.sql.targetBytesInPartitionWhenMerge=67108864;
set spark.hadoopRDD.targetBytesInPartition=67108864;

補充說明:

在決定一個目錄是否需要合并小文件時,會統(tǒng)計目錄下的平均大小,然后和spark.sql.mergeSmallFileSize比較
在合并文件時,一個map task讀取的數(shù)據(jù)量取決于下面三者的較大值:
1、spark.sql.mergeSmallFileSize
2、spark.sql.targetBytesInPartitionWhenMerge
3、spark.hadoopRDD.targetBytesInPartition

Shuffle

參數(shù) 說明
spark.sql.autoBroadcastJoinThreshold 小表join自動開啟廣播機制時小表的閾值,會從Hive Metastore中獲取表統(tǒng)計信息。當(dāng)設(shè)置為-1時會禁用廣播
spark.sql.shuffle.partitions 設(shè)置reduce階段的分區(qū)數(shù).設(shè)置過大可能導(dǎo)致很多reducer同時向一個mapper拉取數(shù)據(jù),導(dǎo)致mapper由于請求壓力過大而掛掉或響應(yīng)緩慢,從而fetch failed
spark.reducer.maxSizeInFlight 同一時刻一個reducer可以同時拉取的數(shù)據(jù)量大小
spark.reducer.maxReqsInFlight 同一時刻一個reducer可以同時產(chǎn)生的請求數(shù)
spark.reducer.maxBlocksInFlightPerAddress 同一時刻一個reducer向同一個上游executor拉取的最多block數(shù)
spark.reducer.maxReqSizeShuffleToMem shufle請求的block超過該閾值就會強制落盤,防止一大堆并發(fā)請求將內(nèi)存占滿
spark.shuffle.io.connectionTimeout shuffle中連接超時時間,超過該時間會fetch failed
spark.shuffle.io.maxRetries shuffle中拉取數(shù)據(jù)的最大重試次數(shù)
spark.shuffle.io.retryWait shuffle重試的等待間隔

使用案例:

set spark.sql.autoBroadcastJoinThreshold=33554432;
set spark.sql.shuffle.partitions=5000;
set spark.reducer.maxSizeInFlight=25165824;
set spark.reducer.maxReqsInFlight=10;
set spark.reducer.maxBlocksInFlightPerAddress=1;
set spark.reducer.maxReqSizeShuffleToMem=536870911;
set spark.shuffle.io.connectionTimeout=120;
set spark.shuffle.io.maxRetries=3;
set spark.shuffle.io.retryWait=5;

Adaptive Execution

參數(shù) 說明
spark.sql.adaptive.enabled 開啟動態(tài)執(zhí)行
spark.sql.adaptive.shuffle.targetPostShuffleInputSize 設(shè)置每個Reducer讀取的目標(biāo)數(shù)據(jù)量,會將低于該值的partition進(jìn)行合并
spark.sql.adaptive.join.enabled 開啟動態(tài)調(diào)整Join
spark.sql.adaptiveBroadcstJoinThreshold 設(shè)置SortMergeJoin轉(zhuǎn)BroadcastJoin的閾值,如果不設(shè)置該參數(shù),該閾值和spark.sql.autoBroadcastJoinThreshold值相等
spark.sql.adaptive.allowAdditionalShuffle 是否允許為了優(yōu)化Join而增加Shuffle,默認(rèn)是false
spark.sql.adaptive.skewedJoin.enabled 開啟自動處理Join時的數(shù)據(jù)傾斜
spark.sql.adaptive.skewedPartitionMaxSplits 控制處理一個傾斜Partition的task個數(shù)上限,默認(rèn)值是5
spark.sql.adaptive.skewedPartitionRowCountThreshold 設(shè)置一個Partition被視為傾斜Partition的行數(shù)下限,行數(shù)低于該值的Partition不會被當(dāng)做傾斜Partition處理
spark.sql.adaptive.skewedPartitionSizeThreshold 設(shè)置一個Partition被視為傾斜Partition的大小下限,大小小于該值的Partition不會被當(dāng)做傾斜Partition處理
spark.sql.adaptive.skewedPartitionFactor 設(shè)置傾斜因子,當(dāng)一個Partition滿足以下兩個條件之一,就會被視為傾斜Partition:
1. 大小大于spark.sql.adaptive.skewedPartitionSizeThreshold的同時大于各Partition大小中位數(shù)與該因子的乘積
2. 行數(shù)大于spark.sql.adaptive.skewedRowCountThreshold的同時大于各Partition行數(shù)中位數(shù)與該因子的乘積

使用案例:

set spark.sql.adaptive.enabled=true;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=268435456;
set spark.sql.adaptive.join.enabled=true;
set spark.sql.adaptiveBroadcastJoinThreshold=33554432;
set spark.sql.adaptive.allowAddititionalShuffle=false;
set spark.sql.adaptive.skewedJoin.enabled=true;
set spark.sql.adaptive.skewedPartitionMaxSplits=100;
set spark.sql.adaptive.skewedPartitionRowCountThreshold=10000000;
set spark.sql.adaptive.skewedPartitionSizeThreshold=536870912;
set spark.sql.adaptive.skewedPartitionFactor=10;

補充說明:

  1. 自動設(shè)置Shuffle Partition個數(shù)功能已經(jīng)發(fā)布,而動態(tài)調(diào)整執(zhí)行計劃和自動處理數(shù)據(jù)傾斜 還不確定是否已經(jīng)發(fā)布(Spark Adaptive Execution調(diào)研
  2. 自動設(shè)置Shuffle Partition功能(spark.sql.adaptive.shuffle.targetPostShuffleInputSize)慎用,可能出現(xiàn)極端情況而導(dǎo)致耗時變長。比如,筆者有次在一個任務(wù)中使用該功能,當(dāng)時讀取表數(shù)據(jù)量是千億級別的,經(jīng)過shuffle處理(broadcast join)后最終的結(jié)果數(shù)據(jù)量在萬級別,大小在幾十kb,因此shuffle后的分區(qū)數(shù)變成了1,但是shuffle前的數(shù)據(jù)量是很大的,因此分區(qū)數(shù)也達(dá)到了筆者設(shè)置的上限5000。也就是說,最后是一個task去拉取5000個task寫好的shuffle文件,雖然這些shuffle文件大多很小,甚至是沒有,但是整個拉取過程依然是非常耗時。而我之所以用這個功能的原因是只是為了小文件合并,完全沒有必要用這個參數(shù),將這個功能參數(shù)注釋掉,換成小文件合并相關(guān)的參數(shù)后,任務(wù)執(zhí)行時長就下降了十幾分鐘
  3. 自動處理數(shù)據(jù)傾斜原理:根據(jù)分區(qū)數(shù)據(jù)大小和條數(shù)來判斷該分區(qū)是否為傾斜分區(qū),如果是傾斜分區(qū),就會將該分區(qū)對應(yīng)task由1個改為多個,而另一部分的相應(yīng)partition數(shù)據(jù)全量傳輸。比如,表a的partition0有傾斜情況,表b的partition0沒有數(shù)據(jù)傾斜情況,此時會將表a的partition0分為多個部分傳到不同的core上處理,同時表b的partition0數(shù)據(jù)也會全量shuffle到這些core上,因此表b的partition0最好不要數(shù)據(jù)量過大(Adaptive Execution讓Spark SQL更高效更智能

推測執(zhí)行

參數(shù) 說明
spark.speculation spark推測執(zhí)行開關(guān),默認(rèn)是true
spark.speculation.interval 開啟推測執(zhí)行后,每隔該值時間會檢測是否有需要推測執(zhí)行的task
spark.speculation.quantile
spark.speculation.multiplier
當(dāng)成功task占總task的比例超過spark.speculation.quantile,統(tǒng)計成功task運行時間中位數(shù)乘以spark.speculation.multiplier得到推測執(zhí)行閾值,當(dāng)在運行的任務(wù)超過這個閾值就會啟動推測執(zhí)行。當(dāng)資源充足時,可以適當(dāng)減小這兩個值

使用案例:

set spark.speculation=true;
set spark.speculation.interval=1000ms;
set spark.speculation.quantile=0.99;
set spark.speculation.multiplier=3;
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容