Spark Sql 源碼剖析(二): TreeNode

零、前置知識(shí) Scala Product trait

// 所有 products 的基trait,至少包含 [[scala.Product1]] 至 [[scala.Product22]] 及 [[scala.Tuple1]] 至 [[scala.Tuple22]]
trait Product extends Any with Equals {
  // 第 n 個(gè)元素,從0開(kāi)始
  def productElement(n: Int): Any

  // product size
  def productArity: Int

  // product 遍及所有元素的迭代器
  def productIterator: Iterator[Any] = new scala.collection.AbstractIterator[Any] {
    private var c: Int = 0
    private val cmax = productArity
    def hasNext = c < cmax
    def next() = { val result = productElement(c); c += 1; result }
  }
}

一、CurrentOrigin

使用 object CurrentOrigin 為 TreeNodes 提供一個(gè)可以查找上下文的地方,比如當(dāng)前正在解析哪行 code。

// Origin 表示第幾行第幾列
case class Origin(
  line: Option[Int] = None,
  startPosition: Option[Int] = None)

object CurrentOrigin 主要包含一個(gè) private val value = new ThreadLocal[Origin]() ,目前 CurrentOrigin 僅在 parser 中使用,在 visit 每個(gè)節(jié)點(diǎn)的時(shí)候都會(huì)使用,記錄當(dāng)前 parse 的節(jié)點(diǎn)是哪行哪列

另外,從 value 是 ThreadLocal 類(lèi)型可以看出,在 Spark SQL 中,parse sql 時(shí)都是在單獨(dú)的 thread 里進(jìn)行的(不同的 sql 不同的 thread)

二、重要方法

2.1、children: Seq[BaseType](由子類(lèi)實(shí)現(xiàn))

返回該節(jié)點(diǎn)的 seq of children,children 是不可變的。有三種情況:

  • LeafNode:無(wú) children
  • UnaryNode:包含一個(gè) child
  • BinaryNode:包含 left、right 兩個(gè) child

2.2、find(f: BaseType => Boolean): Option[BaseType]

查找第一個(gè)符合 f 條件(比如某個(gè)類(lèi)型的)的 TreeNode,先序遍歷。

2.3、foreach(f: BaseType => Unit): Unit

  def foreach(f: BaseType => Unit): Unit = {
    f(this)
    children.foreach(_.foreach(f))
  }

將函數(shù) f 遞歸應(yīng)用于節(jié)點(diǎn)及其子節(jié)點(diǎn)

2.4、foreachUp(f: BaseType => Unit): Unit

與 foreach 不同的是,foreach 先應(yīng)用于 parent,再應(yīng)用與 child;而 foreachUp 是先應(yīng)用于 child 再應(yīng)用與 parent

2.5、map[A](f: BaseType => A): Seq[A]

  def map[A](f: BaseType => A): Seq[A] = {
    val ret = new collection.mutable.ArrayBuffer[A]()
    foreach(ret += f(_))
    ret
  }

調(diào)用 foreach,foreach 中應(yīng)用的函數(shù)是 ret += f(_) ,最終返回一個(gè) seq,包含將 f 通過(guò) foreach 方式應(yīng)用于所有節(jié)點(diǎn)并 add 到 ret。其中 f 本身是 BaseType => A 類(lèi)型

2.6、flatMap[A](f: BaseType => TraversableOnce[A]): Seq[A]

原理與 map 一致,只是 f 變成了 BaseType => TraversableOnce[A]

2.5、collect[B](pf: PartialFunction[BaseType, B]): Seq[B]

  def collect[B](pf: PartialFunction[BaseType, B]): Seq[B] = {
    val ret = new collection.mutable.ArrayBuffer[B]()
    val lifted = pf.lift
    foreach(node => lifted(node).foreach(ret.+=))
    ret
  }

PartialFunction#lift:將 partial func 轉(zhuǎn)換為一個(gè)返回 Option 結(jié)果的函數(shù)。將 pf 函數(shù)應(yīng)用于符合 pf 定義的節(jié)點(diǎn)(即 pf.lift(node)返回的 Option 不是 None )并都 add 到 ret = new collection.mutable.ArrayBuffer[B] 以 Seq 形式返回

2.6、collectLeaves(): Seq[BaseType]

以 Seq 的形式返回 tree 的所有葉子節(jié)點(diǎn)

def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B]:注意,因?yàn)榭赡軟](méi)有符合 pf 定義的節(jié)點(diǎn),所有返回的 Option 可能是 None

2.7、mapProductIterator[B: ClassTag](f: Any => B): Array[B]

相當(dāng)于 productIterator.map(f).toArray ,即對(duì)于 productIterator 每個(gè)元素執(zhí)行 f 然后將 ret 組成一個(gè) arr 返回

注意:TreeNode 沒(méi)有實(shí)現(xiàn) Product 相關(guān)方法,都由其子類(lèi)自行實(shí)現(xiàn)

2.8、withNewChildren

使用 new children 替換并返回該節(jié)點(diǎn)的拷貝。該方法會(huì)對(duì) productElement 每個(gè)元素進(jìn)行模式匹配,根據(jù)節(jié)點(diǎn)類(lèi)型及一定規(guī)則進(jìn)行替換。

2.9、transform(rule: PartialFunction[BaseType, BaseType]): BaseType

調(diào)用 transformDown

2.10、transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType

rule: PartialFunction[BaseType, BaseType]

  def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
    val afterRule = CurrentOrigin.withOrigin(origin) {
        // 如果 this 是 BaseType 或其子類(lèi),則對(duì) this 應(yīng)用 rule 再返回應(yīng)用 rule 后的結(jié)果,否則返回 this 
      rule.applyOrElse(this, identity[BaseType])
    }

    // Check if unchanged and then possibly return old copy to avoid gc churn.
    if (this fastEquals afterRule) {
        // 如果應(yīng)用了 rule 后節(jié)點(diǎn)無(wú)變化,則遞歸將 rule 應(yīng)用于 children
      mapChildren(_.transformDown(rule))
    } else {
        // 如果應(yīng)用了 rule 后節(jié)點(diǎn)有變化,則本節(jié)點(diǎn)換成變化后的節(jié)點(diǎn)(children 不變),再將 rule 遞歸應(yīng)用于子節(jié)點(diǎn)。也就是從根節(jié)點(diǎn)往下來(lái)應(yīng)用 rule 替換節(jié)點(diǎn)
      afterRule.mapChildren(_.transformDown(rule))
    }
  }

2.11、mapChildren(f: BaseType => BaseType): BaseType

返回 f 應(yīng)用于所有子節(jié)點(diǎn)(非遞歸,一般將遞歸操作放在調(diào)用該函數(shù)的地方)后該節(jié)點(diǎn)的 copy。其內(nèi)部的原理是調(diào)用 mapProductIterator,對(duì)每一個(gè) productElement(i) 進(jìn)行各種模式匹配,若能匹配上某個(gè)再根據(jù)一定規(guī)則進(jìn)行轉(zhuǎn)換,核心匹配轉(zhuǎn)換如下:

case arg: TreeNode[_] if containsChild(arg) =>
  val newChild = f(arg.asInstanceOf[BaseType])
  if (!(newChild fastEquals arg)) {
    changed = true
    newChild
  } else {
    arg
  }
case Some(arg: TreeNode[_]) if containsChild(arg) =>
  val newChild = f(arg.asInstanceOf[BaseType])
  if (!(newChild fastEquals arg)) {
    changed = true
    Some(newChild)
  } else {
    Some(arg)
  }
case m: Map[_, _] => m.mapValues {
  case arg: TreeNode[_] if containsChild(arg) =>
    val newChild = f(arg.asInstanceOf[BaseType])
    if (!(newChild fastEquals arg)) {
      changed = true
      newChild
    } else {
      arg
    }
  case other => other
}.view.force // `mapValues` is lazy and we need to force it to materialize
case d: DataType => d // Avoid unpacking Structs
case args: Traversable[_] => args.map {
  case arg: TreeNode[_] if containsChild(arg) =>
    val newChild = f(arg.asInstanceOf[BaseType])
    if (!(newChild fastEquals arg)) {
      changed = true
      newChild
    } else {
      arg
    }
  case tuple@(arg1: TreeNode[_], arg2: TreeNode[_]) =>
    val newChild1 = if (containsChild(arg1)) {
      f(arg1.asInstanceOf[BaseType])
    } else {
      arg1.asInstanceOf[BaseType]
    }

    val newChild2 = if (containsChild(arg2)) {
      f(arg2.asInstanceOf[BaseType])
    } else {
      arg2.asInstanceOf[BaseType]
    }

    if (!(newChild1 fastEquals arg1) || !(newChild2 fastEquals arg2)) {
      changed = true
      (newChild1, newChild2)
    } else {
      tuple
    }
  case other => other
}
case nonChild: AnyRef => nonChild
case null => null

以上都是適用于有 children 的 node,如果是 children 為 null 的 node 直接返回

2.12、makeCopy(newArgs: Array[AnyRef]): BaseType

反射生成節(jié)點(diǎn)副本

2.13、nodeName: String

返回該類(lèi)型 TreeNode 的 name,默認(rèn)為 class name;注意,會(huì)移除物理操作的 Exec$ 前綴

2.14、innerChildren: Seq[TreeNode[_]]

所有應(yīng)該以該節(jié)點(diǎn)內(nèi)嵌套樹(shù)表示的 nodes,比如,可以被用來(lái)表示 sub-queries

2.15、 allChildren: Set[TreeNode[_]]

(children ++ innerChildren).toSet[TreeNode[_]]

2.16、node string 相關(guān)

  • 用一行表示該節(jié)點(diǎn)
  • 一行更細(xì)致的
  • 帶 suffix 的
  • tree 形狀的
  • tree 形狀帶 num 的
  • to json
  • pretty json 等 json 相關(guān)的

2.17、apply(number: Int): TreeNode[_]

主要用于交互式 debug,返回該 tree 指定下標(biāo)的節(jié)點(diǎn),num 可以在 numberedTreeString 找到。最終調(diào)用的

private def getNodeNumbered(number: MutableInt): Option[TreeNode[_]] = {
    if (number.i < 0) {
      None
    } else if (number.i == 0) {
      // 返回根節(jié)點(diǎn)
      Some(this)
    } else {
      number.i -= 1
      // 注意,此遍歷順序必須與numberedTreeString相同
      innerChildren.map(_.getNodeNumbered(number)).find(_ != None).getOrElse {
        children.map(_.getNodeNumbered(number)).find(_ != None).flatten
      }
    }
  }

我的博客即將搬運(yùn)同步至騰訊云+社區(qū),邀請(qǐng)大家一同入駐:https://cloud.tencent.com/developer/support-plan?invite_code=x2lzoxh4s5hi

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容