storm整合kafka重复消费问题分析

为什么写这篇文章?
最近在整合storm+kafka一直纠结于重复数据的读取,重新启动topology更是把kafka的数据扫描一遍,
【如果线上逻辑较重,并且还要往数据库里面插入数据是不是有很多重复数据了!】

为什么写这篇文章?

最近在整合storm+kafka一直纠结于重复数据的读取,重新启动topology更是把kafka的数据扫描一遍,
【如果线上逻辑较重,并且还要往数据库里面插入数据是不是有很多重复数据了!】

软件环境

zookeeper-3.4.6.tar.gz
kafka_2.9.2-0.8.1.1
apache-storm-1.0.1.tar.gz

实践出真知

那我们知道这kafka和storm都是依赖zk的,并且我们在创建topology的时候也是把offset写入到zk
但是一开始的程序是非常奇怪的,zk并没有创建我所指定的目录和id。

先来看一个”错误”的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package cn.base.sk.ex03;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
public class SplitBolt extends BaseRichBolt {
private static final long serialVersionUID = -1380001209433177193L;
private OutputCollector collector = null;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple input) {
String word = input.getString(0);
System.out.println("source data => " + word);
//collector.ack(input);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package cn.base.sk.ex03;
import java.util.Arrays;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import cn.base.sk.ex02.MyKafkaTopology;
public class KafkaTopology {
public static void main(String[] args) {
String zks = "localhost:2181/kafka";
String topic = "topic2";
String zkRoot = "/topic2";
String id = "split";
BrokerHosts brokerHosts = new ZkHosts(zks);
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.zkServers = Arrays.asList(new String[] {"localhost"});
spoutConf.zkPort = 2181;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spoutx", new KafkaSpout(spoutConf));
builder.setBolt("word-splitx", new SplitBolt()).shuffleGrouping("kafka-spoutx");
Config conf = new Config();
String name = MyKafkaTopology.class.getSimpleName();
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
// Utils.sleep(10000);
// cluster.shutdown();
}
}
`

上面这个例子是无法在zk中创建/topic2/split的,至于为什么我在后面会说明。
由于也是最近几天才开始撸起来的所以我就各种搜索,在一个blog中找到了说明

原文
此处需要特别注意的是,要使用backtype.storm.topology.base.BaseBasicBolt对象作为父类,否则不会在zk记录偏移量offset数据。

后来我修改bolt继承该类确实在zk中创建出了topic,但是至于为什么并没有详细说明。
我们先来看看修改后的code。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package cn.base.sk.ex02;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.Logger;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class KafkaWordSplitter extends BaseBasicBolt {
private static Log logger = LogFactory.getLog(KafkaWordSplitter.class);
private OutputCollector collector;
public void execute(Tuple input, BasicOutputCollector collector) {
String line = input.getString(0);
System.out.println("RECV[kafka -> splitter] " + line);
String[] words = line.split(",");
for (String word : words) {
System.out.println("EMIT[splitter -> counter] " + word);
collector.emit(new Values(word, 1));
}
// collector.ack(input);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
`
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package cn.base.sk.ex02;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
public class WordCounter extends BaseBasicBolt {
private static Log logger = LogFactory.getLog(WordCounter.class);
private OutputCollector collector;
private Map<String, AtomicInteger> countMap;
@Override
public void prepare(Map stormConf, TopologyContext context) {
countMap = new HashMap<String, AtomicInteger>();
}
@Override
public void cleanup() {
System.out.println("The final result:");
Iterator<Entry<String, AtomicInteger>> iter = this.countMap.entrySet().iterator();
while (iter.hasNext()) {
Entry<String, AtomicInteger> entry = iter.next();
System.out.println(entry.getKey() + "\t:\t" + entry.getValue().get());
}
}
public void execute(Tuple input, BasicOutputCollector collector) {
String word = input.getString(0);
Integer count = input.getInteger(1);
System.out.println("RECV[splitter -> counter] " + word + " : " + count);
AtomicInteger ai = this.countMap.get(word);
if (ai == null) {
ai = new AtomicInteger(1);
this.countMap.put(word, ai);
}else {
ai.addAndGet(count);
// collector.ack(input);
System.out.println("CHECK statistics map: " + this.countMap);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package cn.base.sk.ex02;
import org.apache.kafka.common.utils.Utils;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import scala.actors.threadpool.Arrays;
public class MyKafkaTopology {
public static void main(String[] args) {
String zks = "localhost:2181/kafka";
String topic = "topic1";
String zkRoot = "/topic1";
String id = "word";
BrokerHosts brokerHosts = new ZkHosts(zks);
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.zkServers = Arrays.asList(new String[] {"localhost"});
spoutConf.zkPort = 2181;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(spoutConf));
builder.setBolt("word-split", new KafkaWordSplitter()).shuffleGrouping("kafka-spout");
builder.setBolt("word-count", new WordCounter()).fieldsGrouping("word-split", new Fields("word"));
Config conf = new Config();
String name = MyKafkaTopology.class.getSimpleName();
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(name, conf, builder.createTopology());
// Utils.sleep(10000);
// cluster.shutdown();
}
}

原谅我写了两个例子吧!
好的,上面一大段代码是修改过的。此时进入zkcli已经创建出来了我们所需的路径
并且已经记录了offset

zk数据

读取数据的时候就从这里开始了。
那好,为啥继承了BaseBasicBolt类就可以,而BaseRichBolt类就不行呢。

走进源码

首先看看KafkaSpout类的open方法做了一些初始化的工作
下图才是我么要看的

kafkaSpout!nextTuple

不用在意其它方法,直接进入commit()方法
kafkaSpout!commit

看到没, 只要if成立就会在zk中创建数据。但是为什么不能进入呢,来看看lastCompletedOffset
kafkaSpout!lastCompletedOffset

当你debug到这里的时候首先获取的是第一个key,这个map的key是offset,value是timestamp
读一次会和上一次进行比较,最终在里面重新赋值最新的offset。

仔细观察,如果继承BaseRichSpout类,调用过后map的key依旧存在,而BaseBasicBolt会进行删除,如果不删除的话会在commit判断时候一直相等。

那么,是在什么时候进行删除的呢?如果是你,你会想在什么时候把这份数据进行删除?
对的,当我们确认完毕这条数据被消费后,我们可以进行删除了。

在进行ack之后,我们看到删除map的数据,这样就顺利的在zk里面创建并写入数据。
kafkaSpout!ack

那么,如果我就想继承自BaseRichBolt类,那有办法实现吗?肯定的,你只需要自己ack一下就行了
UserBolt!ack

ok,此时你在次打开zkcli查看就存在指定的目录和id,并且重启topology也不会重新读取历史。

总结

BaseBasicBolt没有提供ack而是隐示进行了调用,而BaseRichSpout需要显示调用。

结尾

参考:http://www.howardliu.cn/a-few-notes-about-storm/

人生苦短,我要打赏!