【Spark 精選】EliminateOuterJoin 優(yōu)化規(guī)則詳解

1.EliminateOuterJoin 優(yōu)化規(guī)則的應(yīng)用場(chǎng)景

問(wèn)題:為啥需要消除外鏈接即 out join
解答:消除 out join 可以提高執(zhí)行效率。因?yàn)?inner join 只保留左表和右表可以關(guān)聯(lián)到的數(shù)據(jù),left join 需要保留左表全表的數(shù)據(jù),right join 需要保留右表全表的數(shù)據(jù),full join 左右表數(shù)據(jù)都需要保留,所以四種 join 在數(shù)據(jù)處理上的效率:inner join > left join = right join > full join

2.EliminateOuterJoin 源碼解析

  private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
    val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
    val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
    val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))

    lazy val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
    lazy val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)

    join.joinType match {
      case RightOuter if leftHasNonNullPredicate => Inner   // 1.right outer類(lèi)型,且join的左表有過(guò)濾操作,則轉(zhuǎn)換為inner類(lèi)型
      case LeftOuter if rightHasNonNullPredicate => Inner   // 2.left outer類(lèi)型,且join的右表有過(guò)濾操作,則轉(zhuǎn)換為inner類(lèi)型
      case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner   // 3.full outer類(lèi)型,且join的左右表都有過(guò)濾操作,則轉(zhuǎn)換為inner類(lèi)型
      case FullOuter if leftHasNonNullPredicate => LeftOuter   // 4.full outer類(lèi)型,且join的左表有過(guò)濾操作,則轉(zhuǎn)換為left outer類(lèi)型
      case FullOuter if rightHasNonNullPredicate => RightOuter   // 5.full outer類(lèi)型,且join的右表有過(guò)濾操作,則轉(zhuǎn)換為right outer類(lèi)型
      case o => o
    }
  }

  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    // 匹配上Filter并且其子節(jié)點(diǎn)為Join的LogicalPlan
    case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _, _)) =>
      val newJoinType = buildNewJoinType(f, j)
      // 如果相等,則不符合優(yōu)化條件
      if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))   // 如果不相等,則改變 JoinType
  }
}
EliminateOuterJoin執(zhí)行流程.JPG

問(wèn)題 1:為啥 left out join 的右邊有過(guò)濾條件,則轉(zhuǎn)換為 inner?
解答left join 的特點(diǎn)是右表沒(méi)有對(duì)應(yīng)的數(shù)據(jù)時(shí)補(bǔ) null。如下所示,現(xiàn)在右表有個(gè)條件 a<1,這說(shuō)明右表為 null 都會(huì)被 a<1 給過(guò)濾掉,此時(shí)和 inner join 是等價(jià)的。

spark-sql> explain extended SELECT* FROM employees LEFT JOIN departments ON employees.dept_id = departments.dept_id where departments.dept_id < 200;

== Parsed Logical Plan ==
'Project [*]
+- 'Filter ('departments.dept_id < 200)
   +- 'Join LeftOuter, ('employees.dept_id = 'departments.dept_id)
      :- 'UnresolvedRelation [employees], [], false
      +- 'UnresolvedRelation [departments], [], false

== Analyzed Logical Plan ==
emp_id: int, emp_name: string, dept_id: int, dept_id: int, dept_name: string, location_id: int
Project [emp_id#102, emp_name#103, dept_id#104, dept_id#105, dept_name#106, location_id#107]
+- Filter (dept_id#105 < 200)
   +- Join LeftOuter, (dept_id#104 = dept_id#105)
      :- SubqueryAlias spark_catalog.tpcds_text_varchar_5.employees
      :  +- HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
      +- SubqueryAlias spark_catalog.tpcds_text_varchar_5.departments
         +- HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]

== Optimized Logical Plan ==
Join Inner, (dept_id#104 = dept_id#105)
:- Filter ((dept_id#104 < 200) AND isnotnull(dept_id#104))
:  +- HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
+- Filter (isnotnull(dept_id#105) AND (dept_id#105 < 200))
   +- HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]

== Physical Plan ==
*(2) BroadcastHashJoin [dept_id#104], [dept_id#105], Inner, BuildLeft, false
:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, false] as bigint)),false), [id=#138]
:  +- *(1) Filter ((dept_id#104 < 200) AND isnotnull(dept_id#104))
:     +- Scan hive tpcds_text_varchar_5.employees [emp_id#102, emp_name#103, dept_id#104], HiveTableRelation [`tpcds_text_varchar_5`.`employees`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [emp_id#102, emp_name#103, dept_id#104], Partition Cols: []]
+- *(2) Filter (isnotnull(dept_id#105) AND (dept_id#105 < 200))
   +- Scan hive tpcds_text_varchar_5.departments [dept_id#105, dept_name#106, location_id#107], HiveTableRelation [`tpcds_text_varchar_5`.`departments`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [dept_id#105, dept_name#106, location_id#107], Partition Cols: []]

問(wèn)題 2:為啥規(guī)則 EliminateOuterJoin 需要在謂語(yǔ)下推 PushDownPredicates 之前執(zhí)行?

  // Optimizer 
  def defaultBatches: Seq[Batch] = {
    val operatorOptimizationRuleSet =
      Seq(
        // Operator push down
        PushProjectionThroughUnion,
        ReorderJoin,
        EliminateOuterJoin,   // 消除外鏈接
        PushDownPredicates,   // 謂語(yǔ)下推
        // 省略...
  }      

解答:謂詞下推是指盡量將過(guò)濾條件更貼近數(shù)據(jù)源,使得查詢(xún)過(guò)程可以跳過(guò)無(wú)關(guān)的數(shù)據(jù)。因?yàn)?EliminateOuterJoin 需要根據(jù)原始過(guò)濾條件的位置,進(jìn)行 out join 轉(zhuǎn)換,如果先執(zhí)行謂語(yǔ)下推 PushDownPredicates,會(huì)影響前者,所以消除外鏈接的規(guī)則需要在謂語(yǔ)下推規(guī)則之前執(zhí)行。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容