084-BigData-12MapReduce入門

上一篇:083-BigData-11HDFS目錄結(jié)構(gòu)

一、MapReduce入門

1、MapReduce定義

Mapreduce是一個(gè)分布式運(yùn)算程序的編程框架,是用戶開(kāi)發(fā)“基于hadoop的數(shù)據(jù)分析應(yīng)用”的核心框架。

Mapreduce核心功能是將用戶編寫(xiě)的業(yè)務(wù)邏輯代碼和自帶默認(rèn)組件整合成一個(gè)完整的分布式運(yùn)算程序,并發(fā)運(yùn)行在一個(gè)hadoop集群上。

2、MapReduce優(yōu)點(diǎn)

1)MapReduce 易于編程。它簡(jiǎn)單的實(shí)現(xiàn)一些接口,就可以完成一個(gè)分布式程序,這個(gè)分布式程序可以分布到大量廉價(jià)的PC機(jī)器上運(yùn)行。也就是說(shuō)你寫(xiě)一個(gè)分布式程序,跟寫(xiě)一個(gè)簡(jiǎn)單的串行程序是一模一樣的。就是因?yàn)檫@個(gè)特點(diǎn)使得MapReduce編程變得非常流行。

2)良好的擴(kuò)展性。當(dāng)你的計(jì)算資源不能得到滿足的時(shí)候,你可以通過(guò)簡(jiǎn)單的增加機(jī)器來(lái)擴(kuò)展它的計(jì)算能力。

3)高容錯(cuò)性。MapReduce設(shè)計(jì)的初衷就是使程序能夠部署在廉價(jià)的PC機(jī)器上,這就要求它具有很高的容錯(cuò)性。比如其中一臺(tái)機(jī)器掛了,它可以把上面的計(jì)算任務(wù)轉(zhuǎn)移到另外一個(gè)節(jié)點(diǎn)上運(yùn)行,不至于這個(gè)任務(wù)運(yùn)行失敗,而且這個(gè)過(guò)程不需要人工參與,而完全是由 Hadoop內(nèi)部完成的。

4)適合PB級(jí)以上海量數(shù)據(jù)的離線處理。它適合離線處理而不適合在線處理。比如像毫秒級(jí)別的返回一個(gè)結(jié)果,MapReduce很難做到。

3、MapReduce缺點(diǎn)

MapReduce不擅長(zhǎng)做實(shí)時(shí)計(jì)算、流式計(jì)算、DAG(有向無(wú)環(huán)圖)計(jì)算。

1)實(shí)時(shí)計(jì)算。MapReduce無(wú)法像Mysql一樣,在毫秒或者秒級(jí)內(nèi)返回結(jié)果。

2)流式計(jì)算。流式計(jì)算的輸入數(shù)據(jù)是動(dòng)態(tài)的,而MapReduce的輸入數(shù)據(jù)集是靜態(tài)的,不能動(dòng)態(tài)變化。這是因?yàn)镸apReduce自身的設(shè)計(jì)特點(diǎn)決定了數(shù)據(jù)源必須是靜態(tài)的。

3)DAG(有向無(wú)環(huán)圖)計(jì)算。多個(gè)應(yīng)用程序存在依賴關(guān)系,后一個(gè)應(yīng)用程序的輸入為前一個(gè)的輸出。在這種情況下,MapReduce并不是不能做,而是使用后,每個(gè)MapReduce作業(yè)的輸出結(jié)果都會(huì)寫(xiě)入到磁盤,會(huì)造成大量的磁盤IO,導(dǎo)致性能非常的低下。

4、MapReduce核心思想

image.png

1)分布式的運(yùn)算程序往往需要分成至少2個(gè)階段。

2)第一個(gè)階段的maptask并發(fā)實(shí)例,完全并行運(yùn)行,互不相干。

3)第二個(gè)階段的reduce task并發(fā)實(shí)例互不相干,但是他們的數(shù)據(jù)依賴于上一個(gè)階段的所有maptask并發(fā)實(shí)例的輸出。

4)MapReduce編程模型只能包含一個(gè)map階段和一個(gè)reduce階段,如果用戶的業(yè)務(wù)邏輯非常復(fù)雜,那就只能多個(gè)mapreduce程序,串行運(yùn)行。

5、MapReduce進(jìn)程

一個(gè)完整的mapreduce程序在分布式運(yùn)行時(shí)有三類實(shí)例進(jìn)程:

1)MrAppMaster:負(fù)責(zé)整個(gè)程序的過(guò)程調(diào)度及狀態(tài)協(xié)調(diào)。

2)MapTask:負(fù)責(zé)map階段的整個(gè)數(shù)據(jù)處理流程。

3)ReduceTask:負(fù)責(zé)reduce階段的整個(gè)數(shù)據(jù)處理流程。

6、MapReduce編程規(guī)范

用戶編寫(xiě)的程序分成三個(gè)部分:Mapper,Reducer,Driver(提交運(yùn)行mr程序的客戶端)

1)Mapper階段

(1)用戶自定義的Mapper要繼承自己的父類
(2)Mapper的輸入數(shù)據(jù)是KV對(duì)的形式(KV的類型可自定義)
(3)Mapper中的業(yè)務(wù)邏輯寫(xiě)在map()方法中
(4)Mapper的輸出數(shù)據(jù)是KV對(duì)的形式(KV的類型可自定義)
(5)map()方法(maptask進(jìn)程)對(duì)每一個(gè)<K,V>調(diào)用一次

image.png

2)Reducer階段
(1)用戶自定義的Reducer要繼承自己的父類
(2)Reducer的輸入數(shù)據(jù)類型對(duì)應(yīng)Mapper的輸出數(shù)據(jù)類型,也是KV
(3)Reducer的業(yè)務(wù)邏輯寫(xiě)在reduce()方法中
(4)Reducetask進(jìn)程對(duì)每一組相同k的<k,v>組調(diào)用一次reduce()方法

3)Driver階段

整個(gè)程序需要一個(gè)Drvier來(lái)進(jìn)行提交,提交的是一個(gè)描述了各種必要信息的job對(duì)象

二、Hadoop序列化

1、為什么要序列化?

一般來(lái)說(shuō),“活的”對(duì)象只生存在內(nèi)存里,關(guān)機(jī)斷電就沒(méi)有了。而且“活的”對(duì)象只能由本地的進(jìn)程使用,不能被發(fā)送到網(wǎng)絡(luò)上的另外一臺(tái)計(jì)算機(jī)。 然而序列化可以存儲(chǔ)“活的”對(duì)象,可以將“活的”對(duì)象發(fā)送到遠(yuǎn)程計(jì)算機(jī)。

2、什么是序列化?

序列化就是把內(nèi)存中的對(duì)象,轉(zhuǎn)換成字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)以便于存儲(chǔ)(持久化)和網(wǎng)絡(luò)傳輸。

反序列化就是將收到字節(jié)序列(或其他數(shù)據(jù)傳輸協(xié)議)或者是硬盤的持久化數(shù)據(jù),轉(zhuǎn)換成內(nèi)存中的對(duì)象。

3、為什么不用Java的序列化?

Java的序列化是一個(gè)重量級(jí)序列化框架(Serializable),一個(gè)對(duì)象被序列化后,會(huì)附帶很多額外的信息(各種校驗(yàn)信息,header,繼承體系等),不便于在網(wǎng)絡(luò)中高效傳輸。所以,hadoop自己開(kāi)發(fā)了一套序列化機(jī)制(Writable),精簡(jiǎn)、高效。

4、為什么序列化對(duì)Hadoop很重要?

因?yàn)镠adoop在集群之間進(jìn)行通訊或者RPC調(diào)用的時(shí)候,需要序列化,而且要求序列化要快,且體積要小,占用帶寬要小。所以必須理解Hadoop的序列化機(jī)制。

序列化和反序列化在分布式數(shù)據(jù)處理領(lǐng)域經(jīng)常出現(xiàn):進(jìn)程通信和永久存儲(chǔ)。然而Hadoop中各個(gè)節(jié)點(diǎn)的通信是通過(guò)遠(yuǎn)程調(diào)用(RPC)實(shí)現(xiàn)的,那么RPC序列化要求具有以下特點(diǎn):

1)緊湊:緊湊的格式能讓我們充分利用網(wǎng)絡(luò)帶寬,而帶寬是數(shù)據(jù)中心最稀缺的資

2)快速:進(jìn)程通信形成了分布式系統(tǒng)的骨架,所以需要盡量減少序列化和反序列化的性能開(kāi)銷,這是基本的;

3)可擴(kuò)展:協(xié)議為了滿足新的需求變化,所以控制客戶端和服務(wù)器過(guò)程中,需要直接引進(jìn)相應(yīng)的協(xié)議,這些是新協(xié)議,原序列化方式能支持新的協(xié)議報(bào)文;

4)互操作:能支持不同語(yǔ)言寫(xiě)的客戶端和服務(wù)端進(jìn)行交互;

5、常用數(shù)據(jù)序列化類型

常用的數(shù)據(jù)類型對(duì)應(yīng)的hadoop數(shù)據(jù)序列化類型

image.png

6、自定義bean對(duì)象實(shí)現(xiàn)序列化接口(Writable)

1)自定義bean對(duì)象要想序列化傳輸,必須實(shí)現(xiàn)序列化接口,需要注意以下7項(xiàng)。

(1)必須實(shí)現(xiàn)Writable接口

(2)反序列化時(shí),需要反射調(diào)用空參構(gòu)造函數(shù),所以必須有空參構(gòu)造

public FlowBean() {
        super();
    }

(3)重寫(xiě)序列化方法

@Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

(4)重寫(xiě)反序列化方法

    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

(5)注意反序列化的順序和序列化的順序完全一致

(6)要想把結(jié)果顯示在文件中,需要重寫(xiě)toString(),可用”\t”分開(kāi),方便后續(xù)用。

(7)如果需要將自定義的bean放在key中傳輸,則還需要實(shí)現(xiàn)comparable接口,因?yàn)閙apreduce框中的shuffle過(guò)程一定會(huì)對(duì)key進(jìn)行排序。

@Override
    public int compareTo(FlowBean o) {
        // 倒序排列,從大到小
        return this.sumFlow > o.getSumFlow() ? -1 : 1;
    }

三、MapReduce框架原理

1、MapReduce工作流程

1)流程示意圖

image.png
image.png

2)流程詳解

上面的流程是整個(gè)mapreduce最全工作流程,但是shuffle過(guò)程只是從第7步開(kāi)始到第16步結(jié)束,具體shuffle過(guò)程詳解,如下:

1)maptask收集我們的map()方法輸出的kv對(duì),放到內(nèi)存緩沖區(qū)中
2)從內(nèi)存緩沖區(qū)不斷溢出本地磁盤文件,可能會(huì)溢出多個(gè)文件
3)多個(gè)溢出文件會(huì)被合并成大的溢出文件
4)在溢出過(guò)程中,及合并的過(guò)程中,都要調(diào)用partitioner進(jìn)行分區(qū)和針對(duì)key進(jìn)行排序
5)reducetask根據(jù)自己的分區(qū)號(hào),去各個(gè)maptask機(jī)器上取相應(yīng)的結(jié)果分區(qū)數(shù)據(jù)
6)reducetask會(huì)取到同一個(gè)分區(qū)的來(lái)自不同maptask的結(jié)果文件,reducetask會(huì)將這些文件再進(jìn)行合并(歸并排序)
7)合并成大文件后,shuffle的過(guò)程也就結(jié)束了,后面進(jìn)入reducetask的邏輯運(yùn)算過(guò)程(從文件中取出一個(gè)一個(gè)的鍵值對(duì)group,調(diào)用用戶自定義的reduce()方法)

3)注意

Shuffle中的緩沖區(qū)大小會(huì)影響到mapreduce程序的執(zhí)行效率,原則上說(shuō),緩沖區(qū)越大,磁盤io的次數(shù)越少,執(zhí)行速度就越快。

緩沖區(qū)的大小可以通過(guò)參數(shù)調(diào)整,參數(shù):io.sort.mb 默認(rèn)100M。

2、InputFormat數(shù)據(jù)輸入

  • Job提交流程和切片源碼詳解

1)job提交流程源碼詳解

waitForCompletion()
submit();
// 1建立連接
    connect();  
        // 1)創(chuàng)建提交job的代理
        new Cluster(getConfiguration());
            // (1)判斷是本地yarn還是遠(yuǎn)程
            initialize(jobTrackAddr, conf); 
    // 2 提交job
submitter.submitJobInternal(Job.this, cluster)
    // 1)創(chuàng)建給集群提交數(shù)據(jù)的Stag路徑
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    // 2)獲取jobid ,并創(chuàng)建job路徑
    JobID jobId = submitClient.getNewJobID();
    // 3)拷貝jar包到集群
copyAndConfigureFiles(job, submitJobDir);   
    rUploader.uploadFiles(job, jobSubmitDir);
// 4)計(jì)算切片,生成切片規(guī)劃文件
writeSplits(job, submitJobDir);
    maps = writeNewSplits(job, jobSubmitDir);
        input.getSplits(job);
// 5)向Stag路徑寫(xiě)xml配置文件
writeConf(conf, submitJobFile);
    conf.writeXml(out);
// 6)提交job,返回提交狀態(tài)
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

2)FileInputFormat源碼解析(input.getSplits(job))

(1)找到你數(shù)據(jù)存儲(chǔ)的目錄。
(2)開(kāi)始遍歷處理(規(guī)劃切片)目錄下的每一個(gè)文件
(3)遍歷第一個(gè)文件ss.txt
a)獲取文件大小fs.sizeOf(ss.txt);
b)計(jì)算切片大小computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M
c)默認(rèn)情況下,切片大小=blocksize
d)開(kāi)始切,形成第1個(gè)切片:ss.txt—0:128M 第2個(gè)切片ss.txt—128:256M 第3個(gè)切片ss.txt—256M:300M(每次切片時(shí),都要判斷切完剩下的部分是否大于塊的1.1倍,不大于1.1倍就劃分一塊切片)
e)將切片信息寫(xiě)到一個(gè)切片規(guī)劃文件中
f)整個(gè)切片的核心過(guò)程在getSplit()方法中完成。
g)數(shù)據(jù)切片只是在邏輯上對(duì)輸入數(shù)據(jù)進(jìn)行分片,并不會(huì)再磁盤上將其切分成分片進(jìn)行存儲(chǔ)。InputSplit只記錄了分片的元數(shù)據(jù)信息,比如起始位置、長(zhǎng)度以及所在的節(jié)點(diǎn)列表等。
h)注意:block是HDFS物理上存儲(chǔ)的數(shù)據(jù),切片是對(duì)數(shù)據(jù)邏輯上的劃分。
(4)提交切片規(guī)劃文件到y(tǒng)arn上,yarn上的MrAppMaster就可以根據(jù)切片規(guī)劃文件計(jì)算開(kāi)啟maptask個(gè)數(shù)。

3、FileInputFormat切片機(jī)制

1)FileInputFormat中默認(rèn)的切片機(jī)制:

(1)簡(jiǎn)單地按照文件的內(nèi)容長(zhǎng)度進(jìn)行切片

(2)切片大小,默認(rèn)等于block大小

(3)切片時(shí)不考慮數(shù)據(jù)集整體,而是逐個(gè)針對(duì)每一個(gè)文件單獨(dú)切片

比如待處理數(shù)據(jù)有兩個(gè)文件:

file1.txt    320M
file2.txt    10M

經(jīng)過(guò)FileInputFormat的切片機(jī)制運(yùn)算后,形成的切片信息如下:

file1.txt.split1--  0~128
file1.txt.split2--  128~256
file1.txt.split3--  256~320
file2.txt.split1--  0~10M

2)FileInputFormat切片大小的參數(shù)配置

通過(guò)分析源碼,在FileInputFormat的280行中,計(jì)算切片大小的邏輯:Math.max(minSize, Math.min(maxSize, blockSize));

切片主要由這幾個(gè)值來(lái)運(yùn)算決定

mapreduce.input.fileinputformat.split.minsize=1 默認(rèn)值為1

mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默認(rèn)值Long.MAXValue
因此,默認(rèn)情況下,切片大小=blocksize。

maxsize(切片最大值):參數(shù)如果調(diào)得比blocksize小,則會(huì)讓切片變小,而且就等于配置的這個(gè)參數(shù)的值。

minsize(切片最小值):參數(shù)調(diào)的比blockSize大,則可以讓切片變得比blocksize還大。

3)獲取切片信息API

// 根據(jù)文件類型獲取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 獲取切片的文件名稱
String name = inputSplit.getPath().getName();

4、CombineTextInputFormat切片機(jī)制

關(guān)于大量小文件的優(yōu)化策略

1)默認(rèn)情況下TextInputformat對(duì)任務(wù)的切片機(jī)制是按文件規(guī)劃切片,不管文件多小,都會(huì)是一個(gè)單獨(dú)的切片,都會(huì)交給一個(gè)maptask,這樣如果有大量小文件,就會(huì)產(chǎn)生大量的maptask,處理效率極其低下。

2)優(yōu)化策略

(1)最好的辦法,在數(shù)據(jù)處理系統(tǒng)的最前端(預(yù)處理/采集),將小文件先合并成大文件,再上傳到HDFS做后續(xù)分析。
(2)補(bǔ)救措施:如果已經(jīng)是大量小文件在HDFS中了,可以使用另一種InputFormat來(lái)做切片(CombineTextInputFormat),它的切片邏輯跟TextFileInputFormat不同:它可以將多個(gè)小文件從邏輯上規(guī)劃到一個(gè)切片中,這樣,多個(gè)小文件就可以交給一個(gè)maptask。
(3)優(yōu)先滿足最小切片大小,不超過(guò)最大切片大小
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
舉例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m

3)具體實(shí)現(xiàn)步驟

//  如果不設(shè)置InputFormat,它默認(rèn)用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

5、InputFormat接口實(shí)現(xiàn)類

MapReduce任務(wù)的輸入文件一般是存儲(chǔ)在HDFS里面。輸入的文件格式包括:基于行的日志文件、二進(jìn)制格式文件等。這些文件一般會(huì)很大,達(dá)到數(shù)十GB,甚至更大。那么MapReduce是如何讀取這些數(shù)據(jù)的呢?下面我們首先學(xué)習(xí)InputFormat接口。

InputFormat常見(jiàn)的接口實(shí)現(xiàn)類包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定義InputFormat等。

1)TextInputFormat

TextInputFormat是默認(rèn)的InputFormat。每條記錄是一行輸入。鍵K是LongWritable類型,存儲(chǔ)該行在整個(gè)文件中的字節(jié)偏移量。值是這行的內(nèi)容,不包括任何行終止符(換行符和回車符)。

以下是一個(gè)示例,比如,一個(gè)分片包含了如下4條文本記錄。

Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise

每條記錄表示為以下鍵/值對(duì):

(0,Rich learning form)
(19,Intelligent learning engine)
(47,Learning more convenient)
(72,From the real demand for more close to the enterprise)

很明顯,鍵并不是行號(hào)。一般情況下,很難取得行號(hào),因?yàn)槲募醋止?jié)而不是按行切分為分片。

2)KeyValueTextInputFormat

每一行均為一條記錄,被分隔符分割為key,value??梢酝ㄟ^(guò)在驅(qū)動(dòng)類中設(shè)置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");來(lái)設(shè)定分隔符。默認(rèn)分隔符是tab(\t)。

以下是一個(gè)示例,輸入是一個(gè)包含4條記錄的分片。其中——>表示一個(gè)(水平方向的)制表符。

line1 ——>Rich learning form
line2 ——>Intelligent learning engine
line3 ——>Learning more convenient
line4 ——>From the real demand for more close to the enterprise

每條記錄表示為以下鍵/值對(duì):

(line1,Rich learning form)
(line2,Intelligent learning engine)
(line3,Learning more convenient)
(line4,From the real demand for more close to the enterprise)

此時(shí)的鍵是每行排在制表符之前的Text序列。

3)NLineInputFormat

如果使用NlineInputFormat,代表每個(gè)map進(jìn)程處理的InputSplit不再按block塊去劃分,而是按NlineInputFormat指定的行數(shù)N來(lái)劃分。即輸入文件的總行數(shù)/N=切片數(shù)(20),如果不整除,切片數(shù)=商+1。

以下是一個(gè)示例,仍然以上面的4行輸入為例。

Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise

例如,如果N是2,則每個(gè)輸入分片包含兩行。開(kāi)啟2個(gè)maptask。

(0,Rich learning form)
(19,Intelligent learning engine)

另一個(gè) mapper 則收到后兩行:

(47,Learning more convenient)
(72,From the real demand for more close to the enterprise)

這里的鍵和值與TextInputFormat生成的一樣。

6、自定義InputFormat

1)概述
(1)自定義一個(gè)類繼承FileInputFormat。
(2)改寫(xiě)RecordReader,實(shí)現(xiàn)一次讀取一個(gè)完整文件封裝為KV。
(3)在輸出時(shí)使用SequenceFileOutPutFormat輸出合并文件。

7、并行度決定機(jī)制

1)問(wèn)題引出

maptask的并行度決定map階段的任務(wù)處理并發(fā)度,進(jìn)而影響到整個(gè)job的處理速度。那么,mapTask并行任務(wù)是否越多越好呢?

2)MapTask并行度決定機(jī)制

一個(gè)job的map階段MapTask并行度(個(gè)數(shù)),由客戶端提交job時(shí)的切片個(gè)數(shù)決定。

image.png

8、MapTask工作機(jī)制

image.png

(1)Read階段:Map Task通過(guò)用戶編寫(xiě)的RecordReader,從輸入InputSplit中解析出一個(gè)個(gè)key/value。
(2)Map階段:該節(jié)點(diǎn)主要是將解析出的key/value交給用戶編寫(xiě)map()函數(shù)處理,并產(chǎn)生一系列新的key/value。
(3)Collect收集階段:在用戶編寫(xiě)map()函數(shù)中,當(dāng)數(shù)據(jù)處理完成后,一般會(huì)調(diào)用OutputCollector.collect()輸出結(jié)果。在該函數(shù)內(nèi)部,它會(huì)將生成的key/value分區(qū)(調(diào)用Partitioner),并寫(xiě)入一個(gè)環(huán)形內(nèi)存緩沖區(qū)中。
(4)Spill階段:即“溢寫(xiě)”,當(dāng)環(huán)形緩沖區(qū)滿后,MapReduce會(huì)將數(shù)據(jù)寫(xiě)到本地磁盤上,生成一個(gè)臨時(shí)文件。需要注意的是,將數(shù)據(jù)寫(xiě)入本地磁盤之前,先要對(duì)數(shù)據(jù)進(jìn)行一次本地排序,并在必要時(shí)對(duì)數(shù)據(jù)進(jìn)行合并、壓縮等操作。
溢寫(xiě)階段詳情:
步驟1:利用快速排序算法對(duì)緩存區(qū)內(nèi)的數(shù)據(jù)進(jìn)行排序,排序方式是,先按照分區(qū)編號(hào)partition進(jìn)行排序,然后按照key進(jìn)行排序。這樣,經(jīng)過(guò)排序后,數(shù)據(jù)以分區(qū)為單位聚集在一起,且同一分區(qū)內(nèi)所有數(shù)據(jù)按照key有序。
步驟2:按照分區(qū)編號(hào)由小到大依次將每個(gè)分區(qū)中的數(shù)據(jù)寫(xiě)入任務(wù)工作目錄下的臨時(shí)文件output/spillN.out(N表示當(dāng)前溢寫(xiě)次數(shù))中。如果用戶設(shè)置了Combiner,則寫(xiě)入文件之前,對(duì)每個(gè)分區(qū)中的數(shù)據(jù)進(jìn)行一次聚集操作。
步驟3:將分區(qū)數(shù)據(jù)的元信息寫(xiě)到內(nèi)存索引數(shù)據(jù)結(jié)構(gòu)SpillRecord中,其中每個(gè)分區(qū)的元信息包括在臨時(shí)文件中的偏移量、壓縮前數(shù)據(jù)大小和壓縮后數(shù)據(jù)大小。如果當(dāng)前內(nèi)存索引大小超過(guò)1MB,則將內(nèi)存索引寫(xiě)到文件output/spillN.out.index中。
(5)Combine階段:當(dāng)所有數(shù)據(jù)處理完成后,MapTask對(duì)所有臨時(shí)文件進(jìn)行一次合并,以確保最終只會(huì)生成一個(gè)數(shù)據(jù)文件。
當(dāng)所有數(shù)據(jù)處理完后,MapTask會(huì)將所有臨時(shí)文件合并成一個(gè)大文件,并保存到文件output/file.out中,同時(shí)生成相應(yīng)的索引文件output/file.out.index。
在進(jìn)行文件合并過(guò)程中,MapTask以分區(qū)為單位進(jìn)行合并。對(duì)于某個(gè)分區(qū),它將采用多輪遞歸合并的方式。每輪合并io.sort.factor(默認(rèn)100)個(gè)文件,并將產(chǎn)生的文件重新加入待合并列表中,對(duì)文件排序后,重復(fù)以上過(guò)程,直到最終得到一個(gè)大文件。
讓每個(gè)MapTask最終只生成一個(gè)數(shù)據(jù)文件,可避免同時(shí)打開(kāi)大量文件和同時(shí)讀取大量小文件產(chǎn)生的隨機(jī)讀取帶來(lái)的開(kāi)銷。

9、Shuffle機(jī)制

Mapreduce確保每個(gè)reducer的輸入都是按鍵排序的。系統(tǒng)執(zhí)行排序的過(guò)程(即將map輸出作為輸入傳給reducer)稱為shuffle。

image.png
image.png

10、Partition分區(qū)

0)問(wèn)題引出:要求將統(tǒng)計(jì)結(jié)果按照條件輸出到不同文件中(分區(qū))。比如:將統(tǒng)計(jì)結(jié)果按照手機(jī)歸屬地不同省份輸出到不同文件中(分區(qū))

1)默認(rèn)partition分區(qū)

public class HashPartitioner<K, V> extends Partitioner<K, V> {
  public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

默認(rèn)分區(qū)是根據(jù)key的hashCode對(duì)reduceTasks個(gè)數(shù)取模得到的。用戶沒(méi)法控制哪個(gè)key存儲(chǔ)到哪個(gè)分區(qū)。

&是為了不出現(xiàn)負(fù)數(shù),也可以用 hashcode()%num + num來(lái)代替,但是位運(yùn)算性能比較高。

2)自定義Partitioner步驟
(1)自定義類繼承Partitioner,重寫(xiě)getPartition()方法

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {

// 1 獲取電話號(hào)碼的前三位
        String preNum = key.toString().substring(0, 3);
        
        partition = 4;
        
        // 2 判斷是哪個(gè)省
        if ("136".equals(preNum)) {
            partition = 0;
        }else if ("137".equals(preNum)) {
            partition = 1;
        }else if ("138".equals(preNum)) {
            partition = 2;
        }else if ("139".equals(preNum)) {
            partition = 3;
        }
        return partition;
    }
}

(2)在job驅(qū)動(dòng)中,設(shè)置自定義partitioner:

job.setPartitionerClass(CustomPartitioner.class);

(3)自定義partition后,要根據(jù)自定義partitioner的邏輯設(shè)置相應(yīng)數(shù)量的reduce task

job.setNumReduceTasks(5);

3)注意:
如果reduceTask的數(shù)量> getPartition的結(jié)果數(shù),則會(huì)多產(chǎn)生幾個(gè)空的輸出文件part-r-000xx;
如果1<reduceTask的數(shù)量<getPartition的結(jié)果數(shù),則有一部分分區(qū)數(shù)據(jù)無(wú)處安放,會(huì)Exception;
如果reduceTask的數(shù)量=1,則不管mapTask端輸出多少個(gè)分區(qū)文件,最終結(jié)果都交給這一個(gè)reduceTask,最終也就只會(huì)產(chǎn)生一個(gè)結(jié)果文件 part-r-00000;
例如:假設(shè)自定義分區(qū)數(shù)為5,則
(1)job.setNumReduceTasks(1);會(huì)正常運(yùn)行,只不過(guò)會(huì)產(chǎn)生一個(gè)輸出文件
(2)job.setNumReduceTasks(2);會(huì)報(bào)錯(cuò)
(3)job.setNumReduceTasks(6);大于5,程序會(huì)正常運(yùn)行,會(huì)產(chǎn)生空文件
4)案例實(shí)操
詳見(jiàn)7.2.2 需求2:將統(tǒng)計(jì)結(jié)果按照手機(jī)歸屬地不同省份輸出到不同文件中(Partitioner)
詳見(jiàn)7.1.2 需求2:把單詞按照ASCII碼奇偶分區(qū)(Partitioner)

11、WritableComparable排序

排序是MapReduce框架中最重要的操作之一。Map Task和Reduce Task均會(huì)對(duì)數(shù)據(jù)(按照key)進(jìn)行排序。該操作屬于Hadoop的默認(rèn)行為。任何應(yīng)用程序中的數(shù)據(jù)均會(huì)被排序,而不管邏輯上是否需要。默認(rèn)排序是按照字典順序排序,且實(shí)現(xiàn)該排序的方法是快速排序。

對(duì)于Map Task,它會(huì)將處理的結(jié)果暫時(shí)放到一個(gè)緩沖區(qū)中,當(dāng)緩沖區(qū)使用率達(dá)到一定閾值后,再對(duì)緩沖區(qū)中的數(shù)據(jù)進(jìn)行一次排序,并將這些有序數(shù)據(jù)寫(xiě)到磁盤上,而當(dāng)數(shù)據(jù)處理完畢后,它會(huì)對(duì)磁盤上所有文件進(jìn)行一次合并,以將這些文件合并成一個(gè)大的有序文件。

對(duì)于Reduce Task,它從每個(gè)Map Task上遠(yuǎn)程拷貝相應(yīng)的數(shù)據(jù)文件,如果文件大小超過(guò)一定閾值,則放到磁盤上,否則放到內(nèi)存中。如果磁盤上文件數(shù)目達(dá)到一定閾值,則進(jìn)行一次合并以生成一個(gè)更大文件;如果內(nèi)存中文件大小或者數(shù)目超過(guò)一定閾值,則進(jìn)行一次合并后將數(shù)據(jù)寫(xiě)到磁盤上。當(dāng)所有數(shù)據(jù)拷貝完畢后,Reduce Task統(tǒng)一對(duì)內(nèi)存和磁盤上的所有數(shù)據(jù)進(jìn)行一次合并。

每個(gè)階段的默認(rèn)排序

1)排序的分類:
(1)部分排序:
MapReduce根據(jù)輸入記錄的鍵對(duì)數(shù)據(jù)集排序。保證輸出的每個(gè)文件內(nèi)部排序。
(2)全排序:
如何用Hadoop產(chǎn)生一個(gè)全局排序的文件?最簡(jiǎn)單的方法是使用一個(gè)分區(qū)。但該方法在處理大型文件時(shí)效率極低,因?yàn)橐慌_(tái)機(jī)器必須處理所有輸出文件,從而完全喪失了MapReduce所提供的并行架構(gòu)。
替代方案:首先創(chuàng)建一系列排好序的文件;其次,串聯(lián)這些文件;最后,生成一個(gè)全局排序的文件。主要思路是使用一個(gè)分區(qū)來(lái)描述輸出的全局排序。例如:可以為上述文件創(chuàng)建3個(gè)分區(qū),在第一分區(qū)中,記錄的單詞首字母a-g,第二分區(qū)記錄單詞首字母h-n, 第三分區(qū)記錄單詞首字母o-z。
(3)輔助排序:(GroupingComparator分組)
Mapreduce框架在記錄到達(dá)reducer之前按鍵對(duì)記錄排序,但鍵所對(duì)應(yīng)的值并沒(méi)有被排序。甚至在不同的執(zhí)行輪次中,這些值的排序也不固定,因?yàn)樗鼈儊?lái)自不同的map任務(wù)且這些map任務(wù)在不同輪次中完成時(shí)間各不相同。一般來(lái)說(shuō),大多數(shù)MapReduce程序會(huì)避免讓reduce函數(shù)依賴于值的排序。但是,有時(shí)也需要通過(guò)特定的方法對(duì)鍵進(jìn)行排序和分組等以實(shí)現(xiàn)對(duì)值的排序。
(4)二次排序:
在自定義排序過(guò)程中,如果compareTo中的判斷條件為兩個(gè)即為二次排序。
2)自定義排序WritableComparable
(1)原理分析
bean對(duì)象實(shí)現(xiàn)WritableComparable接口重寫(xiě)compareTo方法,就可以實(shí)現(xiàn)排序

@Override
public int compareTo(FlowBean o) {
    // 倒序排列,從大到小
    return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

12、GroupingComparator分組(輔助排序)

1)對(duì)reduce階段的數(shù)據(jù)根據(jù)某一個(gè)或幾個(gè)字段進(jìn)行分組。

13、Combiner合并

0)在分布式的架構(gòu)中,分布式文件系統(tǒng)HDFS,和分布式運(yùn)算程序編程框架mapreduce。
HDFS:不怕大文件,怕很多小文件
mapreduce :怕數(shù)據(jù)傾斜
那么mapreduce是如果解決多個(gè)小文件的問(wèn)題呢?
mapreduce關(guān)于大量小文件的優(yōu)化策略
(1) 默認(rèn)情況下,TextInputFormat對(duì)任務(wù)的切片機(jī)制是按照文件規(guī)劃切片,不管有多少個(gè)小文件,都會(huì)是單獨(dú)的切片,都會(huì)交給一個(gè)maptask,這樣,如果有大量的小文件
就會(huì)產(chǎn)生大量的maptask,處理效率極端底下
(2)優(yōu)化策略
最好的方法:在數(shù)據(jù)處理的最前端(預(yù)處理、采集),就將小文件合并成大文件,在上傳到HDFS做后續(xù)的分析
補(bǔ)救措施:如果已經(jīng)是大量的小文件在HDFS中了,可以使用另一種inputformat來(lái)做切片(CombineFileInputformat),它的切片邏輯跟TextInputformat
注:CombineTextInputFormat是CombineFileInputformat的子類
不同:
它可以將多個(gè)小文件從邏輯上規(guī)劃到一個(gè)切片中,這樣,多個(gè)小文件就可以交給一個(gè)maptask了
//如果不設(shè)置InputFormat,它默認(rèn)的用的是TextInputFormat.class
/*CombineTextInputFormat為系統(tǒng)自帶的組件類
* setMinInputSplitSize 中的2048是表示n個(gè)小文件之和不能大于2048
* setMaxInputSplitSize 中的4096是 當(dāng)滿足setMinInputSplitSize中的2048情況下 在滿足n+1個(gè)小文件之和不能大于4096
*/
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMinInputSplitSize(job, 2048);
CombineTextInputFormat.setMaxInputSplitSize(job, 4096);
1)輸入數(shù)據(jù):準(zhǔn)備5個(gè)小文件
2)實(shí)現(xiàn)過(guò)程
(1)不做任何處理,運(yùn)行需求1中的wordcount程序,觀察切片個(gè)數(shù)為5
(2)在WordcountDriver中增加如下代碼,運(yùn)行程序,并觀察運(yùn)行的切片個(gè)數(shù)為1

// 如果不設(shè)置InputFormat,它默認(rèn)用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4*1024*1024);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2*1024*1024);// 2m

注:在看number of splits時(shí),和最大值(MaxSplitSize)有關(guān)、總體規(guī)律就是和低于最大值是一片、高于最大值1.5倍+,則為兩片;高于最大值2倍以上則向下取整,比如文件大小65MB,切片最大值為4MB,那么切片為16個(gè).總體來(lái)說(shuō),切片差值不超過(guò)1個(gè),不影響整體性能

6)自定義Combiner實(shí)現(xiàn)步驟:
(1)自定義一個(gè)combiner繼承Reducer,重寫(xiě)reduce方法

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
        // 1 匯總操作
        int count = 0;
        for(IntWritable v :values){
            count = v.get();
        }
        // 2 寫(xiě)出
        context.write(key, new IntWritable(count));
    }
}

(2)在job驅(qū)動(dòng)類中設(shè)置:

job.setCombinerClass(WordcountCombiner.class);

13、ReduceTask工作機(jī)制

1)設(shè)置ReduceTask并行度(個(gè)數(shù))

reducetask的并行度同樣影響整個(gè)job的執(zhí)行并發(fā)度和執(zhí)行效率,但與maptask的并發(fā)數(shù)由切片數(shù)決定不同,Reducetask數(shù)量的決定是可以直接手動(dòng)設(shè)置:

//默認(rèn)值是1,手動(dòng)設(shè)置為4
job.setNumReduceTasks(4);

2)注意
(1)reducetask=0 ,表示沒(méi)有reduce階段,輸出文件個(gè)數(shù)和map個(gè)數(shù)一致。
(2)reducetask默認(rèn)值就是1,所以輸出文件個(gè)數(shù)為一個(gè)。
(3)如果數(shù)據(jù)分布不均勻,就有可能在reduce階段產(chǎn)生數(shù)據(jù)傾斜
(4)reducetask數(shù)量并不是任意設(shè)置,還要考慮業(yè)務(wù)邏輯需求,有些情況下,需要計(jì)算全局匯總結(jié)果,就只能有1個(gè)reducetask。
(5)具體多少個(gè)reducetask,需要根據(jù)集群性能而定。
(6)如果分區(qū)數(shù)不是1,但是reducetask為1,是否執(zhí)行分區(qū)過(guò)程。答案是:不執(zhí)行分區(qū)過(guò)程。因?yàn)樵趍aptask的源碼中,執(zhí)行分區(qū)的前提是先判斷reduceNum個(gè)數(shù)是否大于1。不大于1肯定不執(zhí)行。
3)實(shí)驗(yàn):測(cè)試reducetask多少合適。
(1)實(shí)驗(yàn)環(huán)境:1個(gè)master節(jié)點(diǎn),16個(gè)slave節(jié)點(diǎn):CPU:8GHZ,內(nèi)存: 2G
(2)實(shí)驗(yàn)結(jié)論:
表1 改變r(jià)educe task (數(shù)據(jù)量為1GB)

image.png

4)ReduceTask工作機(jī)制

image.png

(1)Copy階段:ReduceTask從各個(gè)MapTask上遠(yuǎn)程拷貝一片數(shù)據(jù),并針對(duì)某一片數(shù)據(jù),如果其大小超過(guò)一定閾值,則寫(xiě)到磁盤上,否則直接放到內(nèi)存中。
(2)Merge階段:在遠(yuǎn)程拷貝數(shù)據(jù)的同時(shí),ReduceTask啟動(dòng)了兩個(gè)后臺(tái)線程對(duì)內(nèi)存和磁盤上的文件進(jìn)行合并,以防止內(nèi)存使用過(guò)多或磁盤上文件過(guò)多。
(3)Sort階段:按照MapReduce語(yǔ)義,用戶編寫(xiě)reduce()函數(shù)輸入數(shù)據(jù)是按key進(jìn)行聚集的一組數(shù)據(jù)。為了將key相同的數(shù)據(jù)聚在一起,Hadoop采用了基于排序的策略。由于各個(gè)MapTask已經(jīng)實(shí)現(xiàn)對(duì)自己的處理結(jié)果進(jìn)行了局部排序,因此,ReduceTask只需對(duì)所有數(shù)據(jù)進(jìn)行一次歸并排序即可。
(4)Reduce階段:reduce()函數(shù)將計(jì)算結(jié)果寫(xiě)到HDFS上。

14、OutputFormat接口實(shí)現(xiàn)類

OutputFormat是MapReduce輸出的基類,所有實(shí)現(xiàn)MapReduce輸出都實(shí)現(xiàn)了 OutputFormat接口。下面我們介紹幾種常見(jiàn)的OutputFormat實(shí)現(xiàn)類。

1)文本輸出TextOutputFormat
默認(rèn)的輸出格式是TextOutputFormat,它把每條記錄寫(xiě)為文本行。它的鍵和值可以是任意類型,因?yàn)門extOutputFormat調(diào)用toString()方法把它們轉(zhuǎn)換為字符串。

2)SequenceFileOutputFormat

SequenceFileOutputFormat將它的輸出寫(xiě)為一個(gè)順序文件。如果輸出需要作為后續(xù) MapReduce任務(wù)的輸入,這便是一種好的輸出格式,因?yàn)樗母袷骄o湊,很容易被壓縮。

3)自定義OutputFormat

根據(jù)用戶需求,自定義實(shí)現(xiàn)輸出。

15、自定義OutputFormat

為了實(shí)現(xiàn)控制最終文件的輸出路徑,可以自定義OutputFormat。

要在一個(gè)mapreduce程序中根據(jù)數(shù)據(jù)的不同輸出兩類結(jié)果到不同目錄,這類靈活的輸出需求可以通過(guò)自定義outputformat來(lái)實(shí)現(xiàn)。

1)自定義OutputFormat步驟
(1)自定義一個(gè)類繼承FileOutputFormat。
(2)改寫(xiě)recordwriter,具體改寫(xiě)輸出數(shù)據(jù)的方法write()。
2)實(shí)操案例:?

Join多種應(yīng)用

16、Map join(Distributedcache分布式緩存)

1)使用場(chǎng)景:一張表十分小、一張表很大。
2)解決方案
在map端緩存多張表,提前處理業(yè)務(wù)邏輯,這樣增加map端業(yè)務(wù),減少reduce端數(shù)據(jù)的壓力,盡可能的減少數(shù)據(jù)傾斜。
3)具體辦法:采用distributedcache
(1)在mapper的setup階段,將文件讀取到緩存集合中。
(2)在驅(qū)動(dòng)函數(shù)中加載緩存。
job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 緩存普通文件到task運(yùn)行節(jié)點(diǎn)
4)實(shí)操案例:

17、Reduce join

1)原理:
Map端的主要工作:為來(lái)自不同表(文件)的key/value對(duì)打標(biāo)簽以區(qū)別不同來(lái)源的記錄。然后用連接字段作為key,其余部分和新加的標(biāo)志作為value,最后進(jìn)行輸出。
Reduce端的主要工作:在reduce端以連接字段作為key的分組已經(jīng)完成,我們只需要在每一個(gè)分組當(dāng)中將那些來(lái)源于不同文件的記錄(在map階段已經(jīng)打標(biāo)志)分開(kāi),最后進(jìn)行合并就ok了。
2)該方法的缺點(diǎn)
這種方式的缺點(diǎn)很明顯就是會(huì)造成map和reduce端也就是shuffle階段出現(xiàn)大量的數(shù)據(jù)傳輸,效率很低。
3)案例實(shí)操

18、數(shù)據(jù)清洗(ETL)

3.8 數(shù)據(jù)清洗(ETL)
1)概述
在運(yùn)行核心業(yè)務(wù)Mapreduce程序之前,往往要先對(duì)數(shù)據(jù)進(jìn)行清洗,清理掉不符合用戶要求的數(shù)據(jù)。清理的過(guò)程往往只需要運(yùn)行mapper程序,不需要運(yùn)行reduce程序。
2)實(shí)操案例

19、計(jì)數(shù)器應(yīng)用

Hadoop為每個(gè)作業(yè)維護(hù)若干內(nèi)置計(jì)數(shù)器,以描述多項(xiàng)指標(biāo)。例如,某些計(jì)數(shù)器記錄已處理的字節(jié)數(shù)和記錄數(shù),使用戶可監(jiān)控已處理的輸入數(shù)據(jù)量和已產(chǎn)生的輸出數(shù)據(jù)量。
1)API
(1)采用枚舉的方式統(tǒng)計(jì)計(jì)數(shù)
enum MyCounter{MALFORORMED,NORMAL}
//對(duì)枚舉定義的自定義計(jì)數(shù)器加1
context.getCounter(MyCounter.MALFORORMED).increment(1);
(2)采用計(jì)數(shù)器組、計(jì)數(shù)器名稱的方式統(tǒng)計(jì)
context.getCounter("counterGroup", "countera").increment(1);
組名和計(jì)數(shù)器名稱隨便起,但最好有意義。
(3)計(jì)數(shù)結(jié)果在程序運(yùn)行后的控制臺(tái)上查看。
2)案例實(shí)操

20、MapReduce開(kāi)發(fā)總結(jié)

在編寫(xiě)mapreduce程序時(shí),需要考慮的幾個(gè)方面:
1)輸入數(shù)據(jù)接口:InputFormat
默認(rèn)使用的實(shí)現(xiàn)類是:TextInputFormat
TextInputFormat的功能邏輯是:一次讀一行文本,然后將該行的起始偏移量作為key,行內(nèi)容作為value返回。
KeyValueTextInputFormat每一行均為一條記錄,被分隔符分割為key,value。默認(rèn)分隔符是tab(\t)。
NlineInputFormat按照指定的行數(shù)N來(lái)劃分切片。
CombineTextInputFormat可以把多個(gè)小文件合并成一個(gè)切片處理,提高處理效率。
用戶還可以自定義InputFormat。
2)邏輯處理接口:Mapper
用戶根據(jù)業(yè)務(wù)需求實(shí)現(xiàn)其中三個(gè)方法:map() setup() cleanup ()
3)Partitioner分區(qū)
有默認(rèn)實(shí)現(xiàn) HashPartitioner,邏輯是根據(jù)key的哈希值和numReduces來(lái)返回一個(gè)分區(qū)號(hào);key.hashCode()&Integer.MAXVALUE % numReduces
如果業(yè)務(wù)上有特別的需求,可以自定義分區(qū)。
4)Comparable排序
當(dāng)我們用自定義的對(duì)象作為key來(lái)輸出時(shí),就必須要實(shí)現(xiàn)WritableComparable接口,重寫(xiě)其中的compareTo()方法。
部分排序:對(duì)最終輸出的每一個(gè)文件進(jìn)行內(nèi)部排序。
全排序:對(duì)所有數(shù)據(jù)進(jìn)行排序,通常只有一個(gè)Reduce。
二次排序:排序的條件有兩個(gè)。
5)Combiner合并
Combiner合并可以提高程序執(zhí)行效率,減少io傳輸。但是使用時(shí)必須不能影響原有的業(yè)務(wù)處理結(jié)果。
6)reduce端分組:Groupingcomparator
reduceTask拿到輸入數(shù)據(jù)(一個(gè)partition的所有數(shù)據(jù))后,首先需要對(duì)數(shù)據(jù)進(jìn)行分組,其分組的默認(rèn)原則是key相同,然后對(duì)每一組kv數(shù)據(jù)調(diào)用一次reduce()方法,并且將這一組kv中的第一個(gè)kv的key作為參數(shù)傳給reduce的key,將這一組數(shù)據(jù)的value的迭代器傳給reduce()的values參數(shù)。
利用上述這個(gè)機(jī)制,我們可以實(shí)現(xiàn)一個(gè)高效的分組取最大值的邏輯。

自定義一個(gè)bean對(duì)象用來(lái)封裝我們的數(shù)據(jù),然后改寫(xiě)其compareTo方法產(chǎn)生倒序排序的效果。然后自定義一個(gè)Groupingcomparator,將bean對(duì)象的分組邏輯改成按照我們的業(yè)務(wù)分組id來(lái)分組(比如訂單號(hào))。這樣,我們要取的最大值就是reduce()方法中傳進(jìn)來(lái)key。
7)邏輯處理接口:Reducer
用戶根據(jù)業(yè)務(wù)需求實(shí)現(xiàn)其中三個(gè)方法:reduce() setup() cleanup ()
8)輸出數(shù)據(jù)接口:OutputFormat
默認(rèn)實(shí)現(xiàn)類是TextOutputFormat,功能邏輯是:將每一個(gè)KV對(duì)向目標(biāo)文本文件中輸出為一行。
SequenceFileOutputFormat將它的輸出寫(xiě)為一個(gè)順序文件。如果輸出需要作為后續(xù) MapReduce任務(wù)的輸入,這便是一種好的輸出格式,因?yàn)樗母袷骄o湊,很容易被壓縮。
用戶還可以自定義OutputFormat。

下一篇:085-BigData-13MapReduce案例分析

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容