思考
想到全局排序,是否第一想到的是,從map端收集數(shù)據(jù),shuffle到reduce來(lái),設(shè)置一個(gè)reduce,再對(duì)reduce中的數(shù)據(jù)排序,顯然這樣和單機(jī)器并沒有什么區(qū)別,要知道m(xù)apreduce框架默認(rèn)是對(duì)key來(lái)排序的,當(dāng)然也可以將value放到key上面來(lái)達(dá)到對(duì)value排序,最后在reduce時(shí)候?qū)φ{(diào)回去,另外排序是針對(duì)相同分區(qū),即一個(gè)reduce來(lái)排序的,這樣其實(shí)也不能充分運(yùn)用到集群的并行,那么如何更優(yōu)雅地實(shí)現(xiàn)全局排序呢?
摘要
hadoop中的排序分為部分排序,全局排序,輔助排序,二次排序等,本文主要介紹如何實(shí)現(xiàn)key全局排序,共有三種實(shí)現(xiàn)方式:
- 設(shè)置一個(gè)reduce
- 利用自定義partition 將數(shù)據(jù)按順序分批次分流到多個(gè)分區(qū)
- 利用框架自實(shí)現(xiàn)TotalOrderPartitioner 分區(qū)器來(lái)實(shí)現(xiàn)
實(shí)現(xiàn)
首先準(zhǔn)備一些輸入數(shù)據(jù):https://github.com/hulichao/bigdata-code/tree/master/data/job,如下,
/data/job/file.txt
2
32
654
32
15
756
65223
通過(guò)設(shè)置一 個(gè)reduce來(lái)實(shí)現(xiàn)全局排序
利用一個(gè)reduce來(lái)實(shí)現(xiàn)全局排序,可以說(shuō)不需要做什么特別的操作,mapper,reduce,driver實(shí)現(xiàn)如下:
package com.hoult.mr.job;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class JobMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
@Override
protected void map(LongWritable key, Text value,
Context context) throws IOException, InterruptedException {
IntWritable intWritable = new IntWritable(Integer.parseInt(value.toString()));
context.write(intWritable, intWritable);
}
}
package com.hoult.mr.job;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class JobReducer extends
Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
private int index = 0;//全局排序計(jì)數(shù)器
@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
for (IntWritable value : values)
context.write(new IntWritable(++index), value);
}
}
package com.hoult.mr.job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("input-path output-path");
System.exit(1);
}
Job job = Job.getInstance(getConf());
job.setJarByClass(JobDriver.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(JobMapper.class);
job.setReducerClass(JobReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(NullWritable.class);
//使用一個(gè)reduce來(lái)排序
job.setNumReduceTasks(1);
job.setJobName("JobDriver");
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args)throws Exception{
// int exitCode = ToolRunner.run(new JobDriver(), args);
int exitCode = ToolRunner.run(new JobDriver(), new String[] {"data/job/", "data/job/output"});
System.exit(exitCode);
}
}
//加了排序索引,最后輸出一個(gè)文件,內(nèi)容如下:
1 2
2 6
3 15
4 22
5 26
6 32
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223
PS; 以上通過(guò)hadoop自帶的ToolRunner工具來(lái)啟動(dòng)任務(wù),后續(xù)代碼涉及到重復(fù)的不再列出,只針對(duì)差異性的代碼。
利用自定義partition 將數(shù)據(jù)按順序分批次分流到多個(gè)分區(qū)
通過(guò)自定義分區(qū)如何保證數(shù)據(jù)的全局有序呢?我們知道key值分區(qū),會(huì)通過(guò)默認(rèn)分區(qū)函數(shù)HashPartition將不同范圍的key發(fā)送到不同的reduce,所以利用這一點(diǎn),這樣來(lái)實(shí)現(xiàn)分區(qū)器,例如有數(shù)據(jù)分布在1-1億,可以將1-1000萬(wàn)的數(shù)據(jù)讓reduce1來(lái)跑,1000萬(wàn)+1-2000萬(wàn)的數(shù)據(jù)來(lái)讓reduce2來(lái)跑。。。。最后可以對(duì)這十個(gè)文件,按順序組合即可得到所有數(shù)據(jù)按分區(qū)有序的全局排序數(shù)據(jù),由于數(shù)據(jù)量較小,采用分11個(gè)分區(qū),分別是1-1000,10001-2000,。跟第一種方式實(shí)現(xiàn)不同的有下面兩個(gè)點(diǎn),
//partitionner實(shí)現(xiàn)
package com.hoult.mr.job;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author hulichao
* @date 20-9-20
**/
public class JobPartitioner extends Partitioner<IntWritable, IntWritable> {
@Override
public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
int keyValue = Integer.parseInt(key.toString());
for (int i = 0; i < 10; i++) {
if (keyValue < 1000 * (i+1) && keyValue >= 1000 * (i-1)) {
System.out.println("key:" + keyValue + ", part:" + i);
return i;
}
}
return 10;
}
}
//driver處需要增加:
//設(shè)置自定義分區(qū)器
job.setPartitionerClass(JobPartitioner.class);
//driver處需要修改reduce數(shù)量
job.setNumReduceTasks(10);
執(zhí)行程序,結(jié)果會(huì)產(chǎn)生10個(gè)文件,文件內(nèi)有序。
part-r-00000
part-r-00001
part-r-00002
part-r-00003
part-r-00004
part-r-00005
part-r-00006
part-r-00007
part-r-00008
part-r-00009
注意:需要注意一點(diǎn),partition含有數(shù)據(jù)的分區(qū)要小于等于reduce數(shù),否則會(huì)包Illegal partiion錯(cuò)誤。另外缺點(diǎn)分區(qū)的實(shí)現(xiàn)如果對(duì)數(shù)據(jù)知道較少可能會(huì)導(dǎo)致數(shù)據(jù)傾斜和OOM問題。
利用框架自實(shí)現(xiàn)TotalOrderPartitioner 分區(qū)器來(lái)實(shí)現(xiàn)
既然想到了第二種自定義方式,其實(shí)可以解決多數(shù)傾斜問題,但是實(shí)際上,在數(shù)據(jù)分布不了解之前,對(duì)數(shù)據(jù)的分布評(píng)估,只能去試,看結(jié)果值有哪些,進(jìn)而自定義分區(qū)器,這不就是取樣嗎,針對(duì)取樣然后實(shí)現(xiàn)分區(qū)器這種方式,hadoop已經(jīng)幫我們實(shí)現(xiàn)好了,并且解決了數(shù)據(jù)傾斜和OOM 問題,那就是TotalOrderPartitioner類,其類提供了數(shù)據(jù)采樣器,對(duì)key值進(jìn)行部分采樣,然后按照采樣結(jié)果尋找key值的最佳分割點(diǎn),從而將key均勻分布在不同分區(qū)中。
TotalOrderPartitioner提供了三個(gè)采樣器如下:
- SplitSampler 分片采樣器,從數(shù)據(jù)分片中采樣數(shù)據(jù),該采樣器不適合已經(jīng)排好序的數(shù)據(jù)
- RandomSampler隨機(jī)采樣器,按照設(shè)置好的采樣率從一個(gè)數(shù)據(jù)集中采樣
- IntervalSampler間隔采樣機(jī),以固定的間隔從分片中采樣數(shù)據(jù),對(duì)于已經(jīng)排好序的數(shù)據(jù)效果非常好
采樣器實(shí)現(xiàn)了K[] getSample(InputFormat<K,V> info, Job job) 方法,返回的是采樣數(shù)組,其中InputFormat是map輸入端前面的輸入輔助類,根據(jù)返回的K[]的長(zhǎng)度進(jìn)而生成數(shù)組長(zhǎng)度-1個(gè)partition,最后按照分割點(diǎn)范圍將對(duì)應(yīng)數(shù)據(jù)發(fā)送到相應(yīng)分區(qū)中。
代碼實(shí)現(xiàn):
//mapper和driver的類型略有不同
package com.hoult.mr.job.totalsort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author hulichao
* @date 20-9-20
**/
public class TotalMapper extends Mapper<Text, Text, Text, IntWritable> {
@Override
protected void map(Text key, Text value,
Context context) throws IOException, InterruptedException {
System.out.println("key:" + key.toString() + ", value:" + value.toString());
context.write(key, new IntWritable(Integer.parseInt(key.toString())));
}
}
package com.hoult.mr.job.totalsort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author hulichao
* @date 20-9-20
**/
public class TotalReducer extends Reducer<Text, IntWritable, IntWritable, NullWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
for (IntWritable value : values)
context.write(value, NullWritable.get());
}
}
//比較器
package com.hoult.mr.job.totalsort;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* 自定義比較器來(lái)比較key的順序
* @author hulichao
* @date 20-9-20
**/
public class KeyComparator extends WritableComparator {
protected KeyComparator() {
super(Text.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
int num1 = Integer.valueOf(w1.toString());
int num2 = Integer.valueOf(w2.toString());
return num1 - num2;
}
}
package com.hoult.mr.job.totalsort;
//driver 實(shí)現(xiàn)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* @author hulichao
* @date 20-9-20
**/
public class TotalDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
//設(shè)置非分區(qū)排序
conf.set("mapreduce.totalorderpartitioner.naturalorder", "false");
Job job = Job.getInstance(conf, "Total Driver");
job.setJarByClass(TotalDriver.class);
//設(shè)置讀取文件的路徑,都是從HDFS中讀取。讀取文件路徑從腳本文件中傳進(jìn)來(lái)
FileInputFormat.addInputPath(job,new Path(args[0]));
//設(shè)置mapreduce程序的輸出路徑,MapReduce的結(jié)果都是輸入到文件中
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setInputFormatClass(KeyValueTextInputFormat.class);
//設(shè)置比較器,用于比較數(shù)據(jù)的大小,然后按順序排序,該例子主要用于比較兩個(gè)key的大小
job.setSortComparatorClass(KeyComparator.class);
job.setNumReduceTasks(10);//設(shè)置reduce數(shù)量
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(NullWritable.class);
//設(shè)置保存partitions文件的路徑
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2]));
//key值采樣,0.01是采樣率,
InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.1, 3, 100);
//將采樣數(shù)據(jù)寫入到分區(qū)文件中
InputSampler.writePartitionFile(job, sampler);
job.setMapperClass(TotalMapper.class);
job.setReducerClass(TotalReducer.class);
//設(shè)置分區(qū)類。
job.setPartitionerClass(TotalOrderPartitioner.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args)throws Exception{
// int exitCode = ToolRunner.run(new TotalDriver(), new String[] {"data/job/input", "data/job/output", "data/job/partition","data/job/partitio2"});
int exitCode = ToolRunner.run(new TotalDriver(), args);
System.exit(exitCode);
}
}
結(jié)果和第二種實(shí)現(xiàn)類似,需要注意只在集群測(cè)試時(shí)候才有效,本地測(cè)試可能會(huì)報(bào)錯(cuò)
2020-09-20 16:36:10,664 WARN [org.apache.hadoop.util.NativeCodeLoader] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0
at com.hoult.mr.job.totalsort.TotalDriver.run(TotalDriver.java:32)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
at com.hoult.mr.job.totalsort.TotalDriver.main(TotalDriver.java:60)
吳邪,小三爺,混跡于后臺(tái),大數(shù)據(jù),人工智能領(lǐng)域的小菜鳥。
更多請(qǐng)關(guān)注
<center> <img src="https://raw.githubusercontent.com/hulichao/myblog_pic/master/blog/wechat.png" width="40%" /> </center>