前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊storm的stream的分流与合并

聊聊storm的stream的分流与合并

原创
作者头像
code4it
发布2018-10-27 21:12:19
1K0
发布2018-10-27 21:12:19
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下storm的stream的分流与合并

实例

代码语言:javascript
复制
    @Test
    public void testStreamSplitJoin() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();
​
        builder.setSpout("sentence-spout", new SentenceSpout());
        // SentenceSpout --> SplitStreamBolt
        builder.setBolt("split-bolt", new SplitStreamBolt())
                .shuffleGrouping("sentence-spout");
        // SplitStreamBolt split two stream --> WordCountBolt
        //NOTE 这里要指定上游的bolt以及要处理的streamId
        builder.setBolt("long-word-count-bolt", new CountStreamBolt(),5)
                .shuffleGrouping("split-bolt","longWordStream");
        builder.setBolt("short-word-count-bolt", new CountStreamBolt(),5)
                .shuffleGrouping("split-bolt","shortWordStream");
        // WordCountBolt join --> ReportBolt
        builder.setBolt("report-bolt", new ReportBolt())
                .shuffleGrouping("long-word-count-bolt")
                .shuffleGrouping("short-word-count-bolt");
​
        submitRemote(builder);
    }
  • 这里在SplitStreamBolt里头将stream分为两个,之后有两个CountStreamBolt分别处理两个stream的数据,最后归到同一个stream由ReportBolt消费tuple

SplitStreamBolt

代码语言:javascript
复制
public class SplitStreamBolt extends BaseRichBolt {
​
    private static final Logger LOGGER = LoggerFactory.getLogger(SplitStreamBolt.class);
​
    private OutputCollector collector;
​
    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }
​
    //NOTE 这里要自己ack
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for(String word : words){
            // NOTE 这里指定发送给指定streamId
            if(word.length() > 4){
                this.collector.emit("longWordStream",new Values(word));
            }else{
                this.collector.emit("shortWordStream",new Values(word));
            }
        }
        this.collector.ack(tuple);
    }
​
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
        //NOTE 这里通过declareStream声明direct stream,并指定streamId
        declarer.declareStream("longWordStream",true,new Fields("word"));
        declarer.declareStream("shortWordStream",true,new Fields("word"));
    }
}
  • 这里额外声明了两个stream,一个是longWordStream,一个是shortWordStream
  • 对于word长度大于4的发送到longWordStream,小于等于4的发送到longWordStream

CountStreamBolt

代码语言:javascript
复制
public class CountStreamBolt extends BaseBasicBolt{
​
    private static final Logger LOGGER = LoggerFactory.getLogger(CountStreamBolt.class);
​
    Map<String, Integer> longWordCounts = new HashMap<String, Integer>();
    Map<String, Integer> shortWordCounts = new HashMap<String, Integer>();
​
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String sourceStreamId = input.getSourceStreamId();
​
        String word = input.getString(0);
​
        if(sourceStreamId.equals("longWordStream")){
            Integer count = longWordCounts.get(word);
            if (count == null) count = 0;
            count++;
            longWordCounts.put(word, count);
            LOGGER.info("long word:{} -> {}",word,count);
            collector.emit(new Values(word, count));
            return ;
        }
​
        if(sourceStreamId.equals("shortWordStream")){
            Integer count = shortWordCounts.get(word);
            if (count == null) count = 0;
            count++;
            shortWordCounts.put(word, count);
            LOGGER.info("short word:{} -> {}",word,count);
            collector.emit(new Values(word, count));
            return ;
        }
    }
​
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}
  • 这里为了展示sourceStreamId的区别,所以两个stream共用了同一个bolt,但是topology那里是两个实例
  • 实际也可以是两个不同的bolt类来处理两个stream的数据

小结

  • OutputFieldsDeclarer可以通过declareStream方法声明多个streamId
  • OutputCollector可以通过emit(String streamId, List<Object> tuple)方法来选择性将tuple发送到指定的streamId
  • OutputCollector也有emit方法参数没有streamId,其内部默认是使用Utils.DEFAULT_STREAM_ID(default)作为实际的streamId

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实例
  • SplitStreamBolt
  • CountStreamBolt
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档