MapReduce是一個用于處理海量數據的分布式計算框架。
- 這個框架解決了 ? 數據分布式存儲
?作業(yè)調度、
? 容錯、
? 機器間通信等復雜問題
MapReduce的核心思想,分而治之
分:map
? 把復雜的問題分解為若干“簡單的 任務”
合:reduce

上面這幅圖就是mapreduce的工作原理
以詞頻統(tǒng)計為例。
詞頻統(tǒng)計就是統(tǒng)計一個單詞在所有文本中出現(xiàn)的次數,在Hadoop中的事例程序就是wordcount,俗稱hadoop編程的"hello world".因為我們有多個文本,所以可以并行的統(tǒng)計每個文本中單詞出現(xiàn)的個數,然后最后進行合計。
所以這個可以很好地體現(xiàn)map,reduce的過程。
1)首先文檔的數據記錄(如文本中的行,或數據表格中的行)是以“鍵值對”的形式傳入map 函數,然后map函數對這些鍵值對進行處理(如統(tǒng)計詞頻),然后輸出到中間結果。
2)在鍵值對進入reduce進行處理之前,必須等到所有的map函數都做完,所以既為了達到這種同步又提高運行效率,在mapreduce中間的過程引入了barrier(同步障)
在負責同步的同時完成對map的中間結果的統(tǒng)計,包括 a. 對同一個map節(jié)點的相同key的value值進行合并,b. 之后將來自不同map的具有相同的key的鍵值對送到同一個reduce進行處理。
3)在reduce階段,每個reduce節(jié)點得到的是從所有map節(jié)點傳過來的具有相同的key的鍵值對。reduce節(jié)點對這些鍵值進行合并。
4)Combiner 節(jié)點負責完成上面提到的將同一個map中相同的key進行合并,避免重復傳輸,從而減少傳輸中的通信開銷。
5)Partitioner節(jié)點負責將map產生的中間結果進行劃分,確保相同的key到達同一個reduce節(jié)點.
編程模型
? 借鑒函數式的編程方式
? 用戶只需要實現(xiàn)兩個函數接口:
? Map(in_key, in_value)-> (out_key, intermediate_value) list
? Reduce (out_key, intermediate_value list) ->out_value list
兩個重要的進程
-
JobTracker
主進程,負責接收客戶作業(yè)提交,調度任務到作節(jié)點上運行,并提供諸如監(jiān)控工作節(jié)點狀態(tài)及任務進度等 管理功能,一個MapReduce集群有一個jobtracker,一般運行在可靠的硬件上。
? tasktracker是通過周期性的心跳來通知jobtracker其當前的健康狀態(tài),每一次心跳包含了可用的map和 reduce任務數目、占用的數目以及運行中的任務詳細信息。Jobtracker利用一個線程池來同時處理心跳和 客戶請求。 -
TaskTracker
? 由jobtracker指派任務,實例化用戶程序,在本地執(zhí)行任務并周期性地向jobtracker匯報 狀態(tài)。在每一個工 作節(jié)點上永遠只會有一個tasktracker
? JobTracker一直在等待JobClient提交作業(yè)
? TaskTracker每隔3秒向JobTracker發(fā)送心跳詢問有沒有任務可做,如果有,讓
其派發(fā)任務給它執(zhí)行
? Slave主動向master拉生意

wordCount實例代碼(python實現(xiàn))
案例文件結構

run.sh文件
HADOOP_CMD="/usr/local/src/hadoop-1.2.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar" //python代碼實現(xiàn),引入streaming
#INPUT_FILE_PATH_1="/The_Man_of_Property.txt"
INPUT_FILE_PATH_1="/1.data"
OUTPUT_PATH="/output"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \ //設置通過streaming方式提交
-input $INPUT_FILE_PATH_1 \ //上面定義的,指向/1.data,原始文件(數據源)
-output $OUTPUT_PATH \ //輸出路徑
-mapper "python map_new.py" \ //指定如何執(zhí)行map
-reducer "python red_new.py" \ //指定如何執(zhí)行reducer
-file ./map_new.py \ //通過下面兩個配置文件,把本地的代碼分發(fā)到集群的map和Reducer上去
-file ./red_new.py
a.txt
111
222
333
map_new.py
import sys
for line in sys.stdin:
ss = line.strip().split(' ')
for word in ss:
print '\t'.join([word.strip(), '1'])

cat a.txt | python map_new.py。通過管道的方式,a.txt 是數據源,標準輸入到map。
red_new.py
import sys
cur_word = None
sum = 0
for line in sys.stdin:
ss = line.strip().split('\t')
if len(ss) != 2:
continue
word, cnt = ss
if cur_word == None:
cur_word = word
if cur_word != word:
print '\t'.join([cur_word, str(sum)])
cur_word = word
sum = 0
sum += int(cnt)
print '\t'.join([cur_word, str(sum)])
head -2 The_Man_of_Property.txt | python map_new.py | sort -k1 | python red_new.py > result.local
這段代碼模擬了數據進入map,中間sort之后再進入red,最后輸出result.local

啟動集群。進入hadoop/bin 目錄下
./start-all.sh
然后hadoop fs -out 1.data / 把數據源上傳到HDFS
bash run.sh執(zhí)行streaming方式提交的腳本。
MapReduce計算框架-執(zhí)行流程

File:
文件要存儲在HDFS中,每個文件切分成多個一定大小(默認64M)的Block(默認3個備份)存儲在多個節(jié)點(DataNode)上
文件數據內容:
We are studying at badou.\n
We are studyPinagrtiatitonbeardou.\n
......
InputFormat:
MR框架基礎類之一
? 數據分割(Data Splits)
? S記pli錄t讀取Sp器lit(RSepcliotrdReader)
例子:
數據格定義,如果以“\n”分割每條記錄,以空格區(qū)分一個目標單詞
Shuffle “we are studying at badou.”為一條記錄
“are”“at”等為一個目標單詞
Split:
實際上每個split包含后一個Block中開頭部分的數據(解決記錄跨Block問題)
例子:
比如記錄 “we are studing at badou./n"
跨越存儲在兩個Block中,那么這條記錄屬于前一個Block對應的split
RecordReader:(RR)
每讀取一條記錄,調用一次map函數
例子
比如,記錄“we are studying at badou." 作為參數v,調用map(v)
然后繼續(xù)這個過程,讀取議案一條記錄知道split尾部。
Map:
比如記錄”we are studying at badou"
調用執(zhí)行一次map("we are studying at badou")
在內存中增加數據:
{"we":1}
{"are":1}
......
Shuffle:
Partion,Sort,Spill,Meger,Combiner.......
神奇發(fā)生的地方,性能優(yōu)化大有可為的地方!
Partitioner:
決定數據由哪個Reducer處理,從而分區(qū)
比如采用Hash法。
MemoryBuffer
內存緩沖區(qū),每個map的結果和partition處理的key value結果都保存在緩存中
緩沖區(qū)大小:默認100M
溢寫閾值:100M*0.8 = 80M
緩沖區(qū)中的數據:partition key value 三元組數據
{“1”, “are” : 1}
{“2”, “at” : 1}
{“1”, “we” : 1}
Spill:
內存緩沖區(qū)達到閾值時,溢寫spill線程鎖住這80M 的緩沖區(qū),開始將數據寫出到本地磁盤中,然后釋 放內存。
每次溢寫都生成一個數據文件。 溢出的數據到磁盤前會對數據進行key排序sort, 以及合并combiner
發(fā)送相同Reduce的key數量,會拼接到一起,減少 partition的索引數量。
Sort:
緩沖區(qū)數據按照key進行排序
Combiner:
數據合并,相同的key的數據,value值合并,減少輸 出傳輸量 Combiner函數事實上是reducer函數,滿足 combiner處理不影響{sum,max等}最終reduce的 結果時,可以極大提升性能
{“1”, “are”, 1} {“1”, “are”, 1} {“1”, “we”, 1}==》
{“1”, “are”, 2} {“1”, “we”, 1}
Reducer
多個reduce任務輸入的數據都屬于不同的partition,因此結果數據的key不會重復。
合并reduce的輸出文件即可得到最終的結果。
MapReduce物理配置
? 文件句柄個數 – ulimit
? cpu
– 多核
? 內存
– 8G以上
? 合適的slot
– 單機map、reduce個數
– mapred.tasktracker.map.tasks.maximum(默認2)
– mapreduce.tasktracker.tasks.reduce.maximum(默認2) – 內存限制
– cpu核數-1
– 多機集群分離
? 磁盤情況
– 合適單機多磁盤
– mapred.local.dir和dfs.data.dir
? 確定map任務數時依次優(yōu)先參考如下幾個原則:
– 每個map任務使用的內存不超過800M,盡量在500M以下
– 每個map任務運行時間控制在大約20分鐘,最好1-3分鐘
– 每個map任務處理的最大數據量為一個HDFS塊大小,一個map任務處理的輸入不能跨文件
– map任務總數不超過平臺可用的任務槽位
? 配置加載的問題
– 簡單配置通過提交作業(yè)時-file分發(fā)
– 復雜較大配置
? 傳入hdfs
? map中打開文件讀取
? 建立內存結構
? map個數為split的份數
? 壓縮文件不可切分
? 非壓縮文件和sequence文件可以切分
? dfs.block.size決定block大小
? 確定reduce任務數時依次優(yōu)先參考如下幾個方面:
– 每個reduce任務使用的內存不超過800M,盡量在500M以下
– 每個reduce任務運行時間控制在大約20分鐘,最好1-3分鐘 – 整個reduce階段的輸入數據總量
– 每個reduce任務處理的數據量控制在500MB以內
– map任務數與reduce任務數的乘積
– 輸出數據要求
? reduce個數設置
– mapred.reduce.tasks – 默認為1
? reduce個數太少 – 單次執(zhí)行慢
– 出錯再試成本高
? reduce個數太多 – shuffle開銷大
– 輸出大量小文件