前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ集群消息收发测试全纪录

RocketMQ集群消息收发测试全纪录

作者头像
loong576
发布2021-08-06 15:43:24
6130
发布2021-08-06 15:43:24
举报
文章被收录于专栏:运维ABC

一、环境说明

ip地址

主机名

操作系统版本

RocketMQ版本

JDK版本

maven版本

备注

172.16.7.91

nameserver01

centos 7.6

4.8.0

1.8.0_291

3.6

Name Server集群

172.16.7.92

nameserver03

centos 7.6

4.8.0

1.8.0_291

3.6

Name Server集群

172.16.7.93

master01

centos 7.6

4.8.0

1.8.0_291

3.6

Broker集群1

172.16.7.94

slave01

centos 7.6

4.8.0

1.8.0_291

3.6

Broker集群1

172.16.7.95

master02

centos 7.6

4.8.0

1.8.0_291

3.6

Broker集群2

172.16.7.96

slave02

centos 7.6

4.8.0

1.8.0_291

3.6

Broker集群2

二、部署概况

d58a913c84dc28e3b6a067557a821c6f.png
d58a913c84dc28e3b6a067557a821c6f.png

三、创建Maven Project

1.新建Maven project

0d7628342d3fbdee8e06961ffc8d99c4.png
0d7628342d3fbdee8e06961ffc8d99c4.png
4201fa1f1c4ab72149cb76b320acdaf9.png
4201fa1f1c4ab72149cb76b320acdaf9.png

选择Maven Project

989e7c423a3650fb2db7cf17919b14fc.png
989e7c423a3650fb2db7cf17919b14fc.png

配置目录

f4b4e26ca50d154f1d53321728990705.png
f4b4e26ca50d154f1d53321728990705.png

选择原型

ccd018cb678910e846b62f0af8067e48.png
ccd018cb678910e846b62f0af8067e48.png

自定义group id和artifact id,完成maven project的创建。

9aa1467cc6ef6c0249865e01bdb79bc1.png
9aa1467cc6ef6c0249865e01bdb79bc1.png

2.导入依赖库

修改pom.xml,加入如下代码

代码语言:javascript
复制
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.3.0</version>
    </dependency>
0154e63305d586f77eb2afa4dc3d8ced.png
0154e63305d586f77eb2afa4dc3d8ced.png

会发现多了很多依赖包

12d2712827dbfcde6530eb08571cd259.png
12d2712827dbfcde6530eb08571cd259.png

四、生产者测试

1.测试前集群查看

启动各节点服务,查看集群状态

dc29e8b502400e12c824f820920ed0f3.png
dc29e8b502400e12c824f820920ed0f3.png

测试前无消息生产和消费

2.新建topic

2.1新增主题topic_test_123
00099731754975b0b60b68061ca70e29.png
00099731754975b0b60b68061ca70e29.png

主题配置如下:

d1b1d6b0f33595bd797e32c3a56962a3.png
d1b1d6b0f33595bd797e32c3a56962a3.png

集群名为MyRocketmq,BROKER_NAME两个broker都选择

2.2查看新增的主题
cda1a32a26be4760758b828938436fed.png
cda1a32a26be4760758b828938436fed.png

4.新建订阅组

4.1新建订阅组group_test_123
5f1b20a40874e73553cabe580a5f81d7.png
5f1b20a40874e73553cabe580a5f81d7.png

配置如下:

f852109a48e12ce24763efaba1894bce.png
f852109a48e12ce24763efaba1894bce.png
4.2查看新建的订阅组
a9ab0bc4598cd4aacb37daa02751be54.png
a9ab0bc4598cd4aacb37daa02751be54.png

5.新建类Producer

10bce6412d0cc2065070ee71e2851c6e.png
10bce6412d0cc2065070ee71e2851c6e.png
16f959f5e68f4851b9e9030a9745d97a.png
16f959f5e68f4851b9e9030a9745d97a.png
13de4f12a198d4c3d635ebd447f06a58.png
13de4f12a198d4c3d635ebd447f06a58.png

新建类Producer

1e2ee332c48d7aaa8d6db7382b547c46.png
1e2ee332c48d7aaa8d6db7382b547c46.png

生产者消息发送代码:

代码语言:javascript
复制
package com.my.maven.rocketmq;
​
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
​
public class Producer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
            DefaultMQProducer("group_test_123");
        // Specify name server addresses.
        producer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876");
        producer.setRetryTimesWhenSendAsyncFailed(2);
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("topic_test_123" /* Topic */,
                "TagA" /* Tag */,
                ("Message Test" +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}
​

生产者配置项 retryTimesWhenSendAsyncFailed 表示异步重试的次数,默认为 2 次,加上正常发送的1次,总共有3次发送机会。

发送消息Message Test0--Message Test99,共100条消息。

6.运行报错

运行Produce发送消息时报错,如图:

7ac1b187a09ea6d8920c10bbdeed50ed.png
7ac1b187a09ea6d8920c10bbdeed50ed.png

解决:

8c96a635a0a449523a152be1498522e4.png
8c96a635a0a449523a152be1498522e4.png

由于测试是在本地电脑虚机上进行的,同时开多个虚机和eclipse应用会占用很多内存,解决办法是进入eclipse的安装目录,修改文件eclipse.ini,将参数-Xms和-Xmx改小点即可。

7.运行Produce

9c85440e22ce05e5c63285a457d1ce32.png
9c85440e22ce05e5c63285a457d1ce32.png

8.发送消息状态查看

8.1集群查看
614a394f0968fc720d08f619a4f4348e.png
614a394f0968fc720d08f619a4f4348e.png

可以看到broker-a和broker-b各产生了50条消息

8.2消息查看
34c1d90c2d794119bee890145f810a69.png
34c1d90c2d794119bee890145f810a69.png

消息详情:

6173224bdd8cf175af226e7a57678fe0.png
6173224bdd8cf175af226e7a57678fe0.png
8.3消费者查看
360b24425202bdb64d30849eb4eda337.png
360b24425202bdb64d30849eb4eda337.png

此时还未消费

五、消费者测试

1.新建类Consumer

925e60bfa64d96b4d2f5dd37405f5306.png
925e60bfa64d96b4d2f5dd37405f5306.png

消费代码:

代码语言:javascript
复制
package com.my.maven.rocketmq;
​
import java.util.List;
​
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
​
public class Consumer {
​
    public static void main(String[] args) throws InterruptedException,
            MQClientException {
​
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
                "group_test_123");
        consumer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876");
​
        consumer.subscribe("topic_test_123", "TagA || TagB");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
​
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName()
                        + " Receive New Messages: " + msgs);
                MessageExt msg = msgs.get(0);
                if (msg.getTopic().equals("topic_test_123")) {
                    if (msg.getTags() != null && msg.getTags().equals("TagA")) {
                        // 获取消息体
                        String message = new String(msg.getBody());
                        System.out.println("receive TagA message:" + message);
                    } else if (msg.getTags() != null
                            && msg.getTags().equals("TagB")) {
                        // 获取消息体
                        String message = new String(msg.getBody());
                        System.out.println("receive TagB message:" + message);
                    }
​
                }
                // 成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
​
}

2.运行Consumer

be0871bb008e61753a723e4e83e03070.png
be0871bb008e61753a723e4e83e03070.png

3.消费消息状态查看

3.1消费者查看
3cf40c9ccdd04574df2295fa72aad867.png
3cf40c9ccdd04574df2295fa72aad867.png
3.2查看消费详情
3b675856402bcec2710463eb3dc7182e.png
3b675856402bcec2710463eb3dc7182e.png
3.3集群查看
a4e4d26bd55d83c598ca1378d7965549.png
a4e4d26bd55d83c598ca1378d7965549.png
3.4消息详情查看
f523661e5b53fd17c4eeb9ce3b4f5f50.png
f523661e5b53fd17c4eeb9ce3b4f5f50.png

发现消息已被消费

4消费者console日志

5ee10260627d3b1cf05b7be3dfc8fbe1.png
5ee10260627d3b1cf05b7be3dfc8fbe1.png

一共100条消息被消费

本文所有代码和配置文件已上传github:RocketMQ_Message_Test

单机版RocketMQ搭建详见:Centos7.6搭建RocketMQ4.8全纪录

集群版RocketMQ搭建详见:RocketMQ4.8集群搭建全纪录

集群启停详见:RocketMQ集群启停手册

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/07/12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、环境说明
  • 二、部署概况
  • 三、创建Maven Project
    • 1.新建Maven project
      • 2.导入依赖库
      • 四、生产者测试
        • 1.测试前集群查看
          • 2.新建topic
            • 2.1新增主题topic_test_123
            • 2.2查看新增的主题
          • 4.新建订阅组
            • 4.1新建订阅组group_test_123
            • 4.2查看新建的订阅组
          • 5.新建类Producer
            • 6.运行报错
              • 7.运行Produce
                • 8.发送消息状态查看
                  • 8.1集群查看
                  • 8.2消息查看
                  • 8.3消费者查看
              • 五、消费者测试
                • 1.新建类Consumer
                  • 2.运行Consumer
                    • 3.消费消息状态查看
                      • 3.1消费者查看
                      • 3.2查看消费详情
                      • 3.3集群查看
                      • 3.4消息详情查看
                    • 4消费者console日志
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档