Hadoop-Mapreduce shuffle及優(yōu)化
MapReduce簡介
在Hadoop MapReduce中,框架會確保reduce收到的輸入數(shù)據(jù)是根據(jù)key排序過的。數(shù)據(jù)從Mapper輸出到Reducer接收,是一個很復(fù)雜的過程,框架處理了所有問題,并提供了很多配置項(xiàng)及擴(kuò)展點(diǎn)。一個MapReduce的大致數(shù)據(jù)流如下圖:
更詳細(xì)的MapReduce介紹參考Hadoop MapReduce原理與實(shí)例。
Mapper的輸出排序、然后傳送到Reducer的過程,稱為shuffle。本文詳細(xì)地解析shuffle過程,深入理解這個過程對于MapReduce調(diào)優(yōu)至關(guān)重要,某種程度上說,shuffle過程是MapReduce的核心內(nèi)容。
Mapper端
當(dāng)map函數(shù)通過context.write()開始輸出數(shù)據(jù)時(shí),不是單純地將數(shù)據(jù)寫入到磁盤。為了性能,map輸出的數(shù)據(jù)會寫入到緩沖區(qū),并進(jìn)行預(yù)排序的一些工作,整個過程如下圖:
環(huán)形Buffer數(shù)據(jù)結(jié)構(gòu)
每一個map任務(wù)有一個環(huán)形Buffer,map將輸出寫入到這個Buffer。環(huán)形Buffer是內(nèi)存中的一種首尾相連的數(shù)據(jù)結(jié)構(gòu),專門用來存儲Key-Value格式的數(shù)據(jù):
Hadoop中,環(huán)形緩沖其實(shí)就是一個字節(jié)數(shù)組:
// MapTask.java
private byte[] kvbuffer; // main output buffer
kvbuffer = new byte[maxMemUsage - recordCapacity]; 1234
kvbuffer包含數(shù)據(jù)區(qū)和索引區(qū),這兩個區(qū)是相鄰不重疊的區(qū)域,用一個分界點(diǎn)來標(biāo)識。分界點(diǎn)不是永恒不變的,每次Spill之后都會更新一次。初始分界點(diǎn)為0,數(shù)據(jù)存儲方向?yàn)橄蛏显鲩L,索引存儲方向向下:
bufferindex一直往上增長,例如最初為0,寫入一個int類型的key之后變?yōu)?,寫入一個int類型的value之后變成8。
索引是對key-value在kvbuffer中的索引,是個四元組,占用四個Int長度,包括:
- value的起始位置
- key的起始位置
- partition值
- value的長度
private static final int VALSTART = 0; // val offset in acct
private static final int KEYSTART = 1; // key offset in acct
private static final int PARTITION = 2; // partition offset in acct
private static final int VALLEN = 3; // length of value
private static final int NMETA = 4; // num meta ints
private static final int METASIZE = NMETA * 4; // size in bytes
// write accounting info
kvmeta.put(kvindex + PARTITION, partition);
kvmeta.put(kvindex + KEYSTART, keystart);
kvmeta.put(kvindex + VALSTART, valstart);
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));1234567891011
kvmeta的存放指針kvindex每次都是向下跳四個“格子”,然后再向上一個格子一個格子地填充四元組的數(shù)據(jù)。比如kvindex初始位置是-4,當(dāng)?shù)谝粋€key-value寫完之后,(kvindex+0)的位置存放value的起始位置、(kvindex+1)的位置存放key的起始位置、(kvindex+2)的位置存放partition的值、(kvindex+3)的位置存放value的長度,然后kvindex跳到-8位置。
緩沖區(qū)的大小默認(rèn)為100M,但是可以通過mapreduce.task.io.sort.mb這個屬性來配置。
Spill
map將輸出不斷寫入到這個緩沖區(qū)中,當(dāng)緩沖區(qū)使用量達(dá)到一定比例之后,一個后臺線程開始把緩沖區(qū)的數(shù)據(jù)寫入磁盤,這個寫入的過程叫spill。開始spill的Buffer比例默認(rèn)為0.80,可以通過mapreduce.map.sort.spill.percent配置。在后臺線程寫入的同時(shí),map繼續(xù)將輸出寫入這個環(huán)形緩沖,如果緩沖池寫滿了,map會阻塞直到spill過程完成,而不會覆蓋緩沖池中的已有的數(shù)據(jù)。
在寫入之前,后臺線程把數(shù)據(jù)按照他們將送往的reducer進(jìn)行劃分,通過調(diào)用Partitioner的getPartition()方法就能知道該輸出要送往哪個Reducer。默認(rèn)的Partitioner使用Hash算法來分區(qū),即通過key.hashCode() mode R來計(jì)算,R為Reducer的個數(shù)。getPartition返回Partition事實(shí)上是個整數(shù),例如有10個Reducer,則返回0-9的整數(shù),每個Reducer會對應(yīng)到一個Partition。map輸出的鍵值對,與partition一起存在緩沖中(即前面提到的kvmeta中)。假設(shè)作業(yè)有2個reduce任務(wù),則數(shù)據(jù)在內(nèi)存中被劃分為reduce1和reduce2:
并且針對每部分?jǐn)?shù)據(jù),使用快速排序算法(QuickSort)對key排序。
如果設(shè)置了Combiner,則在排序的結(jié)果上運(yùn)行combine。
排序后的數(shù)據(jù)被寫入到mapreduce.cluster.local.dir配置的目錄中的其中一個,使用round robin fashion的方式輪流。注意寫入的是本地文件目錄,而不是HDFS。Spill文件名像sipll0.out,spill1.out等。
不同Partition的數(shù)據(jù)都放在同一個文件,通過索引來區(qū)分partition的邊界和起始位置。索引是一個三元組結(jié)構(gòu),包括起始位置、數(shù)據(jù)長度、壓縮后的數(shù)據(jù)長度,對應(yīng)IndexRecord類:
public class IndexRecord {
public long startOffset;
public long rawLength;
public long partLength;
public IndexRecord() { }
public IndexRecord(long startOffset, long rawLength, long partLength) {
this.startOffset = startOffset;
this.rawLength = rawLength;
this.partLength = partLength;
}
}12345678910111213
每個mapper也有對應(yīng)的一個索引環(huán)形Buffer,默認(rèn)為1KB,可以通過mapreduce.task.index.cache.limit.bytes來配置,索引如果足夠小則存在內(nèi)存中,如果內(nèi)存放不下,需要寫入磁盤。
Spill文件索引名稱類似這樣 spill110.out.index, spill111.out.index。
Spill文件的索引事實(shí)上是 org.apache.hadoop.mapred.SpillRecord的一個數(shù)組,每個Map任務(wù)(源碼中的MapTask.java類)維護(hù)一個這樣的列表:
final ArrayList<SpillRecord> indexCacheList = new ArrayList<SpillRecord>();1
創(chuàng)建一個SpillRecord時(shí),會分配(Number_Of_Reducers * 24)Bytes緩沖:
public SpillRecord(int numPartitions) {
buf = ByteBuffer.allocate(
numPartitions * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
entries = buf.asLongBuffer();
}12345
numPartitions是Partition的個數(shù),其實(shí)也就是Reducer的個數(shù):
public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
// ---
partitions = jobContext.getNumReduceTasks();
final SpillRecord spillRec = new SpillRecord(partitions);123456
默認(rèn)的索引緩沖為1KB,即10241024 Bytes,假設(shè)有2個Reducer,則每個Spill文件的索引大小為224=48 Bytes,當(dāng)Spill文件超過21845.3時(shí),索引文件就需要寫入磁盤。
索引及spill文件如下圖示意:
Spill的過程至少需要運(yùn)行一次,因?yàn)镸apper的輸出結(jié)果必須要寫入磁盤,供Reducer進(jìn)一步處理。
合并Spill文件
在整個map任務(wù)中,一旦緩沖達(dá)到設(shè)定的閾值,就會觸發(fā)spill操作,寫入spill文件到磁盤,因此最后可能有多個spill文件。在map任務(wù)結(jié)束之前,這些文件會根據(jù)情況合并到一個大的分區(qū)的、排序的文件中,排序是在內(nèi)存排序的基礎(chǔ)上進(jìn)行全局排序。下圖是合并過程的簡單示意:
相對應(yīng)的索引文件也會被合并,以便在Reducer請求對應(yīng)Partition的數(shù)據(jù)的時(shí)候能夠快速讀取。
另外,如果spill文件數(shù)量大于mapreduce.map.combiner.minspills配置的數(shù),則在合并文件寫入之前,會再次運(yùn)行combiner。如果spill文件數(shù)量太少,運(yùn)行combiner的收益可能小于調(diào)用的代價(jià)。
mapreduce.task.io.sort.factor屬性配置每次最多合并多少個文件,默認(rèn)為10,即一次最多合并10個spill文件。最后,多輪合并之后,所有的輸出文件被合并為唯一一個大文件,以及相應(yīng)的索引文件(可能只在內(nèi)存中存在)。
壓縮
在數(shù)據(jù)量大的時(shí)候,對map輸出進(jìn)行壓縮通常是個好主意。要啟用壓縮,將mapreduce.map.output.compress設(shè)為true,并使用mapreduce.map.output.compress.codec設(shè)置使用的壓縮算法。
通過HTTP暴露輸出結(jié)果
map輸出數(shù)據(jù)完成之后,通過運(yùn)行一個HTTP Server暴露出來,供reduce端獲取。用來相應(yīng)reduce數(shù)據(jù)請求的線程數(shù)量可以配置,默認(rèn)情況下為機(jī)器內(nèi)核數(shù)量的兩倍,如需自己配置,通過mapreduce.shuffle.max.threads屬性來配置,注意該配置是針對NodeManager配置的,而不是每個作業(yè)配置。
同時(shí),Map任務(wù)完成后,也會通知Application Master,以便Reducer能夠及時(shí)來拉取數(shù)據(jù)。
通過緩沖、劃分(partition)、排序、combiner、合并、壓縮等過程之后,map端的工作就算完畢:
Reducer端
各個map任務(wù)運(yùn)行完之后,輸出寫入運(yùn)行任務(wù)的機(jī)器磁盤中。Reducer需要從各map任務(wù)中提取自己的那一部分?jǐn)?shù)據(jù)(對應(yīng)的partition)。每個map任務(wù)的完成時(shí)間可能是不一樣的,reduce任務(wù)在map任務(wù)結(jié)束之后會盡快取走輸出結(jié)果,這個階段叫copy。
Reducer是如何知道要去哪些機(jī)器去數(shù)據(jù)呢?一旦map任務(wù)完成之后,就會通過常規(guī)心跳通知應(yīng)用程序的Application Master。reduce的一個線程會周期性地向master詢問,直到提取完所有數(shù)據(jù)(如何知道提取完?)。
數(shù)據(jù)被reduce提走之后,map機(jī)器不會立刻刪除數(shù)據(jù),這是為了預(yù)防reduce任務(wù)失敗需要重做。因此map輸出數(shù)據(jù)是在整個作業(yè)完成之后才被刪除掉的。
reduce維護(hù)幾個copier線程,并行地從map任務(wù)機(jī)器提取數(shù)據(jù)。默認(rèn)情況下有5個copy線程,可以通過mapreduce.reduce.shuffle.parallelcopies配置。
如果map輸出的數(shù)據(jù)足夠小,則會被拷貝到reduce任務(wù)的JVM內(nèi)存中。mapreduce.reduce.shuffle.input.buffer.percent配置JVM堆內(nèi)存的多少比例可以用于存放map任務(wù)的輸出結(jié)果。如果數(shù)據(jù)太大容不下,則被拷貝到reduce的機(jī)器磁盤上。
內(nèi)存中合并
當(dāng)緩沖中數(shù)據(jù)達(dá)到配置的閾值時(shí),這些數(shù)據(jù)在內(nèi)存中被合并、寫入機(jī)器磁盤。閾值有2種配置方式:
- 配置內(nèi)存比例: 前面提到reduce JVM堆內(nèi)存的一部分用于存放來自map任務(wù)的輸入,在這基礎(chǔ)之上配置一個開始合并數(shù)據(jù)的比例。假設(shè)用于存放map輸出的內(nèi)存為500M,
mapreduce.reduce.shuffle.merger.percent配置為0.80,則當(dāng)內(nèi)存中的數(shù)據(jù)達(dá)到400M的時(shí)候,會觸發(fā)合并寫入。 - 配置map輸出數(shù)量: 通過
mapreduce.reduce.merge.inmem.threshold配置。
在合并的過程中,會對被合并的文件做全局的排序。如果作業(yè)配置了Combiner,則會運(yùn)行combine函數(shù),減少寫入磁盤的數(shù)據(jù)量。
Copy過程中磁盤合并
在copy過來的數(shù)據(jù)不斷寫入磁盤的過程中,一個后臺線程會把這些文件合并為更大的、有序的文件。如果map的輸出結(jié)果進(jìn)行了壓縮,則在合并過程中,需要在內(nèi)存中解壓后才能給進(jìn)行合并。這里的合并只是為了減少最終合并的工作量,也就是在map輸出還在拷貝時(shí),就開始進(jìn)行一部分合并工作。合并的過程一樣會進(jìn)行全局排序。
最終磁盤中合并
當(dāng)所有map輸出都拷貝完畢之后,所有數(shù)據(jù)被最后合并成一個排序的文件,作為reduce任務(wù)的輸入。這個合并過程是一輪一輪進(jìn)行的,最后一輪的合并結(jié)果直接推送給reduce作為輸入,節(jié)省了磁盤操作的一個來回。最后(所以map輸出都拷貝到reduce之后)進(jìn)行合并的map輸出可能來自合并后寫入磁盤的文件,也可能來及內(nèi)存緩沖,在最后寫入內(nèi)存的map輸出可能沒有達(dá)到閾值觸發(fā)合并,所以還留在內(nèi)存中。
每一輪合并并不一定合并平均數(shù)量的文件數(shù),指導(dǎo)原則是使用整個合并過程中寫入磁盤的數(shù)據(jù)量最小,為了達(dá)到這個目的,則需要最終的一輪合并中合并盡可能多的數(shù)據(jù),因?yàn)樽詈笠惠喌臄?shù)據(jù)直接作為reduce的輸入,無需寫入磁盤再讀出。因此我們讓最終的一輪合并的文件數(shù)達(dá)到最大,即合并因子的值,通過mapreduce.task.io.sort.factor來配置。
假設(shè)現(xiàn)在有50個map輸出文件,合并因子配置為10,則需要5輪的合并。最終的一輪確保合并10個文件,其中包括4個來自前4輪的合并結(jié)果,因此原始的50個中,再留出6個給最終一輪。所以最后的5輪合并可能情況如下:
前4輪合并后的數(shù)據(jù)都是寫入到磁盤中的,注意到最后的2格顏色不一樣,是為了標(biāo)明這些數(shù)據(jù)可能直接來自于內(nèi)存。
MemToMem合并
除了內(nèi)存中合并和磁盤中合并外,Hadoop還定義了一種MemToMem合并,這種合并將內(nèi)存中的map輸出合并,然后再寫入內(nèi)存。這種合并默認(rèn)關(guān)閉,可以通過reduce.merge.memtomem.enabled打開,當(dāng)map輸出文件達(dá)到reduce.merge.memtomem.threshold時(shí),觸發(fā)這種合并。
最后一次合并后傳遞給reduce方法
合并后的文件作為輸入傳遞給Reducer,Reducer針對每個key及其排序的數(shù)據(jù)調(diào)用reduce函數(shù)。產(chǎn)生的reduce輸出一般寫入到HDFS,reduce輸出的文件第一個副本寫入到當(dāng)前運(yùn)行reduce的機(jī)器,其他副本選址原則按照常規(guī)的HDFS數(shù)據(jù)寫入原則來進(jìn)行,詳細(xì)信息請參考這里。
通過從map機(jī)器提取結(jié)果,合并,combine之后,傳遞給reduce完成最后工作,整個過程也就差不多完成。最后再感受一下下面這張圖:
性能調(diào)優(yōu)
如果能夠根據(jù)情況對shuffle過程進(jìn)行調(diào)優(yōu),對于提供MapReduce性能很有幫助。相關(guān)的參數(shù)配置列在后面的表格中。
一個通用的原則是給shuffle過程分配盡可能大的內(nèi)存,當(dāng)然你需要確保map和reduce有足夠的內(nèi)存來運(yùn)行業(yè)務(wù)邏輯。因此在實(shí)現(xiàn)Mapper和Reducer時(shí),應(yīng)該盡量減少內(nèi)存的使用,例如避免在Map中不斷地疊加。
運(yùn)行map和reduce任務(wù)的JVM,內(nèi)存通過mapred.child.java.opts屬性來設(shè)置,盡可能設(shè)大內(nèi)存。容器的內(nèi)存大小通過mapreduce.map.memory.mb和mapreduce.reduce.memory.mb來設(shè)置,默認(rèn)都是1024M。
map優(yōu)化
在map端,避免寫入多個spill文件可能達(dá)到最好的性能,一個spill文件是最好的。通過估計(jì)map的輸出大小,設(shè)置合理的mapreduce.task.io.sort.*屬性,使得spill文件數(shù)量最小。例如盡可能調(diào)大mapreduce.task.io.sort.mb。
map端相關(guān)的屬性如下表:
| 屬性名 | 值類型 | 默認(rèn)值 | 說明 |
|---|---|---|---|
| mapreduce.task.io.sort.mb | int | 100 | 用于map輸出排序的內(nèi)存大小 |
| mapreduce.map.sort.spill.percent | float | 0.80 | 開始spill的緩沖池閾值 |
| mapreduce.task.io.sort.factor | int | 10 | 合并文件數(shù)最大值,與reduce共用 |
| mapreduce.map.combine.minspills | int | 3 | 運(yùn)行combiner的最低spill文件數(shù) |
| mapreduce.map.out.compress | boolean | false | 輸出是否壓縮 |
| mapreduce.map.out.compress | 類名 | DefaultCodec | 壓縮算法 |
| mapreduce.shuffle.max.threads | int | 0 | 服務(wù)于reduce提取結(jié)果的線程數(shù)量 |
reduce優(yōu)化
在reduce端,如果能夠讓所有數(shù)據(jù)都保存在內(nèi)存中,可以達(dá)到最佳的性能。通常情況下,內(nèi)存都保留給reduce函數(shù),但是如果reduce函數(shù)對內(nèi)存需求不是很高,將mapreduce.reduce.merge.inmem.threshold(觸發(fā)合并的map輸出文件數(shù))設(shè)為0,mapreduce.reduce.input.buffer.percent(用于保存map輸出文件的堆內(nèi)存比例)設(shè)為1.0,可以達(dá)到很好的性能提升。在2008年的TB級別數(shù)據(jù)排序性能測試中,Hadoop就是通過將reduce的中間數(shù)據(jù)都保存在內(nèi)存中勝利的。
reduce端相關(guān)屬性:
| 屬性名 | 值類型 | 默認(rèn)值 | 說明 |
|---|---|---|---|
| mapreduce.reduce.shuffle.parallelcopies | int | 5 | 提取map輸出的copier線程數(shù) |
| mapreduce.reduce.shuffle.maxfetchfailures | int | 10 | 提取map輸出最大嘗試次數(shù),超出后報(bào)錯 |
| mapreduce.task.io.sort.factor | int | 10 | 合并文件數(shù)最大值,與map共用 |
| mapreduce.reduce.shuffle.input.buffer.percent | float | 0.70 | copy階段用于保存map輸出的堆內(nèi)存比例 |
| mapreduce.reduce.shuffle.merge.percent | float | 0.66 | 開始spill的緩沖池比例閾值 |
| mapreduce.reduce.shuffle.inmem.threshold | int | 1000 | 開始spill的map輸出文件數(shù)閾值,小于等于0表示沒有閾值,此時(shí)只由緩沖池比例來控制 |
| mapreduce.reduce.input.buffer.percent | float | 0.0 | reduce函數(shù)開始運(yùn)行時(shí),內(nèi)存中的map輸出所占的堆內(nèi)存比例不得高于這個值,默認(rèn)情況內(nèi)存都用于reduce函數(shù),也就是map輸出都寫入到磁盤 |
通用優(yōu)化
Hadoop默認(rèn)使用4KB作為緩沖,這個算是很小的,可以通過io.file.buffer.size來調(diào)高緩沖池大小。