1.EliminateOuterJoin 優(yōu)化規(guī)則的應(yīng)用場(chǎng)景
問(wèn)題:為啥需要消除外鏈接即 out join
解答:消除 out join 可以提高執(zhí)行效率。因?yàn)?inner join 只保留左表和右表可以關(guān)聯(lián)到的數(shù)據(jù),left join 需要保留左表全表的數(shù)據(jù),right join 需要保留右表全表的數(shù)據(jù),full join 左右表數(shù)據(jù)都需要保留,所以四種 join 在數(shù)據(jù)處理上的效率:inner join > left join = right join > full join
2.EliminateOuterJoin 源碼解析
private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
lazy val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
lazy val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
join.joinType match {
case RightOuter if leftHasNonNullPredicate => Inner // 1.right outer類(lèi)型,且join的左表有過(guò)濾操作,則轉(zhuǎn)換為inner類(lèi)型
case LeftOuter if rightHasNonNullPredicate => Inner // 2.left outer類(lèi)型,且join的右表有過(guò)濾操作,則轉(zhuǎn)換為inner類(lèi)型
case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner // 3.full outer類(lèi)型,且join的左右表都有過(guò)濾操作,則轉(zhuǎn)換為inner類(lèi)型
case FullOuter if leftHasNonNullPredicate => LeftOuter // 4.full outer類(lèi)型,且join的左表有過(guò)濾操作,則轉(zhuǎn)換為left outer類(lèi)型
case FullOuter if rightHasNonNullPredicate => RightOuter // 5.full outer類(lèi)型,且join的右表有過(guò)濾操作,則轉(zhuǎn)換為right outer類(lèi)型
case o => o
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// 匹配上Filter并且其子節(jié)點(diǎn)為Join的LogicalPlan
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _, _)) =>
val newJoinType = buildNewJoinType(f, j)
// 如果相等,則不符合優(yōu)化條件
if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) // 如果不相等,則改變 JoinType
}
}
問(wèn)題 1:為啥 left out join 的右邊有過(guò)濾條件,則轉(zhuǎn)換為 inner?
解答:left join 的特點(diǎn)是右表沒(méi)有對(duì)應(yīng)的數(shù)據(jù)時(shí)補(bǔ) null。如下所示,現(xiàn)在右表有個(gè)條件 a<1,這說(shuō)明右表為 null 都會(huì)被 a<1 給過(guò)濾掉,此時(shí)和 inner join 是等價(jià)的。
spark-sql> explain extended SELECT* FROM employees LEFT JOIN departments ON employees.dept_id = departments.dept_id where departments.dept_id < 200;
== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('departments.dept_id < 200)
+- 'Join LeftOuter, ('employees.dept_id = 'departments.dept_id)
:- 'UnresolvedRelation [employees], [], false
+- 'UnresolvedRelation [departments], [], false
== Analyzed Logical Plan ==
emp_id: int, emp_name: string, dept_id: int, dept_id: int, dept_name: string, location_id: int
Project [emp_id#102, emp_name#103, dept_id#104, dept_id#105, dept_name#106, location_id#107]
+- Filter (dept_id#105 < 200)
+- Join LeftOuter, (dept_id#104 = dept_id#105)
:- SubqueryAlias spark_catalog.tpcds_text_varchar_5.employees
: +- HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
+- SubqueryAlias spark_catalog.tpcds_text_varchar_5.departments
+- HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]
== Optimized Logical Plan ==
Join Inner, (dept_id#104 = dept_id#105)
:- Filter ((dept_id#104 < 200) AND isnotnull(dept_id#104))
: +- HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
+- Filter (isnotnull(dept_id#105) AND (dept_id#105 < 200))
+- HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]
== Physical Plan ==
*(2) BroadcastHashJoin [dept_id#104], [dept_id#105], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, false] as bigint)),false), [id=#138]
: +- *(1) Filter ((dept_id#104 < 200) AND isnotnull(dept_id#104))
: +- Scan hive tpcds_text_varchar_5.employees [emp_id#102, emp_name#103, dept_id#104], HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
+- *(2) Filter (isnotnull(dept_id#105) AND (dept_id#105 < 200))
+- Scan hive tpcds_text_varchar_5.departments [dept_id#105, dept_name#106, location_id#107], HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]
問(wèn)題 2:為啥規(guī)則 EliminateOuterJoin 需要在謂語(yǔ)下推 PushDownPredicates 之前執(zhí)行?
// Optimizer
def defaultBatches: Seq[Batch] = {
val operatorOptimizationRuleSet =
Seq(
// Operator push down
PushProjectionThroughUnion,
ReorderJoin,
EliminateOuterJoin, // 消除外鏈接
PushDownPredicates, // 謂語(yǔ)下推
// 省略...
}
解答:謂詞下推是指盡量將過(guò)濾條件更貼近數(shù)據(jù)源,使得查詢(xún)過(guò)程可以跳過(guò)無(wú)關(guān)的數(shù)據(jù)。因?yàn)?EliminateOuterJoin 需要根據(jù)原始過(guò)濾條件的位置,進(jìn)行 out join 轉(zhuǎn)換,如果先執(zhí)行謂語(yǔ)下推 PushDownPredicates,會(huì)影響前者,所以消除外鏈接的規(guī)則需要在謂語(yǔ)下推規(guī)則之前執(zhí)行。