Hadoop Map/Reduce執(zhí)行流程詳解

一個(gè)Map/Reduce 作業(yè)(job) 通常會(huì)把輸入的數(shù)據(jù)(input file)切分為若干獨(dú)立的數(shù)據(jù)塊(splits),然后由 map任務(wù)(task)以完全并行的方式處理它們。Map/Reduce框架會(huì)對(duì)map的輸出做一個(gè) Shuffle 操作,Shuffle 操作的后的結(jié)果會(huì)輸入給reduce任務(wù)。整個(gè)Map/Reduce框架負(fù)責(zé)任務(wù)的調(diào)度和監(jiān)控,以及重新執(zhí)行已經(jīng)失敗的任務(wù)。

請(qǐng)點(diǎn)擊此處輸入圖片描述

Map/Reduce計(jì)算集群由一個(gè)單獨(dú)的JobTracker(master) 和每個(gè)集群節(jié)點(diǎn)一個(gè) TaskTracker(slave)共同組成。JobTracker負(fù)責(zé)調(diào)度構(gòu)成一個(gè)作業(yè)的所有任務(wù),這些任務(wù)會(huì)被分派到不同的TaskTracker上去執(zhí)行,JobTracker會(huì)監(jiān)控它們的執(zhí)行、重新執(zhí)行已經(jīng)失敗的任務(wù)。而TaskTracker僅負(fù)責(zé)執(zhí)行由JobTracker指派的任務(wù)。

請(qǐng)點(diǎn)擊此處輸入圖片描述

本文將按照map/reduce執(zhí)行流程中各個(gè)任務(wù)的時(shí)間順序詳細(xì)敘述map/reduce的各個(gè)任務(wù)模塊,包括:輸入分片(input split)、map階段、combiner階段、shuffle階段和reduce階段。下圖是一個(gè)不錯(cuò)的執(zhí)行流程圖:

請(qǐng)點(diǎn)擊此處輸入圖片描述

作業(yè)的提交與監(jiān)控

JobClient是用戶提交的作業(yè)與JobTracker交互的主要接口。

請(qǐng)點(diǎn)擊此處輸入圖片描述

JobClient提交作業(yè)的過(guò)程如下:

(1) map/reduce程序通過(guò)runJob()方法新建一個(gè)JobClient實(shí)例;

(2) 向JobTracker請(qǐng)求一個(gè)新jobID,通過(guò)JobTracker的getNewJobId()獲取;

(3) 檢查作業(yè)輸入輸出說(shuō)明。如果沒(méi)有指定輸出目錄或者輸出目錄已經(jīng)存在,作業(yè)將不會(huì)被提交,map/reduce程序; 輸入作業(yè)劃分split,如果劃分無(wú)法計(jì)算(如:輸入路徑不存在),作業(yè)將不會(huì)被提交,錯(cuò)誤返回給map/reduce程序。

(4) 將運(yùn)行作業(yè)所需要的資源(作業(yè)的jar文件、配置文件、計(jì)算所得的輸入劃分)復(fù)制到一個(gè)以作業(yè)ID命名的目錄中;

(5) 通過(guò)調(diào)用JobTracker的submitJob()方法,告訴JobTracker作業(yè)準(zhǔn)備提交;

(6) JobTracker將提交的作業(yè)放到一個(gè)內(nèi)部隊(duì)列中,交由作業(yè)調(diào)度器進(jìn)行調(diào)度,并對(duì)其進(jìn)行初始化。

(7) 創(chuàng)建Map任務(wù)、Reduce任務(wù):一個(gè)split對(duì)應(yīng)一個(gè)map,有多少split就有多少map; Reduce任務(wù)的數(shù)量由JobConf的mapred.reduce.tasks屬性決定

(8) TaskTracker執(zhí)行一個(gè)簡(jiǎn)單的循環(huán),定期發(fā)送心跳(heartbeat)給JobTracker

Input files

Input file是map/reduce任務(wù)的原始數(shù)據(jù),一般存儲(chǔ)在HDFS上。應(yīng)用程序至少應(yīng)該指明輸入/輸出的位置(路徑),并通過(guò)實(shí)現(xiàn)合適的接口或抽象類提供map和reduce函數(shù)。再加上其他作業(yè)的參數(shù),就構(gòu)成了作業(yè)配置(job configuration)。然后,Hadoop的 job client提交作業(yè)(jar包/可執(zhí)行程序等)和配置信息給JobTracker,后者負(fù)責(zé)分發(fā)這些軟件和配置信息給slave、調(diào)度任務(wù)并監(jiān)控它們的執(zhí)行,同時(shí)提供狀態(tài)和診斷信息給job-client。

InputFormat

InputFormat為Map/Reduce作業(yè)輸入的細(xì)節(jié)規(guī)范。Map/Reduce框架根據(jù)作業(yè)的InputFormat來(lái):

(1) 檢查作業(yè)輸入的正確性,如格式等。

(2) 把輸入文件切分成多個(gè)邏輯InputSplit實(shí)例, 一個(gè)InputSplit將會(huì)被分配給一個(gè)獨(dú)立的Map任務(wù)。

(3) 提供RecordReader實(shí)現(xiàn),這個(gè)RecordReader從邏輯InputSplit中獲得輸入記錄(”K-V對(duì)”),這些記錄將由Map任務(wù)處理。

InputFormat有如下幾種:

請(qǐng)點(diǎn)擊此處輸入圖片描述

TextInputFormat:

TextInputFormat是默認(rèn)的INputFormat,輸入文件中的每一行就是一個(gè)記錄,Key是這一行的byte offset,而value是這一行的內(nèi)容。如果一個(gè)作業(yè)的Inputformat是TextInputFormat,并且框架檢測(cè)到輸入文件的后綴是.gz和.lzo,就會(huì)使用對(duì)應(yīng)的CompressionCodec自動(dòng)解壓縮這些文件。但是需要注意,上述帶后綴的壓縮文件不會(huì)被切分,并且整個(gè)壓縮文件會(huì)分給一個(gè)mapper來(lái)處理。

KeyValueTextInputFormat

輸入文件中每一行就是一個(gè)記錄,第一個(gè)分隔符字符切分每行。在分隔符字符之前的內(nèi)容為Key,在之后的為Value。分隔符變量通過(guò)key.value.separator.in.input.line變量設(shè)置,默認(rèn)為(\t)字符。

NLineInputFormat

與TextInputFormat一樣,但每個(gè)數(shù)據(jù)塊必須保證有且只有N行,mapred.line.input.format.linespermap屬性,默認(rèn)為1。

SequenceFileInputFormat

一個(gè)用來(lái)讀取字符流數(shù)據(jù)的InputFormat,為用戶自定義的。字符流數(shù)據(jù)是Hadoop自定義的壓縮的二進(jìn)制數(shù)據(jù)格式。它用來(lái)優(yōu)化從一個(gè)MapReduce任務(wù)的輸出到另一個(gè)MapReduce任務(wù)的輸入之間的數(shù)據(jù)傳輸過(guò)程。

InputSplits

InputSplit是一個(gè)單獨(dú)的Map任務(wù)需要處理的數(shù)據(jù)塊。一般的InputSplit是字節(jié)樣式輸入,然后由RecordReader處理并轉(zhuǎn)化成記錄樣式。通常一個(gè)split就是一個(gè)block,這樣做的好處是使得Map任務(wù)可以在存儲(chǔ)有當(dāng)前數(shù)據(jù)的節(jié)點(diǎn)上運(yùn)行本地的任務(wù),而不需要通過(guò)網(wǎng)絡(luò)進(jìn)行跨節(jié)點(diǎn)的任務(wù)調(diào)度。

可以通過(guò)設(shè)置mapred.min.split.size,mapred.max.split.size,block.size來(lái)控制拆分的大小。如果mapred.min.split.size大于block size,則會(huì)將兩個(gè)block合成到一個(gè)split,這樣有部分block數(shù)據(jù)需要通過(guò)網(wǎng)絡(luò)讀取;如果mapred.max.split.size小于block size,則會(huì)將一個(gè)block拆成多個(gè)split,增加了Map任務(wù)數(shù)。

假設(shè)splitSize是默認(rèn)的64M,現(xiàn)在輸入包含3個(gè)文件,這3個(gè)文件的大小分別為10M,64M,100M,那么這3個(gè)文件會(huì)被分割為:

1 2輸入文件大小? ? ? ? ? ? ? ? 10M? ? 64M? ? 100M 分割后的InputSplit大小? ? ? 10M? ? 64M? ? 64M,36M

在Map任務(wù)開(kāi)始前,會(huì)先獲取文件在HDFS上的路徑和block信息,然后根據(jù)splitSize對(duì)文件進(jìn)行切分(splitSize = computeSplitSize(blockSize, minSize, maxSize) ),默認(rèn)splitSize 就等于blockSize的默認(rèn)值(64m)。

Mapper

Map是一類將輸入記錄集轉(zhuǎn)換為中間格式記錄集的獨(dú)立任務(wù),主要是讀取InputSplit的每一個(gè)Key,Value對(duì)并進(jìn)行處理

請(qǐng)點(diǎn)擊此處輸入圖片描述

確定map任務(wù)數(shù)量

Map/Reduce框架為每一個(gè)InputSplit產(chǎn)生一個(gè)map任務(wù),而每個(gè)InputSplit是由該作業(yè)的InputFormat產(chǎn)生的,默認(rèn)一個(gè)InputSplit大小就等于blockSize的默認(rèn)值。因此,maps的數(shù)量通常取決于輸入大小,也即輸入文件的block數(shù)。 因此,假如輸入數(shù)據(jù)有10TB,而block大小為64M,則需要164,000個(gè)map。map正常的并行規(guī)模大致是每個(gè)節(jié)點(diǎn)(node)大約10到100個(gè)map,對(duì)于CPU 消耗較小的map任務(wù)可以設(shè)到300個(gè)左右。

因?yàn)閱?dòng)任務(wù)也需要時(shí)間,所以在一個(gè)較大的作業(yè)中,最好每個(gè)map任務(wù)的執(zhí)行時(shí)間不要少于1分鐘,這樣可以讓啟動(dòng)任務(wù)的開(kāi)銷占比盡可能的低。對(duì)于那種有大量小文件輸入的的作業(yè)來(lái)說(shuō),一個(gè)map處理多個(gè)文件會(huì)更有效率。如果輸入的是打文件,那么一種提高效率的方式是增加block的大小(比如512M),每個(gè)map還是處理一個(gè)完整的HDFS的block。

當(dāng)在map處理的block比較大的時(shí)候,確保有足夠的內(nèi)存作為排序緩沖區(qū)是非常重要的,這可以加速map端的排序過(guò)程。假如大多數(shù)的map輸出都能在排序緩沖區(qū)中處理的話應(yīng)用的性能會(huì)有極大的提升。這需要運(yùn)行map過(guò)程的JVM具有更大的堆。

網(wǎng)格模式:確保map的大小,使得所有的map輸出可以在排序緩沖區(qū)中通過(guò)一次排序來(lái)完成操作。

合適的map數(shù)量有以下好處:

(1) 減少了調(diào)度的負(fù)擔(dān);更少的map意味著任務(wù)調(diào)度更簡(jiǎn)單,集群中可用的空閑槽更多。

(2) 有足夠的內(nèi)存將map輸出容納在排序緩存中,這使map端更有效率;

(3) 減少了需要shuffle map輸出的尋址次數(shù),每個(gè)map產(chǎn)生的輸出可用于每一個(gè)reduce,因此尋址數(shù)就是map個(gè)數(shù)乘以reduce個(gè)數(shù);

(4) 每個(gè)shuffle 的片段更大,這減少了建立連接的相對(duì)開(kāi)銷,所謂相對(duì)開(kāi)銷是指相對(duì)于在網(wǎng)絡(luò)中傳輸數(shù)據(jù)的過(guò)程。

(5) 這使reduce端合并map輸出的過(guò)程更高效,因?yàn)楹喜⒌拇螖?shù)更少,因?yàn)樾枰喜⒌奈募胃倭恕?/p>

執(zhí)行map任務(wù)

Mapper的實(shí)現(xiàn)者需要重寫(xiě) JobConfigurable.configure(JobConf)方法,這個(gè)方法需要傳遞一個(gè)JobConf參數(shù),目的是完成Mapper的初始化工作。然后,框架為這個(gè)任務(wù)的InputSplit中每個(gè)鍵值對(duì)調(diào)用一次 map(WritableComparable, Writable, OutputCollector, Reporter)操作。這里需要指出很多人的一種錯(cuò)誤認(rèn)識(shí)——“輸入和輸出的鍵值對(duì)類型一致,一一對(duì)應(yīng)”,這種認(rèn)識(shí)是錯(cuò)誤的。輸入輸出鍵值對(duì)的關(guān)系如下:

1 21) 輸出鍵值對(duì)不需要與輸入鍵值對(duì)的類型一致。 2) 一個(gè)給定的輸入鍵值對(duì)可以映射成0個(gè)或多個(gè)輸出鍵值對(duì)。

以 word count為例,輸入不需要是“一行個(gè)單詞”的形式,可以是一行許多個(gè)單詞,輸入一行可以對(duì)應(yīng)多行輸出,如下圖所示:

請(qǐng)點(diǎn)擊此處輸入圖片描述

通過(guò)調(diào)用OutputCollector.collect(WritableComparable,Writable)可以收集map(WritableComparable, Writable, OutputCollector, Reporter)輸出的鍵值對(duì)。應(yīng)用程序可以使用Reporter報(bào)告進(jìn)度,設(shè)定應(yīng)用級(jí)別的狀態(tài)消息,更新Counters(計(jì)數(shù)器),或者僅是表明自己運(yùn)行正常。

Map/Reduce框架隨后會(huì)把與一個(gè)特定key關(guān)聯(lián)的所有中間過(guò)程的值(value)分組并排序這個(gè)分組和排序過(guò)程被稱為Shuffle,然后把它們傳給Reducer以產(chǎn)出最終的結(jié)果。分組的總數(shù)目和一個(gè)作業(yè)的reduce任務(wù)的數(shù)目是一樣的。用戶可以通過(guò)實(shí)現(xiàn)自定義的 Partitioner來(lái)控制哪個(gè)key被分配給哪個(gè) Reducer。 對(duì)于map的輸出,用戶可選擇通過(guò)JobConf.setCombinerClass(Class)指定一個(gè)combiner,它負(fù)責(zé)對(duì)中間過(guò)程的輸出進(jìn)行本地的聚集,這會(huì)有助于降低從Mapper到 Reducer數(shù)據(jù)傳輸量。

請(qǐng)點(diǎn)擊此處輸入圖片描述

這些被排好序的中間過(guò)程的輸出結(jié)果保存的格式是(key-len, key, value-len, value),應(yīng)用程序可以通過(guò)JobConf控制對(duì)這些中間結(jié)果是否進(jìn)行壓縮以及怎么壓縮,使用哪種CompressionCodec。

整個(gè)map的執(zhí)行過(guò)程如下圖所示:

請(qǐng)點(diǎn)擊此處輸入圖片描述

map輸出溢寫(xiě)(spill) && Shuffle

Shuffle

一般把從map任務(wù)輸出到reducer任務(wù)輸入之間的map/reduce框架所做的工作叫做shuffle。這部分也是map/reduce框架最重要的部分。下面將詳細(xì)介紹這個(gè)shuffle中的各個(gè)步驟。

請(qǐng)點(diǎn)擊此處輸入圖片描述

內(nèi)存緩沖區(qū)

Map/Reduce框架為InputSplit中的每個(gè)鍵值對(duì)調(diào)用一次 map(WritableComparable, Writable, OutputCollector, Reporter)操作,調(diào)用一次map()操作后就會(huì)得到一個(gè)新的(key,value)對(duì)。當(dāng)Map程序開(kāi)始產(chǎn)生結(jié)果的時(shí)候,并不是直接寫(xiě)到文件的,而是寫(xiě)到一個(gè)內(nèi)存緩沖區(qū)(環(huán)形內(nèi)存緩沖區(qū))。每個(gè)map任務(wù)都有一個(gè)內(nèi)存緩沖區(qū),存儲(chǔ)著map的輸出結(jié)果,這個(gè)內(nèi)存緩沖區(qū)是有大小限制的,默認(rèn)是100MB(可以通過(guò)屬性io.sort.mb配置)。

當(dāng)map task的輸出結(jié)果很多時(shí),就可能會(huì)超過(guò)100MB內(nèi)存的限制,所以需要在一定條件下將緩沖區(qū)中的數(shù)據(jù)臨時(shí)寫(xiě)入磁盤(pán),然后重新利用這塊緩沖區(qū)。這個(gè)從內(nèi)存往磁盤(pán)寫(xiě)數(shù)據(jù)的過(guò)程被稱為“spill”,中文可譯為溢寫(xiě)。這個(gè)溢寫(xiě)是由單獨(dú)線程來(lái)完成,不影響往緩沖區(qū)寫(xiě)map結(jié)果的線程。

請(qǐng)點(diǎn)擊此處輸入圖片描述

溢寫(xiě)線程啟動(dòng)時(shí)不應(yīng)該阻止map的結(jié)果輸出,所以整個(gè)緩沖區(qū)有個(gè)溢寫(xiě)的比例spill.percent(可以通過(guò)屬性Io.sort.spill.percent配置),這個(gè)比例默認(rèn)是0.8,也就是當(dāng)緩沖區(qū)的數(shù)據(jù)已經(jīng)達(dá)到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫(xiě)線程啟動(dòng),鎖定這80MB的內(nèi)存,執(zhí)行溢寫(xiě)過(guò)程。Map任務(wù)的輸出結(jié)果還可以往剩下的20MB內(nèi)存中寫(xiě),互不影響,但如果緩沖區(qū)滿了,Map任務(wù)則會(huì)被阻塞。那么為什么需要設(shè)置寫(xiě)入比例呢?達(dá)到一定比例后,由于寫(xiě)緩存和讀緩存是可以同時(shí)并行執(zhí)行的,這會(huì)降低把緩存數(shù)據(jù)騰空的時(shí)間,從而提高效率。

分區(qū)

在把map()輸出數(shù)據(jù)寫(xiě)入內(nèi)存緩沖區(qū)之前會(huì)先進(jìn)行Partitioner操作。Partitioner用于劃分鍵值空間(key space)。MapReduce提供Partitioner接口,它的作用就是根據(jù)key或value及reduce的數(shù)量來(lái)決定當(dāng)前的這對(duì)輸出數(shù)據(jù)最終應(yīng)該交由哪個(gè)reduce task處理。默認(rèn)對(duì)key hash后再以reduce task數(shù)量取模。默認(rèn)的取模方式只是為了平均reduce的處理能力,如果用戶自己對(duì)Partitioner有需求,可以訂制并設(shè)置到j(luò)ob上。

1reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks

HashPartitioner是默認(rèn)的 Partitioner。

Partitioner操作得到的分區(qū)元數(shù)據(jù)也會(huì)被存儲(chǔ)到內(nèi)存緩沖區(qū)中。當(dāng)數(shù)據(jù)達(dá)到溢出的條件時(shí),讀取緩存中的數(shù)據(jù)和分區(qū)元數(shù)據(jù),然后把屬與同一分區(qū)的數(shù)據(jù)合并到一起。對(duì)于每一個(gè)分區(qū),都會(huì)在內(nèi)存中根據(jù)map輸出的key進(jìn)行排序(排序是MapReduce模型默認(rèn)的行為,這里的排序也是對(duì)序列化的字節(jié)做的排序),如果配置了Combiner,則排序后執(zhí)行Combiner(Combine之后可以減少寫(xiě)入文件和傳輸?shù)臄?shù)據(jù))。如果配置了壓縮,則最終寫(xiě)入的文件會(huì)先進(jìn)行壓縮,這樣可以減少寫(xiě)入和傳輸?shù)臄?shù)據(jù)。最后實(shí)現(xiàn)溢出的文件內(nèi)是分區(qū)的,且分區(qū)內(nèi)是有序的。

每次溢出的數(shù)據(jù)寫(xiě)入文件時(shí),都按照分區(qū)的數(shù)值從小到大排序,內(nèi)部存儲(chǔ)是以tag的方式區(qū)分不同分區(qū)的數(shù)據(jù);同時(shí)生成一個(gè)索引文件,這個(gè)索引文件記錄分區(qū)的描述信息,包括:起始位置、長(zhǎng)度、以及壓縮長(zhǎng)度,這些信息存儲(chǔ)在IndexRecord結(jié)構(gòu)里面。一個(gè)spill文件中的多個(gè)段的索引數(shù)據(jù)被組織成SpillRecord結(jié)構(gòu),SpillRecord又被加入進(jìn)indexCacheList中。

請(qǐng)點(diǎn)擊此處輸入圖片描述

Combiner

Combiner最主要的好處在于減少了shuffle過(guò)程從map端到reduce端的傳輸數(shù)據(jù)量。

請(qǐng)點(diǎn)擊此處輸入圖片描述

combiner階段是程序員可以選擇的,combiner其實(shí)也是一種reduce操作。Combiner是一個(gè)本地化的reduce操作,它是map運(yùn)算的后續(xù)操作,主要是在map計(jì)算出中間文件前做一個(gè)簡(jiǎn)單的合并重復(fù)key值的操作,例如我們對(duì)文件里的單詞頻率做統(tǒng)計(jì),map計(jì)算時(shí)候如果碰到一個(gè)hadoop的單詞就會(huì)記錄為1,但是這篇文章里hadoop可能會(huì)出現(xiàn)n多次,那么map輸出文件冗余就會(huì)很多,因此在reduce計(jì)算前對(duì)相同的key做一個(gè)合并操作,那么文件會(huì)變小,這樣就提高了寬帶的傳輸效率,畢竟hadoop計(jì)算力寬帶資源往往是計(jì)算的瓶頸也是最為寶貴的資源,但是combiner操作是有風(fēng)險(xiǎn)的,使用它的原則是combiner的輸入不會(huì)影響到reduce計(jì)算的最終輸入,例如:如果計(jì)算只是求總數(shù),最大值,最小值可以使用combiner,但是做平均值計(jì)算使用combiner的話,最終的reduce計(jì)算結(jié)果就會(huì)出錯(cuò)。

Combiner 也有一個(gè)性能損失點(diǎn),因?yàn)樗枰淮晤~外的對(duì)于map輸出的序列化/反序列化過(guò)程。不能通過(guò)聚合將map端的輸出減少到20-30%的話就不適用combiner。

壓縮

Map/Reduce框架為應(yīng)用程序的寫(xiě)入文件操作提供壓縮工具,這些工具可以為map輸出的中間數(shù)據(jù)和作業(yè)最終輸出數(shù)據(jù)(例如reduce的輸出)提供支持。

壓縮中間數(shù)據(jù): 對(duì)map輸出的中間數(shù)據(jù)進(jìn)行合適的壓縮可以減少map到reduce之間的網(wǎng)絡(luò)數(shù)據(jù)傳輸量,從而提高性能。Lzo壓縮格式是一個(gè)壓縮map中間數(shù)據(jù)的合理選擇,它有效利用了CPU。

壓縮應(yīng)用輸出: 使用合適的壓縮格式壓縮輸出數(shù)據(jù)能夠減少應(yīng)用的運(yùn)行時(shí)間。Zlib/Gzip 格式在大多數(shù)情況下都是比較適當(dāng)?shù)倪x擇,因?yàn)樗谳^高壓縮率的情況下壓縮速度也還算可以,bzip2 就慢得多了。

合并臨時(shí)文件

每次spill操作也就是寫(xiě)入磁盤(pán)操作時(shí)候就會(huì)寫(xiě)一個(gè)溢出文件,也就是說(shuō)在做map輸出有幾次spill就會(huì)產(chǎn)生多少個(gè)溢出文件,等map輸出全部做完后,map會(huì)合并這些輸出文件生成最終的正式輸出文件,然后等待reduce任務(wù)來(lái)拉數(shù)據(jù)。將這些溢寫(xiě)文件歸并到一起的過(guò)程叫做Merge。

請(qǐng)點(diǎn)擊此處輸入圖片描述

如果生成的文件太多,可能會(huì)執(zhí)行多次合并,每次最多能合并的文件數(shù)默認(rèn)為10,可以通過(guò)屬性min.num.spills.for.combine配置。 多個(gè)溢出文件合并是,同一個(gè)分區(qū)內(nèi)部也必須再做一次排序,排序算法是多路歸并排序。是否還需要做combine操作,一是看是否設(shè)置了combine,二是看溢出的文件數(shù)是否大于等于3。最終生成的文件格式與單個(gè)溢出文件一致,也是按分區(qū)順序存儲(chǔ),并且有一個(gè)對(duì)應(yīng)的索引文件,記錄每個(gè)分區(qū)數(shù)據(jù)的起始位置,長(zhǎng)度以及壓縮長(zhǎng)度。這個(gè)索引文件名叫做file.out.index。

至此,map端的所有工作都已結(jié)束,最終生成的這個(gè)文件也存放在TaskTracker夠得著的某個(gè)本地目錄內(nèi)。每個(gè)reduce task不斷地通過(guò)RPC從JobTracker那里獲取map task是否完成的信息,如果reduce task得到通知,Reduce就可以開(kāi)始復(fù)制結(jié)果數(shù)據(jù)。

Reduce

簡(jiǎn)單地說(shuō),reduce任務(wù)在執(zhí)行之前的工作就是不斷地拉取每個(gè)map任務(wù)的最終結(jié)果,然后對(duì)從不同地方拉取過(guò)來(lái)的數(shù)據(jù)不斷地做merge,也最終形成一個(gè)文件作為reduce任務(wù)的輸入文件。

請(qǐng)點(diǎn)擊此處輸入圖片描述

reduce的運(yùn)行可以分成copy、merge、reduce三個(gè)階段,下面將具體說(shuō)明這3個(gè)階段的詳細(xì)執(zhí)行流程。

copy

由于job的每一個(gè)map都會(huì)根據(jù)reduce(n)數(shù)將數(shù)據(jù)分成map 輸出結(jié)果分成n個(gè)partition,所以map的中間結(jié)果中是有可能包含每一個(gè)reduce需要處理的部分?jǐn)?shù)據(jù)的。所以,為了優(yōu)化reduce的執(zhí)行時(shí)間,hadoop中是等job的第一個(gè)map結(jié)束后,所有的reduce就開(kāi)始嘗試從完成的map中下載該reduce對(duì)應(yīng)的partition部分?jǐn)?shù)據(jù),因此map和reduce是交叉進(jìn)行的,如下圖所示:

請(qǐng)點(diǎn)擊此處輸入圖片描述

reduce進(jìn)程啟動(dòng)數(shù)據(jù)copy線程(Fetcher),通過(guò)HTTP方式請(qǐng)求map task所在的TaskTracker獲取map task的輸出文件。由于map通常有許多個(gè),所以對(duì)一個(gè)reduce來(lái)說(shuō),下載也可以是并行的從多個(gè)map下載,這個(gè)并行度是可以通過(guò)mapred.reduce.parallel.copies(default 5)調(diào)整。默認(rèn)情況下,每個(gè)只會(huì)有5個(gè)并行的下載線程在從map下數(shù)據(jù),如果一個(gè)時(shí)間段內(nèi)job完成的map有100個(gè)或者更多,那么reduce也最多只能同時(shí)下載5個(gè)map的數(shù)據(jù),所以這個(gè)參數(shù)比較適合map很多并且完成的比較快的job的情況下調(diào)大,有利于reduce更快的獲取屬于自己部分的數(shù)據(jù)。

reduce的每一個(gè)下載線程在下載某個(gè)map數(shù)據(jù)的時(shí)候,有可能因?yàn)槟莻€(gè)map中間結(jié)果所在機(jī)器發(fā)生錯(cuò)誤,或者中間結(jié)果的文件丟失,或者網(wǎng)絡(luò)瞬斷等等情況,這樣reduce的下載就有可能失敗,所以reduce的下載線程并不會(huì)無(wú)休止的等待下去,當(dāng)一定時(shí)間后下載仍然失敗,那么下載線程就會(huì)放棄這次下載,并在隨后嘗試從另外的地方下載(因?yàn)檫@段時(shí)間map可能重跑)。reduce下載線程的這個(gè)最大的下載時(shí)間段是可以通過(guò)mapred.reduce.copy.backoff(default 300秒)調(diào)整的。如果集群環(huán)境的網(wǎng)絡(luò)本身是瓶頸,那么用戶可以通過(guò)調(diào)大這個(gè)參數(shù)來(lái)避免reduce下載線程被誤判為失敗的情況。不過(guò)在網(wǎng)絡(luò)環(huán)境比較好的情況下,沒(méi)有必要調(diào)整。通常來(lái)說(shuō)專業(yè)的集群網(wǎng)絡(luò)不應(yīng)該有太大問(wèn)題,所以這個(gè)參數(shù)需要調(diào)整的情況不多。

merge

這里的merge如map端的merge動(dòng)作類似,只是數(shù)組中存放的是不同map端copy來(lái)的數(shù)值。Copy過(guò)來(lái)的數(shù)據(jù)會(huì)先放入內(nèi)存緩沖區(qū)中,然后當(dāng)使用內(nèi)存達(dá)到一定量的時(shí)候才刷入磁盤(pán)。這里需要強(qiáng)調(diào)的是,merge有三種形式:1)內(nèi)存到內(nèi)存 2)內(nèi)存到磁盤(pán) 3)磁盤(pán)到磁盤(pán)。內(nèi)存到內(nèi)存的merge一般不適用,主要是內(nèi)存到磁盤(pán)和磁盤(pán)到磁盤(pán)的merge。

這里的緩沖區(qū)大小要比map端的更為靈活,它基于JVM的heap size設(shè)置。這個(gè)內(nèi)存大小的控制就不像map一樣可以通過(guò)io.sort.mb來(lái)設(shè)定了,而是通過(guò)另外一個(gè)參數(shù)mapred.job.shuffle.input.buffer.percent(default 0.7)來(lái)設(shè)置, 這個(gè)參數(shù)其實(shí)是一個(gè)百分比,意思是說(shuō),shuffile在reduce內(nèi)存中的數(shù)據(jù)最多使用內(nèi)存量為:0.7 × maxHeap of reduce task。

也就是說(shuō),如果該reduce task的最大heap使用量(通常通過(guò)mapred.child.java.opts來(lái)設(shè)置,比如設(shè)置為-Xmx1024m)的一定比例用來(lái)緩存數(shù)據(jù)。默認(rèn)情況下,reduce會(huì)使用其heapsize的70%來(lái)在內(nèi)存中緩存數(shù)據(jù)。假設(shè)mapred.job.shuffle.input.buffer.percent為0.7,reduce task的max heapsize為1G,那么用來(lái)做下載數(shù)據(jù)緩存的內(nèi)存就為大概700MB左右。這700M的內(nèi)存,跟map端一樣,也不是要等到全部寫(xiě)滿才會(huì)往磁盤(pán)刷的,而是當(dāng)這700M中被使用到了一定的限度(通常是一個(gè)百分比),就會(huì)開(kāi)始往磁盤(pán)刷(刷磁盤(pán)前會(huì)先做sort)。這個(gè)限度閾值也是可以通過(guò)參數(shù)mapred.job.shuffle.merge.percent(default 0.66)來(lái)設(shè)定。與map 端類似,這也是溢寫(xiě)的過(guò)程,這個(gè)過(guò)程中如果你設(shè)置有Combiner,也是會(huì)啟用的,然后在磁盤(pán)中生成了眾多的溢寫(xiě)文件。這種merge方式一直在運(yùn)行,直到?jīng)]有map端的數(shù)據(jù)時(shí)才結(jié)束,然后啟動(dòng)磁盤(pán)到磁盤(pán)的merge方式生成最終的那個(gè)文件。

reducer

當(dāng)reduce將所有的map上對(duì)應(yīng)自己partition的數(shù)據(jù)下載完成后,就會(huì)開(kāi)始真正的reduce計(jì)算階段。當(dāng)reduce task真正進(jìn)入reduce函數(shù)的計(jì)算階段的時(shí)候,有一個(gè)參數(shù)也是可以調(diào)整reduce的計(jì)算行為。也就是mapred.job.reduce.input.buffer.percent(default 0.0)。由于reduce計(jì)算時(shí)肯定也是需要消耗內(nèi)存的,而在讀取reduce需要的數(shù)據(jù)時(shí),同樣是需要內(nèi)存作為buffer,這個(gè)參數(shù)是控制,需要多少的內(nèi)存百分比來(lái)作為reduce讀已經(jīng)sort好的數(shù)據(jù)的buffer百分比。默認(rèn)情況下為0,也就是說(shuō),默認(rèn)情況下,reduce是全部從磁盤(pán)開(kāi)始讀處理數(shù)據(jù)。如果這個(gè)參數(shù)大于0,那么就會(huì)有一定量的數(shù)據(jù)被緩存在內(nèi)存并輸送給reduce,當(dāng)reduce計(jì)算邏輯消耗內(nèi)存很小時(shí),可以分一部分內(nèi)存用來(lái)緩存數(shù)據(jù),反正reduce的內(nèi)存閑著也是閑著。

Reduce在這個(gè)階段,框架為已分組的輸入數(shù)據(jù)中的每個(gè) 對(duì)調(diào)用一次reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。 Reduce任務(wù)的輸出通常是通過(guò)調(diào)用 OutputCollector.collect(WritableComparable, Writable)寫(xiě)入 文件系統(tǒng)的。Reducer的輸出是沒(méi)有排序的。

那么一般需要多少個(gè)Reduce呢?

Reduce的數(shù)目建議是0.95或1.75乘以 ( * mapred.tasktracker.reduce.tasks.maximum)。 用0.95,所有reduce可以在maps一完成時(shí)就立刻啟動(dòng),開(kāi)始傳輸map的輸出結(jié)果。用1.75,速度快的節(jié)點(diǎn)可以在完成第一輪reduce任務(wù)后,可以開(kāi)始第二輪,這樣可以得到比較好的負(fù)載均衡的效果。

reduces的性能很大程度上受shuffle的性能所影響。應(yīng)用配置的reduces數(shù)量是一個(gè)決定性的因素。太多或者太少的reduce都不利于發(fā)揮最佳性能:?太少的reduce會(huì)使得reduce運(yùn)行的節(jié)點(diǎn)處于過(guò)度負(fù)載狀態(tài),在極端情況下我們見(jiàn)過(guò)一個(gè)reduce要處理100g的數(shù)據(jù)。這對(duì)于失敗恢復(fù)有著非常致命的負(fù)面影響,因?yàn)槭〉膔educe對(duì)作業(yè)的影響非常大。太多的reduce對(duì)shuffle過(guò)程有不利影響。在極端情況下會(huì)導(dǎo)致作業(yè)的輸出都是些小文件,這對(duì)NameNode不利,并且會(huì)影響接下來(lái)要處理這些小文件的mapreduce應(yīng)用的性能。在大多數(shù)情況下,應(yīng)用應(yīng)該保證每個(gè)reduce處理1-2g數(shù)據(jù),最多5-10g。

The output files

作業(yè)的輸出OutputFormat 描述Map/Reduce作業(yè)的輸出樣式。Map/Reduce框架根據(jù)作業(yè)的OutputFormat來(lái):

1 21. 檢驗(yàn)作業(yè)的輸出,例如檢查輸出路徑是否已經(jīng)存在。 2. 提供一個(gè)RecordWriter的實(shí)現(xiàn),用來(lái)輸出作業(yè)結(jié)果。 輸出文件保存在FileSystem上。

OutputFormat主要有以下幾種:

請(qǐng)點(diǎn)擊此處輸入圖片描述

TextOutputFormat是默認(rèn)的 OutputFormat。

計(jì)數(shù)器(Counters)

計(jì)數(shù)器(Counters) 展現(xiàn)一些全局性的統(tǒng)計(jì)度量,這些度量由map/reduce框架本身,也可由應(yīng)用來(lái)設(shè)定。應(yīng)用可以自行定義任意的計(jì)數(shù)器并且在map或者reduce方法中更新它們的值。框架會(huì)對(duì)計(jì)數(shù)器的值做全局聚合。 計(jì)數(shù)器適合于追蹤記錄一些量不是很大,但是很重要的全局性信息。不應(yīng)該用于一些粒度過(guò)細(xì)的信息統(tǒng)計(jì)。 使用計(jì)數(shù)器的代價(jià)非常昂貴,因?yàn)樵趹?yīng)用的生命周期內(nèi)JobTracker需要給每一個(gè)map/reduce任務(wù)維護(hù)一組計(jì)數(shù)器(定義了多少個(gè)就維護(hù)多少個(gè))。

Reporter是用于map/reduce應(yīng)用程序報(bào)告進(jìn)度,設(shè)定應(yīng)用級(jí)別的狀態(tài)消息, 更新Counters(計(jì)數(shù)器)的機(jī)制。

Ref

Apache Hadoop: Best Practices and Anti-Patterns

Understanding Hadoop Clusters and the Network

Introduction to MapReduce

http://hadoop.apache.org/docs/r1.0.4/cn/streaming.html

MapReduce:詳解Shuffle過(guò)程

http://hadoop.apache.org/docs/r1.0.4/cn/mapred_tutorial.html

Anatomy of a MapReduce Job

http://pennywong.gitbooks.io/hadoop-notebook/content/mapreduce/introduction.html

https://developer.yahoo.com/hadoop/tutorial/module4.html

https://developer.yahoo.com/hadoop/tutorial/module5.html

長(zhǎng)按掃一掃,關(guān)注我們。每天都有精彩干貨哦!~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 目的這篇教程從用戶的角度出發(fā),全面地介紹了Hadoop Map/Reduce框架的各個(gè)方面。先決條件請(qǐng)先確認(rèn)Had...
    SeanC52111閱讀 1,842評(píng)論 0 1
  • 參考:hadoop 學(xué)習(xí)筆記:mapreduce框架詳解 [toc] 總結(jié) Mapreduce是一個(gè)計(jì)算框架,既然...
    小小少年Boy閱讀 1,224評(píng)論 0 4
  • MapReduce過(guò)程詳解及其性能優(yōu)化 [toc] 轉(zhuǎn)載:MapReduce過(guò)程詳解及其性能優(yōu)化 總結(jié) 詳情 從J...
    小小少年Boy閱讀 7,203評(píng)論 2 18
  • 思考問(wèn)題 MapReduce總結(jié) MapReduce MapReduce的定義MapReduce是一種編程模型, ...
    Sakura_P閱讀 1,025評(píng)論 0 1
  • MapReduce框架結(jié)構(gòu)## MapReduce是一個(gè)用于大規(guī)模數(shù)據(jù)處理的分布式計(jì)算模型MapReduce模型主...
    Bloo_m閱讀 3,956評(píng)論 0 4

友情鏈接更多精彩內(nèi)容