大數(shù)據(jù)入門(五) - 分布式計算框架 MapReduce

1 概述

  • MapReduce源自Google的MapReduce論文,論文發(fā)表于2004年12月
  • Hadoop MapReduce可以說是Google MapReduce的一個開源實現(xiàn)
  • MapReduce優(yōu)點在于可以將海量的數(shù)據(jù)進行離線處理,并且MapReduce也易于開發(fā),因為MapReduce框架幫我們封裝好了分布式計算的開發(fā)。而且對硬件設(shè)施要求不高,可以運行在廉價的機器上
  • MapReduce也有缺點,它最主要的缺點就是無法完成實時流式計算,只能離線處理。

MapReduce屬于一種編程模型,用于大規(guī)模數(shù)據(jù)集(大于1TB)的并行運算。
概念"Map(映射)"和"Reduce(歸約)",是它們的主要思想,都是從函數(shù)式編程語言里借來的,還有從矢量編程語言里借來的特性。
它極大地方便了編程人員在不會分布式并行編程的情況下,將自己的程序運行在分布式系統(tǒng)上。
當(dāng)前的軟件實現(xiàn)是指定一個Map(映射)函數(shù),用來把一組鍵值對映射成一組新的鍵值對,指定并發(fā)的Reduce(歸約)函數(shù),用來保證所有映射的鍵值對中的每一個共享相同的鍵組。

在學(xué)習(xí)MapReduce之前我們需要準(zhǔn)備好Hadoop的環(huán)境,也就是需要先安裝好HDFS以及YARN,環(huán)境的搭建方式可以參考我之前的兩篇文章

2 MapReduce編程模型

通過wordcount詞頻統(tǒng)計分析案例入門

在安裝Hadoop時,它就自帶有一個WordCount的案例,這個案例是統(tǒng)計文件中每個單詞出現(xiàn)的次數(shù),也就是詞頻統(tǒng)計,我們在學(xué)習(xí)大數(shù)據(jù)開發(fā)時,一般都以WordCount作為入門。

例如,我現(xiàn)在有一個test.txt,文件內(nèi)容如下:

hello world
hello hadoop
hello MapReduce

現(xiàn)在的需求是統(tǒng)計這個文件中每個單詞出現(xiàn)的次數(shù)。假設(shè)我現(xiàn)在寫了一些代碼實現(xiàn)了這個文件的詞頻統(tǒng)計,統(tǒng)計的結(jié)果如下:

hello 3
world 1
hadoop 1
MapReduce 1

以上這就是一個詞頻統(tǒng)計的例子。

詞頻統(tǒng)計看起來貌似很簡單的樣子,一般不需要多少代碼就能完成了,而且如果對shell腳本比較熟悉的話,甚至一句代碼就能完成這個詞頻統(tǒng)計的功能。確實詞頻統(tǒng)計是不難,但是為什么還要用大數(shù)據(jù)技術(shù)去完成這個詞頻統(tǒng)計的功能呢?這是因為實現(xiàn)小文件的詞頻統(tǒng)計功能或許用簡單的代碼就能完成,但是如果是幾百GB、TB甚至是PB級的大文件還能用簡單的代碼完成嗎?這顯然是不可能的,就算能也需要花費相當(dāng)大的時間成本。

而大數(shù)據(jù)技術(shù)就是要解決這種處理海量數(shù)據(jù)的問題,MapReduce在其中就是充當(dāng)一個分布式并行計算的角色,分布式并行計算能大幅度提高海量數(shù)據(jù)的處理速度,畢竟多個人干活肯定比一個人干活快。

又回到我們上面所說到的詞頻統(tǒng)計的例子,在實際工作中很多場景的開發(fā)都是在WordCount的基礎(chǔ)上進行改造的。例如,要從所有服務(wù)器的訪問日志中統(tǒng)計出被訪問得最多的url以及訪問量最高的IP,這就是一個典型的WordCount應(yīng)用場景,要知道即便是小公司的服務(wù)器訪問日志通常也都是GB級別的。

  • 使用MapReduce執(zhí)行WordCount的流程示意圖


  • 輸入的數(shù)據(jù)集會被拆分為多個塊,然后這些塊都會被放到不同的節(jié)點上進行并行的計算

  • 在Splitting這一環(huán)節(jié)會把單詞按照分割符或者分割規(guī)則進行拆分,拆分完成后就到Mapping

  • 到Mapping這個環(huán)節(jié)后會把相同的單詞通過網(wǎng)絡(luò)進行映射或者說傳輸?shù)酵粋€節(jié)點上

  • 接著這些相同的單詞就會在Shuffling環(huán)節(jié)時進行洗牌也就是合并

  • 合并完成之后就會進入Reducing環(huán)節(jié),這一環(huán)節(jié)就是把所有節(jié)點合并后的單詞再進行一次合并,

  • 也就是會輸出到HDFS文件系統(tǒng)中的某一個文件中

大體來看就是一個拆分又合并的過程,所以MapReduce是分為map和Reduce的。最重要的是,要清楚這一流程都是分布式并行的,每個節(jié)點都不會互相依賴,都是相互獨立的。

3 MapReduce執(zhí)行流程

MapReduce作業(yè)會被拆分成Map和Reduce階段

  • Map階段對應(yīng)的就是一堆的Map Tasks
  • 同樣的Reduce階段也是會對應(yīng)一堆的Reduce Tasks

輸入與輸出

MapReduce框架專門用于<key,value>對,也就是說,框架將作業(yè)的輸入視為一組<key,value>對,并生成一組<key,value>對作為輸出。K,V類型可能是不同的.

鍵和值類型必須可序列化,因此需要實現(xiàn)Writable接口
Hadoop Writable接口是基于DataInput和DataOutput實現(xiàn)的序列化協(xié)議,緊湊(高效使用存儲空間),快速(讀寫數(shù)據(jù)、序列化與反序列化的開銷?。?br> Hadoop中的鍵(key)和值(value)必須是實現(xiàn)了Writable接口的對象(鍵還必須實現(xiàn)WritableComparable,以便進行排序)。

package org.apache.hadoop.io;

import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

/**
 * <p>Any <code>key</code> or <code>value</code> type in the Hadoop Map-Reduce
 * framework implements this interface.</p>
 * 
 * <p>Implementations typically implement a static <code>read(DataInput)</code>
 * method which constructs a new instance, calls {@link #readFields(DataInput)} 
 * and returns the instance.</p>

 *     public class MyWritable implements Writable {
 *       // Some data     
 *       private int counter;
 *       private long timestamp;
 *       
 *       public void write(DataOutput out) throws IOException {
 *         out.writeInt(counter);
 *         out.writeLong(timestamp);
 *       }
 *       
 *       public void readFields(DataInput in) throws IOException {
 *         counter = in.readInt();
 *         timestamp = in.readLong();
 *       }
 *       
 *       public static MyWritable read(DataInput in) throws IOException {
 *         MyWritable w = new MyWritable();
 *         w.readFields(in);
 *         return w;
 *       }
 *     }
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Writable {
  /** 
   * Serialize the fields of this object to <code>out</code>.
   */
  void write(DataOutput out) throws IOException;

  /** 
   * Deserialize the fields of this object from <code>in</code>.  
   * 
   * <p>For efficiency, implementations should attempt to re-use storage in the 
   * existing object where possible.</p>
   */
  void readFields(DataInput in) throws IOException;
}

Hadoop自身提供了多種具體的Writable類,包含了常見的Java基本類型(boolean、byte、short、int、float、long和double等)和集合類型(BytesWritable、ArrayWritable和MapWritable等)。這些類型都位于org.apache.hadoop.io包中。


定制Writable類

雖然Hadoop內(nèi)建了多種Writable類提供用戶選擇,Hadoop對Java基本類型的包裝Writable類實現(xiàn)的RawComparable接口,使得這些對象不需要反序列化過程,便可以在字節(jié)流層面進行排序,從而大大縮短了比較的時間開銷,但是當(dāng)我們需要更加復(fù)雜的對象時,Hadoop的內(nèi)建Writable類就不能滿足我們的需求了(需要注意的是Hadoop提供的Writable集合類型并沒有實現(xiàn)RawComparable接口,因此也不滿足我們的需要),這時我們就需要定制自己的Writable類,特別將其作為鍵(key)的時候更應(yīng)該如此,以求達到更高效的存儲和快速的比較。

下面的實例展示了如何定制一個Writable類,一個定制的Writable類首先必須實現(xiàn)Writable或者WritableComparable接口,然后為定制的Writable類編寫write(DataOutput out)和readFields(DataInput in)方法,來控制定制的Writable類如何轉(zhuǎn)化為字節(jié)流(write方法)和如何從字節(jié)流轉(zhuǎn)回為Writable對象。

package com.javaedge.hadoop.project;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable;

/**
 * This MyWritable class demonstrates how to write a custom Writable class
 *
 * @author sss
 * @date 2019-04-07
 */
public class MyWritable implements Writable {
    private VLongWritable field1;
    private VLongWritable field2;

    public MyWritable() {
        this.set(new VLongWritable(), new VLongWritable());
    }
    
    public MyWritable(VLongWritable fld1, VLongWritable fld2) {
        this.set(fld1, fld2);
    }

    public void set(VLongWritable fld1, VLongWritable fld2) {
        //make sure the smaller field is always put as field1
        if (fld1.get() <= fld2.get()) {
            this.field1 = fld1;
            this.field2 = fld2;
        } else {

            this.field1 = fld2;
            this.field2 = fld1;
        }
    }

    /**
     * How to write and read MyWritable fields from DataOutput and DataInput stream
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {

        field1.write(out);
        field2.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {

        field1.readFields(in);
        field2.readFields(in);
    }

    /**
     * Returns true if <code>o</code> is a MyWritable with the same values.
     */
    @Override
    public boolean equals(Object o) {
        if (!(o instanceof MyWritable)) {
            return false;
        }

        MyWritable other = (MyWritable) o;
        return field1.equals(other.field1) && field2.equals(other.field2);

    }

    @Override
    public int hashCode() {

        return field1.hashCode() * 163 + field2.hashCode();
    }

    @Override
    public String toString() {
        return field1.toString() + "\t" + field2.toString();
    }

}

此外,K類型必須實現(xiàn)WritableComparable接口以便于按框架進行排序.

/**
 * A {@link Writable} which is also {@link Comparable}. 
 *
 * <p><code>WritableComparable</code>s can be compared to each other, typically 
 * via <code>Comparator</code>s. Any type which is to be used as a 
 * <code>key</code> in the Hadoop Map-Reduce framework should implement this
 * interface.</p>
 *
 * <p>Note that <code>hashCode()</code> is frequently used in Hadoop to partition
 * keys. It's important that your implementation of hashCode() returns the same 
 * result across different instances of the JVM. Note also that the default 
 * <code>hashCode()</code> implementation in <code>Object</code> does <b>not</b>
 * satisfy this property.</p>
 *  
 * <p>Example:</p>
 * <p><blockquote><pre>
 *     public class MyWritableComparable implements WritableComparable<MyWritableComparable> {
 *       // Some data
 *       private int counter;
 *       private long timestamp;
 *       
 *       public void write(DataOutput out) throws IOException {
 *         out.writeInt(counter);
 *         out.writeLong(timestamp);
 *       }
 *       
 *       public void readFields(DataInput in) throws IOException {
 *         counter = in.readInt();
 *         timestamp = in.readLong();
 *       }
 *       
 *       public int compareTo(MyWritableComparable o) {
 *         int thisValue = this.value;
 *         int thatValue = o.value;
 *         return (thisValue &lt; thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
 *       }
 *
 *       public int hashCode() {
 *         final int prime = 31;
 *         int result = 1;
 *         result = prime * result + counter;
 *         result = prime * result + (int) (timestamp ^ (timestamp &gt;&gt;&gt; 32));
 *         return result
 *       }
 *     }
 * </pre></blockquote></p>
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface WritableComparable<T> extends Writable, Comparable<T> {
}

MapReduce作業(yè)的輸入和輸出類型:
其實簡單來說這也是一個輸入輸出的流程,要注意的是在MapReduce框架中輸入的數(shù)據(jù)集會被序列化成鍵/值對,map階段完成后會對這些鍵值對進行排序,最后到reduce階段中進行合并輸出,輸出的也是鍵/值對

  • 示意圖


InputFormat:將我們輸入數(shù)據(jù)進行分片(split)

/** 
 *   Split-up the input file(s) into logical {@link InputSplit}s, each of 
 *   which is then assigned to an individual {@link Mapper}.
 *   </li>
 *   <li>
 *   Provide the {@link RecordReader} implementation to be used to glean
 *   input records from the logical <code>InputSplit</code> for processing by 
 *   the {@link Mapper}.
 *   </li>
 * </ol>
 * 
 * <p>The default behavior of file-based {@link InputFormat}s, typically 
 * sub-classes of {@link FileInputFormat}, is to split the 
 * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
 * bytes, of the input files. However, the {@link FileSystem} blocksize of  
 * the input files is treated as an upper bound for input splits. A lower bound 
 * on the split size can be set via 
 * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.input.fileinputformat.split.minsize">
 * mapreduce.input.fileinputformat.split.minsize</a>.</p>
 * 
 * <p>Clearly, logical splits based on input-size is insufficient for many 
 * applications since record boundaries are to respected. In such cases, the
 * application has to also implement a {@link RecordReader} on whom lies the
 * responsibility to respect record-boundaries and present a record-oriented
 * view of the logical <code>InputSplit</code> to the individual task.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class InputFormat<K, V> {

  /** 
   * Logically split the set of input files for the job.  
   * 
   * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
   * for processing.</p>
   *
   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
   * input files are not physically split into chunks. For e.g. a split could
   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
   * also creates the {@link RecordReader} to read the {@link InputSplit}.
   * 
   * @param context job configuration.
   * @return an array of {@link InputSplit}s for the job.
   */
  public abstract List<InputSplit> getSplits(JobContext context);
  
  /**
   * Create a record reader for a given split. The framework will call
   * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
   * the split is used.
   * @param split the split to be read
   * @param context the information about the task
   * @return a new record reader
   */
  public abstract 
    RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, 
                                                 InterruptedException;

}

常用以下實例



Split

將數(shù)據(jù)塊交MapReduce作業(yè)來處理,數(shù)據(jù)塊是MapReduce中最小的計算單元

  • 在HDFS中,數(shù)據(jù)塊是最小的存儲單元,默認(rèn)為128M
  • 默認(rèn)情況下,HDFS與MapReduce是一一對應(yīng)的,當(dāng)然我們也可以手動所設(shè)置它們之間的關(guān)系(但是不建議這么做)

OutputFormat

輸出最終的處理結(jié)果

/** 
 * <code>OutputFormat</code> describes the output-specification for a 
 * Map-Reduce job.
 *
 * <p>The Map-Reduce framework relies on the <code>OutputFormat</code> of the
 * job to:<p>
 * <ol>
 *   <li>
 *   Validate the output-specification of the job. For e.g. check that the 
 *   output directory doesn't already exist. 
 *   <li>
 *   Provide the {@link RecordWriter} implementation to be used to write out
 *   the output files of the job. Output files are stored in a 
 *   {@link FileSystem}.
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class OutputFormat<K, V> {

  /** 
   * Get the {@link RecordWriter} for the given task.
   *
   * @param context the information about the current task.
   * @return a {@link RecordWriter} to write the output for the job.
   * @throws IOException
   */
  public abstract RecordWriter<K, V> getRecordWriter(TaskAttemptContext context);

  /** 
   * Check for validity of the output-specification for the job.
   *  
   * <p>This is to validate the output specification for the job when it is
   * a job is submitted.  Typically checks that it does not already exist,
   * throwing an exception when it already exists, so that output is not
   * overwritten.</p>
   *
   * @param context information about the job
   */
  public abstract void checkOutputSpecs(JobContext context);

  /**
   * Get the output committer for this output format. This is responsible
   * for ensuring the output is committed correctly.
   * @param context the task context
   * @return an output committer
   */
  public abstract 
  OutputCommitter getOutputCommitter(TaskAttemptContext context);
}

常用實例



我們可以再來看一張圖,假設(shè)我們手動設(shè)置了block與split的對應(yīng)關(guān)系,一個block對應(yīng)兩個split:

  • OutputFormat
    OutputFormt接口決定了在哪里以及怎樣持久化作業(yè)結(jié)果。Hadoop為不同類型的格式提供了一系列的類和接口,實現(xiàn)自定義操作只要繼承其中的某個類或接口即可。你可能已經(jīng)熟悉了默認(rèn)的OutputFormat,也就是TextOutputFormat,它是一種以行分隔,包含制表符界定的鍵值對的文本文件格式。盡管如此,對多數(shù)類型的數(shù)據(jù)而言,如再常見不過的數(shù)字,文本序列化會浪費一些空間,由此帶來的結(jié)果是運行時間更長且資源消耗更多。為了避免文本文件的弊端,Hadoop提供了SequenceFileOutputformat,它將對象表示成二進制形式而不再是文本文件,并將結(jié)果進行壓縮。

3 MapReduce核心概念

  • 假設(shè)我們手動設(shè)置了block與split的對應(yīng)關(guān)系,一個block對應(yīng)兩個split


上圖中一個block對應(yīng)兩個split(默認(rèn)是一對一),一個split則是對應(yīng)一個Map Task。Map Task將數(shù)據(jù)分完組之后到Shuffle,Shuffle完成后就到Reduce上進行輸出,而每一個Reduce Tasks會輸出到一個文件上,上圖中有三個Reduce Tasks,所以會輸出到三個文件上。

3.1 Split

4 MapReduce 1.x 架構(gòu)

4.1 JobTracker(JT)

作業(yè)的管理者,它會將作業(yè)分解成一堆的任務(wù),也就是Task,Task里包含MapTask和ReduceTask.
它會將分解后的任務(wù)分派給TaskTracker運行,它還需要完成作業(yè)的監(jiān)控以及容錯處理(task作業(yè)宕掉,會重啟task)
如果在一定的時間內(nèi),JT沒有收到某個TaskTracker的心跳信息的話,就會判斷該TaskTracker宕機,然后就會將該TaskTracker上運行的任務(wù)指派到其他的TaskTracker

4.2 TaskTracker

任務(wù)的執(zhí)行者,我們的Task(MapTask和ReduceTask)都是在TaskTracker上運行的,TaskTracker可以與JobTracker進行交互
例如執(zhí)行、啟動或停止作業(yè)以及發(fā)送心跳信息給JobTracker等。

4.3 MapTask

自己開發(fā)的Map任務(wù)會交由該Task完成,

  • 它會解析每條記錄的數(shù)據(jù),交給自己的Map方法處理
  • 處理完成會將Map的輸出結(jié)果寫到本地磁盤

不過有些作業(yè)可能只有map沒有reduce,這時候一般會將結(jié)果輸出到HDFS文件系統(tǒng)里。

4.4 ReduceTask

  • 將MapTask輸出的數(shù)據(jù)進行讀取
  • 按照數(shù)據(jù)的規(guī)則進行分組,然后傳給我們自己編寫的reduce方法處理
  • 處理完成后默認(rèn)將輸出結(jié)果寫到HDFS

5 MapReduce 2.x 架構(gòu)

MapReduce2.x架構(gòu)圖如下,可以看到JobTracker和TaskTracker已經(jīng)不復(fù)存在了,取而代之的是ResourceManager和NodeManager
不僅架構(gòu)變了,功能也變了,2.x之后新引入了YARN,在YARN之上我們可以運行不同的計算框架,不再是1.x那樣只能運行MapReduce了:


關(guān)于MapReduce2.x的架構(gòu)之前已經(jīng)在大數(shù)據(jù)入門(四) - 分布式資源調(diào)度——YARN框架
一文中說明過了,這里不再贅述

6 Java 實現(xiàn) wordCount

  • 1.創(chuàng)建一個Maven工程,配置依賴如下


  • 創(chuàng)建一個類,開始編寫我們wordcount的實現(xiàn)代碼:


  • 3.編寫完成之后,在IDEA里通過Maven進行編譯打包



  • 4.把打包好的jar包上傳到服務(wù)器上:


  • 上傳到Hadoop服務(wù)器



  • 全路徑?jīng)]有問題

hadoop fs -ls hdfs://localhost:9000

7 重構(gòu)

8 Combiner編程

9 Partitoner


10 JobHistoryServer

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容