前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >#导入MD文档图片#RocketMQ集群Broker高可用测试

#导入MD文档图片#RocketMQ集群Broker高可用测试

作者头像
loong576
发布2021-08-06 15:31:58
4310
发布2021-08-06 15:31:58
举报
文章被收录于专栏:运维ABC运维ABC

一、环境说明

ip地址

主机名

操作系统版本

RocketMQ版本

JDK版本

maven版本

备注

172.16.7.91

nameserver01

centos 7.6

4.8.0

1.8.0_291

3.6

Name Server集群

172.16.7.92

nameserver03

centos 7.6

4.8.0

1.8.0_291

3.6

Name Server集群

172.16.7.93

master01

centos 7.6

4.8.0

1.8.0_291

3.6

Broker集群1

172.16.7.94

slave01

centos 7.6

4.8.0

1.8.0_291

3.6

Broker集群1

172.16.7.95

master02

centos 7.6

4.8.0

1.8.0_291

3.6

Broker集群2

172.16.7.96

slave02

centos 7.6

4.8.0

1.8.0_291

3.6

Broker集群2

二、部署概况

三、测试准备

1.清空消息

代码语言:javascript
复制
[root@master01 ~]# init 6
[root@master01 ~]# cd /root/logs/rocketmqlogs/
[root@master01 rocketmqlogs]# rm -rf *
[root@master01 rocketmqlogs]# cd /root/store/
[root@master01 store]# rm -rf *

以master01为例,首先停止所有rocketmq进程,然后删除日志和存储信息。所有服务器都执行该操作。

2.测试前集群查看

启动各节点服务,查看集群状态

3.新建topic

新增主题topic_broker_test

主题配置如下:

查看新增的主题

4.新建订阅组

新建订阅组group_broker_test

配置如下:

查看新建的订阅组

5.producer代码

代码语言:javascript
复制
package com.my.maven.rocketmq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
            DefaultMQProducer("group_broker_test");
        // Specify name server addresses.
        producer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876");
        producer.setRetryTimesWhenSendAsyncFailed(2);
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 10000; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("topic_broker_test" /* Topic */,
                "TagA" /* Tag */,
                ("Broker HA Test" +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

使用循环方式产生多条消息

6.consumer代码

代码语言:javascript
复制
package com.my.maven.rocketmq;

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer {

    public static void main(String[] args) throws InterruptedException,
            MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
                "group_broker_test");
        consumer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876");

        consumer.subscribe("topic_broker_test", "TagA || tagB");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName()
                        + " Receive New Messages: " + msgs);
                MessageExt msg = msgs.get(0);
                if (msg.getTopic().equals("topic_broker_test")) {
                    if (msg.getTags() != null && msg.getTags().equals("tagA")) {
                        // 获取消息体
                        String message = new String(msg.getBody());
                        System.out.println("receive tagA message:" + message);
                    } else if (msg.getTags() != null
                            && msg.getTags().equals("tagB")) {
                        // 获取消息体
                        String message = new String(msg.getBody());
                        System.out.println("receive tagB message:" + message);
                    }

                }
                // 成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }

}

四、消息发送高可用测试

1.broker-a master重启

代码语言:javascript
复制
[root@master01 rocketmq]# init 6

主机名

状态

broker-a master

发送时重启

broker-a slave

正常运行

broker-b master

正常运行

broker-b slave

正常运行

发送5000条消息,在消息发送的同时关闭broker-a master

消息发送会暂停,一共发送了153条

结论:消息发送时如果有master宕机,则消息发送会终止,主机起来后消息也不会继续发送。

2.broker-a slave重启

代码语言:javascript
复制
[root@slave01 rocketmq]# init 6

主机名

状态

broker-a master

宕机

broker-a slave

发送时重启

broker-b master

正常运行

broker-b slave

正常运行

发送5000条消息,在发送过程中同时重启broker-a slave

消息发送会暂停,一共发送了339条

结论:消息发送时如果有slave宕机,则消息发送会终止,主机起来后消息也不会继续发送。

3.所有 slave关机

主机名

状态

broker-a master

正常运行

broker-a slave

宕机

broker-b master

正常运行

broker-b slave

宕机

代码语言:javascript
复制
[root@slave01 rocketmq]# init 0
[root@slave02 rocketmq]# init 0

发送5000条消息,在发送过程中同时关闭所有的slave

消息发送会暂停,一共发送了401条,这也验证了上面的结论:消息发送时如果有slave宕机,则消息发送会终止

保持两个slave宕机状态,继续发送5000条消息

console显示消息记录数为5000条

结论:slave都宕机不影响消息发送。

4.broker-b master关机

主机名

状态

broker-a master

正常运行

broker-a slave

宕机

broker-b master

宕机

broker-b slave

宕机

代码语言:javascript
复制
[root@master02 rocketmq]# init 0

发送5000条测试消息,发送前broker-b master关机,只保留broker-a master运行

console显示发送了5000条消息

结论:集群只有一台master消息发送正常。

5.关闭所有的master

主机名

状态

broker-a master

宕机

broker-a slave

正常运行

broker-b master

宕机

broker-b slave

正常运行

关闭所有的master,启动所有的slave,发送5000条消息

消息发送前:

消息发送:

console报错,消息无法发送

结论:master都宕机消息无法正常发送。

五、消息消费高可用测试

在消息消费高可用测试前先清空消息,然后发送1万条消息

1.broker-a master关机

代码语言:javascript
复制
[root@master01 rocketmq]# init 0

在消息消费时将broker-a master关机

主机名

状态

broker-a master

消费时关机

broker-a slave

正常运行

broker-b master

正常运行

broker-b slave

正常运行

消费刚发送的1万条消息,消费过程中将broker-a master关机

console日志显示消息消费了1万条

结论:某台master宕机不影响消息消费。

2.broker-a slave关机

先发送1万条消息,然后消费,消费过程中broker-a slave关机

代码语言:javascript
复制
[root@slave01 rocketmq]# init 0

dashboard的消费统计不是很准确,以eclipse的console日志为准。

主机名

状态

broker-a master

消费时关机

broker-a slave

消费时关机

broker-b master

正常运行

broker-b slave

正常运行

消费刚发送的1万条消息,消费过程中将broker-a slave关机

console显示消费了1万条

结论:某台slave宕机不影响消息消费

3.所有slave关机

先发送1万条消息,然后消费,消费过程中broker-b slave关机

代码语言:javascript
复制
[root@slave02 rocketmq]# init 0

主机名

状态

broker-a master

消费时关机

broker-a slave

消费时关机

broker-b master

正常运行

broker-b slave

消费时关机

消费刚发送的1万条消息,消费过程中将broker-b slave关机

console消费1万条消息

结论:slave都宕机不影响消息消费

4.所有master关机

拉起broker-a slave或者broker-b slave,保持broker-b master开机状态,发送1万条消息,再将所有master关机最后消费

主机名

状态

broker-a master

关机

broker-a slave

关机

broker-b master

关机

broker-b slave

正常运行

消费刚发送的1万条消息

console显示消费了1w条记录

结论:master都宕机不影响消息发送

5.所有slave关机

拉起broker-a master或者broker-b master,关闭所有的slave,发送1万条消息,然后消费

主机名

状态

broker-a master

正常运行

broker-a slave

关机

broker-b master

关机

broker-b slave

关机

消费刚发送的1万条消息

console显示消息全部被消费

结论:slave都宕机不影响消息消费

六、测试总结

  • 1.消息发送过程中只要有任意一台master或者slave宕机则发送程序暂停;
  • 2.消息发送前slave都宕机不影响消息发送;
  • 3.master或者slave都宕机不影响消息消费;
  • 4.为保证消息正常的收发,集群最小配置为必需要有一台master主机;

本文所有代码和配置文件已上传github:RocketMQ_Broker_HA_Test

单机版RocketMQ搭建详见: Centos7.6搭建RocketMQ4.8全纪录

集群版RocketMQ搭建详见: RocketMQ4.8集群搭建全纪录

集群启停详见: RocketMQ集群启停手册

集群消息收发测试: RocketMQ集群消息收发测试全纪录

NameServer高可用测试:RocketMQ集群NameServer高可用测试

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-08-02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、环境说明
  • 二、部署概况
  • 三、测试准备
    • 1.清空消息
      • 2.测试前集群查看
        • 3.新建topic
          • 4.新建订阅组
            • 5.producer代码
              • 6.consumer代码
              • 四、消息发送高可用测试
                • 1.broker-a master重启
                  • 2.broker-a slave重启
                    • 3.所有 slave关机
                      • 4.broker-b master关机
                        • 5.关闭所有的master
                        • 五、消息消费高可用测试
                          • 1.broker-a master关机
                            • 2.broker-a slave关机
                              • 3.所有slave关机
                                • 4.所有master关机
                                  • 5.所有slave关机
                                  • 六、测试总结
                                  领券
                                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档