前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ单机部署及实现延时队列

RocketMQ单机部署及实现延时队列

原创
作者头像
觉醒的光
修改2021-10-28 12:00:48
1.4K0
修改2021-10-28 12:00:48
举报
文章被收录于专栏:Guang的技术博客Guang的技术博客

主要以当前最新版 4.9.1 版的为标准讲述在linux服务器上部署单机 RocketMQ 实例,服务器默认需要事先安装好JDK。主要参考文章是RocketMQ官网的Quick Start:

https://rocketmq.apache.org/docs/quick-start/

下载安装包

Quick Start上有说明可以先下载源码包,然后本地编译生成二进制包,这里我们直接下载二进制安装包 https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip

将二进制安装包rocketmq-all-4.9.2-bin-release.zip上传到服务器的/usr/local路径下:

> unzip rocketmq-all-4.9.2-bin-release.zip > cd rocketmq-all-4.9.2-bin-release

启动Name Server

> nohup sh bin/mqnamesrv &

打开日志看下是否已经启动成功:

> tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success...

启动Broker

> nohup sh bin/mqbroker -n localhost:9876 &

打开日志看下是否已经启动成功:

> tail -f ~/logs/rocketmqlogs/broker.log The broker%s, 172.30.30.233:10911 boot success...

发送消息和接收消息

在发送和接收消息之前,需要告诉客户端Name servers的位置。方法有很多,可以通过简单设置NAMESRV_ADDR环境变量的方式实现:

> export NAMESRV_ADDR=localhost:9876

发送消息

然后启动 /usr/local/rocketmq-all-4.9.1-bin-release/lib/路径下的 rocketmq-example-4.9.1.jar 包里的生产者类org.apache.rocketmq.example.quickstart.Producer,来启动一个生产者,生产1000条消息发送到RocketMQ:

> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

SendResult [sendStatus=SEND_OK, msgId= ...

接收消息

可以另起一个会话窗口,启动 /usr/local/rocketmq-all-4.9.1-bin-release/lib/路径下的 rocketmq-example-4.9.1.jar 包里的消费者类org.apache.rocketmq.example.quickstart.Consumer ,来启动一个消费者,消费刚才生产的1000条消息:

> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer ConsumeMessageThread_%d Receive New Messages: [MessageExt...

关闭服务

先关闭Broker

> sh bin/mqshutdown broker

The mqbroker(36695) is running...

Send shutdown request to mqbroker(36695) OK

再关闭Name Server

> sh bin/mqshutdown namesrv

The mqnamesrv(36664) is running...

Send shutdown request to mqnamesrv(36664) OK

到此,我们完成了RocketMQ的单机部署,及消息的发送和接收。

需要说明的是

  1. 发生 create mapped file failed 错误怎么办?

在单机部署过程中,有可能会发生 create mapped file failed, server is busy or broken... 这种错误,我这边的原因是,需要手动在 ~/store/路径下手动先创建好文件夹 commitlog 和 consumequeue,作为消息的存储队列和消费队列的路径。

2. 怎么修改name server和broker的启动内存?

一开始按照官网的步骤来操作往往会直接启动不起来,提示内存不足,那是因为name server和broker默认的启动JVM内存是4G,而内存在不够的情况下自然就启动失败了。

这时候需要修改name server和broker的启动内存参数,broker的启动内存参数在runbroker.sh下修改,name server的启动内存参数是在runserver.sh下修改。

3. RocketMQ的延迟队列怎样实现?

下载RocketMQ 4.9.1 的源码,在rocketmq-all-4.9.1-source-release\example\src\main\java代码路径下的org.apache.rocketmq.example.quickstart.Producer类里,给生成者生产的消息添加一个msg.setDelayTimeLevel(5)设置消息延迟等级的动作。

代码语言:txt
复制
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class ScheduledMessageProducer {
   public static void main(String[] args) throws Exception {
      // 实例化一个生产者来产生延时消息
      DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
      // 启动生产者
      producer.start();
      int totalMessagesToSend = 100;
      for (int i = 0; i < totalMessagesToSend; i++) {
          Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
          // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
          message.setDelayTimeLevel(3);
          // 发送消息
          producer.send(message);
      }
       // 关闭生产者
      producer.shutdown();
  }
}

在消费者端启动消费者等待延时消息的消费:

代码语言:txt
复制
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class ScheduledMessageConsumer {
   public static void main(String[] args) throws Exception {
      // 实例化消费者
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
      // 订阅Topics
      consumer.subscribe("TestTopic", "*");
      // 注册消息监听者
      consumer.registerMessageListener(new MessageListenerConcurrently() {
          @Override
          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
              for (MessageExt message : messages) {
                  // Print approximate delay time period
                  System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later");
              }
              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          }
      });
      // 启动消费者
      consumer.start();
  }
}

在windows本地将rocketmq-all-4.9.1-source-release\example以Artifacts的方式打包成jar包,替换服务器的 /usr/local/rocketmq-all-4.9.1-bin-release/lib/路径下的 rocketmq-example-4.9.1.jar,以上面的启动命令分别启动生成者和消费者。

可以看到消息的消费比存储时间晚10秒。

延迟时间和延迟等级的对应关系如下表所示,其具体实现在 org.apache.rocketmq.store.schedule.ScheduleMessageService类的parseDelayLevel()方法中;

References

1. RocketMQ Quick Start. https://rocketmq.apache.org/docs/quick-start/

2. 知乎:Apache RocketMQ单机部署. https://zhuanlan.zhihu.com/p/106097131

3. Apache RocketMQ官网说明文档之样例. https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md#31-%E5%90%AF%E5%8A%A8%E6%B6%88%E8%B4%B9%E8%80%85%E7%AD%89%E5%BE%85%E4%BC%A0%E5%85%A5%E8%AE%A2%E9%98%85%E6%B6%88%E6%81%AF

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档