MapReduce原理和執(zhí)行過程

MapReduce是一個用于處理海量數據的分布式計算框架。

  • 這個框架解決了 ? 數據分布式存儲
    ?作業(yè)調度、
    ? 容錯、
    ? 機器間通信等復雜問題

MapReduce的核心思想,分而治之

分:map
? 把復雜的問題分解為若干“簡單的 任務”
合:reduce

上面這幅圖就是mapreduce的工作原理

以詞頻統(tǒng)計為例。

詞頻統(tǒng)計就是統(tǒng)計一個單詞在所有文本中出現(xiàn)的次數,在Hadoop中的事例程序就是wordcount,俗稱hadoop編程的"hello world".因為我們有多個文本,所以可以并行的統(tǒng)計每個文本中單詞出現(xiàn)的個數,然后最后進行合計。
所以這個可以很好地體現(xiàn)map,reduce的過程。

1)首先文檔的數據記錄(如文本中的行,或數據表格中的行)是以“鍵值對”的形式傳入map 函數,然后map函數對這些鍵值對進行處理(如統(tǒng)計詞頻),然后輸出到中間結果。

2)在鍵值對進入reduce進行處理之前,必須等到所有的map函數都做完,所以既為了達到這種同步又提高運行效率,在mapreduce中間的過程引入了barrier(同步障)
在負責同步的同時完成對map的中間結果的統(tǒng)計,包括 a. 對同一個map節(jié)點的相同key的value值進行合并,b. 之后將來自不同map的具有相同的key的鍵值對送到同一個reduce進行處理。

3)在reduce階段,每個reduce節(jié)點得到的是從所有map節(jié)點傳過來的具有相同的key的鍵值對。reduce節(jié)點對這些鍵值進行合并。

4)Combiner 節(jié)點負責完成上面提到的將同一個map中相同的key進行合并,避免重復傳輸,從而減少傳輸中的通信開銷。

5)Partitioner節(jié)點負責將map產生的中間結果進行劃分,確保相同的key到達同一個reduce節(jié)點.

編程模型

? 借鑒函數式的編程方式
? 用戶只需要實現(xiàn)兩個函數接口:
? Map(in_key, in_value)-> (out_key, intermediate_value) list
? Reduce (out_key, intermediate_value list) ->out_value list

兩個重要的進程

  • JobTracker
    主進程,負責接收客戶作業(yè)提交,調度任務到作節(jié)點上運行,并提供諸如監(jiān)控工作節(jié)點狀態(tài)及任務進度等 管理功能,一個MapReduce集群有一個jobtracker,一般運行在可靠的硬件上。
    ? tasktracker是通過周期性的心跳來通知jobtracker其當前的健康狀態(tài),每一次心跳包含了可用的map和 reduce任務數目、占用的數目以及運行中的任務詳細信息。Jobtracker利用一個線程池來同時處理心跳和 客戶請求。
  • TaskTracker
    ? 由jobtracker指派任務,實例化用戶程序,在本地執(zhí)行任務并周期性地向jobtracker匯報 狀態(tài)。在每一個工 作節(jié)點上永遠只會有一個tasktracker

? JobTracker一直在等待JobClient提交作業(yè)
? TaskTracker每隔3秒向JobTracker發(fā)送心跳詢問有沒有任務可做,如果有,讓
其派發(fā)任務給它執(zhí)行
? Slave主動向master拉生意

wordCount實例代碼(python實現(xiàn))

案例文件結構

run.sh文件
 HADOOP_CMD="/usr/local/src/hadoop-1.2.1/bin/hadoop"  
 STREAM_JAR_PATH="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"    //python代碼實現(xiàn),引入streaming

#INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
INPUT_FILE_PATH_1="/1.data"
OUTPUT_PATH="/output"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1. 
$HADOOP_CMD jar $STREAM_JAR_PATH \    //設置通過streaming方式提交
    -input $INPUT_FILE_PATH_1 \    //上面定義的,指向/1.data,原始文件(數據源)
    -output $OUTPUT_PATH \     //輸出路徑
    -mapper "python map_new.py" \    //指定如何執(zhí)行map
    -reducer "python red_new.py" \    //指定如何執(zhí)行reducer
    -file ./map_new.py \    //通過下面兩個配置文件,把本地的代碼分發(fā)到集群的map和Reducer上去
    -file ./red_new.py
a.txt
111
222
333
map_new.py
import sys

for line in sys.stdin:
    ss = line.strip().split(' ')
    for word in ss:
            print '\t'.join([word.strip(), '1'])

cat a.txt | python map_new.py。通過管道的方式,a.txt 是數據源,標準輸入到map。

red_new.py
import sys


cur_word = None
sum = 0

for line in sys.stdin:
    ss = line.strip().split('\t')
    if len(ss) != 2:
            continue
    word, cnt = ss

    if cur_word == None:
            cur_word = word

    if cur_word != word:
            print '\t'.join([cur_word, str(sum)])
            cur_word = word
            sum = 0

    sum += int(cnt)

print '\t'.join([cur_word, str(sum)])

head -2 The_Man_of_Property.txt | python map_new.py | sort -k1 | python red_new.py > result.local
這段代碼模擬了數據進入map,中間sort之后再進入red,最后輸出result.local

啟動集群。進入hadoop/bin 目錄下
./start-all.sh

然后hadoop fs -out 1.data / 把數據源上傳到HDFS
bash run.sh執(zhí)行streaming方式提交的腳本。

MapReduce計算框架-執(zhí)行流程

File:

文件要存儲在HDFS中,每個文件切分成多個一定大小(默認64M)的Block(默認3個備份)存儲在多個節(jié)點(DataNode)上

文件數據內容:

We are studying at badou.\n
We are studyPinagrtiatitonbeardou.\n
......

InputFormat:

MR框架基礎類之一
? 數據分割(Data Splits)
? S記pli錄t讀取Sp器lit(RSepcliotrdReader)
例子:
數據格定義,如果以“\n”分割每條記錄,以空格區(qū)分一個目標單詞
Shuffle “we are studying at badou.”為一條記錄
“are”“at”等為一個目標單詞

Split:

實際上每個split包含后一個Block中開頭部分的數據(解決記錄跨Block問題)
例子:
比如記錄 “we are studing at badou./n"
跨越存儲在兩個Block中,那么這條記錄屬于前一個Block對應的split

RecordReader:(RR)

每讀取一條記錄,調用一次map函數
例子
比如,記錄“we are studying at badou." 作為參數v,調用map(v)
然后繼續(xù)這個過程,讀取議案一條記錄知道split尾部。

Map:

比如記錄”we are studying at badou"
調用執(zhí)行一次map("we are studying at badou")
在內存中增加數據:
{"we":1}
{"are":1}
......

Shuffle:

Partion,Sort,Spill,Meger,Combiner.......
神奇發(fā)生的地方,性能優(yōu)化大有可為的地方!

Partitioner:

決定數據由哪個Reducer處理,從而分區(qū)
比如采用Hash法。

MemoryBuffer

內存緩沖區(qū),每個map的結果和partition處理的key value結果都保存在緩存中

緩沖區(qū)大小:默認100M
溢寫閾值:100M*0.8 = 80M

緩沖區(qū)中的數據:partition key value 三元組數據
{“1”, “are” : 1}
{“2”, “at” : 1}
{“1”, “we” : 1}

Spill:

內存緩沖區(qū)達到閾值時,溢寫spill線程鎖住這80M 的緩沖區(qū),開始將數據寫出到本地磁盤中,然后釋 放內存。
每次溢寫都生成一個數據文件。 溢出的數據到磁盤前會對數據進行key排序sort, 以及合并combiner
發(fā)送相同Reduce的key數量,會拼接到一起,減少 partition的索引數量。

Sort:

緩沖區(qū)數據按照key進行排序

Combiner:

數據合并,相同的key的數據,value值合并,減少輸 出傳輸量 Combiner函數事實上是reducer函數,滿足 combiner處理不影響{sum,max等}最終reduce的 結果時,可以極大提升性能
{“1”, “are”, 1} {“1”, “are”, 1} {“1”, “we”, 1}==》
{“1”, “are”, 2} {“1”, “we”, 1}

Reducer

多個reduce任務輸入的數據都屬于不同的partition,因此結果數據的key不會重復。
合并reduce的輸出文件即可得到最終的結果。

MapReduce物理配置

? 文件句柄個數 – ulimit
? cpu

– 多核

? 內存

– 8G以上

? 合適的slot

– 單機map、reduce個數
– mapred.tasktracker.map.tasks.maximum(默認2)
– mapreduce.tasktracker.tasks.reduce.maximum(默認2) – 內存限制
– cpu核數-1
– 多機集群分離

? 磁盤情況

– 合適單機多磁盤
– mapred.local.dir和dfs.data.dir

? 確定map任務數時依次優(yōu)先參考如下幾個原則:

– 每個map任務使用的內存不超過800M,盡量在500M以下
– 每個map任務運行時間控制在大約20分鐘,最好1-3分鐘
– 每個map任務處理的最大數據量為一個HDFS塊大小,一個map任務處理的輸入不能跨文件
– map任務總數不超過平臺可用的任務槽位

? 配置加載的問題

– 簡單配置通過提交作業(yè)時-file分發(fā)
– 復雜較大配置
? 傳入hdfs
? map中打開文件讀取
? 建立內存結構

? map個數為split的份數
? 壓縮文件不可切分
? 非壓縮文件和sequence文件可以切分
? dfs.block.size決定block大小
? 確定reduce任務數時依次優(yōu)先參考如下幾個方面:

– 每個reduce任務使用的內存不超過800M,盡量在500M以下
– 每個reduce任務運行時間控制在大約20分鐘,最好1-3分鐘 – 整個reduce階段的輸入數據總量
– 每個reduce任務處理的數據量控制在500MB以內
– map任務數與reduce任務數的乘積
– 輸出數據要求

? reduce個數設置

– mapred.reduce.tasks – 默認為1

? reduce個數太少 – 單次執(zhí)行慢

– 出錯再試成本高

? reduce個數太多 – shuffle開銷大

– 輸出大量小文件

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • 努力生活,別把年齡當回事 妍初 1 晚高峰,在地鐵一號線轉二號線水泄不通的電梯上,接到小葉的電話。周圍人聲鼎沸,...
    妍初0閱讀 689評論 0 0
  • 那天,閃著藍色光芒的流星劃過天際,一條優(yōu)美的拋物線橫于萬里晴空;街道上、天橋上、陽臺上,人們欣賞它的絢麗。 “...
    九月紅葉閱讀 271評論 0 0
  • 我有夢想。我希望把自己的足跡留在世界上的每一個無人問津的角落,描述出我所見到的高尚與卑鄙、冷漠與熱情、智慧與愚蠢、...
    說啥看心情閱讀 1,734評論 25 32

友情鏈接更多精彩內容