- 需求
有三個(gè)RDD ,分別是 rddA,rddB,rddC.取數(shù)據(jù)1,2,3,4,5并且分成三個(gè)分區(qū),對(duì)輸入的數(shù)據(jù)的每一個(gè)數(shù)據(jù)*2 ,只取大于 6 的數(shù)據(jù).
- 代碼
val rddA = sc.parallelize(List(1, 2, 3, 4, 5),3)
//rddA: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0]
val rddB = rddA.map(_*2)
//rddB: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1]
val rddC = rddB.filter(_>6)
//rddC: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2]
rddC.collect()
//res0: Array[Int] = Array(8, 10)
-
圖解
rdd 血緣關(guān)系圖 -
idea調(diào)試查看
idea查看依賴關(guān)系 -
通過 spark web ui 查看
spark-web-ui 依賴關(guān)系 補(bǔ)充
使用代碼rddC.toDebugString打印依賴關(guān)系
res1: String =
(2) MapPartitionsRDD[2] at filter at <console>:25 []
| MapPartitionsRDD[1] at map at <console>:25 []
| ParallelCollectionRDD[0] at parallelize at <console>:24 []
窄依賴
- 說明:父RDD的每個(gè)分區(qū)只被一個(gè)子RDD分區(qū)使用一次
- 窄依賴有分為兩種:
- 1.一種是一對(duì)一的依賴,即
OneToOneDependency - 2.還有一個(gè)是范圍的依賴
RangeDependency,它僅僅被org.apache.spark.rdd.UnionRDD使用。UnionRDD是把多個(gè)RDD合成一個(gè)RDD,這些RDD是被拼接而成,每個(gè)父RDD的Partition的相對(duì)順序不會(huì)變,只不過每個(gè)父RDD在UnionRDD中的Partition的起始位置不同 - 常見算子
map,filter,union,join,mapPartitions,mapValues -
圖解
窄依賴
寬依賴
- 說明:父RDD的每個(gè)分區(qū)都有可能被多個(gè)子RDD分區(qū)使用,子RDD分區(qū)通常對(duì)應(yīng)父RDD所有分區(qū)
- 常見會(huì)對(duì)應(yīng)
Shuffle的操作.在會(huì)job中產(chǎn)生一個(gè)stage -
groupByKey,join,partitionBy,reduce - 常見算子
-
圖解
寬依賴
wordCountDemo演示
val path = "/user/spark/data/wc.txt"
val lines = sc.textFile(path, 3)
//查看每個(gè)分區(qū)的數(shù)據(jù)
// lines.mapPartitionsWithIndex((n, partition) => {
// partition.map(x => (s"分區(qū)編號(hào)${n}", s"分區(qū)數(shù)據(jù)${x}"))
// }).foreach(println)
val words = lines.flatMap(_.split(","))
val wordPair = words.map(x => (x, 1))
val result = wordPair.reduceByKey(_ + _)
result.collect().foreach(println)
-
圖解
RDDWordCount
如果覺得文章不錯(cuò)的話記得關(guān)注下公號(hào)哦

公眾號(hào)





