Storm作为新消费者对接Kafka 0.10.x+版本

00 背景

随着Kafka版本的升级,Storm作为消费者对接Kafka 0.10.x+版本的方式上,与之前存在差异,现将新的方式记录下来,为以后遇到使用Storm实时处理新版Kafka数据的场景时提供参考。

01 架构简介

架构如下图所示。

使用Flume组件采集数据时,采用双层架构,第一层的作用是采集,第二层的作用是聚合,这种架构能够达到负载均衡的效果。第二层会将数据发送到Kafka,Storm会实时从Kafka读取数据,根据需求进行处理,然后将处理后的数据发送到对应的存储层。本篇重点关注Storm从Kafka读取并处理数据。

02 演示

第一步:向采集文件写入数据

第二步:观察Kafka topic情况由以上说明,数据成功写入Kafka。

第三步:执行Storm word count程序,查看结果

03代码

Storm处理数据流程如下图所示。

KafkaWordCountTopology部分

/**

* @author ccc

*/

public class KafkaWordCountTopology {

private static final String TOPICS = "word-count";

private static final String KEYS = "word";

private static final String BOOTSTRAP_SERVERS = "master:9092,slave:9092,slave3:9092";

private static final String KAFKA_WORD_COUT_SPOUT_ID = "KafkaWordCountSpout";

private static final String SPLIT_SENTENCE_BOLT_ID = "SplitSentenceBolt";

private static final String KAFKA_WORD_COUNT_BOLT_ID = "KafkaWordCountBolt";

private static final String KAFKA_OUTPUT_BOLT_ID = "KafkaOutputBolt";

public static void main(String[] args) throws Exception {

KafkaSpoutConfig kafkaSpoutConfig = KafkaSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPICS)

.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)

.setTupleTrackingEnforced(true)

.build();

KafkaSpout kafkaSpout = new KafkaSpout(kafkaSpoutConfig);

TopologyBuilder tp = new TopologyBuilder();

tp.setSpout(KAFKA_WORD_COUT_SPOUT_ID, kafkaSpout, 2);

tp.setBolt(SPLIT_SENTENCE_BOLT_ID, new KafkaSplitSentenceBolt(), 2)

.setNumTasks(2)

.shuffleGrouping(KAFKA_WORD_COUT_SPOUT_ID);

tp.setBolt(KAFKA_WORD_COUNT_BOLT_ID, new KafkaWordCountBolt(), 4)

.setNumTasks(2)

.fieldsGrouping(SPLIT_SENTENCE_BOLT_ID, new Fields(KEYS));

tp.setBolt(KAFKA_OUTPUT_BOLT_ID, new KafkaOutputBolt(), 2)

.setNumTasks(2)

.globalGrouping(KAFKA_WORD_COUNT_BOLT_ID);

Config conf = new Config();

conf.setDebug(true);

if (args != null && args.length > 0) {

// 提交集群

conf.setNumWorkers(3);

StormSubmitter.submitTopologyWithProgressBar(args[0], conf, tp.createTopology());

} else {

// 本地测试

conf.setMaxTaskParallelism(3);

LocalCluster cluster = new LocalCluster();

cluster.submitTopology("kafka-word-count-topology", conf,

tp.createTopology());

Thread.sleep(10000);

cluster.shutdown();

}

}

}

KafkaSplitSentenceBolt部分

import java.util.Arrays;

import java.util.List;

import java.util.Map;

/**

* @author ccc

*/

public class KafkaSplitSentenceBolt implements IBasicBolt {

@Override

public void prepare(Map stormConf, TopologyContext context) {

}

@Override

public void execute(Tuple tuple, BasicOutputCollector collector) {

String poetry = tuple.getStringByField("value");

List sentences = Arrays.asList(poetry.split(","));

sentences.forEach(sentence -> {

List words = Arrays.asList(sentence.replace("。", "").split(""));

words.forEach(word -> collector.emit(new Values(word)));

});

}

@Override

public void cleanup() {

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"));

}

@Override

public Map getComponentConfiguration() {

return null;

}

}

KafkaWordCountBolt部分

import java.util.HashMap;

import java.util.Map;

/**

* @author ccc

*/

public class KafkaWordCountBolt implements IBasicBolt {

private Map wordCounts = new HashMap();

@Override

public void prepare(Map stormConf, TopologyContext context) {

}

@Override

public void execute(Tuple tuple, BasicOutputCollector collector) {

String word = tuple.getStringByField("word");

Integer counts = wordCounts.get(word);

if (counts == null) {

counts = 0;

}

counts++;

wordCounts.put(word, counts);

collector.emit(new Values(word, counts));

}

@Override

public void cleanup() {

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word", "counts"));

}

@Override

public Map getComponentConfiguration() {

return null;

}

}

KafkaOutputBolt部分

import java.util.*;

/**

* @author ccc

*/

public class KafkaOutputBolt implements IBasicBolt {

private Map wordCounts = new HashMap();

@Override

public void prepare(Map stormConf, TopologyContext context) {

}

@Override

public void execute(Tuple tuple, BasicOutputCollector collector) {

String word = tuple.getStringByField("word");

Integer counts = tuple.getIntegerByField("counts");

this.wordCounts.put(word, counts);

}

@Override

public void cleanup() {

Set keySet = wordCounts.keySet();

List keyList = new ArrayList();

keyList.addAll(keySet);

Collections.sort(keyList);

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

@Override

public Map getComponentConfiguration() {

return null;

}

}

代码大体框架如上面所示,在实际开发过程中可以对其进行优化。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181021G03GZR00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券