rdd的概念以及生成rdd的兩種方式


什么是RDD
RDD(Resilient Distributed Dataset)叫做分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,
它代表一個(gè)不可變、可分區(qū)、里面的元素可并行計(jì)算的集合。
RDD具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò)、位置感知性調(diào)度和可伸縮性。
RDD允許用戶(hù)在執(zhí)行多個(gè)查詢(xún)時(shí)顯式地將工作集緩存在內(nèi)存中,
后續(xù)的查詢(xún)能夠重用工作集,這極大地提升了查詢(xún)速度。

RDD的屬性
1、一組分片(Partition),即數(shù)據(jù)集的基本組成單位。
對(duì)于RDD來(lái)說(shuō),每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理,并決定并行計(jì)算的粒度。
用戶(hù)可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù),如果沒(méi)有指定,那么就會(huì)采用默認(rèn)值。
默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。

2、一個(gè)計(jì)算每個(gè)分區(qū)的函數(shù)。
Spark中RDD的計(jì)算是以分片為單位的,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的。
compute函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合,不需要保存每次計(jì)算的結(jié)果。

3、RDD之間的依賴(lài)關(guān)系。
RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD,所以RDD之間就會(huì)形成類(lèi)似于流水線(xiàn)一樣的前后依賴(lài)關(guān)系。
在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過(guò)這個(gè)依賴(lài)關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算。

4、一個(gè)Partitioner,即RDD的分片函數(shù)。
當(dāng)前Spark中實(shí)現(xiàn)了兩種類(lèi)型的分片函數(shù),一個(gè)是基于哈希的HashPartitioner,
另外一個(gè)是基于范圍的RangePartitioner。
只有對(duì)于key-value的RDD,才會(huì)有Partitioner,
非key-value的RDD的Parititioner的值是None。
Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。

5、一個(gè)列表,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)。
對(duì)于一個(gè)HDFS文件來(lái)說(shuō),這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置。
按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置。

生成rdd的兩種方式

#cd training/spark/bin
# ./spark-shell --master spark://hadoop21:7077 --executor-memory 512m --total-executor-cores 2
scala> val rdd1 = sc.textFile("hdfs://192.168.56.21:9000/wc")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.56.21:9000/wc MapPartitionsRDD[1] at textFile at <console>:24

scala> rdd1.collect
res0: Array[String] = Array(hello tom, hello jerry, hello tom, hello kitty, hello tom, hello jerry, hello tom, hello jerry, hello lilei, hello hanmeimei, hello tom, hello tom, hello jerry, hello tom, hello tom, hello jerry, hello lilei, hello hanmeimei, hello tom, hello tom, hello jerry, hello tom)

scala> val rdd2 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> rdd2.collect
res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容