1.spark_shuffle_分區(qū)數(shù)
spark_sql aqe 優(yōu)化
SparkSQL中,基于SQL分析或者DSL分析,執(zhí)行Job時,如果產(chǎn)生Shuffle,默認分區(qū)數(shù):200
【實際生產(chǎn)環(huán)境中必須合理設(shè)置shuffle時分區(qū)數(shù)目】
> spark.sql.shuffle.partitions
【代碼中設(shè)置】
spark = SparkSession.builder \
.appName("RetailAnalysis") \
.master("local[4]") \
.config("spark.sql.shuffle.partitions", "4") \
.getOrCreate()
注意:
在Spark 3之前,每個應用都需要通過測試,合理設(shè)置分區(qū)數(shù)目。
【從Spark 3 開始,SparkSQL底層實現(xiàn)自適應調(diào)整性能優(yōu)化】
當Job運行時,自動依據(jù)Shuffle時數(shù)據(jù)量,合理設(shè)置Shuffle時分區(qū)數(shù)。
【官方文檔】
https://spark.apache.org/docs/3.1.2/sql-performance-tuning.html#coalescing-post-shuffle-partitions
【參數(shù)】
參數(shù)1:表示是否啟用自適應調(diào)整機制
spark.sql.adaptive.enabled=true
參數(shù)2:表示Shuffle后分區(qū)數(shù)目
spark.sql.adaptive.coalescePartitions.enabled=true
其他參數(shù):
最小分區(qū)數(shù)目
spark.sql.adaptive.coalescePartitions.minPartitionNum
初始化分區(qū)數(shù)目
spark.sql.adaptive.coalescePartitions.initialPartitionNum
每個分區(qū)數(shù)據(jù)處理數(shù)據(jù)量
spark.sql.adaptive.advisoryPartitionSizeInBytes=64m
2_spark項目調(diào)優(yōu)方案
問題: 問題:Hive、Spark做了哪些優(yōu)化?/ 項目中做了哪些優(yōu)化?
我將項目內(nèi)的spark調(diào)優(yōu)分為兩個
pyspark 開發(fā)調(diào)優(yōu)
1.使用高性能的rdd算子
如 mappartition 和 foreachpartition
2.使用窄依賴算子替代寬依賴的算子
如 降低分區(qū) 使用coalesce代替repartition
3.適當增加和減少分區(qū)
數(shù)據(jù)量大,分區(qū)少,調(diào)大分區(qū):讀取數(shù)據(jù)
數(shù)量量小,分區(qū)多,降低分區(qū):聚合之后
4.適當使用 緩存機制 及時釋放內(nèi)存
RDD或者DataFrame或者TmpView使用多次:先緩存persist
后續(xù)代碼中不再需要時,就釋放緩存
5.盡量使用map端聚合的算子
列如 reducebykey agg..bykey 代替 groupbykey
6.當大表join小表時 使用廣播join
-
Sql優(yōu)化 遵循謂詞下推思想
將需要過濾的數(shù)據(jù)或者無用的數(shù)據(jù)在進行復雜操作之前提前進行過濾
列如
需要過濾:where/having:能用where就不用having ,where是分組前 having 是分組后,where前不能加聚合,having 后邊可以加聚合
row_nubmer實現(xiàn)去重:減少計算的數(shù)據(jù)量
舉例
無用數(shù)據(jù):訂單表1000萬,商品表100萬,需求將兩張表進行join
假設(shè)已經(jīng) 賣出去60w 種商品
- 訂單表:tb_order:oid、pid
- 商品表:tb_goods:pid、pname
分析 商品表內(nèi)的一百萬條數(shù)據(jù) 在訂單表中不一定全有, 只有賣出去的部分才需要join
所以 1_找出所有已經(jīng)賣出的商品單號 表A 60w
select distinct pid from tb_order
2_篩選出賣出商品的詳細信息 表B 60w
select
pid,pname
from tb_goods B join A on A.pid = B.pid
3_step3:用訂單表和B進行關(guān)聯(lián)
select
a.oid,
a.pid,
B.pname
from tb_order a
join B on a.pid = B.pid
-
結(jié)構(gòu)優(yōu)化 文件存儲類型、分區(qū)結(jié)構(gòu)化
分區(qū)表:采用靜態(tài)分區(qū)
select count(*) from table1 where dt = '2021-11-09'; --走分區(qū)裁剪過濾查詢
-- 不支持join中動態(tài)分區(qū)裁剪
select
*
from table1 a join table2 b on a.dt = b.dt
and a.dt = '2021-11-09' and b.dt = '2021-11-09';
- 合適文件類型:parquet、orc【索引】
- 合適壓縮類型:snappy、lz4、lzo
Spark 3.0新特性
-
動態(tài)分區(qū)裁剪(Dynamic Partition Pruning)
默認的分區(qū)裁剪只有在單表查詢過濾時才有效
-
開啟動態(tài)分區(qū)裁剪:自動在Join時對兩邊表的數(shù)據(jù)根據(jù)條件進行查詢過濾,將過濾后的結(jié)果再進行join
spark.sql.optimizer.dynamicPartitionPruning.enabled=true
開啟動態(tài)分區(qū)裁剪之后,過濾條件都會先執(zhí)行 不論是on 還是 where 還是 主副表的條件都會執(zhí)行
-
自適應查詢執(zhí)行(Adaptive Query Execution)
基于CBO優(yōu)化器引擎:實現(xiàn)最小代價的數(shù)據(jù)處理
自動根據(jù)統(tǒng)計信息設(shè)置Reducer【ShuffleRead】的數(shù)量來避免內(nèi)存和I/O資源的浪費
自動選擇更優(yōu)的join策略來提高連接查詢性能
-
自動優(yōu)化join數(shù)據(jù)來避免不平衡查詢造成的數(shù)據(jù)傾斜,將數(shù)據(jù)傾斜的數(shù)據(jù)自動重分區(qū)
spark.sql.adaptive.enabled=true
cbo 將 catalyst 優(yōu)化器優(yōu)化后的邏輯計劃 轉(zhuǎn)換為物理計劃
-
加速器感知調(diào)度(Accelerator-aware Scheduling)(不重要)
- 添加原生的 GPU 調(diào)度支持,該方案填補了 Spark 在 GPU 資源的任務(wù)調(diào)度方面的空白
- 有機地融合了大數(shù)據(jù)處理和 AI 應用,擴展了 Spark 在深度學習、信號處理和各大數(shù)據(jù)應用的應用場景
項目總結(jié)問題
問題1: 數(shù)據(jù)導出不滿足原子性
場景:使用sqoop將報表數(shù)據(jù)導出到mysql中時,有一部分map task 失敗了,導致數(shù)據(jù)只導出了一部分
需求,我們希望如果 要失敗都失敗,mysql里面一點數(shù)據(jù)也沒有,要成功都成功
解決 :在sqoop導入的時候添加一個參數(shù) --staging-table
原理 :sqoop將這張表的數(shù)據(jù)導入到一張臨時表中,如果成功了,就將數(shù)據(jù)從臨時表中放到目標表中,如果失敗了就刪除目標表
問題2 數(shù)據(jù)量不一致的情況
場景 hive表中的數(shù)據(jù)量于數(shù)據(jù)源oracle中的表數(shù)據(jù)量不一致
原因, sqoop在導入hive中 如果以textfile格式存儲的話,會默認將\t\n等特殊字符當作 換行符生成文本
解決 ,不以textfile格式進行存儲 (avro orc格式)
問題3 spark的數(shù)據(jù)傾斜
現(xiàn)象 :部分task 執(zhí)行時間過長
定位:從4040監(jiān)控中看到某個Stage中的Task運行時間遠高于別的Task
rdd.groupByKey.reduceByKey.sortByKey.join
解決
提高Shuffle過程中的并行度:增加分區(qū)個數(shù)
選用帶有分區(qū)內(nèi)聚合的算子:帶有Map端聚合的算子,減少Shuffle Read的數(shù)據(jù)量
將小表數(shù)據(jù)進行廣播,實現(xiàn)廣播Join:類似于Map Join
采樣抽取傾斜的數(shù)據(jù),單獨Join,最后Union合并:類似于Hive中Skew Join
增加隨機前綴:增加隨機的前綴,實現(xiàn)隨機分區(qū)
自定義分區(qū)器:默認的是HashPartition
小表擴大N倍,大表增加1 ~ N的隨機前綴再做Join
問題4小文件問題
- 每個Task會產(chǎn)生一個結(jié)果文件
- Task個數(shù)根據(jù)分區(qū)個數(shù)來決定
- 分區(qū)多,每個分區(qū)的數(shù)據(jù)少
- 調(diào)整分區(qū)個數(shù):coalesce
問題4:ThriftServer資源不足,GC問題
本質(zhì) ThriftServer本身是一個spark 應用 項目中所有SQL計算都是提交給ThriftServer,計算結(jié)果都會返回給 Driver
會導致 Driver 內(nèi)存不足
解決 給Driver 分配更多的內(nèi)存
start-thriftserver.sh \--name sparksql-thrift-server \--master yarn \--deploy-mode client \--driver-memory 1g \--hiveconf hive.server2.thrift.http.port=10001 \--num-executors 3 \--executor-memory 1g \--conf spark.sql.shuffle.partitions=2
問題5:ThriftServer單點故障問題