mapreduce计算uv

为什么写这篇文件?
我们在统计的时候pv和uv可以说的最基础的也是最常见的,相信做数据的都知道。这种需求
我们一般就是使用hive进行统计就完事了,非常的简单。
根据url计算每个页面的访问次数和独立访客用户数。

为什么写这篇文件?

我们在统计的时候pv和uv可以说的最基础的也是最常见的,相信做数据的都知道。这种需求
我们一般就是使用hive进行统计就完事了,非常的简单。
根据url计算每个页面的访问次数和独立访客用户数。

1
2
select count(gi) as 'pv', count(distinct gi) as 'uv' from
table where cdate = '2016-06-01' group by url

那好,我们通过mapreduce如何计算呢?
我想我大多数人都是通过在reduce中使用Set或者List进行判断是否在集合中存在,
如果不存在那么就加1。
事实却是如此,我搜索发现很多blog都是此方法并且内容大致相同,包括我最开始写的mapreduce也是
按照这种方法做的。

但是,使用这种方法做数据量小看不出问题,但是数据量一旦非常大就马上出现问题。
因为你的数据放在了内存,很容易就oom了。

其实我们需要通过两个mapreduce进行计算。
第一个map就是分割url+uid作为key,value为1
数据格式如下:

1
2
3
http://www.google.com,zhangsan 1
http://www.google.com,zhangsan 1
http://www.google.com,zhangsan 1

相同的key值发送到同一个reduce中,这样的话zhangsan的数据都为1了,reduce不用做什么就是
把key写入就行。

然后到了第二个map中,我们将第一个reduce的数据进行拆解就得到了url和uid的数据了
由于在第一个mr中已经将相同的uid和url归为一类,所以不会存在重复数据,所以这里就和
wordcount一样计算就行了。

实践

上面已经说了这么多了,是不是感觉很乏味了。来看看代码醒醒脑吧,嘿嘿嘿~

使用hadoop2.7.0

测试数据:

1
2
3
4
5
6
7
8
9
10
http://www.google.com,2016-01-02,dsadasd-dasd-as-das
https://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop-name-node/,2016-01-02,000-111-11-22
http://www.jd.com/?keyword=dadas&keywordid=34879410794&re_dcp=202m0QjIIg==&traffic_source=1004&test=1&enc=utf8&cu=true&utm_source=baidu-search&utm_medium=cpc&utm_campaign=t_262767352_baidusearch&utm_term=34879410794_0_b0d37d1995654fdb9c013c4eb7544071,2016-01-02,dasdsa-ds-ad-as-da
http://mall.jd.com/index-56654.html,2016-01-02,d99dsa-dsdasdsa-dasdj
http://mall.jd.com/index-56654.html,2016-01-02,d99dsa-dsdasdsa-dasdj
http://mall.jd.com/index-56654.html,2016-01-02,d99dsa-dsddddd-dsss
http://mall.jd.com/index-56654.html,2016-01-02,d99dsa-dsdasdsa-dasdj
http://item.jd.com/3148810.html,2016-01-02,d99dsa-dsdasdsa-dasdj
http://item.jd.com/3148810.html,2016-01-02,d99dsa-dsdasdasda-sadas
http://item.jd.com/3148762.html,2016-01-02,d99dsa-dsdasdsa-xxxx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class UVMapper extends Mapper<Object, Text, Text, LongWritable> {
private Text k = new Text();
private LongWritable v = new LongWritable(1);
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] tokens = line.split(",");
// url + uid
k.set(tokens[0] + "," + tokens[2]);
context.write(k, v);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class UVReducer extends Reducer<Text, LongWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class UVMapperUp extends Mapper<Object, Text, Text, LongWritable> {
private Text k = new Text();
private LongWritable v = new LongWritable(1);
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] tokens = line.split(",");
if (tokens.length != 2) {
return ;
}
String url = tokens[0];
k.set(url);
context.write(k, v);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class UVReducerUp extends Reducer<Text, LongWritable, Text, LongWritable> {
private LongWritable res = new LongWritable();
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable val : values) {
sum += val.get();
}
res.set(sum);
context.write(key, res);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class UVApp extends Configured implements Tool {
public static void main(String[] args) {
try {
args = new String[]{"in/browse.txt", "uv_out", "f_uv_out"};
ToolRunner.run(new Configuration(), new UVApp(), args);
} catch (Exception e) {
e.printStackTrace();
}
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job1 = Job.getInstance(conf, "uv");
Job job2 = Job.getInstance(conf, "uv");
job1.setJarByClass(UVApp.class);
job2.setJarByClass(UVApp.class);
job1.setMapperClass(UVMapper.class);
job1.setReducerClass(UVReducer.class);
job2.setMapperClass(UVMapperUp.class);
job2.setReducerClass(UVReducerUp.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(LongWritable.class);
job2.setMapOutputKeyClass(Text.class);
job2.setOutputValueClass(LongWritable.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(NullWritable.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path(args[1]));
FileInputFormat.addInputPath(job2, new Path(args[1]));
FileOutputFormat.setOutputPath(job2, new Path(args[2]));
int code = job1.waitForCompletion(true) ? 0 : 1;
if(code != 0){
System.exit(1);
}
return job2.waitForCompletion(true) ? 0 : 1;
}
}

总结

一般使用分布式框架表示我们数据是比较大的,放内存肯定是不合理的。
看来代码质量有待提高!!!

人生苦短,我要打赏!