Hadoop5-Mapreduce shuffle及優(yōu)化

Hadoop-Mapreduce shuffle及優(yōu)化

轉(zhuǎn)載

MapReduce簡介

在Hadoop MapReduce中,框架會確保reduce收到的輸入數(shù)據(jù)是根據(jù)key排序過的。數(shù)據(jù)從Mapper輸出到Reducer接收,是一個很復(fù)雜的過程,框架處理了所有問題,并提供了很多配置項(xiàng)及擴(kuò)展點(diǎn)。一個MapReduce的大致數(shù)據(jù)流如下圖:

這里寫圖片描述

更詳細(xì)的MapReduce介紹參考Hadoop MapReduce原理與實(shí)例

Mapper的輸出排序、然后傳送到Reducer的過程,稱為shuffle。本文詳細(xì)地解析shuffle過程,深入理解這個過程對于MapReduce調(diào)優(yōu)至關(guān)重要,某種程度上說,shuffle過程是MapReduce的核心內(nèi)容。

Mapper端

當(dāng)map函數(shù)通過context.write()開始輸出數(shù)據(jù)時(shí),不是單純地將數(shù)據(jù)寫入到磁盤。為了性能,map輸出的數(shù)據(jù)會寫入到緩沖區(qū),并進(jìn)行預(yù)排序的一些工作,整個過程如下圖:

這里寫圖片描述

環(huán)形Buffer數(shù)據(jù)結(jié)構(gòu)

每一個map任務(wù)有一個環(huán)形Buffer,map將輸出寫入到這個Buffer。環(huán)形Buffer是內(nèi)存中的一種首尾相連的數(shù)據(jù)結(jié)構(gòu),專門用來存儲Key-Value格式的數(shù)據(jù):

這里寫圖片描述

Hadoop中,環(huán)形緩沖其實(shí)就是一個字節(jié)數(shù)組:

// MapTask.java
private byte[] kvbuffer;  // main output buffer

kvbuffer = new byte[maxMemUsage - recordCapacity]; 1234

kvbuffer包含數(shù)據(jù)區(qū)和索引區(qū),這兩個區(qū)是相鄰不重疊的區(qū)域,用一個分界點(diǎn)來標(biāo)識。分界點(diǎn)不是永恒不變的,每次Spill之后都會更新一次。初始分界點(diǎn)為0,數(shù)據(jù)存儲方向?yàn)橄蛏显鲩L,索引存儲方向向下:

這里寫圖片描述

bufferindex一直往上增長,例如最初為0,寫入一個int類型的key之后變?yōu)?,寫入一個int類型的value之后變成8。

索引是對key-value在kvbuffer中的索引,是個四元組,占用四個Int長度,包括:

  • value的起始位置
  • key的起始位置
  • partition值
  • value的長度
private static final int VALSTART = 0;    // val offset in acct
private static final int KEYSTART = 1;    // key offset in acct
private static final int PARTITION = 2;   // partition offset in acct
private static final int VALLEN = 3;      // length of value
private static final int NMETA = 4;       // num meta ints
private static final int METASIZE = NMETA * 4; // size in bytes
 // write accounting info
kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));1234567891011

kvmeta的存放指針kvindex每次都是向下跳四個“格子”,然后再向上一個格子一個格子地填充四元組的數(shù)據(jù)。比如kvindex初始位置是-4,當(dāng)?shù)谝粋€key-value寫完之后,(kvindex+0)的位置存放value的起始位置、(kvindex+1)的位置存放key的起始位置、(kvindex+2)的位置存放partition的值、(kvindex+3)的位置存放value的長度,然后kvindex跳到-8位置。

緩沖區(qū)的大小默認(rèn)為100M,但是可以通過mapreduce.task.io.sort.mb這個屬性來配置。

Spill

map將輸出不斷寫入到這個緩沖區(qū)中,當(dāng)緩沖區(qū)使用量達(dá)到一定比例之后,一個后臺線程開始把緩沖區(qū)的數(shù)據(jù)寫入磁盤,這個寫入的過程叫spill。開始spill的Buffer比例默認(rèn)為0.80,可以通過mapreduce.map.sort.spill.percent配置。在后臺線程寫入的同時(shí),map繼續(xù)將輸出寫入這個環(huán)形緩沖,如果緩沖池寫滿了,map會阻塞直到spill過程完成,而不會覆蓋緩沖池中的已有的數(shù)據(jù)。

在寫入之前,后臺線程把數(shù)據(jù)按照他們將送往的reducer進(jìn)行劃分,通過調(diào)用PartitionergetPartition()方法就能知道該輸出要送往哪個Reducer。默認(rèn)的Partitioner使用Hash算法來分區(qū),即通過key.hashCode() mode R來計(jì)算,R為Reducer的個數(shù)。getPartition返回Partition事實(shí)上是個整數(shù),例如有10個Reducer,則返回0-9的整數(shù),每個Reducer會對應(yīng)到一個Partition。map輸出的鍵值對,與partition一起存在緩沖中(即前面提到的kvmeta中)。假設(shè)作業(yè)有2個reduce任務(wù),則數(shù)據(jù)在內(nèi)存中被劃分為reduce1和reduce2:

這里寫圖片描述

并且針對每部分?jǐn)?shù)據(jù),使用快速排序算法(QuickSort)對key排序。

如果設(shè)置了Combiner,則在排序的結(jié)果上運(yùn)行combine。

排序后的數(shù)據(jù)被寫入到mapreduce.cluster.local.dir配置的目錄中的其中一個,使用round robin fashion的方式輪流。注意寫入的是本地文件目錄,而不是HDFS。Spill文件名像sipll0.out,spill1.out等。

不同Partition的數(shù)據(jù)都放在同一個文件,通過索引來區(qū)分partition的邊界和起始位置。索引是一個三元組結(jié)構(gòu),包括起始位置、數(shù)據(jù)長度、壓縮后的數(shù)據(jù)長度,對應(yīng)IndexRecord類:

public class IndexRecord {
  public long startOffset;
  public long rawLength;
  public long partLength;

  public IndexRecord() { }

  public IndexRecord(long startOffset, long rawLength, long partLength) {
    this.startOffset = startOffset;
    this.rawLength = rawLength;
    this.partLength = partLength;
  }
}12345678910111213

每個mapper也有對應(yīng)的一個索引環(huán)形Buffer,默認(rèn)為1KB,可以通過mapreduce.task.index.cache.limit.bytes來配置,索引如果足夠小則存在內(nèi)存中,如果內(nèi)存放不下,需要寫入磁盤。
Spill文件索引名稱類似這樣 spill110.out.index, spill111.out.index。

Spill文件的索引事實(shí)上是 org.apache.hadoop.mapred.SpillRecord的一個數(shù)組,每個Map任務(wù)(源碼中的MapTask.java類)維護(hù)一個這樣的列表:

final ArrayList<SpillRecord> indexCacheList = new ArrayList<SpillRecord>();1

創(chuàng)建一個SpillRecord時(shí),會分配(Number_Of_Reducers * 24)Bytes緩沖:

public SpillRecord(int numPartitions) {
  buf = ByteBuffer.allocate(
      numPartitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
  entries = buf.asLongBuffer();
}12345

numPartitions是Partition的個數(shù),其實(shí)也就是Reducer的個數(shù):

public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;

// ---

partitions = jobContext.getNumReduceTasks();
final SpillRecord spillRec = new SpillRecord(partitions);123456

默認(rèn)的索引緩沖為1KB,即10241024 Bytes,假設(shè)有2個Reducer,則每個Spill文件的索引大小為224=48 Bytes,當(dāng)Spill文件超過21845.3時(shí),索引文件就需要寫入磁盤。

索引及spill文件如下圖示意:

這里寫圖片描述

Spill的過程至少需要運(yùn)行一次,因?yàn)镸apper的輸出結(jié)果必須要寫入磁盤,供Reducer進(jìn)一步處理。

合并Spill文件

在整個map任務(wù)中,一旦緩沖達(dá)到設(shè)定的閾值,就會觸發(fā)spill操作,寫入spill文件到磁盤,因此最后可能有多個spill文件。在map任務(wù)結(jié)束之前,這些文件會根據(jù)情況合并到一個大的分區(qū)的、排序的文件中,排序是在內(nèi)存排序的基礎(chǔ)上進(jìn)行全局排序。下圖是合并過程的簡單示意:

這里寫圖片描述

相對應(yīng)的索引文件也會被合并,以便在Reducer請求對應(yīng)Partition的數(shù)據(jù)的時(shí)候能夠快速讀取。

另外,如果spill文件數(shù)量大于mapreduce.map.combiner.minspills配置的數(shù),則在合并文件寫入之前,會再次運(yùn)行combiner。如果spill文件數(shù)量太少,運(yùn)行combiner的收益可能小于調(diào)用的代價(jià)。

mapreduce.task.io.sort.factor屬性配置每次最多合并多少個文件,默認(rèn)為10,即一次最多合并10個spill文件。最后,多輪合并之后,所有的輸出文件被合并為唯一一個大文件,以及相應(yīng)的索引文件(可能只在內(nèi)存中存在)。

壓縮

在數(shù)據(jù)量大的時(shí)候,對map輸出進(jìn)行壓縮通常是個好主意。要啟用壓縮,將mapreduce.map.output.compress設(shè)為true,并使用mapreduce.map.output.compress.codec設(shè)置使用的壓縮算法。

通過HTTP暴露輸出結(jié)果

map輸出數(shù)據(jù)完成之后,通過運(yùn)行一個HTTP Server暴露出來,供reduce端獲取。用來相應(yīng)reduce數(shù)據(jù)請求的線程數(shù)量可以配置,默認(rèn)情況下為機(jī)器內(nèi)核數(shù)量的兩倍,如需自己配置,通過mapreduce.shuffle.max.threads屬性來配置,注意該配置是針對NodeManager配置的,而不是每個作業(yè)配置。

同時(shí),Map任務(wù)完成后,也會通知Application Master,以便Reducer能夠及時(shí)來拉取數(shù)據(jù)。

通過緩沖、劃分(partition)、排序、combiner、合并、壓縮等過程之后,map端的工作就算完畢:

這里寫圖片描述

Reducer端

各個map任務(wù)運(yùn)行完之后,輸出寫入運(yùn)行任務(wù)的機(jī)器磁盤中。Reducer需要從各map任務(wù)中提取自己的那一部分?jǐn)?shù)據(jù)(對應(yīng)的partition)。每個map任務(wù)的完成時(shí)間可能是不一樣的,reduce任務(wù)在map任務(wù)結(jié)束之后會盡快取走輸出結(jié)果,這個階段叫copy。
Reducer是如何知道要去哪些機(jī)器去數(shù)據(jù)呢?一旦map任務(wù)完成之后,就會通過常規(guī)心跳通知應(yīng)用程序的Application Master。reduce的一個線程會周期性地向master詢問,直到提取完所有數(shù)據(jù)(如何知道提取完?)。

數(shù)據(jù)被reduce提走之后,map機(jī)器不會立刻刪除數(shù)據(jù),這是為了預(yù)防reduce任務(wù)失敗需要重做。因此map輸出數(shù)據(jù)是在整個作業(yè)完成之后才被刪除掉的。

reduce維護(hù)幾個copier線程,并行地從map任務(wù)機(jī)器提取數(shù)據(jù)。默認(rèn)情況下有5個copy線程,可以通過mapreduce.reduce.shuffle.parallelcopies配置。

如果map輸出的數(shù)據(jù)足夠小,則會被拷貝到reduce任務(wù)的JVM內(nèi)存中。mapreduce.reduce.shuffle.input.buffer.percent配置JVM堆內(nèi)存的多少比例可以用于存放map任務(wù)的輸出結(jié)果。如果數(shù)據(jù)太大容不下,則被拷貝到reduce的機(jī)器磁盤上。

內(nèi)存中合并

當(dāng)緩沖中數(shù)據(jù)達(dá)到配置的閾值時(shí),這些數(shù)據(jù)在內(nèi)存中被合并、寫入機(jī)器磁盤。閾值有2種配置方式:

  • 配置內(nèi)存比例: 前面提到reduce JVM堆內(nèi)存的一部分用于存放來自map任務(wù)的輸入,在這基礎(chǔ)之上配置一個開始合并數(shù)據(jù)的比例。假設(shè)用于存放map輸出的內(nèi)存為500M,mapreduce.reduce.shuffle.merger.percent配置為0.80,則當(dāng)內(nèi)存中的數(shù)據(jù)達(dá)到400M的時(shí)候,會觸發(fā)合并寫入。
  • 配置map輸出數(shù)量: 通過mapreduce.reduce.merge.inmem.threshold配置。

在合并的過程中,會對被合并的文件做全局的排序。如果作業(yè)配置了Combiner,則會運(yùn)行combine函數(shù),減少寫入磁盤的數(shù)據(jù)量。

Copy過程中磁盤合并

在copy過來的數(shù)據(jù)不斷寫入磁盤的過程中,一個后臺線程會把這些文件合并為更大的、有序的文件。如果map的輸出結(jié)果進(jìn)行了壓縮,則在合并過程中,需要在內(nèi)存中解壓后才能給進(jìn)行合并。這里的合并只是為了減少最終合并的工作量,也就是在map輸出還在拷貝時(shí),就開始進(jìn)行一部分合并工作。合并的過程一樣會進(jìn)行全局排序。

最終磁盤中合并

當(dāng)所有map輸出都拷貝完畢之后,所有數(shù)據(jù)被最后合并成一個排序的文件,作為reduce任務(wù)的輸入。這個合并過程是一輪一輪進(jìn)行的,最后一輪的合并結(jié)果直接推送給reduce作為輸入,節(jié)省了磁盤操作的一個來回。最后(所以map輸出都拷貝到reduce之后)進(jìn)行合并的map輸出可能來自合并后寫入磁盤的文件,也可能來及內(nèi)存緩沖,在最后寫入內(nèi)存的map輸出可能沒有達(dá)到閾值觸發(fā)合并,所以還留在內(nèi)存中。

每一輪合并并不一定合并平均數(shù)量的文件數(shù),指導(dǎo)原則是使用整個合并過程中寫入磁盤的數(shù)據(jù)量最小,為了達(dá)到這個目的,則需要最終的一輪合并中合并盡可能多的數(shù)據(jù),因?yàn)樽詈笠惠喌臄?shù)據(jù)直接作為reduce的輸入,無需寫入磁盤再讀出。因此我們讓最終的一輪合并的文件數(shù)達(dá)到最大,即合并因子的值,通過mapreduce.task.io.sort.factor來配置。

假設(shè)現(xiàn)在有50個map輸出文件,合并因子配置為10,則需要5輪的合并。最終的一輪確保合并10個文件,其中包括4個來自前4輪的合并結(jié)果,因此原始的50個中,再留出6個給最終一輪。所以最后的5輪合并可能情況如下:

這里寫圖片描述

前4輪合并后的數(shù)據(jù)都是寫入到磁盤中的,注意到最后的2格顏色不一樣,是為了標(biāo)明這些數(shù)據(jù)可能直接來自于內(nèi)存。

MemToMem合并

除了內(nèi)存中合并和磁盤中合并外,Hadoop還定義了一種MemToMem合并,這種合并將內(nèi)存中的map輸出合并,然后再寫入內(nèi)存。這種合并默認(rèn)關(guān)閉,可以通過reduce.merge.memtomem.enabled打開,當(dāng)map輸出文件達(dá)到reduce.merge.memtomem.threshold時(shí),觸發(fā)這種合并。

最后一次合并后傳遞給reduce方法

合并后的文件作為輸入傳遞給Reducer,Reducer針對每個key及其排序的數(shù)據(jù)調(diào)用reduce函數(shù)。產(chǎn)生的reduce輸出一般寫入到HDFS,reduce輸出的文件第一個副本寫入到當(dāng)前運(yùn)行reduce的機(jī)器,其他副本選址原則按照常規(guī)的HDFS數(shù)據(jù)寫入原則來進(jìn)行,詳細(xì)信息請參考這里。

通過從map機(jī)器提取結(jié)果,合并,combine之后,傳遞給reduce完成最后工作,整個過程也就差不多完成。最后再感受一下下面這張圖:

這里寫圖片描述

性能調(diào)優(yōu)

如果能夠根據(jù)情況對shuffle過程進(jìn)行調(diào)優(yōu),對于提供MapReduce性能很有幫助。相關(guān)的參數(shù)配置列在后面的表格中。

一個通用的原則是給shuffle過程分配盡可能大的內(nèi)存,當(dāng)然你需要確保map和reduce有足夠的內(nèi)存來運(yùn)行業(yè)務(wù)邏輯。因此在實(shí)現(xiàn)Mapper和Reducer時(shí),應(yīng)該盡量減少內(nèi)存的使用,例如避免在Map中不斷地疊加。

運(yùn)行map和reduce任務(wù)的JVM,內(nèi)存通過mapred.child.java.opts屬性來設(shè)置,盡可能設(shè)大內(nèi)存。容器的內(nèi)存大小通過mapreduce.map.memory.mbmapreduce.reduce.memory.mb來設(shè)置,默認(rèn)都是1024M。

map優(yōu)化

在map端,避免寫入多個spill文件可能達(dá)到最好的性能,一個spill文件是最好的。通過估計(jì)map的輸出大小,設(shè)置合理的mapreduce.task.io.sort.*屬性,使得spill文件數(shù)量最小。例如盡可能調(diào)大mapreduce.task.io.sort.mb。

map端相關(guān)的屬性如下表:

屬性名 值類型 默認(rèn)值 說明
mapreduce.task.io.sort.mb int 100 用于map輸出排序的內(nèi)存大小
mapreduce.map.sort.spill.percent float 0.80 開始spill的緩沖池閾值
mapreduce.task.io.sort.factor int 10 合并文件數(shù)最大值,與reduce共用
mapreduce.map.combine.minspills int 3 運(yùn)行combiner的最低spill文件數(shù)
mapreduce.map.out.compress boolean false 輸出是否壓縮
mapreduce.map.out.compress 類名 DefaultCodec 壓縮算法
mapreduce.shuffle.max.threads int 0 服務(wù)于reduce提取結(jié)果的線程數(shù)量

reduce優(yōu)化

在reduce端,如果能夠讓所有數(shù)據(jù)都保存在內(nèi)存中,可以達(dá)到最佳的性能。通常情況下,內(nèi)存都保留給reduce函數(shù),但是如果reduce函數(shù)對內(nèi)存需求不是很高,將mapreduce.reduce.merge.inmem.threshold(觸發(fā)合并的map輸出文件數(shù))設(shè)為0,mapreduce.reduce.input.buffer.percent(用于保存map輸出文件的堆內(nèi)存比例)設(shè)為1.0,可以達(dá)到很好的性能提升。在2008年的TB級別數(shù)據(jù)排序性能測試中,Hadoop就是通過將reduce的中間數(shù)據(jù)都保存在內(nèi)存中勝利的。

reduce端相關(guān)屬性:

屬性名 值類型 默認(rèn)值 說明
mapreduce.reduce.shuffle.parallelcopies int 5 提取map輸出的copier線程數(shù)
mapreduce.reduce.shuffle.maxfetchfailures int 10 提取map輸出最大嘗試次數(shù),超出后報(bào)錯
mapreduce.task.io.sort.factor int 10 合并文件數(shù)最大值,與map共用
mapreduce.reduce.shuffle.input.buffer.percent float 0.70 copy階段用于保存map輸出的堆內(nèi)存比例
mapreduce.reduce.shuffle.merge.percent float 0.66 開始spill的緩沖池比例閾值
mapreduce.reduce.shuffle.inmem.threshold int 1000 開始spill的map輸出文件數(shù)閾值,小于等于0表示沒有閾值,此時(shí)只由緩沖池比例來控制
mapreduce.reduce.input.buffer.percent float 0.0 reduce函數(shù)開始運(yùn)行時(shí),內(nèi)存中的map輸出所占的堆內(nèi)存比例不得高于這個值,默認(rèn)情況內(nèi)存都用于reduce函數(shù),也就是map輸出都寫入到磁盤

通用優(yōu)化

Hadoop默認(rèn)使用4KB作為緩沖,這個算是很小的,可以通過io.file.buffer.size來調(diào)高緩沖池大小。

參考

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容