前言
由前面博客我們知道了SparkSql整個解析流程如下:
- sqlText 經(jīng)過 SqlParser 解析成 Unresolved LogicalPlan;
- analyzer 模塊結(jié)合catalog進(jìn)行綁定,生成 resolved LogicalPlan;
- optimizer 模塊對 resolved LogicalPlan 進(jìn)行優(yōu)化,生成 optimized LogicalPlan;
- SparkPlan 將 LogicalPlan 轉(zhuǎn)換成PhysicalPlan;
- prepareForExecution()將 PhysicalPlan 轉(zhuǎn)換成可執(zhí)行物理計(jì)劃;
- 使用 execute()執(zhí)行可執(zhí)行物理計(jì)劃;
詳解optimizer 模塊
optimizer 以及之后的模塊都只會在觸發(fā)了action操作后才會執(zhí)行。優(yōu)化器是用來將Resolved LogicalPlan轉(zhuǎn)化為optimized LogicalPlan的。
optimizer 就是根據(jù)大佬們多年的SQL優(yōu)化經(jīng)驗(yàn)來對語法樹進(jìn)行優(yōu)化,比如謂詞下推、列值裁剪、常量累加等。優(yōu)化的模式和Analyzer非常相近,Optimizer 同樣繼承了RuleExecutor,并定義了很多優(yōu)化的Rule:
def batches: Seq[Batch] = {
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
// we do not eliminate subqueries or compute current time in the analyzer.
Batch("Finish Analysis", Once,
EliminateSubqueryAliases,
EliminateView,
ReplaceExpressions,
ComputeCurrentTime,
GetCurrentDatabase(sessionCatalog),
RewriteDistinctAggregates,
ReplaceDeduplicateWithAggregate) ::
//////////////////////////////////////////////////////////////////////////////////////////
// Optimizer rules start here
//////////////////////////////////////////////////////////////////////////////////////////
// - Do the first call of CombineUnions before starting the major Optimizer rules,
// since it can reduce the number of iteration and the other rules could add/move
// extra operators between two adjacent Union operators.
// - Call CombineUnions again in Batch("Operator Optimizations"),
// since the other rules might make two separate Unions operators adjacent.
Batch("Union", Once,
CombineUnions) ::
Batch("Pullup Correlated Expressions", Once,
PullupCorrelatedPredicates) ::
Batch("Subquery", Once,
OptimizeSubqueries) ::
Batch("Replace Operators", fixedPoint,
ReplaceIntersectWithSemiJoin,
ReplaceExceptWithAntiJoin,
ReplaceDistinctWithAggregate) :: // aggregate替換distinct
Batch("Aggregate", fixedPoint,
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions) ::
Batch("Operator Optimizations", fixedPoint, Seq(
// Operator push down
PushProjectionThroughUnion, //謂詞下推
ReorderJoin(conf),
EliminateOuterJoin(conf),
PushPredicateThroughJoin,
PushDownPredicate,
LimitPushDown(conf),
ColumnPruning, //列剪裁
InferFiltersFromConstraints(conf),
// Operator combine
CollapseRepartition,
CollapseProject,
CollapseWindow,
CombineFilters, //合并filter
CombineLimits, //合并limit
CombineUnions,
// Constant folding and strength reduction
NullPropagation(conf), //null處理
FoldablePropagation,
OptimizeIn(conf), // 關(guān)鍵字in的優(yōu)化,替代為InSet
ConstantFolding, //針對常量的優(yōu)化,在這里會直接計(jì)算可以獲得的常量
ReorderAssociativeOperator,
LikeSimplification, //表達(dá)式簡化
BooleanSimplification,
SimplifyConditionals,
RemoveDispensableExpressions,
SimplifyBinaryComparison,
PruneFilters(conf),
EliminateSorts,
SimplifyCasts,
SimplifyCaseConversionExpressions,
RewriteCorrelatedScalarSubquery,
EliminateSerialization,
RemoveRedundantAliases,
RemoveRedundantProject,
SimplifyCreateStructOps,
SimplifyCreateArrayOps,
SimplifyCreateMapOps) ++
extendedOperatorOptimizationRules: _*) ::
Batch("Check Cartesian Products", Once,
CheckCartesianProducts(conf)) ::
Batch("Join Reorder", Once,
CostBasedJoinReorder(conf)) ::
Batch("Decimal Optimizations", fixedPoint, //精度優(yōu)化
DecimalAggregates(conf)) ::
Batch("Object Expressions Optimization", fixedPoint,
EliminateMapObjects,
CombineTypedFilters) ::
Batch("LocalRelation", fixedPoint,
ConvertToLocalRelation,
PropagateEmptyRelation) ::
Batch("OptimizeCodegen", Once,
OptimizeCodegen(conf)) ::
Batch("RewriteSubquery", Once,
RewritePredicateSubquery,
CollapseProject) :: Nil
}
batch的執(zhí)行和analyzer一樣是通過RuleExecutor的execute方法依次遍歷,這里不再解析。這里有部分優(yōu)化的例子