MapReduce怎么優(yōu)雅地實(shí)現(xiàn)全局排序

思考

想到全局排序,是否第一想到的是,從map端收集數(shù)據(jù),shuffle到reduce來(lái),設(shè)置一個(gè)reduce,再對(duì)reduce中的數(shù)據(jù)排序,顯然這樣和單機(jī)器并沒有什么區(qū)別,要知道m(xù)apreduce框架默認(rèn)是對(duì)key來(lái)排序的,當(dāng)然也可以將value放到key上面來(lái)達(dá)到對(duì)value排序,最后在reduce時(shí)候?qū)φ{(diào)回去,另外排序是針對(duì)相同分區(qū),即一個(gè)reduce來(lái)排序的,這樣其實(shí)也不能充分運(yùn)用到集群的并行,那么如何更優(yōu)雅地實(shí)現(xiàn)全局排序呢?

摘要

hadoop中的排序分為部分排序,全局排序,輔助排序,二次排序等,本文主要介紹如何實(shí)現(xiàn)key全局排序,共有三種實(shí)現(xiàn)方式:

  1. 設(shè)置一個(gè)reduce
  2. 利用自定義partition 將數(shù)據(jù)按順序分批次分流到多個(gè)分區(qū)
  3. 利用框架自實(shí)現(xiàn)TotalOrderPartitioner 分區(qū)器來(lái)實(shí)現(xiàn)

實(shí)現(xiàn)

首先準(zhǔn)備一些輸入數(shù)據(jù):https://github.com/hulichao/bigdata-code/tree/master/data/job,如下,

/data/job/file.txt
2
32
654
32
15
756
65223

通過(guò)設(shè)置一 個(gè)reduce來(lái)實(shí)現(xiàn)全局排序

利用一個(gè)reduce來(lái)實(shí)現(xiàn)全局排序,可以說(shuō)不需要做什么特別的操作,mapper,reduce,driver實(shí)現(xiàn)如下:

package com.hoult.mr.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class JobMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value,
                       Context context) throws IOException, InterruptedException {
        IntWritable intWritable = new IntWritable(Integer.parseInt(value.toString()));
        context.write(intWritable, intWritable);
    }
}
package com.hoult.mr.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class JobReducer  extends
        Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

    private int index = 0;//全局排序計(jì)數(shù)器
    @Override
    protected void reduce(IntWritable key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        for (IntWritable value : values)
            context.write(new IntWritable(++index), value);
    }
}
package com.hoult.mr.job;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("input-path output-path");
            System.exit(1);
        }

        Job job = Job.getInstance(getConf());
        job.setJarByClass(JobDriver.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(JobMapper.class);
        job.setReducerClass(JobReducer.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(NullWritable.class);
        //使用一個(gè)reduce來(lái)排序
        job.setNumReduceTasks(1);
        job.setJobName("JobDriver");
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args)throws Exception{

//        int exitCode = ToolRunner.run(new JobDriver(), args);
        int exitCode = ToolRunner.run(new JobDriver(), new String[] {"data/job/", "data/job/output"});
        System.exit(exitCode);
    }
}

//加了排序索引,最后輸出一個(gè)文件,內(nèi)容如下:
1   2
2   6
3   15
4   22
5   26
6   32
7   32
8   54
9   92
10  650
11  654
12  756
13  5956
14  65223

PS; 以上通過(guò)hadoop自帶的ToolRunner工具來(lái)啟動(dòng)任務(wù),后續(xù)代碼涉及到重復(fù)的不再列出,只針對(duì)差異性的代碼。

利用自定義partition 將數(shù)據(jù)按順序分批次分流到多個(gè)分區(qū)

通過(guò)自定義分區(qū)如何保證數(shù)據(jù)的全局有序呢?我們知道key值分區(qū),會(huì)通過(guò)默認(rèn)分區(qū)函數(shù)HashPartition將不同范圍的key發(fā)送到不同的reduce,所以利用這一點(diǎn),這樣來(lái)實(shí)現(xiàn)分區(qū)器,例如有數(shù)據(jù)分布在1-1億,可以將1-1000萬(wàn)的數(shù)據(jù)讓reduce1來(lái)跑,1000萬(wàn)+1-2000萬(wàn)的數(shù)據(jù)來(lái)讓reduce2來(lái)跑。。。。最后可以對(duì)這十個(gè)文件,按順序組合即可得到所有數(shù)據(jù)按分區(qū)有序的全局排序數(shù)據(jù),由于數(shù)據(jù)量較小,采用分11個(gè)分區(qū),分別是1-1000,10001-2000,。跟第一種方式實(shí)現(xiàn)不同的有下面兩個(gè)點(diǎn),

//partitionner實(shí)現(xiàn)
package com.hoult.mr.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @author hulichao
 * @date 20-9-20
 **/
public class JobPartitioner extends Partitioner<IntWritable, IntWritable> {
    @Override
    public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
        int keyValue = Integer.parseInt(key.toString());

        for (int i = 0; i < 10; i++) {
            if (keyValue < 1000 * (i+1) && keyValue >= 1000 * (i-1)) {
                System.out.println("key:" + keyValue + ", part:" + i);
                return i;
            }
        }

        return 10;
    }
}

//driver處需要增加:
        //設(shè)置自定義分區(qū)器
        job.setPartitionerClass(JobPartitioner.class);
        
//driver處需要修改reduce數(shù)量
        job.setNumReduceTasks(10);

執(zhí)行程序,結(jié)果會(huì)產(chǎn)生10個(gè)文件,文件內(nèi)有序。

part-r-00000
part-r-00001
part-r-00002
part-r-00003
part-r-00004
part-r-00005
part-r-00006
part-r-00007
part-r-00008
part-r-00009

注意:需要注意一點(diǎn),partition含有數(shù)據(jù)的分區(qū)要小于等于reduce數(shù),否則會(huì)包Illegal partiion錯(cuò)誤。另外缺點(diǎn)分區(qū)的實(shí)現(xiàn)如果對(duì)數(shù)據(jù)知道較少可能會(huì)導(dǎo)致數(shù)據(jù)傾斜和OOM問題。

利用框架自實(shí)現(xiàn)TotalOrderPartitioner 分區(qū)器來(lái)實(shí)現(xiàn)

既然想到了第二種自定義方式,其實(shí)可以解決多數(shù)傾斜問題,但是實(shí)際上,在數(shù)據(jù)分布不了解之前,對(duì)數(shù)據(jù)的分布評(píng)估,只能去試,看結(jié)果值有哪些,進(jìn)而自定義分區(qū)器,這不就是取樣嗎,針對(duì)取樣然后實(shí)現(xiàn)分區(qū)器這種方式,hadoop已經(jīng)幫我們實(shí)現(xiàn)好了,并且解決了數(shù)據(jù)傾斜和OOM 問題,那就是TotalOrderPartitioner類,其類提供了數(shù)據(jù)采樣器,對(duì)key值進(jìn)行部分采樣,然后按照采樣結(jié)果尋找key值的最佳分割點(diǎn),從而將key均勻分布在不同分區(qū)中。

TotalOrderPartitioner提供了三個(gè)采樣器如下:

  • SplitSampler 分片采樣器,從數(shù)據(jù)分片中采樣數(shù)據(jù),該采樣器不適合已經(jīng)排好序的數(shù)據(jù)
  • RandomSampler隨機(jī)采樣器,按照設(shè)置好的采樣率從一個(gè)數(shù)據(jù)集中采樣
  • IntervalSampler間隔采樣機(jī),以固定的間隔從分片中采樣數(shù)據(jù),對(duì)于已經(jīng)排好序的數(shù)據(jù)效果非常好

采樣器實(shí)現(xiàn)了K[] getSample(InputFormat<K,V> info, Job job) 方法,返回的是采樣數(shù)組,其中InputFormat是map輸入端前面的輸入輔助類,根據(jù)返回的K[]的長(zhǎng)度進(jìn)而生成數(shù)組長(zhǎng)度-1個(gè)partition,最后按照分割點(diǎn)范圍將對(duì)應(yīng)數(shù)據(jù)發(fā)送到相應(yīng)分區(qū)中。

代碼實(shí)現(xiàn):

//mapper和driver的類型略有不同
package com.hoult.mr.job.totalsort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author hulichao
 * @date 20-9-20
 **/
public class TotalMapper extends Mapper<Text, Text, Text, IntWritable> {
    @Override
    protected void map(Text key, Text value,
                       Context context) throws IOException, InterruptedException {
        System.out.println("key:" + key.toString() + ", value:" + value.toString());
        context.write(key, new IntWritable(Integer.parseInt(key.toString())));
    }
}
package com.hoult.mr.job.totalsort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author hulichao
 * @date 20-9-20
 **/
public class TotalReducer extends Reducer<Text, IntWritable, IntWritable, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        for (IntWritable value : values)
            context.write(value, NullWritable.get());
    }
}
//比較器
package com.hoult.mr.job.totalsort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 自定義比較器來(lái)比較key的順序
 * @author hulichao
 * @date 20-9-20
 **/
public class KeyComparator extends WritableComparator {
    protected KeyComparator() {
        super(Text.class, true);
    }

    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        int num1 = Integer.valueOf(w1.toString());
        int num2 = Integer.valueOf(w2.toString());
        return num1 - num2;
    }
}
package com.hoult.mr.job.totalsort;

//driver 實(shí)現(xiàn)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author hulichao
 * @date 20-9-20
 **/
public class TotalDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //設(shè)置非分區(qū)排序
        conf.set("mapreduce.totalorderpartitioner.naturalorder", "false");
        Job job = Job.getInstance(conf, "Total Driver");
        job.setJarByClass(TotalDriver.class);

        //設(shè)置讀取文件的路徑,都是從HDFS中讀取。讀取文件路徑從腳本文件中傳進(jìn)來(lái)
        FileInputFormat.addInputPath(job,new Path(args[0]));
        //設(shè)置mapreduce程序的輸出路徑,MapReduce的結(jié)果都是輸入到文件中
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        //設(shè)置比較器,用于比較數(shù)據(jù)的大小,然后按順序排序,該例子主要用于比較兩個(gè)key的大小
        job.setSortComparatorClass(KeyComparator.class);
        job.setNumReduceTasks(10);//設(shè)置reduce數(shù)量

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(NullWritable.class);

        //設(shè)置保存partitions文件的路徑
        TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2]));
        //key值采樣,0.01是采樣率,
        InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.1, 3, 100);
        //將采樣數(shù)據(jù)寫入到分區(qū)文件中
        InputSampler.writePartitionFile(job, sampler);

        job.setMapperClass(TotalMapper.class);
        job.setReducerClass(TotalReducer.class);
        //設(shè)置分區(qū)類。
        job.setPartitionerClass(TotalOrderPartitioner.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }
    public static void main(String[] args)throws Exception{
//        int exitCode = ToolRunner.run(new TotalDriver(), new String[] {"data/job/input", "data/job/output", "data/job/partition","data/job/partitio2"});
        int exitCode = ToolRunner.run(new TotalDriver(), args);
        System.exit(exitCode);
    }
}

結(jié)果和第二種實(shí)現(xiàn)類似,需要注意只在集群測(cè)試時(shí)候才有效,本地測(cè)試可能會(huì)報(bào)錯(cuò)

2020-09-20 16:36:10,664 WARN [org.apache.hadoop.util.NativeCodeLoader] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0
    at com.hoult.mr.job.totalsort.TotalDriver.run(TotalDriver.java:32)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
    at com.hoult.mr.job.totalsort.TotalDriver.main(TotalDriver.java:60)

吳邪,小三爺,混跡于后臺(tái),大數(shù)據(jù),人工智能領(lǐng)域的小菜鳥。
更多請(qǐng)關(guān)注
<center> <img src="https://raw.githubusercontent.com/hulichao/myblog_pic/master/blog/wechat.png" width="40%" /> </center>

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容