storm drpc实例

本文主要演示一下storm drpc实例

配置

version: '2'
services:
    supervisor:
        image: storm
        container_name: supervisor
        command: storm supervisor -c storm.local.hostname="192.168.99.100" -c drpc.servers='["192.168.99.100"]' -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774
        depends_on:
            - nimbus
            - zookeeper
        links:
            - nimbus
            - zookeeper
        restart: always
        ports:
            - 6700:6700
            - 6701:6701
            - 6702:6702
            - 6703:6703
            - 8000:8000
    drpc:
        image: storm
        container_name: drpc
        command: storm drpc -c storm.local.hostname="192.168.99.100" -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774
        depends_on:
            - nimbus
            - supervisor
            - zookeeper
        links:
            - nimbus
            - supervisor
            - zookeeper
        restart: always
        ports:
            - 3772:3772
            - 3773:3773
            - 3774:3774
  • 这里对supervisor配置drpc.servers及drpc.port、drpc.invocations.port,好让worker通过drpc.invocations.port去访问drpc节点
  • 对于drpc服务,则暴露drpc.port(好让外部的DRPCClient访问)、drpc.invocations.port(让worker访问)

TridentTopology

    @Test
    public void testDeployDRPCStateQuery() throws InterruptedException, TException {
        TridentTopology topology = new TridentTopology();
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
                new Values("the cow jumped over the moon"),
                new Values("the man went to the store and bought some candy"),
                new Values("four score and seven years ago"),
                new Values("how many apples can you eat"));
        spout.setCycle(true);
        TridentState wordCounts =
                topology.newStream("spout1", spout)
                        .each(new Fields("sentence"), new Split(), new Fields("word"))
                        .groupBy(new Fields("word"))
                        //NOTE transforms a Stream into a TridentState object
                        .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
                        .parallelismHint(6);

        topology.newDRPCStream("words")
                .each(new Fields("args"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
                .each(new Fields("count"), new FilterNull())
                .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

        StormTopology stormTopology = topology.build();

        //远程提交 mvn clean package -Dmaven.test.skip=true
        //storm默认会使用System.getProperty("storm.jar")去取,如果不设定,就不能提交
        System.setProperty("storm.jar",TOPOLOGY_JAR);

        Config conf = new Config();
        conf.put(Config.NIMBUS_SEEDS,Arrays.asList("192.168.99.100")); //配置nimbus连接主机地址,比如:192.168.10.1
        conf.put(Config.NIMBUS_THRIFT_PORT,6627);//配置nimbus连接端口,默认 6627
        conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.99.100")); //配置zookeeper连接主机地址,可以使用集合存放多个
        conf.put(Config.STORM_ZOOKEEPER_PORT,2181); //配置zookeeper连接端口,默认2181

        StormSubmitter.submitTopology("DRPCStateQuery", conf, stormTopology);
    }
  • 这里newStream创建了一个TridentState,然后newDRPCStream创建了一个DRPCStream,其stateQuery指定为前面创建的TridentState
  • 由于TridentState把结果存储到了MemoryMapState,因而这里的DRPCStream通过drpc进行stateQuery

DRPCClient

    @Test
    public void testLaunchDrpcClient() throws TException {
        Config conf = new Config();
        //NOTE 要设置Config.DRPC_THRIFT_TRANSPORT_PLUGIN属性,不然client直接跑空指针
        conf.put(Config.DRPC_THRIFT_TRANSPORT_PLUGIN,SimpleTransportPlugin.class.getName());
        conf.put(Config.STORM_NIMBUS_RETRY_TIMES,3);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL,10000);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING,10000);
        conf.put(Config.DRPC_MAX_BUFFER_SIZE, 104857600); // 100M
        DRPCClient client = new DRPCClient(conf, "192.168.99.100", 3772);
        System.out.println(client.execute("words", "cat dog the man"));
    }
  • 注意这里的配置项不能少,否则会引发空指针
  • Config.DRPC_THRIFT_TRANSPORT_PLUGIN这里使用的是SimpleTransportPlugin.class.getName(),虽然该类被废弃了,不过还可以跑通
  • 由于使用了SimpleTransportPlugin.class,因而这里要配置Config.DRPC_MAX_BUFFER_SIZE
  • DRPCClient配置了drpc的地址及port
  • client.execute这里要传入newDRPCStream指定的function名称

小结

  • 使用drpc的时候,需要通过storm drpc启动drpc server服务节点,另外要暴露两个端口,一个drpc.port是供外部DRPCClient调用,一个drpc.invocations.port是给worker来访问;drpc.http.port端口是暴露给http协议调用的(DRPCClient使用的是thrift协议调用)
  • supervisor要配置drpc.servers、drpc.invocations.port,好让worker去访问到drpc server
  • DRPCClient使用drpc.port指定的端口来访问,另外client.execute这里要传入newDRPCStream指定的function名称

doc

  • Trident Tutorial
  • Distributed RPC
  • Running Apache Storm Securely

本文分享自微信公众号 - 码匠的流水账(geek_luandun)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-10-22

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • storm drpc实例

    codecraft
  • 聊聊Elasticsearch的RunOnce

    elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurren...

    codecraft
  • webmagic小试牛刀

    codecraft
  • storm drpc实例

    codecraft
  • Python字符串截取

    py3study
  • Spring Data REST不完全指南(三)

    上一篇我们介绍了使用Spring Data REST时的一些高级特性,以及使用代码演示了如何使用这些高级的特性。本文将继续讲解前面我们列出来的七个高级特性中的后...

    东溪陈姓少年
  • 阅读器多种翻页的设计与实现

    UIKit提供UIPageViewController可以很方便实现平移的页面切换效果,使用流程: 1、创建UIPageViewController;

    落影
  • 像投资比特币一样投资旅游民宿

    引导全球的民宿、精品酒店、度假地产发行代表其使用权的通证并流通,为区块链和全球旅游爱好者提供一个安全、公平、开放的交易平台,让用户可以像投资比特币一样投资全球旅...

    区块链领域
  • IOS 导航栏 UINavigationController 常用

    1 创建:FirstViewController、SecondViewController 2、在FirstViewController的viewDidLoa...

    用户5760343
  • PL/SQL --> 动态SQL调用包中函数或过程

          动态SQL主要是用于针对不同的条件或查询任务来生成不同的SQL语句。最常用的方法是直接使用EXECUTE IMMEDIATE来执行动态SQL语句字符...

    Leshami

扫码关注云+社区

领取腾讯云代金券