mapreduce join

前言
我们知道hive,mysql等sql语言都可以进行join操作。那么mapreduce是如何join的呢?
在说明mapreduce进行join开始,我们来先看看sql的语法。

前言

我们知道hive,mysql等sql语言都可以进行join操作。那么mapreduce是如何join的呢?
在说明mapreduce进行join开始,我们来先看看sql的语法。

1
2
3
4
5
6
7
8
SELECT
tab1.*,
tab2.*
FROM
table1 tab1
JOIN
table2 tb2
on tab1.id = tab2.id

两张表关联在一起,需要什么数据,就从不同的表中取出数据即可。

mapreduce如何JOIN

通过前面的铺垫,我们知道sql进行表关联是多么的简单。那么通过程序如何进行关联呢?
我们hive的数据是不是从HDFS上来的,HDFS上是不是文件数据。那么我们可以把表数据看成是一个文件。
那么两张表可不可以看成两个文件,hive也是拿到这两个文件进行关联的。

现在如果不使用MR进行计算,写一个程序来进行连接呢?
文件A假设1GB,文件B500MB。
此时,我么可以把文件B读入到内存之中。
文件B的数据结构:

1
Map<String, List<String>>

然后一行一行读取文件A的数据,判断key是否存在,如果存在,则把文件A的值+List的值遍历输出
是不是也可以得到呢。

上面的数据可以放置在内存中,我觉得挺合理的,应为单机能处理的数据表示数据量普遍不算很大。
但是,如果说你现在的数据有1T呢?还有可能放置在内存中吗?

我看过大部分博客内容,大部分都是相同的写法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static class MyReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
LinkedList<String> linkU = new LinkedList<String>(); //users值
LinkedList<String> linkL = new LinkedList<String>(); //login_logs值
for (Text tval : values) {
String val = tval.toString();
if(val.startsWith("u#")) {
linkU.add(val.substring(2));
} else if(val.startsWith("l#")) {
linkL.add(val.substring(2));
}
}
for (String u : linkU) {
for (String l : linkL) {
context.write(key, new Text(u + DELIMITER + l));
}
}
}
}

这种写法对吗,是对的。但是有没有更好的,有。(稍后让你看,😏😏😏😏)
但是,我么通常也会遇到数据倾斜的问题,可能是某一组值特别的大,那么如果在JOIN的时候
也遇到了特别多相同的key值,那么内存还放得下吗?
不过这位使用LinkedList也是非常不错的,插入删除速度也是优于ArrayList(点个赞)。

好了,说了这么多。你真的能比这些人的内容写的好的吗?不会再吹牛吧(尴尬表情)

来说说我如何去做。
自定义key值之后,由于我不是放入内存的,所以字段输出的顺序可能是有点问题的
所以还要进行二次排序,到reduce的时候一组数据已经在一起了,我么设置一个boolean值
在第一次遍历值的时候不写入文件中,而是记录在一个字符串,然后第二次boolean更改后
把上一次和这一次的值一起写入。

来看看代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class Pair implements WritableComparable<Pair> {
private Text first;
private Text second;
public Pair(String first, String second) {
set(new Text(first), new Text(second));
}
public Pair() {
set(new Text(), new Text());
}
public Pair(Text first, String second) {
set(first, new Text(second));
}
public Pair(String first, Text second) {
set(new Text(first), second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Pair) {
Pair pair = (Pair) obj;
return first.equals(pair.first) && second.equals(pair.second);
}
return false;
}
@Override
public String toString() {
return this.first.toString();
}
@Override
public int compareTo(Pair pair) {
// int cmp = first.compareTo(pair.first);
int cmp = pair.first.compareTo(first);
if (cmp != 0) {
return cmp;
}
// return second.compareTo(pair.second);
return pair.second.compareTo(second);
}
public int compareTo(Pair pair, int index) {
if (index == 1) {
// return this.first.compareTo(pair.first);
return pair.first.compareTo(this.first);
} else {
// return this.second.compareTo(pair.second);
return pair.second.compareTo(this.second);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class LJoinMapper extends Mapper<Object, Text, Pair, Text> {
private Text key = new Text();
private Text val = new Text();
@Override
protected void map(Object line, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split(",");
if (tokens.length < 3) {
return;
}
// key
String k = tokens[0] + "," + tokens[1];
Pair pair = new Pair(k, "0");
// val
// String v = "left" + "," + tokens[2];
String v = tokens[2];
// key.set(k);
val.set(v);
// context.write(key, val);
context.write(pair, val);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RJoinMapper extends Mapper<Object, Text, Pair, Text> {
// private Text key = new Text();
private Text val = new Text();
@Override
protected void map(Object line, Text value, Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split(",");
if (tokens.length < 5) {
return;
}
// key
String k = tokens[0] + "," + tokens[1];
Pair pair = new Pair(k, "1");
// val
// String v = "right" + "," + tokens[2] + "," + tokens[3] + "," + tokens[4];
String v = tokens[2] + "," + tokens[3] + "," + tokens[4];
// key.set(k);
val.set(v);
// context.write(key, val);
context.write(pair, val);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class JoinReducer extends Reducer<Pair, Text, NullWritable, Text> {
private Text val = new Text();
@Override
protected void reduce(Pair key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String deptName = null;
boolean set = false;
for (Text v : values) {
String[] vs = v.toString().split(",");
if (!set) {
deptName = v.toString();
set = true;
} else {
System.out.println(key.toString() + "," + deptName + "," + v);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class JOINGroup extends WritableComparator {
public JOINGroup() {
super(Pair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Pair keyA = (Pair) a;
Pair keyB = (Pair) b;
// return keyA.compareTo(keyB, 1);
return keyB.compareTo(keyA, 1);
}
}
1
2
3
4
5
6
7
8
9
public class JOINPartition extends Partitioner<Pair, Text> {
@Override
public int getPartition(Pair key, Text value, int numPartitions) {
return (key.getFirst().hashCode() % numPartitions);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class JOINSort extends WritableComparator {
public JOINSort() {
super(Pair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Pair compositeKey1 = (Pair) a;
Pair compositeKey2 = (Pair) b;
return compositeKey2.compareTo(compositeKey1);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Dirver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 3) {
System.out.printf("Usage: %s [generic options] <input dir> <output dir>\n", getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "join");
job.setJarByClass(getClass());
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, LJoinMapper.class);
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, RJoinMapper.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setReducerClass(JoinReducer.class);
job.setGroupingComparatorClass(JOINGroup.class);
job.setPartitionerClass(JOINPartition.class);
job.setSortComparatorClass(JOINSort.class);
job.setMapOutputKeyClass(Pair.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) {
try {
args = new String[] {"in/l", "in/r", "ljoinout"};
ToolRunner.run(new Configuration(), new Dirver(), args);
} catch (Exception e) {
e.printStackTrace();
}
}

测试数据:

l.txt

1
2
3
4
1,job,beijing
2,jue,shanghai
3,role,shenzhen
4,jie,guangzhou

r.txt

1
2
3
4
5
6
1,job,30,man,333330000
2,jue,90,woman,9384832
3,role,100,man,9103841038
4,jie,0,man,103848103474
1,job,20,man,333330000
1,job,10,man,333330000

以下是输出结果:
join


以上程序作为内联展示给了大家,如果对文章内容有疑问,或者有更好的建议,又或者有土豪打赏
都不要吝啬。谢谢!

还不错的文章链接

http://codingjunkie.net/mapreduce-reduce-joins/
https://www.safaribooksonline.com/library/view/data-algorithms/9781491906170/ch01.html
https://chamibuddhika.wordpress.com/2012/02/26/joins-with-map-reduce/

方便后人,方便自己!!!

人生苦短,我要打赏!