storm drpc实例

本文主要演示一下storm drpc实例

配置

version: '2'
services:
    supervisor:
        image: storm
        container_name: supervisor
        command: storm supervisor -c storm.local.hostname="192.168.99.100" -c drpc.servers='["192.168.99.100"]' -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774
        depends_on:
            - nimbus
            - zookeeper
        links:
            - nimbus
            - zookeeper
        restart: always
        ports:
            - 6700:6700
            - 6701:6701
            - 6702:6702
            - 6703:6703
            - 8000:8000
    drpc:
        image: storm
        container_name: drpc
        command: storm drpc -c storm.local.hostname="192.168.99.100" -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774
        depends_on:
            - nimbus
            - supervisor
            - zookeeper
        links:
            - nimbus
            - supervisor
            - zookeeper
        restart: always
        ports:
            - 3772:3772
            - 3773:3773
            - 3774:3774
  • 这里对supervisor配置drpc.servers及drpc.port、drpc.invocations.port,好让worker通过drpc.invocations.port去访问drpc节点
  • 对于drpc服务,则暴露drpc.port(好让外部的DRPCClient访问)、drpc.invocations.port(让worker访问)

TridentTopology

    @Test
    public void testDeployDRPCStateQuery() throws InterruptedException, TException {
        TridentTopology topology = new TridentTopology();
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
                new Values("the cow jumped over the moon"),
                new Values("the man went to the store and bought some candy"),
                new Values("four score and seven years ago"),
                new Values("how many apples can you eat"));
        spout.setCycle(true);
        TridentState wordCounts =
                topology.newStream("spout1", spout)
                        .each(new Fields("sentence"), new Split(), new Fields("word"))
                        .groupBy(new Fields("word"))
                        //NOTE transforms a Stream into a TridentState object
                        .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
                        .parallelismHint(6);

        topology.newDRPCStream("words")
                .each(new Fields("args"), new Split(), new Fields("word"))
                .groupBy(new Fields("word"))
                .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
                .each(new Fields("count"), new FilterNull())
                .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

        StormTopology stormTopology = topology.build();

        //远程提交 mvn clean package -Dmaven.test.skip=true
        //storm默认会使用System.getProperty("storm.jar")去取,如果不设定,就不能提交
        System.setProperty("storm.jar",TOPOLOGY_JAR);

        Config conf = new Config();
        conf.put(Config.NIMBUS_SEEDS,Arrays.asList("192.168.99.100")); //配置nimbus连接主机地址,比如:192.168.10.1
        conf.put(Config.NIMBUS_THRIFT_PORT,6627);//配置nimbus连接端口,默认 6627
        conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList("192.168.99.100")); //配置zookeeper连接主机地址,可以使用集合存放多个
        conf.put(Config.STORM_ZOOKEEPER_PORT,2181); //配置zookeeper连接端口,默认2181

        StormSubmitter.submitTopology("DRPCStateQuery", conf, stormTopology);
    }
  • 这里newStream创建了一个TridentState,然后newDRPCStream创建了一个DRPCStream,其stateQuery指定为前面创建的TridentState
  • 由于TridentState把结果存储到了MemoryMapState,因而这里的DRPCStream通过drpc进行stateQuery

DRPCClient

    @Test
    public void testLaunchDrpcClient() throws TException {
        Config conf = new Config();
        //NOTE 要设置Config.DRPC_THRIFT_TRANSPORT_PLUGIN属性,不然client直接跑空指针
        conf.put(Config.DRPC_THRIFT_TRANSPORT_PLUGIN,SimpleTransportPlugin.class.getName());
        conf.put(Config.STORM_NIMBUS_RETRY_TIMES,3);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL,10000);
        conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING,10000);
        conf.put(Config.DRPC_MAX_BUFFER_SIZE, 104857600); // 100M
        DRPCClient client = new DRPCClient(conf, "192.168.99.100", 3772);
        System.out.println(client.execute("words", "cat dog the man"));
    }
  • 注意这里的配置项不能少,否则会引发空指针
  • Config.DRPC_THRIFT_TRANSPORT_PLUGIN这里使用的是SimpleTransportPlugin.class.getName(),虽然该类被废弃了,不过还可以跑通
  • 由于使用了SimpleTransportPlugin.class,因而这里要配置Config.DRPC_MAX_BUFFER_SIZE
  • DRPCClient配置了drpc的地址及port
  • client.execute这里要传入newDRPCStream指定的function名称

小结

  • 使用drpc的时候,需要通过storm drpc启动drpc server服务节点,另外要暴露两个端口,一个drpc.port是供外部DRPCClient调用,一个drpc.invocations.port是给worker来访问;drpc.http.port端口是暴露给http协议调用的(DRPCClient使用的是thrift协议调用)
  • supervisor要配置drpc.servers、drpc.invocations.port,好让worker去访问到drpc server
  • DRPCClient使用drpc.port指定的端口来访问,另外client.execute这里要传入newDRPCStream指定的function名称

doc

  • Trident Tutorial
  • Distributed RPC
  • Running Apache Storm Securely

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-10-22

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏NetCore

[原创]Fluent NHibernate之旅(四)-- 关系(上)

经过了前面三篇的介绍,相信大家对Fluent NHibernate已经有一定的了解了,在我们学习中,Fluent 也已经进入了RTM版本。这次的版本发布离RC版...

2216
来自专栏张善友的专栏

重新审视SqlDataReader的使用

      ADO.NET 1.x 利用SqlDataReader读取数据,针对每个结果集需要一个独立的连接。当然,你还必须管理这些连接并且要付出相应的内存和潜...

2079
来自专栏老码农专栏

TodoBackend展示应用以及ActFramework的实现

1365
来自专栏知识分享

4-MSP430定时器_定时器中断

一开始没写好就上传了,,,,,,,,这次来个全的 自己学MSP430是为了写一篇关于PID的文章,需要430在proteus上做仿真,一则认为在自动控制算法上P...

3656
来自专栏Kubernetes

深度解析Kubernetes Local Persistent Volume(二)

摘要:上一篇博客”深度解析Kubernetes Local Persistent Volume(一)“对local volume的基本原理和注意事项进行了分析,...

1.6K3
来自专栏互联网技术栈

Netflix Archaius 分布式配置管理依赖构件

archaius是Netflix公司开源项目之一,基于java的配置管理类库,主要用于多配置存储的动态获取。主要功能是对apache common config...

1542
来自专栏Java成神之路

分布式_事务_02_2PC框架raincat源码解析

上一节已经将raincat demo工程运行起来了,这一节来分析下raincat的源码

2271
来自专栏思考的代码世界

Hexo+Next指定文章隐藏侧栏

近期在幕布的活动比较多,想新增一个幕布的作品集页面,所以就依葫芦画瓢,新建一个幕布作品集的页面。

2765
来自专栏工科狗和生物喵

FreeRTOS-Qemu 实现三任务同步通信机制以及API信息

---- 1. 本次作业的考察要点: 作业地址:https://github.com/HustWolfzzb/STM32F429DiscoveryFreeRTO...

1.1K6
来自专栏码匠的流水账

storm drpc实例

2622

扫码关注云+社区

领取腾讯云代金券