本文基于spark源碼2.11
1. 前言
shuffle是spark job中一個(gè)重要的階段,發(fā)生在map和reduce之間,涉及到map到reduce之間的數(shù)據(jù)的移動(dòng),以下面一段wordCount為例:
def main(args:Array[String]){
val sparkConf = new SparkConf().setAppName("Log Query")
val sc = new SparkContext(sparkConf)
val lines = sc.textFile("README.md",3)
val words = lines.flatMap(line => line.split(" "))
val wordOne = words.map(word => (word,1))
val wordCount = wordOne.reduceByKey(_ + _,3)
wordCount.foreach(println)
}
其RDD的轉(zhuǎn)換如下:

上圖中map和flatMap這種轉(zhuǎn)換只會(huì)產(chǎn)生rdd之間的窄依賴,因此對(duì)一個(gè)分區(qū)上進(jìn)行map和flatMap可以如同流水線一樣只在同一臺(tái)的機(jī)器上盡心,不存在多個(gè)節(jié)點(diǎn)之間的數(shù)據(jù)移動(dòng),而reduceByKey這樣的操作,涉及到需要將相同的key做聚合操作。上圖中Stage1中按key做hash 到三個(gè)分區(qū)做reduce操作,對(duì)于Stage1中任意一個(gè)partition而言,其輸入可能存在與上游Stage0中每一個(gè)分區(qū)中,因此需要從上游的每一個(gè)partition所在的機(jī)器上拉取數(shù)據(jù),這個(gè)過(guò)程稱為shuffle。
解釋一下: spark的stage劃分就是以shuffle依賴為界限劃分的,上圖中只存在一次shuffle操作,所以被劃分為兩個(gè)stage
從上圖中可以看出shuffle首先涉及到stage0最后一個(gè)階段需要寫(xiě)出map結(jié)果, 以及stage1從上游stage0中每一個(gè)partition寫(xiě)出的數(shù)據(jù)中讀取屬于當(dāng)前partition的數(shù)據(jù)。
2. Shuffle Write
spark中rdd由多個(gè)partition組成,任務(wù)運(yùn)行作用于partition。spark有兩種類型的task:
- ShuffleMapTask, 負(fù)責(zé)rdd之間的transform,map輸出也就是shuffle write
- ResultTask, job最后階段運(yùn)行的任務(wù),也就是action(上面代碼中foreach就是一個(gè)action,一個(gè)action會(huì)觸發(fā)生成一個(gè)job并提交)操作觸發(fā)生成的task,用來(lái)收集job運(yùn)行的結(jié)果并返回結(jié)果到driver端。
“關(guān)于job的創(chuàng)建,stage的劃分以及task的提交在另一篇文章中介紹(待填坑)”
shuffle write的操作發(fā)生在ShuffleMapTask#runTask中,其代碼如下:
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
調(diào)用val (rdd, dep) = ser.deserialize(...)獲取任務(wù)運(yùn)行的rdd和shuffle dep,這是在由DAGScheduler序列化然后提交到當(dāng)前任務(wù)運(yùn)行的executor上的。
調(diào)用writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) 獲得shuffle writer,調(diào)用writer.write(rdd.iterator)寫(xiě)出map output。idd.iterator在迭代過(guò)程中,會(huì)往上游一直追溯當(dāng)前rdd依賴的rdd,然后從上至下調(diào)用rdd.compute()完成數(shù)據(jù)計(jì)算并返回iterator迭代轉(zhuǎn)換計(jì)算的結(jié)果。 此處manager在SparkEnv中實(shí)例化微SortShuffleManager,下面是SortShuffleManager#getWriter方法:
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
numMapsForShuffle.putIfAbsent(
handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
bypassMergeSortHandle,
mapId,
context,
env.conf)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
}
}
”上面提到shuffleManager被實(shí)例化為SortShuffleManager,老版本里還有HashShuffleManager,似乎不用了,這里有一篇兩種方式的性能比較文章SortShuffleManager和HashShuffleManager性能比較“
有三種類型的ShuffleWriter,取決于handle的類型。
- UnsafeShuflleWriter, 不清楚
- BypassMergeSortShuffleWriter, 這個(gè)writer會(huì)根據(jù)reduce的個(gè)數(shù)n(reduceByKey中指定的參數(shù),有partitioner決定)創(chuàng)建n個(gè)臨時(shí)文件,然后計(jì)算iterator每一個(gè)key的hash,放到對(duì)應(yīng)的臨時(shí)文件中,最后合并這些臨時(shí)文件成一個(gè)文件,同時(shí)還是創(chuàng)建一個(gè)索引文件來(lái)記錄每一個(gè)臨時(shí)文件在合并后的文件中偏移。當(dāng)reducer取數(shù)據(jù)時(shí)根據(jù)reducer partitionid就能以及索引文件就能找到對(duì)應(yīng)的數(shù)據(jù)塊。
- SortShuffleWriter, 會(huì)在map做key的aggregate操作,(key,value)會(huì)先在保存在內(nèi)存里,并按照用戶自定義的aggregator做key的聚合操作,并在達(dá)到一定的內(nèi)存大小后,對(duì)內(nèi)存中已有的記錄按(partition,key)做排序,然后保存到磁盤(pán)上的臨時(shí)文件。最終對(duì)生成的文件再做一次merge操作。
2.1 BypassMergeSortShuffleWriter
1. 什么情況下使用
不需要在map端做combine操作,且partitioner產(chǎn)生的分區(qū)數(shù)量(也就是reducer的個(gè)數(shù))小于配置文件中spark.shuffle.sort.bypassMergeThreshold定義的大?。J(rèn)值是200)
2. 如何寫(xiě)出map output
下圖是BypassMergeSortShuffleWriter寫(xiě)出數(shù)據(jù)的方式:

輸入數(shù)據(jù)是(nation,city)的鍵值對(duì),調(diào)用reduceByKey(_ + "," + _,3)。運(yùn)行在在partition-0上的ShuffleMapTask使用BypassMergeSortShuffleWriter#write的過(guò)程如下:
- 根據(jù)reducer的個(gè)數(shù)(partitioner決定)n 創(chuàng)建n個(gè)
DiskBlockObjectWriter,每一個(gè)創(chuàng)建一個(gè)臨時(shí)文件,臨時(shí)文件命名規(guī)則為temp_shuffle_uuid,也就是每一個(gè)臨時(shí)文件放的就是下游一個(gè)reduce的輸入數(shù)據(jù)。 - 迭代訪問(wèn)輸入的數(shù)據(jù)記錄,調(diào)用
partitioner.getPartition(key)計(jì)算出記錄的應(yīng)該落在哪一個(gè)reducer擁有的partition,然后索引到對(duì)應(yīng)的DiskBlockObjectWriter對(duì)象,寫(xiě)出key, value - 創(chuàng)建一個(gè)名為
shuffle_shuffleid_mapid_0.uuid這樣的臨時(shí)且絕對(duì)不會(huì)重復(fù)的文件,然后將1中生成的所有臨時(shí)文件寫(xiě)入到這個(gè)文件中,寫(xiě)出的順序是partitionid從小到大開(kāi)始的(這里之所以使用uuid創(chuàng)建文件,主要是不使用uuid的話可能有另外一個(gè)任務(wù)也寫(xiě)出過(guò)相同的文件,文件名中的0本來(lái)應(yīng)該是reduceid,但是由于合并到只剩一個(gè)文件,就用0就行了)。 - 寫(xiě)出索引文件,索引文件名為
shuffle_shuffleid_mapid_0.index.uuid(使用uuid和3中的原因是一樣的)。由于map的輸出數(shù)據(jù)被合并到一個(gè)文件中,reducer在讀取數(shù)據(jù)時(shí)需要根據(jù)索引文件快速定位到應(yīng)該讀取的數(shù)據(jù)在文件中的偏移和大小。 - 索引文件只順序?qū)懗鰌artition_0 ~ partition_n的偏移的值
- 還需要將3中
shuffle_shuffleid_mapid_0.uuid重命名為``shuffle_shuffleid_mapid_0`, 過(guò)程是驗(yàn)證一下是不是已經(jīng)存在這么一個(gè)文件以及文件的長(zhǎng)度是否等于 1 中所有臨時(shí)文件相加的大小,不是的話就重命名索引文件和數(shù)據(jù)文件(去掉uuid)。否則的話表示先前已經(jīng)有一個(gè)任務(wù)成功寫(xiě)出了數(shù)據(jù),直接刪掉臨時(shí)索引和數(shù)據(jù)文件,返回。
以上就是BypassMergeSortShuffleWriter寫(xiě)數(shù)據(jù)的方式。有如下特點(diǎn):
- map端沒(méi)有按照key做排序,也沒(méi)有按照key做聚合操作, [(China, Beijing),(China,Hefei),(China,Shanghai)]如果在map端聚合的話會(huì)變成(China,“Beijing,Hefei,Shanghai”)。
- 如果有M格mapper,N格reducer,那么會(huì)產(chǎn)生M*N個(gè)臨時(shí)文件,但是最終會(huì)合并生成M個(gè)數(shù)據(jù)文件,M個(gè)索引文件。
2.2 SortShuffleWriter
下面是SortShuffleWrite#write方法
override def write(records: Iterator[Product2[K, V]]): Unit = {
sorter = if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}
- 先創(chuàng)建了一個(gè)ExternalSorter,sort.insertAll(records)會(huì)將數(shù)據(jù)寫(xiě)到多個(gè)磁盤(pán)文件中。
- 接下來(lái)和BypassMergeSortShuffleWriter類似,創(chuàng)建一個(gè)名為
shuffle_shuffleid_mapid_0.uuid的這種唯一的臨時(shí)數(shù)據(jù)文件,將 1 中的多個(gè)磁盤(pán)文件合并寫(xiě)出到這個(gè)臨時(shí)數(shù)據(jù)文件中,并寫(xiě)出索引文件,最終的數(shù)據(jù)文件中相同分區(qū)的數(shù)據(jù)一定是連續(xù)分布的,這樣就能根據(jù)索引文件中的偏移值快速定位到對(duì)應(yīng)分區(qū)的數(shù)據(jù)。
由于寫(xiě)數(shù)據(jù)的核心在ExternalSorter#insertAll中,下文會(huì)主要介紹ExternalSorter。
1. 什么情況下使用
ShuffleredRDD#mapSideCombine為true,且定義了aggregate的情況下會(huì)使用SortShuffleWriter。
2. 原理
根據(jù)mapSizeCombine是否為true,SortShuffleWriter在寫(xiě)出map output時(shí)也會(huì)做不同處理,為true時(shí)會(huì)按用戶自定聚合方法按key聚合,并按照(partitionId,key)排序(沒(méi)指定key的排序方法時(shí)就只根據(jù)partitionid排序),然后寫(xiě)出到磁盤(pán)文件;為false時(shí)不會(huì)不會(huì)做聚合操作,只會(huì)進(jìn)行排序然后寫(xiě)出到磁盤(pán)。下文先介紹沒(méi)有聚合,然后介紹有聚合。兩者之間有很多的共同之處,都會(huì)先將數(shù)據(jù)緩存在內(nèi)存當(dāng)中,在達(dá)到一定大小之后刷到磁盤(pán),但是最大的區(qū)別也在此,他們使用了不同的集合緩存數(shù)據(jù)。
2.2.1 ExternalSorter
下面是ExternalSorter的一些重要的成員:
1. private val blockManager = SparkEnv.get.blockManager
寫(xiě)出臨時(shí)文件到磁盤(pán)需要blockManager
2. private var map = new PartitionedAppendOnlyMap[K, C]
private var buffer = new PartitionedPairBuffer[K, C]
下文介紹在map端執(zhí)行聚合操作和不在map聚合是數(shù)據(jù)會(huì)以不同的方式緩存在內(nèi)存中,map就是在map端聚合是數(shù)據(jù)緩存的方式
3. private val keyComparator: Comparator[K]
key的比較方式,在map端聚合時(shí),數(shù)據(jù)排序方式是先按partitionId然后按key排序。不在map聚合時(shí)這個(gè)字段是空,只按partitionId排序
4. private val spills = new ArrayBuffer[SpilledFile]
緩存在內(nèi)存中的數(shù)據(jù)(map或者buffer)在達(dá)到一定大小后就會(huì)寫(xiě)出到磁盤(pán)中,spills保存了所有寫(xiě)出過(guò)的磁盤(pán)文件,后續(xù)會(huì)根據(jù)spills做merge成一個(gè)文件。
2.2.2 不在map端聚合
下面是ExternalSorter#insertAll的源碼:
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
// TODO: stop combining if we find that the reduction factor isn't high
val shouldCombine = aggregator.isDefined
if (shouldCombine) {
...
...
// 此處省略了map做combine的代碼
} else {
// Stick values into our buffer
while (records.hasNext) {
addElementsRead()
val kv = records.next()
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
}
}
}
while循環(huán)獲?。╧ey,value)記錄,然后調(diào)用buffer.insert(...)插入記錄,此處buffer是PartitionedPairBuffer的實(shí)例(PartitionedPairBuffer介紹見(jiàn)附錄4.1)。insert會(huì)將(key,value)轉(zhuǎn)換成((partition_id,key), value)的形式插入,例如("China","Beijing") ->((1, "China"), "Beijing").
maybeSpillCollection則會(huì)根據(jù)具體情況決定是否將buffer中的記錄寫(xiě)出到磁盤(pán)。經(jīng)過(guò)如下調(diào)用鏈路進(jìn)入到寫(xiě)磁盤(pán)操作:
maybeSpillCollection (調(diào)用buffer.estimateSize 估算當(dāng)前buffer大小)
--> mybeSpill (會(huì)嘗試擴(kuò)容)
--> spill (寫(xiě)到磁盤(pán)中)
下面是spill方法
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
spills += spillFile
}
collection.destructiveSortedWritablePartitionedIterator(comparator)做了很多事情,參數(shù)comparator在這種情況下是null。
下面是它的調(diào)用序列:
destructiveSortedWritablePartitionedIterator
-> partitionedDestructiveSortedIterator
-> PartitionedPairBuffer#partitionedDestructiveSortedIterator
進(jìn)入到PartitionedPairBuffer#partitionedDestructiveSortedIterator代碼如下:
override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
: Iterator[((Int, K), V)] = {
val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator)
iterator
}
此處參數(shù)keyComparator從前面一直傳下來(lái)的,此處是空值,因此comparator使用partitionComparator,也就是只按照buffer數(shù)據(jù)所屬的partitionId排序。
Sort#sort方法對(duì)buffer排序(排序直接在buffer底層數(shù)據(jù)上移動(dòng),也就是說(shuō)會(huì)破壞buffer原有的數(shù)據(jù)順序)之后返回iterator,此時(shí)這個(gè)iterator迭代出來(lái)的數(shù)據(jù)就是按照partitionId排序的數(shù)據(jù),同時(shí)也就意味者相同的partitionId的數(shù)據(jù)一定會(huì)連續(xù)的分布。
回到上面spill方法,spillMemoryIteratorToDisk接收上面提到的iterator作為參數(shù)開(kāi)始輸出磁盤(pán), 這個(gè)方法大體如下:
- 使用batchSizes保存每批量flush的大小,
- elementsPerPartition保存每個(gè)partition,鍵值對(duì)個(gè)數(shù)
- 創(chuàng)建臨時(shí)文件,buffer中記錄批量寫(xiě)出,只寫(xiě)出key,value,partitionId不寫(xiě)
- 返回SpilledFile,里面有blockId,file,elementsPerPartitionbatchSizes這些信息,后續(xù)會(huì)將SpilledFile合并成一個(gè)文件。
和Bypass方式的區(qū)別
兩者在寫(xiě)map out數(shù)據(jù)時(shí)都會(huì)產(chǎn)生多個(gè)臨時(shí)文件,bypass方式產(chǎn)生的每一個(gè)臨時(shí)文件中的數(shù)據(jù)指揮是下游一個(gè)reducer的輸入數(shù)據(jù),后續(xù)合并成同一個(gè)文件時(shí)很簡(jiǎn)單只要逐個(gè)將臨時(shí)文件copy就行,但是sort方式中臨時(shí)文件中的數(shù)據(jù)可能輸入多個(gè)reducer,也就意味著在合并到同一個(gè)文件時(shí),需要考慮將多個(gè)臨時(shí)文件相同的分區(qū)合并好在輸出到最終文件中。關(guān)于sort的文件合并會(huì)在下一節(jié)“map端做聚合”之后。
2.2.3 在map端做聚合
定義聚合方法
reduce轉(zhuǎn)換會(huì)是的兩個(gè)RDD之間存在ShuffleDependency,ShuffleDependency,ShuffleDependency的屬性aggregator: Aggregator定義了按key聚合的方式,Aggregator類如下:
case class Aggregator[K, V, C] (
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {
...}
- K,V分別時(shí)key、value的類型,C是V聚合后的類型。
- createCombiner, 第一個(gè)value轉(zhuǎn)換成聚合后類型。
- mergeValue, 并入的value。
- 合并兩個(gè)已經(jīng)聚合的數(shù)據(jù)。
例如我們將相同key的value(String類型)合并到一個(gè)List中,則定義:
createCombiner: (s String) => List(s) 將string轉(zhuǎn)成List
mergeValue: (c:List[String],v: String) => v::c 將string加到列表
mergeCombiners: (c1:List[String],c2: List[String]) => c1:::c2 合并兩個(gè)列表
write過(guò)程
下圖是一個(gè)map端做聚合的shuffle write過(guò)程:

reduceByKey(_ + "," + _)操作把key相同的所有value用“,”連接起來(lái)。
依然是調(diào)用ExternalSorter#insertAll完成排序,aggregate以及寫(xiě)出到磁盤(pán)的過(guò)程。此時(shí)使用map作為內(nèi)存緩存的數(shù)據(jù)結(jié)構(gòu)。寫(xiě)的流程如下:
- 從輸入iterator中一次讀入(key,value),使用partitioner計(jì)算key的partitionid,調(diào)用map.insert插入數(shù)據(jù),格式為((partitionid,key),value),插入時(shí)就會(huì)對(duì)key相同的做aggregate,形成的內(nèi)存數(shù)據(jù)布局如上圖map(上圖map數(shù)據(jù)已經(jīng)排序了,但是插入時(shí)不會(huì)排序,而是在寫(xiě)出磁盤(pán)時(shí)排序)。
- 當(dāng)map的數(shù)據(jù)達(dá)到一定大小時(shí),使用blockManager創(chuàng)建臨時(shí)文件temp_shuffle_uuid,然后對(duì)map數(shù)據(jù)排序,輸出到臨時(shí)文件。排序時(shí)現(xiàn)按照partitionid排序,然后按照key排序,保證臨時(shí)文件中相同partitionid的數(shù)據(jù)一定是連續(xù)分布的。
- 完成ExternalSorter#insertAll調(diào)用,生成若干臨時(shí)文件,合并這些文件。
源碼解析
源碼基本和不做聚合時(shí)一樣,區(qū)別主要是在用作內(nèi)存緩存的集合buffer和map的區(qū)別。附錄介紹了buffer和map的原理。
3. ShuffleRead
前面RDD轉(zhuǎn)換圖中,RDD#reduceByKey產(chǎn)生了MapPartitionRDD到ShufferedRDD的轉(zhuǎn)換,shuffle read操作發(fā)生在轉(zhuǎn)換ShufferedRDD的compute方法中,下面是ShufferedRDD#compute方法:
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
.read()
.asInstanceOf[Iterator[(K, C)]]
}
通過(guò)shuffleManager.getReader獲得ShuffleReader,返回的是BlockStoreShuffleReader的實(shí)例,參數(shù)[split.index,split.index+1)表示需要從上游stage0 所有task產(chǎn)生的數(shù)據(jù)文件中讀取split.index這一個(gè)分區(qū)的記錄。
下面是BlockStoreShuffleReader#read方法
/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
val wrappedStreams = new ShuffleBlockFetcherIterator(
context,
blockManager.shuffleClient,
blockManager,
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
val serializerInstance = dep.serializer.newInstance()
// Create a key/value iterator for each stream
val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) =>
// Note: the asKeyValueIterator below wraps a key/value iterator inside of a
// NextIterator. The NextIterator makes sure that close() is called on the
// underlying InputStream when all records have been read.
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
}
// Update the context task metrics for each record read.
val readMetrics = context.taskMetrics.createTempShuffleReadMetrics()
val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]](
recordIter.map { record =>
readMetrics.incRecordsRead(1)
record
},
context.taskMetrics().mergeShuffleReadMetrics())
// An interruptible iterator must be used here in order to support task cancellation
val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter)
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
// We are reading values that are already combined
val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
} else {
// We don't know the value type, but also don't care -- the dependency *should*
// have made sure its compatible w/ this aggregator, which will convert the value
// type to the combined type C
val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
}
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
}
// Sort the output if there is a sort ordering defined.
dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =>
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
// the ExternalSorter won't spill to disk.
val sorter =
new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
sorter.insertAll(aggregatedIter)
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
case None =>
aggregatedIter
}
}
}
這是一個(gè)很復(fù)雜的方法,從上游的map output讀區(qū)屬于當(dāng)前分區(qū)的block,層層封裝迭代器,從上面代碼可以看到有如下迭代器:
ShuffleBlockFetcherIterator
其next方法返回類型為(BlockId, InputStream)。當(dāng)前reduce分區(qū)需要從上游map 輸出數(shù)據(jù)中fetch多個(gè)block。這個(gè)迭代器負(fù)責(zé)從上游fetch到blockid中的數(shù)據(jù)(由于write階段數(shù)據(jù)是合并到一個(gè)blockid文件中,所以數(shù)據(jù)是其中一段),然后將從數(shù)據(jù)創(chuàng)建InputStream,并把blockid以及創(chuàng)建的stream返回。顯然如果上游有三個(gè)partition,每個(gè)partition的輸出數(shù)據(jù)文件中有一段是當(dāng)前的輸入,那這個(gè)迭代器三次就結(jié)束了。val recordIter = wrappedStreams.flatMap { ...}
1 中迭代器產(chǎn)生(BlockId,InputStream),但是作為read 而言spark最終需要的讀出一個(gè)個(gè)(key,value),在 1 的iterator上做一次flatMap將(BlockId,InputStream)轉(zhuǎn)換成(key,value)。
先是調(diào)用serializerInstance.deserializeStream(wrappedStream)使用自定義的序列化方式包裝一下1中的輸入流,這樣就能正常讀出反序列化后的對(duì)象;然后調(diào)用asKeyValueIterator轉(zhuǎn)換成NextIterator,其next方法就反序列化后的流中讀出(key,value)。val metricIter = CompletionIterator...
這個(gè)迭代器包裝2中迭代器,next方法也只是包裝了2中的迭代器,但是多了一個(gè)度量的功能,統(tǒng)計(jì)讀入多少(key,value)。InterruptibleIterator, 這個(gè)迭代器使得任務(wù)取消是優(yōu)雅的停止讀入數(shù)據(jù)。
val aggregatedIter: Iterator[Product2[K, C]] = if ...
從前面shuffle write的過(guò)程可以知道,即便每一個(gè)分區(qū)任務(wù)寫(xiě)出時(shí)做了value的聚合,在reducer端的任務(wù)里,由于有多個(gè)分區(qū)的數(shù)據(jù),因此依然還要需要對(duì)每個(gè)分區(qū)里的相同的key做value的聚合。
這個(gè)iterator就是完成這個(gè)功能。
首先,會(huì)從4 中迭代器中一個(gè)個(gè)讀入數(shù)據(jù),緩存在內(nèi)存中(map緩存,因?yàn)橐鼍酆希?,并且在必要時(shí)spill到磁盤(pán)(spill之前會(huì)按key排序)。這個(gè)過(guò)程和shuffle write中在map端聚合時(shí)操作差不多。
然后, 假設(shè)上一部產(chǎn)生了多個(gè)spill文件,那么每一個(gè)spill文件必然時(shí)按key排序的,再對(duì)這個(gè)spill文件做歸并,歸并時(shí)key相同的進(jìn)行聚合。
最后, 迭代器的next返回key以及聚合后的value。dep.keyOrdering match {...
5中相同key的所有value都按照用戶自定義的聚合方法聚合在一起了,但是iterator輸出是按key的hash值排序輸出的,用戶可能自定義了自己的排序方法。這里又使用了ExternalSorter,按照自定義排序方式排序(根據(jù)前面External介紹,可能又會(huì)有spill磁盤(pán)的操作。。。),返回的iterator按照用戶自定義排序返回聚合后的key。
至此shuffle read算是完成。
3.1 Shuffle Read源碼解析
層層包裝的iterator中,比較復(fù)雜的在兩個(gè)地方:
- 上面1中 ShuffleBlockFetcherIterator,從上游依賴的rdd讀區(qū)分區(qū)數(shù)據(jù)。
- 上面5中aggregatedIter,對(duì)讀取到的各個(gè)分區(qū)數(shù)據(jù)做reducer端的aggregate
這里只介紹上面2處。
3.1.1 ShuffleBlockFetchIterator
下面是BlockStoreShuffleReader#read創(chuàng)建該iterator時(shí)的代碼:
val wrappedStreams = new ShuffleBlockFetcherIterator(
context,
blockManager.shuffleClient,
blockManager,
mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
serializerManager.wrapStream,
// Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,
SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
- blockManager.shuffleClient, 上NettyBlockTranseferService的實(shí)例,這在《Spark初始化》文章中介紹過(guò),用來(lái)傳輸datablock。NettyBlockTransferService可以參考《Spark 數(shù)據(jù)傳輸》
- mapOutputTracker.getXXX返回executorId到BlockId的映射,表示當(dāng)前partition需要讀取的上游的的block的blockid,以及blockid所屬的executor。
- serializerManager.wrapStream, 反序列化流,上有數(shù)據(jù)被包裝成輸入流之后,再使用反序列化流包裝之后讀出對(duì)象。
創(chuàng)建ShuffleBlockFetchIterator時(shí)會(huì)調(diào)用它的initialize方法,該方法如下:
private[this] def initialize(): Unit = {
// Add a task completion callback (called in both success case and failure case) to cleanup.
context.addTaskCompletionListener(_ => cleanup())
// Split local and remote blocks.
val remoteRequests = splitLocalRemoteBlocks()
// Add the remote requests into our queue in a random order
fetchRequests ++= Utils.randomize(remoteRequests)
assert ((0 == reqsInFlight) == (0 == bytesInFlight),
"expected reqsInFlight = 0 but found reqsInFlight = " + reqsInFlight +
", expected bytesInFlight = 0 but found bytesInFlight = " + bytesInFlight)
// Send out initial requests for blocks, up to our maxBytesInFlight
fetchUpToMaxBytes()
val numFetches = remoteRequests.size - fetchRequests.size
logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))
// Get Local Blocks
fetchLocalBlocks()
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime))
}
- splitLocalRemoteBlocks, 根據(jù)executorId區(qū)分出在本地的的block和遠(yuǎn)程的block,然后構(gòu)建出FetchRequest(每一個(gè)request可能包含多個(gè)block,但是block都是屬于一個(gè)executor)。
- fetchUpToMaxBytes和fetchLocalBlocks,從本地或者遠(yuǎn)程datablock,數(shù)據(jù)放在buffer中,包裝好buffer放到其成員results(一個(gè)阻塞隊(duì)列)中。
作為iterator,它的next方法每次從results中取出一個(gè),從數(shù)據(jù)buffer中創(chuàng)建出InputStream,使用wrapStream包裝InputStream返回。
3.1.2 aggregatedIter
用來(lái)將上游各個(gè)partition中的數(shù)據(jù)在reducer再聚合的,
調(diào)用dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)創(chuàng)建aggregatedIter,下面是combineCombinersByKey方法:
def combineCombinersByKey(
iter: Iterator[_ <: Product2[K, C]],
context: TaskContext): Iterator[(K, C)] = {
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
combiners.insertAll(iter)
updateMetrics(context, combiners)
combiners.iterator
}
調(diào)用ExternalAppendOnlyMap#insertAll將輸入數(shù)據(jù),這個(gè)類和PartitionedAppendOnlyMap原理十分類似,實(shí)際上它內(nèi)部使用
@volatile private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
這個(gè)成員來(lái)緩存數(shù)據(jù),插入數(shù)據(jù)同時(shí)會(huì)合并key相同的value,在內(nèi)存不夠時(shí),會(huì)保存到磁盤(pán)上,返回的iterator則會(huì)迭代磁盤(pán)中的文件合并的結(jié)果,可以參考附錄4.2節(jié)。
關(guān)于ExternalAppendOnlyMap#iterator的介紹見(jiàn)附錄4.3 ExternalAppendOnlyMap
4. 附錄
4.1 PartitionedPairBuffer
數(shù)據(jù)存放格式
2.2.2節(jié)中說(shuō)到當(dāng)不在Map 端做聚合時(shí),ExternalSorter使用buffer作為內(nèi)存緩存數(shù)據(jù)時(shí)的數(shù)據(jù)結(jié)構(gòu),調(diào)用buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])插入數(shù)據(jù)記錄。插入數(shù)據(jù)時(shí)將(key,value)轉(zhuǎn)換成((partition-id,key), value)的形式插入。
下面PartitionedPairBuffer的核心屬性:
private var capacity = initialCapacity
private var curSize = 0
private var data = new Array[AnyRef](2 * initialCapacity)
- data是一個(gè)數(shù)據(jù),就是PartitionedPairBuffer底層用來(lái)存儲(chǔ)數(shù)據(jù),其初始長(zhǎng)度是0。
下面是PartitionedPairBuffer的insert方法
def insert(partition: Int, key: K, value: V): Unit = {
if (curSize == capacity) {
growArray()
}
data(2 * curSize) = (partition, key.asInstanceOf[AnyRef])
data(2 * curSize + 1) = value.asInstanceOf[AnyRef]
curSize += 1
afterUpdate()
}
依次插入key,和value。因此PartitionedPairBuffer中數(shù)據(jù)排列的方式
_______________________________________________________
| key1 | value1 | key2 | value2 | ... | keyN | valueN |
_______________________________________________________
數(shù)據(jù)是連續(xù)分布的。
數(shù)據(jù)排序
ExternalSorter使用buffer的size達(dá)到一定大小后會(huì)將buffer中數(shù)據(jù)spill到磁盤(pán),在此之前需要對(duì)亂序的data數(shù)據(jù)排序。
PartitionedPairBuffer#partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
方法對(duì)data數(shù)據(jù)中的數(shù)據(jù)進(jìn)行排序,按照key排序,參數(shù)keyComparator定義key的比較方式。
在ExternalSorter中,data數(shù)組中key是(partition-id,key)。keyComparator取partition-id比較大小排序。這樣就保證相同的partition-id連續(xù)分布在寫(xiě)到磁盤(pán)中的文件中。
排序所用的算法為timsort(優(yōu)化后的歸并排序),參考timsort wiki
4.2 PartitionedAppendOnlyMap
2.2.3 節(jié)中介紹當(dāng)shuffle write對(duì)寫(xiě)出的數(shù)據(jù)做map端聚合時(shí),用來(lái)做內(nèi)存緩存數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)式map。
數(shù)據(jù)存放格式
PartitionedAppendOnlyMap類有如下繼承關(guān)系:
AppendOnlyMap
^
|
SizeTrackingAppendOnlyMap WritablePartitionedPairCollection
^ ^
| |
_______________________________
^
|
PartitionedAppendOnlyMap
2.2.3節(jié)中ExternalSorter向map中插入數(shù)據(jù)的代碼如下:
insertAll(...){
...
if (shouldCombine) {
// Combine values in-memory first using our AppendOnlyMap
val mergeValue = aggregator.get.mergeValue
val createCombiner = aggregator.get.createCombiner
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
}
while (records.hasNext) {
addElementsRead()
kv = records.next()
map.changeValue((getPartition(kv._1), kv._1), update)
maybeSpillCollection(usingMap = true)
}
}
...
}
mergeValue,createCombiner即定義在Aggregator中合并value的函數(shù)。
調(diào)用的map.changeValue插入數(shù)據(jù),這個(gè)方法還傳入的參數(shù)update函數(shù),
map調(diào)用changeValue插入數(shù)據(jù)時(shí),會(huì)首先調(diào)用update,update做如下判斷:
1. 若key之前已經(jīng)在map中(hadValue=true),調(diào)用mergeValue合并key相同的value
2. key不存在(hadValue=false),轉(zhuǎn)換value。
所以綜上所述,在map端按key聚合就是在插入數(shù)據(jù)的過(guò)程的完成的。
調(diào)用PartitionedAppednOnlyMap#insert(),會(huì)有下面調(diào)用鏈:
PartitionedAppendOnlyMap#changeValue(key,value)
-> SizeTrackingAppendOnlyMap#changeValue( (partition-id,key), value) 和buffer插入一樣,將key轉(zhuǎn)換(partition-id,key)
->AppendOnlyMap#changeValue( (partition-id,key),value )
底層數(shù)據(jù)結(jié)構(gòu)在AppendOnlyMap中,AppendOnlyMap有如下屬性:
private var data = new Array[AnyRef](2 * capacity)
底層存儲(chǔ)數(shù)據(jù)依然使用data數(shù)組。
下面是AppendOnlyMap#changeValue方法:
def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
assert(!destroyed, destructionMessage)
val k = key.asInstanceOf[AnyRef]
if (k.eq(null)) {
if (!haveNullValue) {
incrementSize()
}
nullValue = updateFunc(haveNullValue, nullValue)
haveNullValue = true
return nullValue
}
var pos = rehash(k.hashCode) & mask
var i = 1
while (true) {
val curKey = data(2 * pos)
if (curKey.eq(null)) {
// curKey是null,表示沒(méi)有插入過(guò)相同的key,不需要合并
// updateFunc就是上面提到的update,合并value的
val newValue = updateFunc(false, null.asInstanceOf[V])
data(2 * pos) = k
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
incrementSize()
return newValue
} else if (k.eq(curKey) || k.equals(curKey)) {
// curKey不是null,表示有插入過(guò)相同的key,需要合并
// updateFunc就是上面提到的update,合并value的
val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
return newValue
} else {
val delta = i
pos = (pos + delta) & mask
i += 1
}
}
null.asInstanceOf[V] // Never reached but needed to keep compiler happy
}
上面代碼中對(duì)于待插入的(key,value),不像buffer中那樣直接放在數(shù)據(jù)尾部,而是調(diào)用pos = rehash(...)確定插入的位置,因此其底層的數(shù)據(jù)可能是下面這樣的:
______________________________________________________________________
| key1 | value1 | | | key2 | value2 | ... | keyN | valueN | |
______________________________________________________________________
使用hash的方式確定位置,意味著數(shù)據(jù)不是連續(xù)的,存在空槽。
數(shù)據(jù)排序
和buffer排序有點(diǎn)區(qū)別,buffer由于數(shù)據(jù)是連續(xù)分布,沒(méi)有空槽,timsort可以直接在數(shù)組上排序。但是map由于空槽的存在,需要先將數(shù)據(jù)聚攏在一起,然后使用和buffer一樣的排序。
4.3 ExternalAppendOnlyMap
它有如下核心成員:
@volatile private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
private val spilledMaps = new ArrayBuffer[DiskMapIterator]
- currentMap是其內(nèi)部用來(lái)緩存數(shù)據(jù)
- spilledMaps,currentMap的size達(dá)到一定大小之后,會(huì)將數(shù)據(jù)寫(xiě)到磁盤(pán),這個(gè)里面保存了用來(lái)迭代返回磁盤(pán)文件中(key,value)。
這里主要介紹ExternalAppendOnlyMap#iterator。下面是iterator方法:
override def iterator: Iterator[(K, C)] = {
if (currentMap == null) {
throw new IllegalStateException(
"ExternalAppendOnlyMap.iterator is destructive and should only be called once.")
}
if (spilledMaps.isEmpty) {
CompletionIterator[(K, C), Iterator[(K, C)]](
destructiveIterator(currentMap.iterator), freeCurrentMap())
} else {
new ExternalIterator()
}
}
- spilledMap.isEmpty表示內(nèi)存夠用,沒(méi)有spill到磁盤(pán),這個(gè)時(shí)候比較好辦不需要再將磁盤(pán)文件合并的,直接在底層存儲(chǔ)結(jié)構(gòu)currentMap上迭代就行了。
- 否則,需要合并磁盤(pán)文件,創(chuàng)建ExternalIterator用來(lái)合并文件。
ExternalIterator
對(duì)spill到磁盤(pán)文件做外部歸并的。
它有如下成員:
private val mergeHeap = new mutable.PriorityQueue[StreamBuffer]
// Input streams are derived both from the in-memory map and spilled maps on disk
// The in-memory map is sorted in place, while the spilled maps are already in sorted order
private val sortedMap = CompletionIterator[(K, C), Iterator[(K, C)]](destructiveIterator(
currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap())
private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => it.buffered)
- inputStreams, sortedMap是當(dāng)前內(nèi)存currentMap的迭代器,spilledMaps是磁盤(pán)文件的迭代器,將這些迭代器轉(zhuǎn)換成BufferedIterator(可以預(yù)讀下一個(gè)數(shù)據(jù),而移動(dòng)迭代器)。
- mergeHeap,小根堆。
ExternalIterator實(shí)例化話調(diào)用如下方法:
inputStreams.foreach { it =>
val kcPairs = new ArrayBuffer[(K, C)]
readNextHashCode(it, kcPairs)
if (kcPairs.length > 0) {
mergeHeap.enqueue(new StreamBuffer(it, kcPairs))
}
- readNextHashCode,連續(xù)讀區(qū)it迭代器中相同的key的所有記錄,碰到不同key時(shí)停止
- mergeHeap.enqueue,將1中的所有(key,value)包裝方入小根堆中。StreamBuffer重寫(xiě)了comparaTo方法,按照key的hash值排序。key小的就在堆頂端。
- foreach,對(duì)每一個(gè)待歸并的文件,每次取出其靠前的key相同的連續(xù)記錄,放到小根堆
接下來(lái)是其next方法:
override def next(): (K, C) = {
if (mergeHeap.isEmpty) {
throw new NoSuchElementException
}
// Select a key from the StreamBuffer that holds the lowest key hash
//從堆中取出key hash最小的(key, value)序列
val minBuffer = mergeHeap.dequeue()
val minPairs = minBuffer.pairs
val minHash = minBuffer.minKeyHash
// 從(key,value)序列中取下第一個(gè)(key,value)記錄
val minPair = removeFromBuffer(minPairs, 0)
val minKey = minPair._1
var minCombiner = minPair._2
assert(hashKey(minPair) == minHash)
val mergedBuffers = ArrayBuffer[StreamBuffer](minBuffer)
// 判斷堆中當(dāng)前key hash最小的和剛剛?cè)〕鰜?lái)的第一個(gè)記錄hash是
//不是一樣,是一樣則有可能是同一個(gè)key,但也可能不是同一個(gè)
// key,因?yàn)樵趇nputStreams.foreach中是使用hashcode判斷key
// 相等的,和reducer端則是使用==判斷。
while (mergeHeap.nonEmpty && mergeHeap.head.minKeyHash == minHash) {
val newBuffer = mergeHeap.dequeue()
// 可能需要合并,newBuffer中存放的是key的hashCode相等
// 的序列,但是key1==minKey不一定成立,所以可能只會(huì)合并
// minBuffer和newBuffer中的一部分?jǐn)?shù)據(jù)
minCombiner = mergeIfKeyExists(minKey, minCombiner, newBuffer)
mergedBuffers += newBuffer
}
// 前面說(shuō)到buffer中數(shù)據(jù)可能只會(huì)有一部分合并,對(duì)于沒(méi)有合并的
// 還需要重新添加到堆中,等待下一輪合并
mergedBuffers.foreach { buffer =>
if (buffer.isEmpty) {
// minBuffer和newBuffer全部合并了,那可以從迭代器中讀區(qū)下.
// 一批key的hashcode一樣的連續(xù)記錄了
readNextHashCode(buffer.iterator, buffer.pairs)
}
if (!buffer.isEmpty) {
mergeHeap.enqueue(buffer)
}
}
(minKey, minCombiner)
}