前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ集群NameServer高可用测试

RocketMQ集群NameServer高可用测试

作者头像
loong576
发布2021-08-06 15:32:51
1.2K0
发布2021-08-06 15:32:51
举报
文章被收录于专栏:运维ABC运维ABC

一、环境说明

ip地址

主机名

操作系统版本

RocketMQ版本

JDK版本

maven版本

备注

172.16.7.91

nameserver01

centos 7.6

4.8.0

1.8.0_291

3.6

Name Server集群

172.16.7.92

nameserver03

centos 7.6

4.8.0

1.8.0_291

3.6

Name Server集群

172.16.7.93

master01

centos 7.6

4.8.0

1.8.0_291

3.6

Broker集群1

172.16.7.94

slave01

centos 7.6

4.8.0

1.8.0_291

3.6

Broker集群1

172.16.7.95

master02

centos 7.6

4.8.0

1.8.0_291

3.6

Broker集群2

172.16.7.96

slave02

centos 7.6

4.8.0

1.8.0_291

3.6

Broker集群2

二、部署概况

watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=

三、消息正常发送

1.Producer代码

代码语言:javascript
复制
package com.my.maven.rocketmq;
​
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
​
public class Producer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
            DefaultMQProducer("group_test_123");
        // Specify name server addresses.
        producer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876");
        producer.setRetryTimesWhenSendAsyncFailed(2);
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("topic_test_123" /* Topic */,
                "TagA" /* Tag */,
                ("NameServer Test" +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

发送消息NameServer Test0--NameServer Test99

2.运行Producer

watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=

3.发送查看

日志:

代码语言:javascript
复制
SendResult [sendStatus=SEND_OK, msgId=AC1B09F602B873D16E936A4028160000, offsetMsgId=AC10075D00002A9F0000000000000000, messageQueue=MessageQueue [topic=topic_test_123, brokerName=broker-a, queueId=8], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC1B09F602B873D16E936A4028470001, offsetMsgId=AC10075D00002A9F00000000000000CA, messageQueue=MessageQueue [topic=topic_test_123, brokerName=broker-a, queueId=9], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC1B09F602B873D16E936A40284C0002, offsetMsgId=AC10075D00002A9F0000000000000194, messageQueue=MessageQueue [topic=topic_test_123, brokerName=broker-a, queueId=10], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC1B09F602B873D16E936A4028520003, offsetMsgId=AC10075D00002A9F000000000000025E, messageQueue=MessageQueue [topic=topic_test_123, brokerName=broker-a, queueId=11], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC1B09F602B873D16E936A4028550004, offsetMsgId=AC10075D00002A9F0000000000000328, messageQueue=MessageQueue [topic=topic_test_123, brokerName=broker-a, queueId=12], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC1B09F602B873D16E936A4028590005, offsetMsgId=AC10075D00002A9F00000000000003F2, messageQueue=MessageQueue [topic=topic_test_123, brokerName=broker-a, queueId=13], queueOffset=0]
……

一共100条发送记录

console查看

watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=

broker-a和broker-b两个分片各发送了52和48条消息

4.Consumer代码

代码语言:javascript
复制
package com.my.maven.rocketmq;
​
import java.util.List;
​
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
​
public class Consumer {
​
    public static void main(String[] args) throws InterruptedException,
            MQClientException {
​
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
                "group_test_123");
        consumer.setNamesrvAddr("172.16.7.91:9876;172.16.7.92:9876");
​
        consumer.subscribe("topic_test_123", "TagA || TagB");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
​
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName()
                        + " Receive New Messages: " + msgs);
                MessageExt msg = msgs.get(0);
                if (msg.getTopic().equals("topic_test_123")) {
                    if (msg.getTags() != null && msg.getTags().equals("TagA")) {
                        // 获取消息体
                        String message = new String(msg.getBody());
                        System.out.println("receive TagA message:" + message);
                    } else if (msg.getTags() != null
                            && msg.getTags().equals("TagB")) {
                        // 获取消息体
                        String message = new String(msg.getBody());
                        System.out.println("receive TagB message:" + message);
                    }
​
                }
                // 成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
​
}

5.运行Consumer

watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=

6.消费查看

消费日志:

watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
代码语言:javascript
复制
Consumer Started.
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=13, storeSize=203, queueOffset=2, sysFlag=0, bornTimestamp=1626851390760, bornHost=/172.16.7.1:49540, storeTimestamp=1626851374215, storeHost=/172.16.7.95:10911, msgId=AC10075F00002A9F00000000000023AD, commitLogOffset=9133, bodyCRC=938317384, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topic_test_123', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1626851850431, UNIQ_KEY=AC1B09F602B873D16E936A402D280055, CLUSTER=MyRocketmq, WAIT=true, TAGS=TagA}, body=[78, 97, 109, 101, 83, 101, 114, 118, 101, 114, 32, 84, 101, 115, 116, 56, 53], transactionId='null'}]]
receive TagA message:NameServer Test85
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=13, storeSize=203, queueOffset=0, sysFlag=0, bornTimestamp=1626851390103, bornHost=/172.16.7.1:49540, storeTimestamp=1626851373560, storeHost=/172.16.7.95:10911, msgId=AC10075F00002A9F0000000000000A4D, commitLogOffset=2637, bodyCRC=1248836315, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topic_test_123', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1626851850427, UNIQ_KEY=AC1B09F602B873D16E936A402A970015, CLUSTER=MyRocketmq, WAIT=true, TAGS=TagA}, body=[78, 97, 109, 101, 83, 101, 114, 118, 101, 114, 32, 84, 101, 115, 116, 50, 49], transactionId='null'}]]
receive TagA message:NameServer Test21
ConsumeMessageThread_9 Receive New Messages: [MessageExt [queueId=5, storeSize=203, queueOffset=1, sysFlag=0, bornTimestamp=1626851390599, bornHost=/172.16.7.1:49540, storeTimestamp=1626851374054, storeHost=/172.16.7.95:10911, msgId=AC10075F00002A9F00000000000010A5, commitLogOffset=4261, bodyCRC=458807620, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topic_test_123', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1626851850434, UNIQ_KEY=AC1B09F602B873D16E936A402C87002D, CLUSTER=MyRocketmq, WAIT=true, TAGS=TagA}, body=[78, 97, 109, 101, 83, 101, 114, 118, 101, 114, 32, 84, 101, 115, 116, 52, 53], transactionId='null'}]]
receive TagA message:NameServer Test45
ConsumeMessageThread_13 Receive New Messages: [MessageExt [queueId=9, storeSize=203, queueOffset=2, sysFlag=0, bornTimestamp=1626851390743, bornHost=/172.16.7.1:49540, storeTimestamp=1626851374198, storeHost=/172.16.7.95:10911, msgId=AC10075F00002A9F0000000000002081, commitLogOffset=8321, bodyCRC=813716049, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topic_test_123', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1626851850435, UNIQ_KEY=AC1B09F602B873D16E936A402D170051, CLUSTER=MyRocketmq, WAIT=true, TAGS=TagA}, body=[78, 97, 109, 101, 83, 101, 114, 118, 101, 114, 32, 84, 101, 115, 116, 56, 49], transactionId='null'}]]
receive TagA message:NameServer Test81
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=5, storeSize=203, queueOffset=0, sysFlag=0, bornTimestamp=1626851389767, bornHost=/172.16.7.1:49540, storeTimestamp=1626851373464, storeHost=/172.16.7.95:10911, msgId=AC10075F00002A9F00000000000003F5, commitLogOffset=1013, bodyCRC=256673844, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topic_test_123', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1626851850433, UNIQ_KEY=AC1B09F602B873D16E936A402947000D, CLUSTER=MyRocketmq, WAIT=true, TAGS=TagA}, body=[78, 97, 109, 101, 83, 101, 114, 118, 101, 114, 32, 84, 101, 115, 116, 49, 51], transactionId='null'}]]
receive TagA message:NameServer Test13
ConsumeMessageThread_13 Receive New Messages: [MessageExt [queueId=12, storeSize=203, queueOffset=1, sysFlag=0, bornTimestamp=1626851390629, bornHost=/172.16.7.1:49540, storeTimestamp=1626851374084, storeHost=/172.16.7.95:10911, msgId=AC10075F00002A9F0000000000001632, commitLogOffset=5682, bodyCRC=472350118, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topic_test_123', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1626851850436, UNIQ_KEY=AC1B09F602B873D16E936A402CA50034, CLUSTER=MyRocketmq, WAIT=true, TAGS=TagA}, body=[78, 97, 109, 101, 83, 101, 114, 118, 101, 114, 32, 84, 101, 115, 116, 53, 50], transactionId='null'}]]
receive TagA message:NameServer Test52
……

一共消费了100条消息

console查看

watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=

发现消息消费一共也是100条

四、关闭一台nameserver节点

1.关闭nameserver02

代码语言:javascript
复制
[root@nameserver02 ~]# init 0

2.发送消息

再次发送100条消息

watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=

查看日志和console,发现新增消息100条

3.消息消费

watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=

消息消费也新增100条

4.结论

结论一:当一个nameserver节点宕机时,不影响消息发送和消费。

五、关闭所有nameserver节点

1.消息发送

继续发送10000条消息,发送的同时继续关闭nameserver01

代码语言:javascript
复制
[root@nameserver01 ~]# init 0
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=

发送10000条消息,发送的同时关闭nameserver01,发现消息只发送了367条

2.消息消费

watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=

发现无法消费,无消费记录

3.结论

结论二:当所有nameserver宕机时,消息发送和接收都会无法进行。

六、开启nameserver01

1.nameserver01开机

消息发送和消费会恢复,但是会丢消息

watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=
watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=

2.结论

结论三:当nameserver集群恢复时,部分消息会恢复发送和消费,同时出现部分消息丢失情况。

七、总结

总结:为保证RocketMQ集群能正常对外提供服务,需至少保证有一台nameserver服务器处于运行状态;当所有nameserver服务器宕机时,消息无法发送和消费。

本文所有代码和配置文件已上传github:RocketMQ_NameServer_HA_Test

单机版RocketMQ搭建详见:Centos7.6搭建RocketMQ4.8全纪录

集群版RocketMQ搭建详见:RocketMQ4.8集群搭建全纪录

集群启停详见:RocketMQ集群启停手册

集群消息收发测试:RocketMQ集群消息收发测试全纪录

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-07-21 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、环境说明
  • 二、部署概况
  • 三、消息正常发送
    • 1.Producer代码
      • 2.运行Producer
        • 3.发送查看
          • 4.Consumer代码
            • 5.运行Consumer
              • 6.消费查看
              • 四、关闭一台nameserver节点
                • 1.关闭nameserver02
                  • 2.发送消息
                    • 3.消息消费
                      • 4.结论
                      • 五、关闭所有nameserver节点
                        • 1.消息发送
                          • 2.消息消费
                            • 3.结论
                            • 六、开启nameserver01
                              • 1.nameserver01开机
                                • 2.结论
                                • 七、总结
                                领券
                                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档