00 背景
随着Kafka版本的升级,Storm作为消费者对接Kafka 0.10.x+版本的方式上,与之前存在差异,现将新的方式记录下来,为以后遇到使用Storm实时处理新版Kafka数据的场景时提供参考。
01 架构简介
架构如下图所示。
使用Flume组件采集数据时,采用双层架构,第一层的作用是采集,第二层的作用是聚合,这种架构能够达到负载均衡的效果。第二层会将数据发送到Kafka,Storm会实时从Kafka读取数据,根据需求进行处理,然后将处理后的数据发送到对应的存储层。本篇重点关注Storm从Kafka读取并处理数据。
02 演示
第一步:向采集文件写入数据
第二步:观察Kafka topic情况由以上说明,数据成功写入Kafka。
第三步:执行Storm word count程序,查看结果
03代码
Storm处理数据流程如下图所示。
KafkaWordCountTopology部分
/**
* @author ccc
*/
public class KafkaWordCountTopology {
private static final String TOPICS = "word-count";
private static final String KEYS = "word";
private static final String BOOTSTRAP_SERVERS = "master:9092,slave:9092,slave3:9092";
private static final String KAFKA_WORD_COUT_SPOUT_ID = "KafkaWordCountSpout";
private static final String SPLIT_SENTENCE_BOLT_ID = "SplitSentenceBolt";
private static final String KAFKA_WORD_COUNT_BOLT_ID = "KafkaWordCountBolt";
private static final String KAFKA_OUTPUT_BOLT_ID = "KafkaOutputBolt";
public static void main(String[] args) throws Exception {
KafkaSpoutConfig kafkaSpoutConfig = KafkaSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPICS)
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
.setTupleTrackingEnforced(true)
.build();
KafkaSpout kafkaSpout = new KafkaSpout(kafkaSpoutConfig);
TopologyBuilder tp = new TopologyBuilder();
tp.setSpout(KAFKA_WORD_COUT_SPOUT_ID, kafkaSpout, 2);
tp.setBolt(SPLIT_SENTENCE_BOLT_ID, new KafkaSplitSentenceBolt(), 2)
.setNumTasks(2)
.shuffleGrouping(KAFKA_WORD_COUT_SPOUT_ID);
tp.setBolt(KAFKA_WORD_COUNT_BOLT_ID, new KafkaWordCountBolt(), 4)
.setNumTasks(2)
.fieldsGrouping(SPLIT_SENTENCE_BOLT_ID, new Fields(KEYS));
tp.setBolt(KAFKA_OUTPUT_BOLT_ID, new KafkaOutputBolt(), 2)
.setNumTasks(2)
.globalGrouping(KAFKA_WORD_COUNT_BOLT_ID);
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
// 提交集群
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, tp.createTopology());
} else {
// 本地测试
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka-word-count-topology", conf,
tp.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
KafkaSplitSentenceBolt部分
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* @author ccc
*/
public class KafkaSplitSentenceBolt implements IBasicBolt {
@Override
public void prepare(Map stormConf, TopologyContext context) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String poetry = tuple.getStringByField("value");
List sentences = Arrays.asList(poetry.split(","));
sentences.forEach(sentence -> {
List words = Arrays.asList(sentence.replace("。", "").split(""));
words.forEach(word -> collector.emit(new Values(word)));
});
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map getComponentConfiguration() {
return null;
}
}
KafkaWordCountBolt部分
import java.util.HashMap;
import java.util.Map;
/**
* @author ccc
*/
public class KafkaWordCountBolt implements IBasicBolt {
private Map wordCounts = new HashMap();
@Override
public void prepare(Map stormConf, TopologyContext context) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getStringByField("word");
Integer counts = wordCounts.get(word);
if (counts == null) {
counts = 0;
}
counts++;
wordCounts.put(word, counts);
collector.emit(new Values(word, counts));
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "counts"));
}
@Override
public Map getComponentConfiguration() {
return null;
}
}
KafkaOutputBolt部分
import java.util.*;
/**
* @author ccc
*/
public class KafkaOutputBolt implements IBasicBolt {
private Map wordCounts = new HashMap();
@Override
public void prepare(Map stormConf, TopologyContext context) {
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getStringByField("word");
Integer counts = tuple.getIntegerByField("counts");
this.wordCounts.put(word, counts);
}
@Override
public void cleanup() {
Set keySet = wordCounts.keySet();
List keyList = new ArrayList();
keyList.addAll(keySet);
Collections.sort(keyList);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map getComponentConfiguration() {
return null;
}
}
代码大体框架如上面所示,在实际开发过程中可以对其进行优化。
领取专属 10元无门槛券
私享最新 技术干货