分布式之MapReduce——解讀《MapReduce》


title: 分布式之MapReduce——解讀《MapReduce》
date: 2021-12-22 17:36:29


論文:Dean J, Ghemawat S. MapReduce: Simplified data processing on large clusters[J]. 2004.

前言

MIT6.824分布式課程第一課的課前閱讀要求。

MapReduce是谷歌提出的面向大規(guī)模數(shù)據(jù)的分布式并行計(jì)算模式,給大數(shù)據(jù)并行計(jì)算帶來了革命性影響,MapReduce也是著名的Hadoop中相當(dāng)重要的一部分。

論文解讀

Abstract

MapReduce用于大數(shù)據(jù)的計(jì)算。

用戶可定義一個(gè) map 函數(shù),用于處理一個(gè)鍵值對,并生成一堆中間鍵值對;同時(shí),用戶再定義一個(gè) reduce 函數(shù),它用于合并所有 key 相同的中間鍵值對。

1 Introduction

map 和 reduct 的設(shè)計(jì)理念受 Lisp 等函數(shù)式語言的啟發(fā)。

我們意識(shí)到大部分計(jì)算都可以:先用一個(gè) map 操作去計(jì)算輸入中的每個(gè)邏輯單位,得出一個(gè)包含中間態(tài)鍵值對的集合;然后再用一個(gè) reduce 操作去合并所有 key 相同的 value 。

We realized that most of our computations involved applying a map operation to each logical "record" in our input in order to compute a set of intermediate key/value pairs, and then applying a reduce operation to all the values that shared the same key, in order to combine the derived data appropriately.

容錯(cuò)的主要機(jī)制:重新執(zhí)行。

第2節(jié)介紹基礎(chǔ)的程序模型和幾個(gè)樣例;第3節(jié)介紹在集群計(jì)算環(huán)境下, MapReduce 接口的簡單實(shí)現(xiàn);第4節(jié)介紹幾種我們認(rèn)為有用的程序模型的改良;第5節(jié)展示在各種各樣任務(wù)上的測量;第6節(jié)展示 MapReduce 在谷歌內(nèi)部的使用,包括使用經(jīng)驗(yàn);第7節(jié)討論未來相關(guān)的工作。

2 Programming Model

程序的輸入和輸出都是鍵值對集合。使用 MapReduce 的用戶需要將計(jì)算過程表示成兩個(gè)函數(shù):Map 和 Reduce 。

  • Map 函數(shù)。由用戶自定義,輸入一個(gè)鍵值對,進(jìn)行處理后,輸出一組中間鍵值對。MapReduce 會(huì)將其中 key 相同的鍵值對的 value 合并起來,關(guān)聯(lián)到同一個(gè) key ,并將其傳給 Reduce 函數(shù)。
  • Reduce 函數(shù)。也由用戶自定義,接收一個(gè)中間鍵值對的 key 和該 key 對應(yīng)的一組 value 。Reduce 會(huì)將這些 value 合并起來,輸出一組新的 value ,不過通常它只輸出零或一個(gè)值。輸入中的一組 value 會(huì)通過一個(gè)迭代器傳遞,防止數(shù)據(jù)量太大而導(dǎo)致內(nèi)存無法容納。

2.1 Example

統(tǒng)計(jì)大量文檔中各單詞出現(xiàn)的次數(shù):

map(String key, String value):
  // key: document name
  // value: document contents
  for each word w in value:
    EmitIntermediate(w, "1");

reduce(String key, Iterator values):
  // key: a word
  // values: a list of counts
  int result = 0;
  for each v in values:
    result += ParseInt(v);
  Emit(AsString(result));

2.2 Types

map    (k1,v1)       → list(k2,v2)
reduce (k2,list(v2)) → list(v2)

map函數(shù)輸入的鍵值對(k1,v1) 和輸出的中間鍵值對(k2,v2)屬于不同域,比如在上述示例中:對于map函數(shù),(k1,v1)是文檔名和文檔內(nèi)容,而(k2,v2)則是單詞和出現(xiàn)次數(shù),;而對于reduce函數(shù),輸入的v2是單詞出現(xiàn)次數(shù),輸出也是單詞出現(xiàn)次數(shù),屬于同一域。

2.3 More Examples

  • Distributed Grep:map函數(shù)輸出被匹配的行,reduce函數(shù)則只是拷貝輸入到輸出。
  • Count of URL Access Frequency:map輸入請求日志并輸出<URL, 1>,reduce求和URL相同的鍵值對的值并輸出<URL, total count>。
  • Reverse Web-Link Graph:map函數(shù)輸出<target, source>,target是指向目標(biāo)URL的鏈接,source是鏈接所在頁面的名稱;reduce函數(shù)將所有target相同的source集合起來,輸出<target, list(source)>。
  • Term-Vector per Host
  • Inverted Index
  • Distributed Sort

3 Implementation

本節(jié)所介紹的MapReduce實(shí)現(xiàn)將基于大規(guī)模計(jì)算機(jī)集群。

3.1 Execution Overview

Map函數(shù)分布在多個(gè)機(jī)器上,輸入數(shù)據(jù)將被自動(dòng)分為M片,每一片可被不同機(jī)器并行地處理。中間鍵值對通過一個(gè)分區(qū)函數(shù)(partitioning function)分為R片,R和分區(qū)函數(shù)由用戶自定義。

MapReduce調(diào)用過程如下:

image.png
  1. MapReduce庫先將輸入文件分為M片,每片通常16MB到64MB。然后它將喚醒程序?qū)?yīng)的集群。
  2. 集群存在一個(gè)特殊的節(jié)點(diǎn)master,其余都為worker??偣灿蠱個(gè)Map任務(wù)和R個(gè)Reduce任務(wù),由master分配給相應(yīng)的worker。
  3. 負(fù)責(zé)Map任務(wù)的worker,從對應(yīng)的片段中讀取輸入數(shù)據(jù)(M個(gè)Map worker與輸入的M個(gè)片段一一對應(yīng))。它從輸入中解析出鍵值對,然后將每一個(gè)傳遞給用戶定義的Map函數(shù)。Map函數(shù)處理后輸出中間鍵值對,并緩存在內(nèi)存中。
  4. 緩存的中間鍵值對會(huì)周期性地寫入磁盤中,并被分區(qū)函數(shù)(比如hash(key) mod R,保證不同map輸出的相同key都在同一個(gè)序號的分區(qū)中)分為R個(gè)分區(qū)(對應(yīng)R個(gè)Reduce任務(wù),比如第i個(gè)分區(qū)對應(yīng)第i個(gè)Reduce)。這些緩存的中間鍵值對的位置會(huì)被告知給master,master會(huì)將位置傳達(dá)給Reduce worker。
  5. 當(dāng)Reduce worker被master告知中間鍵值對位置后,它會(huì)通過遠(yuǎn)程調(diào)用讀取對應(yīng)Map worker磁盤中的數(shù)據(jù)(第i個(gè)Reduce讀取所有Map中第i個(gè)分區(qū)的數(shù)據(jù))。當(dāng)Reduce worker讀取了所有的中間鍵值對,它會(huì)根據(jù)key進(jìn)行排序 ,因此相同key的中間鍵值對會(huì)組合在一起。如果數(shù)據(jù)量太大無法放入內(nèi)存,則需進(jìn)行外部排序。
  6. Reduce worker會(huì)遍歷排好序的中間鍵值對,將每一個(gè)唯一的key和對應(yīng)的一組value傳遞給用戶定義的Reduce函數(shù),輸出結(jié)果將追加到當(dāng)前reduce分區(qū)對應(yīng)的最終文件。
  7. 當(dāng)所有map和reduce任務(wù)完成,master將喚醒并返回用戶程序。

當(dāng)MapReduce成功執(zhí)行完成,結(jié)果將會(huì)存放在R個(gè)輸出文件中(對應(yīng)Reduce任務(wù)個(gè)數(shù))。用戶并不需要合并這R個(gè)文件,它們通常會(huì)作為下一個(gè)MapReduce任務(wù)的輸入或者應(yīng)用到下一個(gè)分布式任務(wù)。

3.2 Master Data Structures

master節(jié)點(diǎn)保存每個(gè)map和reduce任務(wù)的狀態(tài)、每態(tài)計(jì)算機(jī)的標(biāo)識(shí)、已完成的map worker的輸出數(shù)據(jù)的位置和大小。

3.3 Fault Tolerance

Worker Failure

master定期ping每一個(gè)worker,如果某個(gè)worker一段時(shí)間內(nèi)沒有響應(yīng),則將其標(biāo)記為failed失敗。

對于失敗worker中的map任務(wù),不管任務(wù)已完成還是在進(jìn)行中,都將被重置為idle初始狀態(tài),并分配給另一個(gè)worker。 因?yàn)?,失敗worker中map任務(wù)的輸出被保存在失敗worker電腦的本地磁盤。

對于失敗worker中的reduce任務(wù),若任務(wù)已完成則不需要被重新執(zhí)行,因?yàn)樗鼈兊妮敵霰槐4嬖谌治募到y(tǒng)。

當(dāng)一個(gè)map任務(wù)先被worker A執(zhí)行再被worker B執(zhí)行(A出現(xiàn)異常)時(shí),所有從A讀取輸入數(shù)據(jù)的正在執(zhí)行的reduce任務(wù)將被通知重新執(zhí)行。

Master Failure

將master的相關(guān)數(shù)據(jù)周期性地寫入checkpoint檢查點(diǎn),如果master失效,一個(gè)新的副本將從最近一次的checkpoint啟動(dòng)。然而,master只有一個(gè)節(jié)點(diǎn),是不可能失效的(?)。如果master失效,將中止MapReduce計(jì)算。客戶端可檢查此條件,并重試MapReduce計(jì)算。

It is easy to make the master write periodic checkpoints of the master data structures described above. If the master task dies, a new copy can be started from the last checkpointed state. However, given that there is only a single master, its failure is unlikely; therefore our current implementation aborts the MapReduce computation if the master fails. Clients can check for this condition and retry the MapReduce operation if they desire.

Semantics in the Presence of Failures

map和reduce任務(wù)的提交都是原子操作。

每一個(gè)正在運(yùn)行的任務(wù)會(huì)將輸出寫到私有的暫存文件中 ,reduce任務(wù)產(chǎn)生一個(gè)這樣的文件,map任務(wù)產(chǎn)生R個(gè)這樣的文件(每個(gè)對應(yīng)一個(gè)reduce任務(wù)?)。

當(dāng)map任務(wù)完成時(shí),worker會(huì)向master發(fā)送一條消息,包含R個(gè)暫存文件的名字 。如果master已接受過該消息,則忽略,否則將消息數(shù)據(jù)記錄到本地。

當(dāng)reduce任務(wù)完成時(shí),worker會(huì)原子地將其輸出的暫存文件重命名為最終輸出文件 。如果相同的reduce任務(wù)被多臺(tái)機(jī)器執(zhí)行,多個(gè)重命名操作會(huì)被執(zhí)行在最終輸出文件上,但由于原子性,我們能保證最終文件僅包含一個(gè)reduce任務(wù)所產(chǎn)生的數(shù)據(jù)。

3.4 Locality

master在調(diào)度map任務(wù)時(shí)會(huì)考慮輸入數(shù)據(jù)文件的位置信息。盡量在包含該相關(guān)輸入數(shù)據(jù)的副本的機(jī)器上執(zhí)行map任務(wù)。如果任務(wù)失敗,master會(huì)嘗試在保存輸入數(shù)據(jù)副本的鄰近機(jī)器上執(zhí)行map任務(wù)(比如同一網(wǎng)絡(luò)中)。

3.5 Task Granularity

我們將map任務(wù)劃分成M個(gè),reduce任務(wù)劃分成R個(gè),同時(shí)M和R遠(yuǎn)遠(yuǎn)大于worker的數(shù)量。

master必須執(zhí)行O(M+R)次調(diào)度,并在內(nèi)存中保存O(M*R)個(gè)狀態(tài)。

R通常由用戶定義,因?yàn)樽罱K會(huì)有R個(gè)輸出文件。實(shí)際中,M的值會(huì)保證每份獨(dú)立的輸入數(shù)據(jù)在16MB在64MB之間,R的值則應(yīng)該是worker的倍數(shù)。比如:2000個(gè)worker,M=200000,R=5000。

3.6 Backup Tasks

導(dǎo)致MapReduce總時(shí)間延長的一個(gè)常見原因是存在“落伍者”——一臺(tái)機(jī)器在執(zhí)行最后幾個(gè)map或reduce時(shí),花費(fèi)了比平時(shí)更長的時(shí)間?!奥湮檎摺背霈F(xiàn)的原因有很多,比如磁盤出現(xiàn)了錯(cuò)誤,讀寫速度從30MB/s下降到1MB/s。

我們有一個(gè)常用方法來解決“落伍者”問題,當(dāng)MapReduce任務(wù)快完成時(shí),master將會(huì)備份執(zhí)行 正在運(yùn)行中的剩余任務(wù),只要主任務(wù)或備份任務(wù)完成,任務(wù)就會(huì)被標(biāo)記為已完成。

使用該機(jī)制只會(huì)比不使用多花費(fèi)幾個(gè)百分比的計(jì)算資源,但能顯著減少運(yùn)行大型計(jì)算所需要的時(shí)間。

4 Refinements

4.1 Partitioning Function

reduce任務(wù)/輸出文件數(shù)量R由用戶定義,中間鍵值對被分區(qū)函數(shù)分為R份。一個(gè)默認(rèn)的分區(qū)函數(shù)通常使用HASH,比如:hash(key) mod R,再比如:中間key是URL,且想要域名相同的最終在一個(gè)輸出文件中,分區(qū)函數(shù)可定義為hash(Hostname(urlkey)) mod R。

4.2 Ordering Guarantees

對于一個(gè)給定的分區(qū),中間鍵值對會(huì)根據(jù)key升序排列。這種順序能保證分區(qū)對應(yīng)的輸出文件是有序的。有時(shí)這很有用。

4.3 Combiner Function

在很多情況下,不同map任務(wù)會(huì)產(chǎn)生相同的中間鍵值對。比如在單詞統(tǒng)計(jì)的MapReduce程序中,每個(gè)map任務(wù)都會(huì)產(chǎn)生數(shù)以千計(jì)的<the, 1>鍵值對,所有這些鍵值對會(huì)被發(fā)送給單個(gè)reduce任務(wù)。我們允許用戶定義一個(gè)Combiner函數(shù),在數(shù)據(jù)發(fā)送之前,可以通過該函數(shù)將數(shù)據(jù)進(jìn)行部分合并

Combiner函數(shù)在map任務(wù)運(yùn)行的每臺(tái)機(jī)器上執(zhí)行。通常Combiner函數(shù)和Reduce函數(shù)代碼一樣(?),不同在于Reduce函數(shù)輸出到最終文件,Combiner函數(shù)輸出到中間文件。

部分合并會(huì)明顯提升某類MapReduce操作的速度。

4.4 Input and Output Types

MapReduce支持讀取不同形式的輸入數(shù)據(jù),例如:對于文本數(shù)據(jù),每一行當(dāng)作一對key/value。

用戶可以使用預(yù)定義的輸入格式,也可以實(shí)現(xiàn)reader接口用來支持新的輸入格式。reader函數(shù)不僅可以從文件中讀取數(shù)據(jù),也可以從數(shù)據(jù)庫、內(nèi)存中讀取。

同樣,我們也提供預(yù)定義的輸出格式用于產(chǎn)生輸出,當(dāng)然用戶也可以自定義。

4.5 Side-effects

在某些情況下,用戶為了便利,會(huì)在執(zhí)行map或reduce任務(wù)時(shí)產(chǎn)生額外的輔助文件。我們需要用戶保證這些副作用(side-effects)的原子性和冪等性(idempotent)。

我們不支持一個(gè)任務(wù)產(chǎn)生多個(gè)輸出文件所帶來的兩段原子提交。。。(沒搞懂,原文如下)

We do not provide support for atomic two-phase commits of multiple output fifiles produced by a single task.Therefore, tasks that produce multiple output fifiles withcross-fifile consistency requirements should be deterministic. This restriction has never been an issue in practice.

4.6 Skipping Bad Records

有時(shí),某個(gè)記錄(record)會(huì)導(dǎo)致Map或Reduce函數(shù)崩潰,這些bug會(huì)阻止MapReduce的完成。

通常的方法是去修復(fù)這個(gè)bug,但有時(shí)候是不可修復(fù)的,比如bug在第三方庫中。我們提供了一種可選的執(zhí)行模式 ,MapReduce檢測到某個(gè)記錄會(huì)導(dǎo)致崩潰時(shí),將會(huì)跳過這個(gè)記錄。

具體細(xì)節(jié)是,worker上運(yùn)行一個(gè)捕獲內(nèi)存段異常(segmentation violation)和總線錯(cuò)誤(bus error)的handler。在調(diào)用Map或Reduce函數(shù)之前,MapReduce會(huì)保存參數(shù)的序號。如果程序產(chǎn)生錯(cuò)誤信號(signal),handler就會(huì)向master發(fā)送一個(gè)包含參數(shù)序號的"last gasp"UDP包。如果master發(fā)現(xiàn)這個(gè)記錄出現(xiàn)了不止一次錯(cuò)誤,就會(huì)標(biāo)記這個(gè)記錄,并讓重新執(zhí)行的Map或Reduce任務(wù)跳過它。

4.7 Local Execution

調(diào)試分布式系統(tǒng)是一個(gè)棘手的問題,為了方便調(diào)試,我們實(shí)現(xiàn)了另一種MapReduce——可以在本地機(jī)器上順序執(zhí)行所有操作。

4.8 Status Information

在master中,運(yùn)行著一個(gè)HTTP服務(wù)器,它向用戶展示一些狀態(tài)頁面。這些狀態(tài)頁面展示計(jì)算的進(jìn)度,比如:how many tasks have been completed, how many are inprogress, bytes of input, bytes of intermediate data, bytes of output, processing rates, etc。頁面還包含每個(gè)任務(wù)產(chǎn)生的錯(cuò)誤和輸出文件的鏈接。用戶可以通過頁面估計(jì)時(shí)間、添加計(jì)算資源、分析調(diào)試等等。

4.9 Counters

MapReduce還提供計(jì)數(shù)器counter用于記錄不同事件發(fā)生的次數(shù),比如已處理單詞的個(gè)數(shù)。

Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
  for each word w in contents:
    if (IsCapitalized(w)):
      uppercase->Increment();
    EmitIntermediate(w, "1");

每隔一段時(shí)間,計(jì)數(shù)器的值會(huì)從不同worker上報(bào)給master。來自已成功完成的map和reduce任務(wù)的counter值,會(huì)被master集合起來,并在MapReduce完成后返回給用戶代碼。同時(shí),counter值也在狀態(tài)頁面展示給用戶。聚合counter值時(shí),master會(huì)剔除重復(fù)執(zhí)行的任務(wù)。

5 Performance

6 Experience

7 Related Work

8 Conclusions

MapReduce成功的原因:

  1. 易于使用(抽象性好)。
  2. 很多問題都能用MapReduce表示。
  3. 已經(jīng)成熟,可以擴(kuò)展到數(shù)千臺(tái)機(jī)器上使用。

這項(xiàng)工作中的啟發(fā):

  1. 通過限制編程模式 可以讓并行分布式計(jì)算和計(jì)算容錯(cuò)更加容易。
  2. 網(wǎng)絡(luò)是一種緊缺資源 。因此我們進(jìn)行了許多優(yōu)化,比如:從本地讀取數(shù)據(jù)、將中間數(shù)據(jù)暫存到本地。
  3. 可以使用冗余執(zhí)行 ,來減少速度比較慢的機(jī)器所帶來的影響,以及處理計(jì)算機(jī)故障和數(shù)據(jù)丟失。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容