专栏首页数据之美storm kafka 编程指南

storm kafka 编程指南

一、原理及关键步骤介绍

storm中的storm-kafka组件提供了storm与kafka交互的所需的所有功能,请参考其官方文档:https://github.com/apache/storm/tree/master/external/storm-kafka#brokerhosts

(一)使用storm-kafka的关键步骤

1、创建ZkHosts

当storm从kafka中读取某个topic的消息时,需要知道这个topic有多少个分区,以及这些分区放在哪个kafka节点(broker)上,ZkHosts就是用于这个功能。  创建zkHosts有2种形式:

   public ZkHosts(String brokerZkStr, String brokerZkPath) 
   public ZkHosts(String brokerZkStr)

(1)默认情况下,zk信息被放到/brokers中,此时可以使用第2种方式: 

new ZkHosts(“192.168.172.117:2181,192.168.172.98:2181,192.168.172.111:2181,192.168.172.114:2181,192.168.172.116:2181”);

(2)若zk信息被放置在/kafka/brokers中(我们的集群就是这种情形),则可以使用: 

new ZkHosts(“192.168.172.117:2181,192.168.172.98:2181,192.168.172.111:2181,192.168.172.114:2181,192.168.172.116:2181”,“/kafka”)

或者直接:

new ZkHosts("192.168.172.117:2181,192.168.172.98:2181,192.168.172.111:2181,192.168.172.114:2181,192.168.172.116:2181/kafka”)

默认情况下,每60秒去读取一次kafka的分区信息,可以通过修改host.refreshFreqSecs来设置。

(3)除了使用ZkHosts来读取分析信息外,storm-kafka还提供了一种静态指定的方法(不推荐此方法),如:

    Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
    Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
    Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
    GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
    partitionInfo.addPartition(0, brokerForPartition0);//mapping form partition 0 to brokerForPartition0
    partitionInfo.addPartition(1, brokerForPartition1);//mapping form partition 1 to brokerForPartition1
    partitionInfo.addPartition(2, brokerForPartition2);//mapping form partition 2 to brokerForPartition2
    StaticHosts hosts = new StaticHosts(partitionInfo);

由此可以看出,ZkHosts完成的功能就是指定了从哪个kafka节点读取某个topic的哪个分区。

2、创建KafkaConfig

(1)有2种方式创建KafkaConfig 

public KafkaConfig(BrokerHosts hosts, String topic) 
public KafkaConfig(BrokerHosts hosts, String topic, String clientId) 

BrokerHosts就是上面创建的实例,topic就是要订阅的topic名称,clientId用于指定存放当前topic consumer的offset的位置,这个id 应该是唯一的,否则多个拓扑会引起冲突。  事实上,trident的offset并不保存在这个位置,见下面介绍。  真正使用时,有2种扩展,分别用于一般的storm以及trident。  (2)core storm 

Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer’s offset. The id should uniquely identify your spout.  public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);  public SpoutConfig(BrokerHosts hosts, String topic, String id);  In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves:

    // setting for how often to save the current kafka offset to ZooKeeper
    public long stateUpdateIntervalMs = 2000;

    // Exponential back-off retry settings.  These are used when retrying messages after a bolt
    // calls OutputCollector.fail().
    // Note: be sure to set backtype.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
    // resubmitting the message while still retrying.
    public long retryInitialDelayMs = 0;
    public double retryDelayMultiplier = 1.0;
    public long retryDelayMaxMs = 60 * 1000;

KafkaSpout 只接受 SpoutConfig作为参数

(3)TridentKafkaConfig,TridentKafkaEmitter只接受TridentKafkaConfig使用参数  trident消费kafka的offset位置是在建立拓扑中指定,如:

topology.newStream(test, kafkaSpout).

则offset的位置为:

/transactional/test/coordinator/currtx

(4)KafkaConfig的一些默认参数

    public int fetchSizeBytes = 1024 * 1024;
    public int socketTimeoutMs = 10000;
    public int fetchMaxWait = 10000;
    public int bufferSizeBytes = 1024 * 1024;
    public MultiScheme scheme = new RawMultiScheme();
    public boolean forceFromStart = false;
    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
    public long maxOffsetBehind = Long.MAX_VALUE;
    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
    public int metricsTimeBucketSizeInSecs = 60;

可以通过以下方式修改:

kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

3、设置MultiScheme

MultiScheme用于指定如何处理从kafka中读取到的字节,同时它用于控制输出字段名称。

 public Iterable<List<Object>> deserialize(byte[] ser);
 public Fields getOutputFields();

默认情况下,RawMultiScheme读取一个字段并返回一个字节,而发射的字段名称为bytes。  可以通过SchemeAsMultiScheme和 KeyValueSchemeAsMultiScheme改变这种默认行为:

 kafkaConfig.scheme =new SchemeAsMultiScheme(new StringScheme());

上面的语句指定了将字节转化为字符。  同时建立拓扑时:

topology.newStream(“test",kafkaSpout).each(new Fields("str"),new FilterFunction(),new Fields("word”))….

会指定发射的字段名称为str。

4、创建Spout

(1)core storm

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

(2)trident

 OpaqueTridentKafkaSpoutkafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);

5、建立拓扑:

(1)core-storm

builder.setSpout("kafka-reader",new KafkaSpout(spoutConf),12);

kafka-reader指定了spout的名称,12指定了并行度。

(2)trident

topology.newStream(“test", kafkaSpout). each(new Fields("str"), new FilterFunction(),new Fields("word”))….

test指定了放置offset的位置,也就是txid的位置。str指定了spout发射字段的名称。

完整示例:  Core Spout

BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

Trident Spout

TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);

(二)当拓扑出错时,如何从上一次的kafka位置继续处理消息

1、KafkaConfig.startOffsetTime

KafkaConfig有一个配置项为KafkaConfig.startOffsetTime,它用于指定拓扑从哪个位置上开始处理消息,可取的值有3个:  (1)kafka.api.OffsetRequest.EarliestTime(): 从最早的消息开始  (2)kafka.api.OffsetRequest.LatestTime(): 从最新的消息开始,即从队列队伍最末端开始。  (3)根据时间点:

kafkaConfig.startOffsetTime =  new SimpleDateFormat("yyyy.MM.dd-HH:mm:ss").parse(startOffsetTime).getTime();

可以参阅 How do I accurately get offsets of messages for a certain timestamp using OffsetRequest? 的实现原理。 

How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?  Kafka allows querying offsets of messages by time and it does so at segment granularity.The timestamp parameter is the unix timestamp and querying the offset by timestamp returns the latest possible offset of the message that is appended no later than the given timestamp. There are 2 special values of the timestamp - latest and earliest. For any other value of the unix timestamp, Kafka will get the starting offset of the log segment that is created no later than the given timestamp. Due to this, and since the offset request is served only at segment granularity, the offset fetch request returns less accurate results for larger segment sizes.  For more accurate results, you may configure the log segment size based on time (log.roll.ms) instead of size (log.segment.bytes). However care should be taken since doing so might increase the number of file handlers due to frequent log segment rolling. 

2、由于运行拓扑时,指定了offset在zk中保存的位置,当出现错误时,可以找出offset  当重新部署拓扑时,必须保证offset的保存位置不变,它才能正确的读取到offset。  (1)对于core storm,就是

SpoutConfigspoutConf = new SpoutConfig(brokerHosts,topic, zkRoot,id);

后2个参数不能变化  (2)对于trident而言,就是

topology.newStream(“test", kafkaSpout).

第1个参数不能变化。  3、也就是说只要拓扑运行过一次KafkaConfig.startOffsetTime,之后重新部署时均可从offset中开始 再看看这2个参数

   public booleanforceFromStart =false;
   public long startOffsetTime= kafka.api.OffsetRequest.EarliestTime();

如果将forceFromStart(旧版本是ignoreZkOffsets)设置为true,则每次拓扑重新启动时,都会从开头读取消息。  如果为false,则:  第一次启动,从开头读取,之后的重启均是从offset中读取。  一般使用时,将数值设置为以上2个即可。

(三)结果写回kafka

如果想把结果写回kafka,并保证事务性,可以使用 storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and storm.kafka.trident.TridentKafkaUpdater.

以下是官方说明。 

Writing to Kafka as part of your topology  You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you are using trident you can use storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and storm.kafka.trident.TridentKafkaUpdater.  You need to provide implementation of following 2 interfaces TupleToKafkaMapper and TridentTupleToKafkaMapper  These interfaces have 2 methods defined:

    K getKeyFromTuple(Tuple/TridentTuple tuple);
    V getMessageFromTuple(Tuple/TridentTuple tuple);

as the name suggests these methods are called to map a tuple to kafka key and kafka message. If you just want one field as key and one field as value then you can use the provided FieldNameBasedTupleToKafkaMapper.Java implementation. In the KafkaBolt, the implementation always looks for a field with field name “key” and “message” if you use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility reasons. Alternatively you could also specify a different key and message field by using the non default constructor. In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor. These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper. KafkaTopicSelector and trident KafkaTopicSelector  This interface has only one method

publicinterface KafkaTopicSelector {
    StringgetTopics(Tuple/TridentTuple tuple);
}

The implementation of this interface should return topic to which the tuple’s key/message mapping needs to be published You can return a null and the message will be ignored. If you have one static topic name then you can use DefaultTopicSelector.java and set the name of the topic in the constructor. Specifying kafka producer properties  You can provide all the produce properties , seehttp://kafka.apache.org/documentation.html#producerconfigs section “Important configuration properties for the producer”, in your storm topology config by setting the properties map with key kafka.broker.properties.

附带2个官方的示例  For the bolt :

        TopologyBuilder builder = new TopologyBuilder();

        Fields fields = new Fields("key", "message");
        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
                    new Values("storm", "1"),
                    new Values("trident", "1"),
                    new Values("needs", "1"),
                    new Values("javadoc", "1")
        );
        spout.setCycle(true);
        builder.setSpout("spout", spout, 5);
        KafkaBolt bolt = new KafkaBolt()
                .withTopicSelector(new DefaultTopicSelector("test"))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
        builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");

        Config conf = new Config();
        //set producer properties.
        Properties props = new Properties();
        props.put("metadata.broker.list", "localhost:9092");
        props.put("request.required.acks", "1");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);

        StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());

For Trident:

        Fields fields = new Fields("word", "count");
        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
                new Values("storm", "1"),
                new Values("trident", "1"),
                new Values("needs", "1"),
                new Values("javadoc", "1")
        );
        spout.setCycle(true);

        TridentTopology topology = new TridentTopology();
        Stream stream = topology.newStream("spout1", spout);

        TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
                .withKafkaTopicSelector(new DefaultTopicSelector("test"))
                .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
        stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());

        Config conf = new Config();
        //set producer properties.
        Properties props = new Properties();
        props.put("metadata.broker.list", "localhost:9092");
        props.put("request.required.acks", "1");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
        StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());

二、完整示例

(一)基本订阅

基本场景:订阅kafka的某个topic,然后在读取的消息前加上自定义的字符串,然后写回到kafka另外一个topic。

从Kafka读取数据的Spout使用storm.kafka.KafkaSpout,向Kafka写数据的Bolt使用storm.kafka.bolt.KafkaBolt。中间进行进行数据处理的Bolt定义为TopicMsgBolt。闲言少叙,奉上代码:

public class TopicMsgTopology {
    public static void main(String[] args) throws Exception {
        // 配置Zookeeper地址
        BrokerHosts brokerHosts = new ZkHosts("zk1:2181,zk2:2281,zk3:2381");
        // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, "msgTopic1", "/topology/root", "topicMsgTopology");
        // 配置KafkaBolt中的kafka.broker.properties
        Config conf = new Config();
        Properties props = new Properties();
        // 配置Kafka broker地址
        props.put("metadata.broker.list", "dev2_55.wfj-search:9092");
        // serializer.class为消息的序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        conf.put("kafka.broker.properties", props);
        // 配置KafkaBolt生成的topic
        conf.put("topic", "msgTopic2");
        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("msgKafkaSpout", new KafkaSpout(spoutConfig));
        builder.setBolt("msgSentenceBolt", new TopicMsgBolt()).shuffleGrouping("msgKafkaSpout");
        builder.setBolt("msgKafkaBolt", new KafkaBolt<String, Integer>()).shuffleGrouping("msgSentenceBolt");
        if (args.length == 0) {
            String topologyName = "kafkaTopicTopology";
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(topologyName, conf, builder.createTopology());
            Utils.sleep(100000);
            cluster.killTopology(topologyName);
            cluster.shutdown();
        } else {
            conf.setNumWorkers(1);
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }
    }
}

storm.kafka.ZkHosts构造方法的参数是zookeeper标准配置地址的形式(ZooKeeper环境搭建可以查看ZooKeeper安装部署),zk1、zk2、zk3在本地配置了host,因为服务器使用的伪分布式模式,因此几个端口号不是默认的2181。

storm.kafka.SpoutConfig构造方法第一个参数为上述的storm.kafka.ZkHosts对象,第二个为待订阅的topic名称,第三个参数zkRoot为写读取topic时的偏移量offset数据的节点(zk node),第四个参数为该节点上的次级节点名(有个地方说这个是spout的id)。

backtype.storm.Config对象是配置storm的topology(拓扑)所需要的基础配置。

backtype.storm.spout.SchemeAsMultiScheme的构造方法输入的参数是订阅kafka数据的处理参数,这里的MessageScheme是自定义的,代码如下:

public class MessageScheme implements Scheme {
    private static final Logger logger = LoggerFactory.getLogger(MessageScheme.class);

    @Override
    public List<Object> deserialize(byte[] ser) {
        try {
            String msg = new String(ser, "UTF-8");
            logger.info("get one message is {}", msg);
            return new Values(msg);
        } catch (UnsupportedEncodingException ignored) {
            return null;
        }
    }

    @Override
    public Fields getOutputFields() {
        return new Fields("msg");
    }
}

MessageScheme类中getOutputFields方法是KafkaSpout向后发送tuple(storm传输数据的最小结构)的名字,需要与接收数据的Bolt中统一(在这个例子中可以不统一,因为后面直接取第0条数据,但是在wordCount的那个例子中就需要统一了)。

TopicMsgBolt类是从storm.kafka.KafkaSpout接收数据的Bolt,对接收到的数据进行处理,然后向后传输给storm.kafka.bolt.KafkaBolt。代码如下:

public class TopicMsgBolt extends BaseBasicBolt {
    private static final Logger logger = LoggerFactory.getLogger(TopicMsgBolt.class);

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String word = (String) input.getValue(0);
        String out = "Message got is '" + word + "'!";
        logger.info("out={}", out);
        collector.emit(new Values(out));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("message"));
    }
}

此处需要特别注意的是,要使用backtype.storm.topology.base.BaseBasicBolt对象作为父类,否则不会在zk记录偏移量offset数据。

需要编写的代码已完成,接下来就是在搭建好的storm、kafka中进行测试:

# 创建topic
./bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2281,zk3:2381 --replication-factor 1 --partitions 1 --topic msgTopic1
./bin/kafka-topics.sh --create --zookeeper zk1:2181,zk2:2281,zk3:2381 --replication-factor 1 --partitions 1 --topic msgTopic2

接下来需要分别对msgTopic1、msgTopic2启动producer(生产者)与consumer(消费者):

# 对msgTopic1启动producer,用于发送数据
./bin/kafka-console-producer.sh --broker-list dev2_55.wfj-search:9092 --topic msgTopic1
# 对msgTopic2启动consumer,用于查看发送数据的处理结果
./bin/kafka-console-consumer.sh --zookeeper zk1:2181,zk2:2281,zk3:2381 --topic msgTopic2 --from-beginning

然后将打好的jar包上传到storm的nimbus(可以使用远程上传或先上传jar包到nimbus节点所在服务器,然后本地执行):

# ./bin/storm jar topology TopicMsgTopology.jar cn.howardliu.demo.storm.kafka.topicMsg.TopicMsgTopology TopicMsgTopology

待对应的worker启动好之后,就可以在msgTopic1的producer对应终端输入数据,然后在msgTopic2的consumer对应终端查看输出结果了。

有几点需要注意的:

  1. 必须先创建msgTopic1、msgTopic2两个topic;
  2. 定义的bolt必须使用BaseBasicBolt作为父类,不能够使用BaseRichBolt,否则无法记录偏移量;
  3. zookeeper最好使用至少三个节点的分布式模式或伪分布式模式,否则会出现一些异常情况;
  4. 在整个storm下,spout、bolt的id必须唯一,否则会出现异常。
  5. TopicMsgBolt类作为storm.kafka.bolt.KafkaBolt前的最后一个Bolt,需要将输出数据名称定义为message,否则KafkaBolt无法接收数据。

(二)Topic 消费与回写:wordCount

简单的输入输出做完了,来点复杂点儿的场景:从某个topic定于消息,然后根据空格分词,统计单词数量,然后将当前输入的单词数量推送到另一个topic。

首先规划需要用到的类:

  1. 从KafkaSpout接收数据并进行处理的backtype.storm.spout.Scheme子类;
  2. 数据切分bolt:SplitSentenceBolt
  3. 计数bolt:WordCountBolt
  4. 报表bolt:ReportBolt
  5. topology定义:WordCountTopology
  6. 最后再加一个原样显示订阅数据的bolt:SentenceBolt

backtype.storm.spout.Scheme子类可以使用上面已经定义过的MessageScheme,此处不再赘述。

SplitSentenceBolt是对输入数据进行分割,简单的使用String类的split方法,然后将每个单词命名为“word”,向后传输,代码如下:

public class SplitSentenceBolt extends BaseBasicBolt {
    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String sentence = input.getStringByField("msg");
        String[] words = sentence.split(" ");
        Arrays.asList(words).forEach(word -> collector.emit(new Values(word)));
    }
}

SentenceBolt是从KafkaSpout接收数据,然后直接输出。在拓扑图上就是从输入分叉,一个进入SplitSentenceBolt,一个进入SentenceBolt。这种结构可以应用在Lambda架构中,代码如下:

public class SentenceBolt extends BaseBasicBolt {
    private static final Logger logger = LoggerFactory.getLogger(SentenceBolt.class);

    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        String msg = tuple.getStringByField("msg");
        logger.info("get one message is {}", msg);
        basicOutputCollector.emit(new Values(msg));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }
}

WordCountBolt是对接收到的单词进行汇总统一,然后将单词“word”及其对应数量“count”向后传输,代码如下:

public class WordCountBolt extends BaseBasicBolt {
    private Map<String, Long> counts = null;

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        this.counts = new ConcurrentHashMap<>();
        super.prepare(stormConf, context);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word", "count"));
    }

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String word = input.getStringByField("word");
        Long count = this.counts.get(word);
        if (count == null) {
            count = 0L;
        }
        count++;
        this.counts.put(word, count);
        collector.emit(new Values(word, count));
    }
}

ReportBolt是对接收到的单词及数量进行整理,拼成json格式,然后继续向后传输,代码如下:

public class ReportBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String word = input.getStringByField("word");
        Long count = input.getLongByField("count");
        String reportMessage = "{'word': '" + word + "', 'count': '" + count + "'}";
        collector.emit(new Values(reportMessage));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("message"));
    }
}

最后是定义topology(拓扑)WordCountTopology,代码如下:

public class WordCountTopology {
    private static final String KAFKA_SPOUT_ID = "kafkaSpout";
    private static final String SENTENCE_BOLT_ID = "sentenceBolt";
    private static final String SPLIT_BOLT_ID = "sentenceSplitBolt";
    private static final String WORD_COUNT_BOLT_ID = "sentenceWordCountBolt";
    private static final String REPORT_BOLT_ID = "reportBolt";
    private static final String KAFKA_BOLT_ID = "kafkabolt";
    private static final String CONSUME_TOPIC = "sentenceTopic";
    private static final String PRODUCT_TOPIC = "wordCountTopic";
    private static final String ZK_ROOT = "/topology/root";
    private static final String ZK_ID = "wordCount";
    private static final String DEFAULT_TOPOLOGY_NAME = "sentenceWordCountKafka";

    public static void main(String[] args) throws Exception {
        // 配置Zookeeper地址
        BrokerHosts brokerHosts = new ZkHosts("zk1:2181,zk2:2281,zk3:2381");
        // 配置Kafka订阅的Topic,以及zookeeper中数据节点目录和名字
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, CONSUME_TOPIC, ZK_ROOT, ZK_ID);
        spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(KAFKA_SPOUT_ID, new KafkaSpout(spoutConfig));
        builder.setBolt(SENTENCE_BOLT_ID, new SentenceBolt()).shuffleGrouping(KAFKA_SPOUT_ID);
        builder.setBolt(SPLIT_BOLT_ID, new SplitSentenceBolt()).shuffleGrouping(KAFKA_SPOUT_ID);
        builder.setBolt(WORD_COUNT_BOLT_ID, new WordCountBolt()).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
        builder.setBolt(REPORT_BOLT_ID, new ReportBolt()).shuffleGrouping(WORD_COUNT_BOLT_ID);
        builder.setBolt(KAFKA_BOLT_ID, new KafkaBolt<String, Long>()).shuffleGrouping(REPORT_BOLT_ID);

        Config config = new Config();
        Map<String, String> map = new HashMap<>();
        map.put("metadata.broker.list", "dev2_55.wfj-search:9092");// 配置Kafka broker地址
        map.put("serializer.class", "kafka.serializer.StringEncoder");// serializer.class为消息的序列化类
        config.put("kafka.broker.properties", map);// 配置KafkaBolt中的kafka.broker.properties
        config.put("topic", PRODUCT_TOPIC);// 配置KafkaBolt生成的topic

        if (args.length == 0) {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(DEFAULT_TOPOLOGY_NAME, config, builder.createTopology());
            Utils.sleep(100000);
            cluster.killTopology(DEFAULT_TOPOLOGY_NAME);
            cluster.shutdown();
        } else {
            config.setNumWorkers(1);
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        }
    }
}

除了上面提过应该注意的地方,此处还需要注意,storm.kafka.SpoutConfig定义的zkRoot与id应该与第一个例子中不同(至少保证id不同,否则两个topology将使用一个节点记录偏移量)。

Refer:

[1] storm-kafka编程指南

http://blog.csdn.net/lujinhong2/article/details/47132287

[2] kafka集群编程指南

http://blog.csdn.net/lujinhong2/article/details/47146693

[3] 关于kafka中的timestamp与offset的对应关系

http://blog.csdn.net/lujinhong2/article/details/49661309

[4] storm笔记:Storm+Kafka简单应用

http://www.howardliu.cn/a-few-notes-about-storm/

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 聊聊 Java 中 HashMap 初始化的另一种方式

    如果你接触过不同的语言,从语法和代码层面来说,Java 是一种不折不扣的“臃肿、啰嗦”的语言,从另一方面来说这种臃肿和啰嗦也体现了它严谨的一面,作为适合构建大型...

    用户1177713
  • MapReduce 中的两表 join 几种方案简介

    1. 概述 在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的。而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独...

    用户1177713
  • Hive 基础(1):分区、桶、Sort Merge Bucket Join

    Hive 已是目前业界最为通用、廉价的构建大数据时代数据仓库的解决方案了,虽然也有 Impala 等后起之秀,但目前从功能、稳定性等方面来说,Hive 的地位...

    用户1177713
  • 核心技术靠化缘是要不来的——自己动手写ORM框架

    开源市场上的Java的ORM框架一个都不好用,所以花了几天时间自己撸了一个 OrmKids,欢迎大家下载学习。遇到问题请关注公众号进群大家一起讨论。

    老钱
  • java基础第十四篇之Map

    一,Map集合的特点: * * 1.Map集合和Collection集合,没有关系 * * 2.Map集合的元素是成对存在(夫妻关系)...

    海仔
  • android之listview缓存图片(缓存优化)

    网上关于这个方面的文章也不少,基本的思路是线程+缓存来解决。下面提出一些优化: 1、采用线程池 2、内存缓存+文件缓存 3、内存缓存中网上很多是采用SoftRe...

    xiangzhihong
  • Flink-Kafka-Connector Flink结合Kafka实战

    启动zk:nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

    王知无
  • 我终于搞清楚了和String有关的那点事儿。

    String,是Java中除了基本数据类型以外,最为重要的一个类型了。很多人会认为他比较简单。但是和String有关的面试题有很多,下面我随便找两道面试题,看看...

    java思维导图
  • What You See Is What You Get

    腾讯ISUX
  • 简析 Jenkins 专有用户数据库加密算法

    其中,安全域可以采用三种形式,分别为:Jenkins 专有用户数据库、LDAP、Servlet 容器代理。

    LinuxSuRen

扫码关注云+社区

领取腾讯云代金券