得到 Resolved Logical Plan 后,將進(jìn)入優(yōu)化階段。后續(xù)執(zhí)行邏輯如下:
// 如果緩存中有查詢結(jié)果,則直接替換為緩存的結(jié)果,邏輯不復(fù)雜,這里不再展開講了。
lazy val withCachedData: LogicalPlan = {
assertAnalyzed()
assertSupported()
sparkSession.sharedState.cacheManager.useCachedData(analyzed)
}
// 對Logical Plan 優(yōu)化
lazy val optimizedPlan: LogicalPlan = {
sparkSession.sessionState.optimizer.execute(withCachedData)
}
下面看一下Optimizer:
/**
* Abstract class all optimizers should inherit of, contains the standard batches (extending
* Optimizers can override this.
*/
abstract class Optimizer(sessionCatalog: SessionCatalog)
extends RuleExecutor[LogicalPlan] {
看到Optimizer也是繼承自RuleExecutor,我們就開心了,和Analyzer一個套路,也是遍歷tree,并對每個節(jié)點(diǎn)應(yīng)用rule。下面直接看rules就好了:
def batches: Seq[Batch] = {
val operatorOptimizationRuleSet =
Seq(
// Operator push down
PushProjectionThroughUnion,
ReorderJoin,
EliminateOuterJoin,
PushPredicateThroughJoin,
PushDownPredicate,
LimitPushDown,
ColumnPruning,
InferFiltersFromConstraints,
// Operator combine
CollapseRepartition,
CollapseProject,
CollapseWindow,
CombineFilters,
CombineLimits,
CombineUnions,
// Constant folding and strength reduction
NullPropagation,
ConstantPropagation,
FoldablePropagation,
OptimizeIn,
ConstantFolding,
ReorderAssociativeOperator,
LikeSimplification,
BooleanSimplification,
SimplifyConditionals,
RemoveDispensableExpressions,
SimplifyBinaryComparison,
PruneFilters,
EliminateSorts,
SimplifyCasts,
SimplifyCaseConversionExpressions,
RewriteCorrelatedScalarSubquery,
EliminateSerialization,
RemoveRedundantAliases,
RemoveRedundantProject,
SimplifyCreateStructOps,
SimplifyCreateArrayOps,
SimplifyCreateMapOps,
CombineConcats) ++
extendedOperatorOptimizationRules
val operatorOptimizationBatch: Seq[Batch] = {
val rulesWithoutInferFiltersFromConstraints =
operatorOptimizationRuleSet.filterNot(_ == InferFiltersFromConstraints)
Batch("Operator Optimization before Inferring Filters", fixedPoint,
rulesWithoutInferFiltersFromConstraints: _*) ::
Batch("Infer Filters", Once,
InferFiltersFromConstraints) ::
Batch("Operator Optimization after Inferring Filters", fixedPoint,
rulesWithoutInferFiltersFromConstraints: _*) :: Nil
}
(Batch("Eliminate Distinct", Once, EliminateDistinct) ::
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
// However, because we also use the analyzer to canonicalized queries (for view definition),
// we do not eliminate subqueries or compute current time in the analyzer.
Batch("Finish Analysis", Once,
EliminateSubqueryAliases,
EliminateView,
ReplaceExpressions,
ComputeCurrentTime,
GetCurrentDatabase(sessionCatalog),
RewriteDistinctAggregates,
ReplaceDeduplicateWithAggregate) ::
//////////////////////////////////////////////////////////////////////////////////////////
// Optimizer rules start here
//////////////////////////////////////////////////////////////////////////////////////////
// - Do the first call of CombineUnions before starting the major Optimizer rules,
// since it can reduce the number of iteration and the other rules could add/move
// extra operators between two adjacent Union operators.
// - Call CombineUnions again in Batch("Operator Optimizations"),
// since the other rules might make two separate Unions operators adjacent.
Batch("Union", Once,
CombineUnions) ::
Batch("Pullup Correlated Expressions", Once,
PullupCorrelatedPredicates) ::
Batch("Subquery", Once,
OptimizeSubqueries) ::
Batch("Replace Operators", fixedPoint,
ReplaceIntersectWithSemiJoin,
ReplaceExceptWithFilter,
ReplaceExceptWithAntiJoin,
ReplaceDistinctWithAggregate) ::
Batch("Aggregate", fixedPoint,
RemoveLiteralFromGroupExpressions,
RemoveRepetitionFromGroupExpressions) :: Nil ++
operatorOptimizationBatch) :+
Batch("Join Reorder", Once,
CostBasedJoinReorder) :+
Batch("Decimal Optimizations", fixedPoint,
DecimalAggregates) :+
Batch("Object Expressions Optimization", fixedPoint,
EliminateMapObjects,
CombineTypedFilters) :+
Batch("LocalRelation", fixedPoint,
ConvertToLocalRelation,
PropagateEmptyRelation) :+
// The following batch should be executed after batch "Join Reorder" and "LocalRelation".
Batch("Check Cartesian Products", Once,
CheckCartesianProducts) :+
Batch("RewriteSubquery", Once,
RewritePredicateSubquery,
ColumnPruning,
CollapseProject,
RemoveRedundantProject)
}
優(yōu)化的rule很多,需要sql優(yōu)化經(jīng)驗(yàn)才能看懂了。
咱們以sql中最常見的優(yōu)化謂詞下推為例,謂詞下推的介紹可以看這里:https://cloud.tencent.com/developer/article/1005925
執(zhí)行的sql為:"SELECT A1.B FROM A1 JOIN A2 ON A1.B = A2.B WHERE A1.B = 'Andy'"
優(yōu)化前:
`Project [B#6]`
`+- Filter (B#6 = Andy)`
+- Join Inner, (B#6 = B#8)
:- SubqueryAlias a1
: +- Relation[B#6] json
+- SubqueryAlias a2
`+- Relation[B#8] json`
優(yōu)化后:
`Project [B#6]`
`+- Join Inner, (B#6 = B#8)`
:- Filter (isnotnull(B#6) && (B#6 = Andy))
: +- Relation[B#6] json
+- Filter (isnotnull(B#8) && (B#8 = Andy))
`+- Relation[B#8] json`
明顯可以看到Filter的下推優(yōu)化。起作用的rule是PushPredicateThroughJoin和InferFiltersFromConstraints
下面著重看一下PushPredicateThroughJoin的關(guān)鍵代碼:
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// push the where condition down into join filter
// match 這個結(jié)構(gòu)
case f @ Filter(filterCondition, Join(left, right, joinType, joinCondition)) =>
val (leftFilterConditions, rightFilterConditions, commonFilterCondition) =
split(splitConjunctivePredicates(filterCondition), left, right)
joinType match {
// 是 inner join
case _: InnerLike =>
// left下推為Filter
// push down the single side `where` condition into respective sides
val newLeft = leftFilterConditions.
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
// right下推為Filter
val newRight = rightFilterConditions.
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
val (newJoinConditions, others) =
commonFilterCondition.partition(canEvaluateWithinJoin)
val newJoinCond = (newJoinConditions ++ joinCondition).reduceLeftOption(And)
// 最終的優(yōu)化結(jié)果
val join = Join(newLeft, newRight, joinType, newJoinCond)
if (others.nonEmpty) {
Filter(others.reduceLeft(And), join)
} else {
join
}
Optimizer就介紹到這里,感興趣大家可以多看看其他的優(yōu)化規(guī)則,對sql肯定有更深刻的理解。