Spark Shuffle(ExternalSorter)

1、Shuffle流程

spark的shuffle過(guò)程如下圖所示,和mapreduce中的類(lèi)似,但在spark2.0及之后的版本中只存在SortShuffleManager而將原來(lái)的HashShuffleManager廢棄掉(但是shuffleWriter的子類(lèi)BypassMergeSortShuffleWriter和已經(jīng)被廢棄掉的HashShuffleWriter類(lèi)似)。這樣,每個(gè)mapTask在shuffle的sort階段只會(huì)生成一個(gè)結(jié)果文件,單個(gè)文件按照partitionId分成多個(gè)region。reducer階段根據(jù)partitionId來(lái)fetch對(duì)應(yīng)的region數(shù)據(jù)。
整個(gè)shuffle過(guò)程分為兩個(gè)階段,write(核心)和read階段,其中write階段比較重要的實(shí)現(xiàn)類(lèi)為ExternalSorter(后面會(huì)重點(diǎn)分析該類(lèi))。

shuffle

2、Shuffle Write

  • BypassMergeSortShuffleWriter -
    這種方式是對(duì)partition(對(duì)應(yīng)的reduce)數(shù)量較少且不需要map-side aggregation的shuffle優(yōu)化,將每個(gè)partition的數(shù)據(jù)直接寫(xiě)到對(duì)應(yīng)的文件,在所有數(shù)據(jù)都寫(xiě)入完成后進(jìn)行一次合并,下面是部分代碼:
[BypassMergeSortShuffleWriter]->write
public void write(Iterator<Product2<K, V>> records) throws IOException {

                                    ...

    partitionWriters = new DiskBlockObjectWriter[numPartitions];
    /**
      為每個(gè)partition創(chuàng)建一個(gè)DiskWriter用于寫(xiě)臨時(shí)文件
    **/
    for (int i = 0; i < numPartitions; i++) {
      final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
        blockManager.diskBlockManager().createTempShuffleBlock();
      final File file = tempShuffleBlockIdPlusFile._2();
      final BlockId blockId = tempShuffleBlockIdPlusFile._1();
      partitionWriters[i] =
        blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
    }
                        ...
    /**
      對(duì)每個(gè)record用對(duì)應(yīng)的writer進(jìn)行文件寫(xiě)入操作
    **/
    while (records.hasNext()) {
      final Product2<K, V> record = records.next();
      final K key = record._1();
      partitionWriters[partitioner.getPartition(key)].write(key, record._2());
    }
    //flush
    for (DiskBlockObjectWriter writer : partitionWriters) {
      writer.commitAndClose();
    }
    /**
        構(gòu)造最終的輸出文件實(shí)例,其中文件名為(reduceId為0):
        "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
         文件所在的local文件夾是根據(jù)該文件名的hash值確定。
        1、如果運(yùn)行在yarn上,yarn在啟動(dòng)的時(shí)候會(huì)根據(jù)配置項(xiàng)'LOCAL_DIRS'在本地創(chuàng)建
        文件夾
    **/
    File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    //在實(shí)際結(jié)果文件名后加上uuid用于標(biāo)識(shí)文件正在寫(xiě)入,結(jié)束后重命名
    File tmp = Utils.tempFileWith(output);
    try {
      //合并每個(gè)partition對(duì)應(yīng)的文件到一個(gè)文件中
      partitionLengths = writePartitionedFile(tmp);
      //將每個(gè)partition的offset寫(xiě)入index文件方便reduce端fetch數(shù)據(jù)
      shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
    } finally {
      if (tmp.exists() && !tmp.delete()) {
        logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
      }
    }
    mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), 
partitionLengths);
  }
  • UnsafeShuffleWriter(詳見(jiàn)project tungsten)

該writer可將數(shù)據(jù)序列化后寫(xiě)入到堆外內(nèi)存,只需要按照partitionid對(duì)地址進(jìn)行排序,整個(gè)過(guò)程不涉及反序列化。
條件
1、使用的序列化類(lèi)需要支持object relocation.目前只能使用kryoSerializer
2、不需要map side aggregate即不能定義aggregator
3、partition數(shù)量不能大于支持的上限(2^24)
內(nèi)存模型:
每條數(shù)據(jù)地址由一個(gè)64位的指針確定,其構(gòu)成為:[24 bit partition number][13 bit memory page number][27 bit offset in page]
在內(nèi)存為非8字節(jié)對(duì)齊的情況下,每個(gè)page的容量為227bits=128Mb,page總數(shù)為213,因此每個(gè)task可操作內(nèi)存總量為:227*213bits=1Tb,在內(nèi)存按字節(jié)對(duì)齊的情況下允許每個(gè)page的size有1g(即128*8,實(shí)際64位系統(tǒng)的內(nèi)存都是8字節(jié)對(duì)齊的)的容量,數(shù)據(jù)存放在off heap上。在地址中加入partitionID 是為了排序階段只需要對(duì)record的地址排序。

數(shù)據(jù)存儲(chǔ)格式:

4、Shuffle過(guò)程中涉及到的幾個(gè)參數(shù)

  • spark.shuffle.sort.bypassMergeThreshold
    當(dāng)partition的數(shù)量小于該值并且不需要進(jìn)行map-side aggregation時(shí)使用BypassMergeSortShuffleWriter來(lái)進(jìn)行shuffle的write操作,默認(rèn)值為200.
    [SortShuffleWriter]->shouldBypassMergeSort
  def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
    if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      false
    } else {
      val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
      dep.partitioner.numPartitions <= bypassMergeThreshold
    }
}```
- *spark.shuffle.compress*、*spark.shuffle.file.buffer*
**[DiskBlockObjectWriter]->open**

def open(): DiskBlockObjectWriter = {
...
/**
'spark.shuffle.compress'-該參數(shù)決定是否對(duì)寫(xiě)入文件的序列化數(shù)據(jù)進(jìn)行壓縮。
'spark.shuffle.file.buffer'-設(shè)置buffer stream的buffersize,每writey
一個(gè)byte時(shí)會(huì)檢查當(dāng)前buffer容量,容量滿的時(shí)候則會(huì)flush到磁盤(pán)。該參數(shù)值在代碼中
會(huì)乘以1024轉(zhuǎn)換為字節(jié)長(zhǎng)度。默認(rèn)值為'32k',該值太大可能導(dǎo)致內(nèi)存溢出。
**/
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
...
}```

  • spark.file.transferTo
    決定在使用BypassMergeWriter過(guò)程中,最后對(duì)文件進(jìn)行合并時(shí)是否使用NIO方式進(jìn)行file stream的copy。默認(rèn)為true,在為false的情況下合并文件效率比較低(創(chuàng)建一個(gè)大小為8192的字節(jié)數(shù)組作為buffer,從in stream中讀滿后寫(xiě)入out stream,單線程讀寫(xiě)),版本號(hào)為2.6.32的linux內(nèi)核在使用NIO方式會(huì)產(chǎn)生bug,需要將該參數(shù)設(shè)置為false。

  • spark.shuffle.spill.numElementsForceSpillThreshold
    在使用UnsafeShuffleWriter時(shí),如果內(nèi)存中的數(shù)據(jù)超過(guò)這個(gè)值則對(duì)當(dāng)前內(nèi)存數(shù)據(jù)進(jìn)行排序并寫(xiě)入磁盤(pán)臨時(shí)文件。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容