1 概述
- MapReduce源自Google的MapReduce論文,論文發(fā)表于2004年12月
- Hadoop MapReduce可以說是Google MapReduce的一個開源實現(xiàn)
- MapReduce優(yōu)點在于可以將海量的數(shù)據(jù)進行離線處理,并且MapReduce也易于開發(fā),因為MapReduce框架幫我們封裝好了分布式計算的開發(fā)。而且對硬件設(shè)施要求不高,可以運行在廉價的機器上
- MapReduce也有缺點,它最主要的缺點就是無法完成實時流式計算,只能離線處理。
MapReduce屬于一種編程模型,用于大規(guī)模數(shù)據(jù)集(大于1TB)的并行運算。
概念"Map(映射)"和"Reduce(歸約)",是它們的主要思想,都是從函數(shù)式編程語言里借來的,還有從矢量編程語言里借來的特性。
它極大地方便了編程人員在不會分布式并行編程的情況下,將自己的程序運行在分布式系統(tǒng)上。
當(dāng)前的軟件實現(xiàn)是指定一個Map(映射)函數(shù),用來把一組鍵值對映射成一組新的鍵值對,指定并發(fā)的Reduce(歸約)函數(shù),用來保證所有映射的鍵值對中的每一個共享相同的鍵組。
在學(xué)習(xí)MapReduce之前我們需要準(zhǔn)備好Hadoop的環(huán)境,也就是需要先安裝好HDFS以及YARN,環(huán)境的搭建方式可以參考我之前的兩篇文章
2 MapReduce編程模型
通過wordcount詞頻統(tǒng)計分析案例入門
在安裝Hadoop時,它就自帶有一個WordCount的案例,這個案例是統(tǒng)計文件中每個單詞出現(xiàn)的次數(shù),也就是詞頻統(tǒng)計,我們在學(xué)習(xí)大數(shù)據(jù)開發(fā)時,一般都以WordCount作為入門。
例如,我現(xiàn)在有一個test.txt,文件內(nèi)容如下:
hello world
hello hadoop
hello MapReduce
現(xiàn)在的需求是統(tǒng)計這個文件中每個單詞出現(xiàn)的次數(shù)。假設(shè)我現(xiàn)在寫了一些代碼實現(xiàn)了這個文件的詞頻統(tǒng)計,統(tǒng)計的結(jié)果如下:
hello 3
world 1
hadoop 1
MapReduce 1
以上這就是一個詞頻統(tǒng)計的例子。
詞頻統(tǒng)計看起來貌似很簡單的樣子,一般不需要多少代碼就能完成了,而且如果對shell腳本比較熟悉的話,甚至一句代碼就能完成這個詞頻統(tǒng)計的功能。確實詞頻統(tǒng)計是不難,但是為什么還要用大數(shù)據(jù)技術(shù)去完成這個詞頻統(tǒng)計的功能呢?這是因為實現(xiàn)小文件的詞頻統(tǒng)計功能或許用簡單的代碼就能完成,但是如果是幾百GB、TB甚至是PB級的大文件還能用簡單的代碼完成嗎?這顯然是不可能的,就算能也需要花費相當(dāng)大的時間成本。
而大數(shù)據(jù)技術(shù)就是要解決這種處理海量數(shù)據(jù)的問題,MapReduce在其中就是充當(dāng)一個分布式并行計算的角色,分布式并行計算能大幅度提高海量數(shù)據(jù)的處理速度,畢竟多個人干活肯定比一個人干活快。
又回到我們上面所說到的詞頻統(tǒng)計的例子,在實際工作中很多場景的開發(fā)都是在WordCount的基礎(chǔ)上進行改造的。例如,要從所有服務(wù)器的訪問日志中統(tǒng)計出被訪問得最多的url以及訪問量最高的IP,這就是一個典型的WordCount應(yīng)用場景,要知道即便是小公司的服務(wù)器訪問日志通常也都是GB級別的。
-
使用MapReduce執(zhí)行WordCount的流程示意圖
輸入的數(shù)據(jù)集會被拆分為多個塊,然后這些塊都會被放到不同的節(jié)點上進行并行的計算
在Splitting這一環(huán)節(jié)會把單詞按照分割符或者分割規(guī)則進行拆分,拆分完成后就到Mapping
到Mapping這個環(huán)節(jié)后會把相同的單詞通過網(wǎng)絡(luò)進行映射或者說傳輸?shù)酵粋€節(jié)點上
接著這些相同的單詞就會在Shuffling環(huán)節(jié)時進行洗牌也就是合并
合并完成之后就會進入Reducing環(huán)節(jié),這一環(huán)節(jié)就是把所有節(jié)點合并后的單詞再進行一次合并,
也就是會輸出到HDFS文件系統(tǒng)中的某一個文件中
大體來看就是一個拆分又合并的過程,所以MapReduce是分為map和Reduce的。最重要的是,要清楚這一流程都是分布式并行的,每個節(jié)點都不會互相依賴,都是相互獨立的。
3 MapReduce執(zhí)行流程
MapReduce作業(yè)會被拆分成Map和Reduce階段
- Map階段對應(yīng)的就是一堆的Map Tasks
- 同樣的Reduce階段也是會對應(yīng)一堆的Reduce Tasks
輸入與輸出
MapReduce框架專門用于<key,value>對,也就是說,框架將作業(yè)的輸入視為一組<key,value>對,并生成一組<key,value>對作為輸出。K,V類型可能是不同的.
鍵和值類型必須可序列化,因此需要實現(xiàn)Writable接口
Hadoop Writable接口是基于DataInput和DataOutput實現(xiàn)的序列化協(xié)議,緊湊(高效使用存儲空間),快速(讀寫數(shù)據(jù)、序列化與反序列化的開銷?。?br>
Hadoop中的鍵(key)和值(value)必須是實現(xiàn)了Writable接口的對象(鍵還必須實現(xiàn)WritableComparable,以便進行排序)。
package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* <p>Any <code>key</code> or <code>value</code> type in the Hadoop Map-Reduce
* framework implements this interface.</p>
*
* <p>Implementations typically implement a static <code>read(DataInput)</code>
* method which constructs a new instance, calls {@link #readFields(DataInput)}
* and returns the instance.</p>
* public class MyWritable implements Writable {
* // Some data
* private int counter;
* private long timestamp;
*
* public void write(DataOutput out) throws IOException {
* out.writeInt(counter);
* out.writeLong(timestamp);
* }
*
* public void readFields(DataInput in) throws IOException {
* counter = in.readInt();
* timestamp = in.readLong();
* }
*
* public static MyWritable read(DataInput in) throws IOException {
* MyWritable w = new MyWritable();
* w.readFields(in);
* return w;
* }
* }
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Writable {
/**
* Serialize the fields of this object to <code>out</code>.
*/
void write(DataOutput out) throws IOException;
/**
* Deserialize the fields of this object from <code>in</code>.
*
* <p>For efficiency, implementations should attempt to re-use storage in the
* existing object where possible.</p>
*/
void readFields(DataInput in) throws IOException;
}
Hadoop自身提供了多種具體的Writable類,包含了常見的Java基本類型(boolean、byte、short、int、float、long和double等)和集合類型(BytesWritable、ArrayWritable和MapWritable等)。這些類型都位于org.apache.hadoop.io包中。

定制Writable類
雖然Hadoop內(nèi)建了多種Writable類提供用戶選擇,Hadoop對Java基本類型的包裝Writable類實現(xiàn)的RawComparable接口,使得這些對象不需要反序列化過程,便可以在字節(jié)流層面進行排序,從而大大縮短了比較的時間開銷,但是當(dāng)我們需要更加復(fù)雜的對象時,Hadoop的內(nèi)建Writable類就不能滿足我們的需求了(需要注意的是Hadoop提供的Writable集合類型并沒有實現(xiàn)RawComparable接口,因此也不滿足我們的需要),這時我們就需要定制自己的Writable類,特別將其作為鍵(key)的時候更應(yīng)該如此,以求達到更高效的存儲和快速的比較。
下面的實例展示了如何定制一個Writable類,一個定制的Writable類首先必須實現(xiàn)Writable或者WritableComparable接口,然后為定制的Writable類編寫write(DataOutput out)和readFields(DataInput in)方法,來控制定制的Writable類如何轉(zhuǎn)化為字節(jié)流(write方法)和如何從字節(jié)流轉(zhuǎn)回為Writable對象。
package com.javaedge.hadoop.project;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable;
/**
* This MyWritable class demonstrates how to write a custom Writable class
*
* @author sss
* @date 2019-04-07
*/
public class MyWritable implements Writable {
private VLongWritable field1;
private VLongWritable field2;
public MyWritable() {
this.set(new VLongWritable(), new VLongWritable());
}
public MyWritable(VLongWritable fld1, VLongWritable fld2) {
this.set(fld1, fld2);
}
public void set(VLongWritable fld1, VLongWritable fld2) {
//make sure the smaller field is always put as field1
if (fld1.get() <= fld2.get()) {
this.field1 = fld1;
this.field2 = fld2;
} else {
this.field1 = fld2;
this.field2 = fld1;
}
}
/**
* How to write and read MyWritable fields from DataOutput and DataInput stream
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
field1.write(out);
field2.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
field1.readFields(in);
field2.readFields(in);
}
/**
* Returns true if <code>o</code> is a MyWritable with the same values.
*/
@Override
public boolean equals(Object o) {
if (!(o instanceof MyWritable)) {
return false;
}
MyWritable other = (MyWritable) o;
return field1.equals(other.field1) && field2.equals(other.field2);
}
@Override
public int hashCode() {
return field1.hashCode() * 163 + field2.hashCode();
}
@Override
public String toString() {
return field1.toString() + "\t" + field2.toString();
}
}
此外,K類型必須實現(xiàn)WritableComparable接口以便于按框架進行排序.
/**
* A {@link Writable} which is also {@link Comparable}.
*
* <p><code>WritableComparable</code>s can be compared to each other, typically
* via <code>Comparator</code>s. Any type which is to be used as a
* <code>key</code> in the Hadoop Map-Reduce framework should implement this
* interface.</p>
*
* <p>Note that <code>hashCode()</code> is frequently used in Hadoop to partition
* keys. It's important that your implementation of hashCode() returns the same
* result across different instances of the JVM. Note also that the default
* <code>hashCode()</code> implementation in <code>Object</code> does <b>not</b>
* satisfy this property.</p>
*
* <p>Example:</p>
* <p><blockquote><pre>
* public class MyWritableComparable implements WritableComparable<MyWritableComparable> {
* // Some data
* private int counter;
* private long timestamp;
*
* public void write(DataOutput out) throws IOException {
* out.writeInt(counter);
* out.writeLong(timestamp);
* }
*
* public void readFields(DataInput in) throws IOException {
* counter = in.readInt();
* timestamp = in.readLong();
* }
*
* public int compareTo(MyWritableComparable o) {
* int thisValue = this.value;
* int thatValue = o.value;
* return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
* }
*
* public int hashCode() {
* final int prime = 31;
* int result = 1;
* result = prime * result + counter;
* result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
* return result
* }
* }
* </pre></blockquote></p>
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
MapReduce作業(yè)的輸入和輸出類型:
其實簡單來說這也是一個輸入輸出的流程,要注意的是在MapReduce框架中輸入的數(shù)據(jù)集會被序列化成鍵/值對,map階段完成后會對這些鍵值對進行排序,最后到reduce階段中進行合并輸出,輸出的也是鍵/值對

-
示意圖
InputFormat:將我們輸入數(shù)據(jù)進行分片(split)
/**
* Split-up the input file(s) into logical {@link InputSplit}s, each of
* which is then assigned to an individual {@link Mapper}.
* </li>
* <li>
* Provide the {@link RecordReader} implementation to be used to glean
* input records from the logical <code>InputSplit</code> for processing by
* the {@link Mapper}.
* </li>
* </ol>
*
* <p>The default behavior of file-based {@link InputFormat}s, typically
* sub-classes of {@link FileInputFormat}, is to split the
* input into <i>logical</i> {@link InputSplit}s based on the total size, in
* bytes, of the input files. However, the {@link FileSystem} blocksize of
* the input files is treated as an upper bound for input splits. A lower bound
* on the split size can be set via
* <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.input.fileinputformat.split.minsize">
* mapreduce.input.fileinputformat.split.minsize</a>.</p>
*
* <p>Clearly, logical splits based on input-size is insufficient for many
* applications since record boundaries are to respected. In such cases, the
* application has to also implement a {@link RecordReader} on whom lies the
* responsibility to respect record-boundaries and present a record-oriented
* view of the logical <code>InputSplit</code> to the individual task.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class InputFormat<K, V> {
/**
* Logically split the set of input files for the job.
*
* <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
* for processing.</p>
*
* <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
* input files are not physically split into chunks. For e.g. a split could
* be <i><input-file-path, start, offset></i> tuple. The InputFormat
* also creates the {@link RecordReader} to read the {@link InputSplit}.
*
* @param context job configuration.
* @return an array of {@link InputSplit}s for the job.
*/
public abstract List<InputSplit> getSplits(JobContext context);
/**
* Create a record reader for a given split. The framework will call
* {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
* the split is used.
* @param split the split to be read
* @param context the information about the task
* @return a new record reader
*/
public abstract
RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context
) throws IOException,
InterruptedException;
}
常用以下實例


Split
將數(shù)據(jù)塊交MapReduce作業(yè)來處理,數(shù)據(jù)塊是MapReduce中最小的計算單元
- 在HDFS中,數(shù)據(jù)塊是最小的存儲單元,默認(rèn)為128M
- 默認(rèn)情況下,HDFS與MapReduce是一一對應(yīng)的,當(dāng)然我們也可以手動所設(shè)置它們之間的關(guān)系(但是不建議這么做)
OutputFormat
輸出最終的處理結(jié)果
/**
* <code>OutputFormat</code> describes the output-specification for a
* Map-Reduce job.
*
* <p>The Map-Reduce framework relies on the <code>OutputFormat</code> of the
* job to:<p>
* <ol>
* <li>
* Validate the output-specification of the job. For e.g. check that the
* output directory doesn't already exist.
* <li>
* Provide the {@link RecordWriter} implementation to be used to write out
* the output files of the job. Output files are stored in a
* {@link FileSystem}.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class OutputFormat<K, V> {
/**
* Get the {@link RecordWriter} for the given task.
*
* @param context the information about the current task.
* @return a {@link RecordWriter} to write the output for the job.
* @throws IOException
*/
public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context);
/**
* Check for validity of the output-specification for the job.
*
* <p>This is to validate the output specification for the job when it is
* a job is submitted. Typically checks that it does not already exist,
* throwing an exception when it already exists, so that output is not
* overwritten.</p>
*
* @param context information about the job
*/
public abstract void checkOutputSpecs(JobContext context);
/**
* Get the output committer for this output format. This is responsible
* for ensuring the output is committed correctly.
* @param context the task context
* @return an output committer
*/
public abstract
OutputCommitter getOutputCommitter(TaskAttemptContext context);
}
常用實例


我們可以再來看一張圖,假設(shè)我們手動設(shè)置了block與split的對應(yīng)關(guān)系,一個block對應(yīng)兩個split:
- OutputFormat
OutputFormt接口決定了在哪里以及怎樣持久化作業(yè)結(jié)果。Hadoop為不同類型的格式提供了一系列的類和接口,實現(xiàn)自定義操作只要繼承其中的某個類或接口即可。你可能已經(jīng)熟悉了默認(rèn)的OutputFormat,也就是TextOutputFormat,它是一種以行分隔,包含制表符界定的鍵值對的文本文件格式。盡管如此,對多數(shù)類型的數(shù)據(jù)而言,如再常見不過的數(shù)字,文本序列化會浪費一些空間,由此帶來的結(jié)果是運行時間更長且資源消耗更多。為了避免文本文件的弊端,Hadoop提供了SequenceFileOutputformat,它將對象表示成二進制形式而不再是文本文件,并將結(jié)果進行壓縮。
3 MapReduce核心概念
-
假設(shè)我們手動設(shè)置了block與split的對應(yīng)關(guān)系,一個block對應(yīng)兩個split
上圖中一個block對應(yīng)兩個split(默認(rèn)是一對一),一個split則是對應(yīng)一個Map Task。Map Task將數(shù)據(jù)分完組之后到Shuffle,Shuffle完成后就到Reduce上進行輸出,而每一個Reduce Tasks會輸出到一個文件上,上圖中有三個Reduce Tasks,所以會輸出到三個文件上。
3.1 Split

4 MapReduce 1.x 架構(gòu)

4.1 JobTracker(JT)
作業(yè)的管理者,它會將作業(yè)分解成一堆的任務(wù),也就是Task,Task里包含MapTask和ReduceTask.
它會將分解后的任務(wù)分派給TaskTracker運行,它還需要完成作業(yè)的監(jiān)控以及容錯處理(task作業(yè)宕掉,會重啟task)
如果在一定的時間內(nèi),JT沒有收到某個TaskTracker的心跳信息的話,就會判斷該TaskTracker宕機,然后就會將該TaskTracker上運行的任務(wù)指派到其他的TaskTracker
4.2 TaskTracker
任務(wù)的執(zhí)行者,我們的Task(MapTask和ReduceTask)都是在TaskTracker上運行的,TaskTracker可以與JobTracker進行交互
例如執(zhí)行、啟動或停止作業(yè)以及發(fā)送心跳信息給JobTracker等。
4.3 MapTask
自己開發(fā)的Map任務(wù)會交由該Task完成,
- 它會解析每條記錄的數(shù)據(jù),交給自己的Map方法處理
- 處理完成會將Map的輸出結(jié)果寫到本地磁盤
不過有些作業(yè)可能只有map沒有reduce,這時候一般會將結(jié)果輸出到HDFS文件系統(tǒng)里。
4.4 ReduceTask
- 將MapTask輸出的數(shù)據(jù)進行讀取
- 按照數(shù)據(jù)的規(guī)則進行分組,然后傳給我們自己編寫的reduce方法處理
- 處理完成后默認(rèn)將輸出結(jié)果寫到HDFS
5 MapReduce 2.x 架構(gòu)
MapReduce2.x架構(gòu)圖如下,可以看到JobTracker和TaskTracker已經(jīng)不復(fù)存在了,取而代之的是ResourceManager和NodeManager
不僅架構(gòu)變了,功能也變了,2.x之后新引入了YARN,在YARN之上我們可以運行不同的計算框架,不再是1.x那樣只能運行MapReduce了:

關(guān)于MapReduce2.x的架構(gòu)之前已經(jīng)在大數(shù)據(jù)入門(四) - 分布式資源調(diào)度——YARN框架
一文中說明過了,這里不再贅述
6 Java 實現(xiàn) wordCount
-
1.創(chuàng)建一個Maven工程,配置依賴如下
-
創(chuàng)建一個類,開始編寫我們wordcount的實現(xiàn)代碼:
-
3.編寫完成之后,在IDEA里通過Maven進行編譯打包
-
4.把打包好的jar包上傳到服務(wù)器上:
-
上傳到Hadoop服務(wù)器
全路徑?jīng)]有問題
hadoop fs -ls hdfs://localhost:9000


7 重構(gòu)

8 Combiner編程

9 Partitoner











