hadoop Partitioner使用及注意点

前言
hadoop已经出来这么长时间了,分区的文章早已经多如牛毛,为何你还要写呢?
其实呢,这篇文章主要是想要介绍一下使用MR自定义分区需要注意的一些点。

前言

hadoop已经出来这么长时间了,分区的文章早已经多如牛毛,为何你还要写呢?
其实呢,这篇文章主要是想要介绍一下使用MR自定义分区需要注意的一些点。
可能早有前辈已经指出该问题了。但还是容我自己做一个小小的记录,哈哈哈~~~

我们知道map数据会写入到分区,默认的分区只有一个,但是我想要10个又或者是100个,可以吗?
当然是可以的是。

你只需要创建一个类继承org.apache.hadoop.mapreduce.Partitioner
类就可以完全定义自己想要的分区方式。
然后在job中设置自定义的Partitioner类即可。
但是,这样写真的就结束了吗?

一个例子

PartitionMapper.java

1
2
3
4
5
6
7
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split(",");
String gender = tokens[2];
String nameAgeScore = tokens[0] + "," + tokens[1] + "," + tokens[3];
context.write(new Text(gender), new Text(nameAgeScore));
}

AgePartitioner.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public int getPartition(Text key, Text value, int numPartitions) {
String[] nameAgeScore = value.toString().split(",");
String age = nameAgeScore[1];
int ageInt = Integer.parseInt(age);
if (numPartitions == 0) {
return 0;
}
if (ageInt <= 20) {
return 0;
}
if (ageInt > 20 && ageInt <= 50) {
return 1 % numPartitions;
}
return 2 % numPartitions;
}

ParitionReducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int maxScore = Integer.MIN_VALUE;
String name = "";
String age = "";
String gender = "";
int score = 0;
for (Text val : values) {
String[] valTokens = val.toString().split(",");
score = Integer.parseInt(valTokens[2]);
if (score > maxScore) {
name = valTokens[0];
age = valTokens[1];
gender = key.toString();
maxScore = score;
}
}
context.write(new Text(name), new Text("age- " + age + "," + gender + "," + " score-" + maxScore));
}

驱动类,具体的模板代码我就不再写入,只将Partitioner设置展示

1
job.setPartitionerClass(AgePartitioner.class);

以上这个例子,是我在其它文章中截取下来的,具体地址,会在链接中给出。
现在,你可以运行该例子,你会发现Reduce输出的只有一个文件,然后你还会发现其实使用的并非是自定义
的Partitioner类。

一开始的时候,我有点懵逼了。what?我的设置没有生效吗?
你的设置是没有问题的,但是你却忘记了一项重要的事情。究竟是什么事情呀,快点说说呀(臭鱼)。
在说出这个秘密之前,我们看看map的context.write()这个方法是怎么做的吧。

MapTask.java

1
2
3
4
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}

其中partitioner是在哪里定义的呢?
在NewOutputCollector类中,该类作为MapTask内部类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private class NewOutputCollector<K,V>
extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
private final MapOutputCollector<K,V> collector;
private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
private final int partitions;
@SuppressWarnings("unchecked")
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
collector = createSortingCollector(job, reporter);
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
}
...

我们来看看构造函数之中,如果partitions大于1就从配置中读取我们自己的Partitioner对象并实例化给引用,否则自己就创建一个实例。
那partitions是从jobContext.getNumReduceTasks();读取出来的,这个要怎么配置呢?


job.setNumReduceTasks(number);

配置该值之后,那么就可以使用我们自己定义的分区函数了。
好了,文章到这里也就结束了,欢迎大家拍砖!!!

链接


就这里的例子,数据这里也有

人生苦短,我要打赏!