spark編程指南
可以使容器并行化
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
擴(kuò)展數(shù)據(jù)集
- 可以直接使用文本文件
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
- 或者其他分布式文件系統(tǒng)的文件
RDD操作
分為transformations和actions
- transformations
只記錄操作,不產(chǎn)生計算,返回的也是RDD - actions
進(jìn)行實際的計算,得到具體的值
基本用法
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
想緩存的話:
lineLengths.persist()
spark中傳遞函數(shù)
推薦2種
- 匿名函數(shù)
- 全局單例對象中的靜態(tài)函數(shù)
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
注意:同樣可以傳遞類方法到spark
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
如這個例子,在對一個實例ins調(diào)用doStuff方法時,由于其引用了func1,所以需要將ins對象傳遞給spark集群,這回帶來一些問題。
解決辦法是使用一個變量顯示的捕獲field,而不是傳遞整個對象:
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
理解閉包
捕獲外部變量的閉包將導(dǎo)致錯誤的結(jié)果。
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
例如這個例子,counter將被復(fù)制一份發(fā)送給各個executor,它們是獨(dú)立的進(jìn)程,不會共享變量。所以并不會得到預(yù)期的結(jié)果。所以在有共享變量的需求時,需要使用Accumulator。
打印RDD的值
嘗試使用rdd.foreach(println)時,并不會在driver端產(chǎn)生預(yù)期輸出,而是輸出在每個executor的stdout。正確的方法是先求值到driver端,然后打?。?/p>
rdd.collect().foreach(println)
但是數(shù)據(jù)量大的情況下,這也是大多數(shù)情況,內(nèi)存是不夠用的。所以你應(yīng)該限制輸出的數(shù)量:
rdd.take(100).foreach(println)
鍵值對
RDD的少數(shù)操作只針對鍵值對,例如通過key進(jìn)行分組聚合等操作。
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
注意:使用自定義對象作key時,要實現(xiàn)equal()方法和合適的hashcode()方法