
轉(zhuǎn)換(Transformation)算子就是對(duì)RDD進(jìn)行操作的接口函數(shù),其作用是將一個(gè)或多個(gè)RDD變換成新的RDD。
使用Spark進(jìn)行數(shù)據(jù)計(jì)算,在利用創(chuàng)建算子生成RDD后,數(shù)據(jù)處理的算法設(shè)計(jì)和程序編寫的最關(guān)鍵部分,就是利用變換算子對(duì)原始數(shù)據(jù)產(chǎn)生的RDD進(jìn)行一步一步的變換,最終得到期望的計(jì)算結(jié)果。
對(duì)于變換算子可理解為分兩類:1,對(duì)Value型RDD進(jìn)行變換的算子;2,對(duì)Key/Value型RDD進(jìn)行變換算子。在每個(gè)變換中有僅對(duì)一個(gè)RDD進(jìn)行變換的,也有是對(duì)兩個(gè)RDD進(jìn)行變換的。
對(duì)單個(gè)Value型的RDD進(jìn)行變換
- map
- filter
- distinct
- flatMap
- sample
- union
- intersection
- groupByKey
對(duì)于上面列出的幾個(gè)RDD轉(zhuǎn)換算子因?yàn)樵谇懊娴奈恼吕镉薪榻B了,這里就不進(jìn)行示例展示了。詳見
coalesce——重新分區(qū)
將當(dāng)前RDD進(jìn)行重新分區(qū),生成一個(gè)以numPartitions參數(shù)指定的分區(qū)數(shù)存儲(chǔ)的新RDD。參數(shù)shuffle為true時(shí)在變換過程中進(jìn)行shuffle操作,否則不進(jìn)行shuffle。
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]
Note:
With shuffle = true, you can actually coalesce to a larger number of partitions. This is useful if you have a small number of partitions, say 100, potentially with a few partitions being abnormally large. Calling coalesce(1000, shuffle = true) will result in 1000 partitions with the data distributed using a hash partitioner. The optional partition coalescer passed in must be serializable.
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6,7,8), 4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> rdd.partitions
res13: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@7b0, org.apache.spark.rdd.ParallelCollectionPartition@7b1, org.apache.spark.rdd.ParallelCollectionPartition@7b2, org.apache.spark.rdd.ParallelCollectionPartition@7b3)
scala> rdd.partitions.length
res14: Int = 4
scala> rdd.collect
res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)
scala> rdd.glom.collect
res16: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4), Array(5, 6), Array(7, 8))
scala> val newRDD = rdd.coalesce(2, false)
newRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[9] at coalesce at <console>:26
scala> newRDD.partitions.length
res17: Int = 2
scala> newRDD.collect
res18: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8)
scala> newRDD.glom.collect
res19: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8))


pipe——調(diào)用Shell命令
# Return an RDD created by piping elements to a forked external process.
def pipe(command: String): RDD[String]
在Linux系統(tǒng)中,有許多對(duì)數(shù)據(jù)進(jìn)行處理的shell命令,我們可能通過pipe變換將一些shell命令用于Spark中生成新的RDD。
scala> val rdd = sc.parallelize(0 to 7, 4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24
scala> rdd.glom.collect
res20: Array[Array[Int]] = Array(Array(0, 1), Array(2, 3), Array(4, 5), Array(6, 7))
scala> rdd.pipe("head -n 1").collect #提取每一個(gè)分區(qū)中的第一個(gè)元素構(gòu)成新的RDD
res21: Array[String] = Array(0, 2, 4, 6)


sortBy——排序
對(duì)原RDD中的元素按照函數(shù)f指定的規(guī)則進(jìn)行排序,并可通過ascending參數(shù)進(jìn)行升序或降序設(shè)置,排序后的結(jié)果生成新的RDD,新的RDD的分區(qū)數(shù)量可以由參數(shù)numPartitions指定,默認(rèn)與原RDD相同的分區(qū)數(shù)。
# Return this RDD sorted by the given key function.
def sortBy[K](f: (T) ? K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

scala> val rdd = sc.parallelize(List(2,1,4,3),1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24
scala> rdd.glom.collect
res24: Array[Array[Int]] = Array(Array(2, 1, 4, 3))
scala> rdd.sortBy(x=>x, true).collect
res25: Array[Int] = Array(1, 2, 3, 4)
scala> rdd.sortBy(x=>x, false).collect
res26: Array[Int] = Array(4, 3, 2, 1)
對(duì)兩個(gè)Value型RDD進(jìn)行變換
cartesian——笛卡爾積
輸入?yún)?shù)為另一個(gè)RDD,返回兩個(gè)RDD中所有元素的笛卡爾積。
# Return the Cartesian product of this RDD and another one,
# that is, the RDD of all pairs of elements (a, b) where a is in this and b is in other.
def cartesian[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

scala> val rdd1 = sc.parallelize(List("a", "b", "c"),1)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[27] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List(1,2,3), 1)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:24
scala> rdd1.cartesian(rdd2).collect
res27: Array[(String, Int)] = Array((a,1), (a,2), (a,3), (b,1), (b,2), (b,3), (c,1), (c,2), (c,3))
subtract——補(bǔ)集
輸入?yún)?shù)為另一個(gè)RDD,返回原始RDD與輸入?yún)?shù)RDD的補(bǔ)集,即生成由原始RDD中而不在輸入?yún)?shù)RDD中的元素構(gòu)成新的RDD,參數(shù)numPartitions指定新RDD分區(qū)數(shù)。
#Return an RDD with the elements from this that are not in other.
defsubtract(other: RDD[T], numPartitions: Int): RDD[T]

scala> val rdd1 = sc.parallelize(0 to 5, 1)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[30] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(0 to 2,1)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:24
scala> rdd1.subtract(rdd2).collect
res28: Array[Int] = Array(3, 4, 5)

union——并集
返回原始RDD與另一個(gè)RDD的并集。
# Return the union of this RDD and another one.
def union(other: RDD[T]): RDD[T]
def ++(other: RDD[T]): RDD[T]
#Return the union of this RDD and another one.

zip——聯(lián)結(jié)
生成由原始RDD的值為Key,另一個(gè)RDD的值為Value依次配對(duì)構(gòu)成的所有Key/Value對(duì),并返回這些Key/Value對(duì)集合構(gòu)成的新RDD

對(duì)Key/Value型RDD進(jìn)行變換
對(duì)單個(gè)Key-Value型RDD進(jìn)行變換
combineByKey——按Key聚合
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

scala> val pair = sc.parallelize(List(("fruit", "Apple"), ("fruit", "Banana"), ("vegetable", "Cucumber"), ("fruit", "Cherry"), ("vegetable", "Bean"), ("vegetable", "Pepper")),2)
pair: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[41] at parallelize at <console>:24
scala> val combinePair = pair.combineByKey(List(_), (x:List[String], y:String) => y::x, (x:List[String], y:List[String]) => x:::y)
combinePair: org.apache.spark.rdd.RDD[(String, List[String])] = ShuffledRDD[42] at combineByKey at <console>:26
scala> combinePair.collect
res31: Array[(String, List[String])] = Array((fruit,List(Banana, Apple, Cherry)), (vegetable,List(Cucumber, Pepper, Bean)))
flatMapValues——對(duì)所有Value進(jìn)行flatMap
# Pass each value in the key-value pair RDD through a flatMap function without changing the keys;
# this also retains the original RDD's partitioning.
def flatMapValues[U](f: (V) =>TraversableOnce[U]): RDD[(K, U)]

scala> val rdd = sc.parallelize(List("a", "boy"), 1).keyBy(_.length)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[44] at keyBy at <console>:24
scala> rdd.collect
res32: Array[(Int, String)] = Array((1,a), (3,boy))
scala> rdd.flatMapValues(x=>"*" + x + "*").collect
res33: Array[(Int, Char)] = Array((1,*), (1,a), (1,*), (3,*), (3,b), (3,o), (3,y), (3,*))
keys——提取Key
將Key/Value型RDD中的元素的Key提取出來,所有Key值構(gòu)成一個(gè)序列形成新的RDD。
# Return an RDD with the keys of each tuple.
def keys: RDD[K]

scala> val pairs = sc.parallelize(List("wills", "aprilchang","kris"),1).keyBy(_.length)
pairs: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[47] at keyBy at <console>:24
scala> pairs.collect
res34: Array[(Int, String)] = Array((5,wills), (10,aprilchang), (4,kris))
scala> pairs.keys.collect
res35: Array[Int] = Array(5, 10, 4)
mapValues——對(duì)Value值進(jìn)行變換
將Key/Value型RDD中的元素的Value值使用輸入?yún)?shù)函數(shù)f進(jìn)行變換構(gòu)成一個(gè)新的RDD。
# Pass each value in the key-value pair RDD through a map function without changing the keys;
# this also retains the original RDD's partitioning.
def mapValues[U](f: (V) => U): RDD[(K, U)]

partitionBy——按Key值重新分區(qū)
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
#Return a copy of the RDD partitioned using the specified partitioner.

scala> val pairs = sc.parallelize(0 to 9, 2).keyBy(x=>x)
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[51] at keyBy at <console>:24
scala> pairs.collect
res37: Array[(Int, Int)] = Array((0,0), (1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8), (9,9))
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
scala> val partitionPairs = pairs.partitionBy(new HashPartitioner(2)) #按每個(gè)key的Hash值進(jìn)行分區(qū)的
partitionPairs: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[52] at partitionBy at <console>:27
scala> partitionPairs.glom.collect
res38: Array[Array[(Int, Int)]] = Array(Array((0,0), (2,2), (4,4), (6,6), (8,8)), Array((1,1), (3,3), (5,5), (7,7), (9,9)))
reduceByKey——按Key值進(jìn)行Reduce操作
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
## Merge the values for each key using an associative and commutative reduce function.
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
## Merge the values for each key using an associative and commutative reduce function.
## This will also perform the merging locally on each mapper before sending results to a reducer,
## similarly to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
## Merge the values for each key using an associative and commutative reduce function.