聊聊storm的direct grouping

本文主要研究一下storm的direct grouping

direct grouping

direct grouping是一种特殊的grouping,它是由上游的producer直接指定下游哪个task去接收它发射出来的tuple。direct grouping的使用有如下几个步骤:

1、上游在prepare方法保存下游bolt的taskId列表

public class SentenceDirectBolt extends BaseRichBolt {
    private static final Logger LOGGER = LoggerFactory.getLogger(SentenceDirectBolt.class);
    private OutputCollector collector;
    private List<Integer> taskIds;
    private int numCounterTasks;
    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        //NOTE 1 这里要取到下游的bolt的taskId,用于emitDirect时指定taskId
        this.taskIds = context.getComponentTasks("count-bolt");
        this.numCounterTasks = taskIds.size();
    }
    //......
}

这里保存了下游的bolt的taskId列表,用于emitDirect时选择taskId

2、上游在declareOutputFields使用declareStream声明streamId

public class SentenceDirectBolt extends BaseRichBolt {
    //......
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
        //NOTE 2 这里要通过declareStream声明direct stream,并指定streamId
        declarer.declareStream("directStreamDemo1",true,new Fields("word"));
        declarer.declareStream("directStreamDemo2",true,new Fields("word"));
    }
}

这里声明了两个streamId,一个是directStreamDemo1,一个是directStreamDemo2

3、上游采用emitDirect指定下游taskId及streamId

public class SentenceDirectBolt extends BaseRichBolt {
    //......
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for(String word : words){
            int targetTaskId = getWordCountTaskId(word);
            LOGGER.info("word:{} choose taskId:{}",word,targetTaskId);
            // NOTE 3 这里指定发送给下游bolt的哪个taskId,同时指定streamId
            if(targetTaskId % 2 == 0){
                this.collector.emitDirect(targetTaskId,"directStreamDemo1",new Values(word));
            }else{
                this.collector.emitDirect(targetTaskId,"directStreamDemo2",new Values(word));
            }
        }
        this.collector.ack(tuple);
    }
}

这里使用emitDirect(int taskId, String streamId, List<Object> tuple)方法指定了下游的taskId以及要发送到的streamId

4、下游使用directGrouping连接上游bolt及streamId

    @Test
    public void testDirectGrouping() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("sentence-spout", new SentenceSpout());
        // SentenceSpout --> SplitSentenceBolt
        builder.setBolt("split-bolt", new SentenceDirectBolt()).shuffleGrouping("sentence-spout");
        // SplitSentenceBolt --> WordCountBolt
        //NOTE 4这里要指定上游的bolt以及要处理的streamId
        builder.setBolt("count-bolt", new WordCountBolt(),5).directGrouping("split-bolt","directStreamDemo1");
        // WordCountBolt --> ReportBolt
        builder.setBolt("report-bolt", new ReportBolt()).globalGrouping("count-bolt");
        submitRemote(builder);
    }

这里count-bolt作为split-bolt的下游,使用了directGrouping,同时指定了要接收的streamId为directStreamDemo1

小结

  • direct grouping是一种特殊的grouping,它是由上游的producer直接指定下游哪个task去接收它发射出来的tuple。
  • 下游使用directGrouping连接上游同时指定要消费的streamId,上游在prepare的时候保存下游的taskId列表,然后在declareOutputFields的时候使用declareStream来声明streamId,最后在execute方法里头使用emitDirect(int taskId, String streamId, List tuple)方法指定了下游的taskId以及要发送到的streamId

doc

  • Concepts
  • Common Topology Patterns
  • 关于Storm Stream grouping

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-10-19

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏ASP.NET MVC5 后台权限管理系统

ASP.NET MVC5+EF6+EasyUI 后台管理系统(29)-T4模版

本节不再适合本系统,在58,59节已经重构。请超过本节 这讲适合所有的MVC程序 很荣幸,我们的系统有了体验的地址了。演示地址 之前我们发布了一个简单的代码生...

40070
来自专栏ASP.NET MVC5 后台权限管理系统

ASP.NET MVC5+EF6+EasyUI 后台管理系统(87)-MVC Excel导入和导出

前言: 导入导出实在多例子,很多成熟的组建都分装了导入和导出,这一节演示利用LinqToExcel组件对Excel的导入,这个是一个极其简单的例子。 我...

80090
来自专栏码农阿宇

.Net Core中利用TPL(任务并行库)构建Pipeline处理Dataflow

在学习的过程中,看一些一线的技术文档很吃力,而且考虑到国内那些技术牛人英语都不差的,要向他们看齐,所以每天下班都在疯狂地背单词,博客有些日子没有更新了,见谅见谅...

26610
来自专栏知识分享

51采集PCF8591数据通过ESP8266上传C#上位机android 之TCP客户端编程ESP8266使用详解NodeMCU初探ESP8266刷AT固件与nodemcu固件ESP8266使用详解-

这两天测试程序还发现一个bug就是如果客户端断开了,应该检测一下哪个断开了,数据就不应该发向那个连接,,,否则就会报错,然后模块会复位重启 所以加上这段代码 c...

60950
来自专栏GIS讲堂

C#连接Sqlite

SQLite,是一款轻型的数据库,是遵守ACID的关联式数据库管理系统,它的设计目标是嵌入式的,而且目前已经在很多嵌入式产品中使用了它,它占用资源非常的低,在嵌...

15120
来自专栏木宛城主

SharePoint中在线编辑文档

我一直以为只有在Document Library里面的File才会支持在线编辑。直到今天早上我才发现用IE打开List里面的Attachments也是支持在线...

40060
来自专栏JackieZheng

Java豆瓣电影爬虫——模拟登录的前世今生与验证码的爱恨情仇

前言 并不是所有的网站都能够敞开心扉让你看个透彻,它们总要给你出些难题让你觉得有些东西是来之不易的,往往,这也更加激发你的激情和斗志! 从《为了媳妇的一张号,...

50060
来自专栏jeremy的技术点滴

开发小技巧备忘

34970
来自专栏柠檬先生

Angularjs基础(四)

AngularJS过滤器     过滤器可以使用一个管道符(|)添加到表达式和指令中。       AngularJS过滤器可用于转换数据:    ...

21390
来自专栏码农阿宇

.Net Core中利用TPL(任务并行库)构建Pipeline处理Dataflow

在学习的过程中,看一些一线的技术文档很吃力,而且考虑到国内那些技术牛人英语都不差的,要向他们看齐,所以每天下班都在疯狂地背单词,博客有些日子没有更新了,见谅见谅...

11410

扫码关注云+社区

领取腾讯云代金券