前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ专题1:入门

RocketMQ专题1:入门

作者头像
SecondWorld
发布2018-10-08 10:58:32
1.8K0
发布2018-10-08 10:58:32
举报
文章被收录于专栏:Java开发者杂谈

RocketMQ入门

源码和应用下载

​ 这里以RocketMQ的4.3.0版本为例,本地环境为windows10,jdk1.8, maven3.2.1.

源码下载地址: http://mirrors.hust.edu.cn/apache/rocketmq/4.3.0/rocketmq-all-4.3.0-source-release.zip 应用下载地址: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.3.0/rocketmq-all-4.3.0-bin-release.zip

启动

​ Windows下需要配置环境变量,ROCKETMQ_HOME, 我这里配置为: E:\software\rocketmq-all-4.3.0-bin-release

​ 配置完环境变量后,就可以进入到bin目录:

  • 启动server: 直接运行bin目录下的mqnamesrv.cmd
  • 启动broker: 运行mqbroker.cmd,发现一闪而过,查看bin目录下的bk.log日志,发现错误日志如下: 错误: 找不到或无法加载主类 Files\Java\jdk1.8.0_121\lib;C:\Program 再查看mqbroker.cmd源码,发现其最终调用了runbroker.cmd。该脚本的倒数第二行为: set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%" 知道问题所在: CLASSPATH的配置中是包含空格的,而空格导致最终解析出来的路径错误。最终我修改倒数第二行为: set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%"" ​至此可以顺利启动

​ 本以为启动之后就能就行消息收发了,于是我按照官网示例进入RocketMQ的bin目录,并通过命令向broker发送消息:

代码语言:javascript
复制
tools org.apache.rocketmq.example.quickstart.Producer

​ 结果一直报错,搜索得知在windows下需要配置环境变量NAMESRV_ADDR127.0.0.1:9876

​ 配置完成之后,再依次启动mqnamesrv和mqbroker,重新测试Producer发现Producer的输出大致如下:

代码语言:javascript
复制
......
SendResult [sendStatus=SEND_OK, msgId=C0A8029D46D461BBE9BA5A115E9C03E5, offsetMsgId=C0A8130100002A9F000000000002BC96, messageQueue=MessageQueue [topic=TopicTest, brokerName=Ziyuqi-0431, queueId=0], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A8029D46D461BBE9BA5A115E9E03E6, offsetMsgId=C0A8130100002A9F000000000002BD4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=Ziyuqi-0431, queueId=1], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A8029D46D461BBE9BA5A115EA003E7, offsetMsgId=C0A8130100002A9F000000000002BDFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=Ziyuqi-0431, queueId=2], queueOffset=249]
11:44:47.790 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.19.1:10911] result: true
11:44:47.791 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
11:44:47.793 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[192.168.19.1:10909] result: true

​ 在通过命令行运行Consumer:

代码语言:javascript
复制
tools org.apache.rocketmq.example.quickstart.Consumer

​ 发现Consumer的输出为:

代码语言:javascript
复制
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=249, sysFlag=0, bornTimestamp=1537242287776, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287778, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002BDFE, commitLogOffset=179710, bodyCRC=638172955, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409812, UNIQ_KEY=C0A8029D46D461BBE9BA5A115EA003E7, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 57], transactionId='null'}]]
ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=248, sysFlag=0, bornTimestamp=1537242287768, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287768, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002BB2E, commitLogOffset=178990, bodyCRC=801108784, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409811, UNIQ_KEY=C0A8029D46D461BBE9BA5A115E9803E3, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 53], transactionId='null'}]]
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=247, sysFlag=0, bornTimestamp=1537242287761, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287761, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002B85E, commitLogOffset=178270, bodyCRC=684865321, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409807, UNIQ_KEY=C0A8029D46D461BBE9BA5A115E9103DF, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 57, 49], transactionId='null'}]]
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=246, sysFlag=0, bornTimestamp=1537242287753, bornHost=/192.168.19.1:53847, storeTimestamp=1537242287753, storeHost=/192.168.19.1:10911, msgId=C0A8130100002A9F000000000002B58E, commitLogOffset=177550, bodyCRC=1487577949, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1537242409807, UNIQ_KEY=C0A8029D46D461BBE9BA5A115E8903DB, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 55], transactionId='null'}]]

关闭

​ 关闭的步骤与启动正好相反

  • 关闭brokermqshutdown broker
  • 关闭namesrvmqshutdown namesrv

简单示例

​ 在进行简单的示例之前,我们先要知道为什么会出现RocketMQ,下面一段话摘自RocketMQ官网:

代码语言:javascript
复制
Based on our research, with increased queues and virtual topics in use, ActiveMQ IO module reaches a bottleneck. We tried our best to solve this problem through throttling, circuit breaker or degradation, but it did not work well. So we begin to focus on the popular messaging solution Kafka at that time. Unfortunately, Kafka can not meet our requirements especially in terms of low latency and high reliability, see here for details.

In this context, we decided to invent a new messaging engine to handle a broader set of use cases, ranging from traditional pub/sub scenarios to high volume real-time zero-loss tolerance transaction system. We believe this solution can be beneficial, so we would like to open source it to the community. Today, more than 100 companies are using the open source version of RocketMQ in their business. We also published a commercial distribution based on RocketMQ, a PaaS product called the Alibaba Cloud Platform.

​ 可以知道RocketMQ是阿里在使用ActiveMQ时,出现了IO瓶颈,无法满足阿里业务所需要的低延迟和高可靠性要求时自己研发出来。并且最终捐赠给Apache,成为顶级开源项目的。high volume real-time zero-loss tolerance transaction system是其核心特点。

​ 下面通过一个简单的示例,来说明RocketMQ的基本使用:

引入pom依赖

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>

Producer

​ Producer一般分为三种模式: 同步、异步和单向,具体代码如下:

代码语言:javascript
复制
public class SimpleProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException,
            MQBrokerException, InterruptedException {
        /**
         * 同步消息发送: 一般用来进行通知、短信等重要消息的同步
         */
        // syncProducer();
        
        /**
         * 异步消息发送: 一般用来对方法调用响应时间有较严格要求的情况下,异步调用,立即返回
         * 不同于同步的唯一在于: send方法调用的时候多携带一个回调接口参数,用来异步处理消息发送结果
         */
        asyncProducer();
        
        /**
         * 单向模式: 一般用来对可靠性有一定要求的消息发送,例如日志系统
         * 不同于同步的唯一之处在于: 调用的是sendOneway方法,且该方法不会给调用者任何返回值
         */
        // oneWayProducer();
    }

    private static void oneWayProducer() throws MQClientException, UnsupportedEncodingException, RemotingException, MQBrokerException, InterruptedException {
        // STEP1: 创建Producer并且指定组名
        DefaultMQProducer oneWayProducer = new DefaultMQProducer("GroupA");

        // STEP2: 指定nameServer地址
        oneWayProducer.setNamesrvAddr("localhost:9876");

        // STEP3: 启动Producer
        oneWayProducer.start();

        // STEP4: 循环发送消息
        for (int i = 0; i < 50; i++) {
            Message message = new Message("OneWayTopic", "TagA",
                    ("OneWayMsg" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            oneWayProducer.sendOneway(message);
        }

        // STEP5: 关闭Producer
        oneWayProducer.shutdown();
    }

    private static void asyncProducer() throws MQClientException, UnsupportedEncodingException, RemotingException, MQBrokerException, InterruptedException {
        // STEP1: 创建Producer并且指定组名
        DefaultMQProducer asyncProducer = new DefaultMQProducer("GroupA");

        // STEP2: 指定nameServer地址
        asyncProducer.setNamesrvAddr("localhost:9876");

        // STEP3: 启动Producer
        asyncProducer.start();
        asyncProducer.setRetryTimesWhenSendAsyncFailed(0);      // 设置异步发送失败重试次数,默认为2

        // STEP4: 循环发送消息
        for (int i = 0; i < 50; i++) {
            Message message = new Message("AsyncTopic", "TagA",
                    ("AsyncMsg" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 创建回调函数处理发送成功或者异常
            asyncProducer.send(message, new SendCallback() {
                
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }
                
                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                }
            });
        }

        // STEP5: 关闭Producer
        TimeUnit.SECONDS.sleep(10); // 睡眠10秒,确保消息都发送出去
        asyncProducer.shutdown();
    }

    private static void syncProducer() throws MQClientException, UnsupportedEncodingException, RemotingException, MQBrokerException, InterruptedException {
        // STEP1: 创建Producer并且指定组名
        DefaultMQProducer syncProducer = new DefaultMQProducer("GroupA");

        // STEP2: 指定nameServer地址
        syncProducer.setNamesrvAddr("localhost:9876");

        // STEP3: 启动Producer
        syncProducer.start();

        // STEP4: 循环发送消息
        for (int i = 0; i < 50; i++) {
            Message message = new Message("SyncTopic", "TagA",
                    ("SyncMsg" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = syncProducer.send(message);
            System.out.println(sendResult);
        }
        
        // STEP5: 关闭Producer
        syncProducer.shutdown();
    }
}

Consumer

​ consumer的实现就较为简单了,定义一个事件监听接口即可.

代码语言:javascript
复制
public class SimpleConsumer {
    public static void main(String[] args) throws MQClientException {
        // STEP1: 创建默认Consumer并指定
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupA");
        
        // STEP2: 指定nameServer地址
        consumer.setNamesrvAddr("localhost:9876");
        
        // STEP3: 订阅对应主题和tag
        consumer.subscribe("AsyncTopic", "*");
        
        // STEP4: 注册接收到broker消息后的处理接口
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    System.out.println(new String(msgs.get(0).getBody(), RemotingHelper.DEFAULT_CHARSET));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        
        // STEP5: 启动consumer (必须在注册完消息监听器之后启动,否则会报错)
        consumer.start();
        
        System.out.println("Consumer started......");
    }
}

总结

  • 运行Producer的时候必须保证nameServer和broker都正常运行,否则会报org.apache.rocketmq.client.exception.MQClientException: No route info of this topic
  • 即使先运行Producer只要在运行Consumer之前,未重启broker或者nameServer。Consumer启动时还是能正常收到消息

参考链接

http://rocketmq.apache.org/docs/simple-example/

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-09-18 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RocketMQ入门
    • 源码和应用下载
      • 启动
        • 关闭
          • 简单示例
            • 总结
              • 参考链接
              相关产品与服务
              短信
              腾讯云短信(Short Message Service,SMS)可为广大企业级用户提供稳定可靠,安全合规的短信触达服务。用户可快速接入,调用 API / SDK 或者通过控制台即可发送,支持发送验证码、通知类短信和营销短信。国内验证短信秒级触达,99%到达率;国际/港澳台短信覆盖全球200+国家/地区,全球多服务站点,稳定可靠。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档