操作场景
RocketMQ 5.0 提供了全新的基于 gRPC 协议的 5.x SDK,新版本 SDK 更加轻量化,多语言支持更好,建议优先使用。同时,消息队列 RocketMQ 5.x 系列也支持存量业务继续使用 4.x SDK 访问,本文以调用 4.x Java SDK 为例介绍通过开源 SDK 实现普通消息收发的操作过程。
前提条件
已完成前期的 RocketMQ 集群资源创建。
已参考准备工作完成 Linux 服务器准备和环境配置。
下载 Demo
操作步骤
步骤1:安装 Java 依赖库
在 Java 项目中引入相关依赖,以 Maven 工程为例,在 pom.xml 添加以下依赖:
<!-- in your <dependencies> block --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.7</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.7</version></dependency>
步骤2:生产消息
1. 在已创建的 Java 工程中,创建发送普通消息程序并运行。
// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer(groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL权限);// 设置NameServer的地址,地址就是形如xxx.tencenttdmq.com:8080 这样的接入地址。producer.setNamesrvAddr(nameserver);// 启动Producer实例producer.start();for (int i = 0; i < 10; i++) {// 创建消息实例,设置topic和消息内容.Message msg = new Message(topic_name, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}
参数 | 说明 |
accessKey | 角色密钥,在控制台的集群权限页面 AccessKey 列复制。 ![]() |
secretKey | 角色名称,在控制台的集群权限页面 SecretKey 列复制。 |
nameserver | 集群接入地址,在控制台集群基本信息页面的接入信息模块获取。 ![]() |
topic_name | Topic 的名称,在控制台 Topic 管理页面复制。 ![]() |
2. 运行结果如下:
SendResult [sendStatus=SEND_OK, msgId=0100017D1DC818B4AAC202F388CF0000, offsetMsgId=null, messageQueue=MessageQueue [topic=yourTopic, brokerName=broker-a, queueId=3], queueOffset=250]SendResult [sendStatus=SEND_OK, msgId=0100017D1DC818BD1CAC202F388CF0001, offsetMsgId=null, messageQueue=MessageQueue [topic=yourTopic, brokerName=broker-a, queueId=0], queueOffset=251]...SendResult [sendStatus=SEND_OK, msgId=0100017D1DC818B4AAC202F388CF0009, offsetMsgId=null, messageQueue=MessageQueue [topic=yourTopic, brokerName=broker-a, queueId=2], queueOffset=259]
步骤3:消费消息
1. 在已创建的 Java 工程中,创建订阅普通消息程序并运行。以下代码示例以 Push Consumer 为例,其他的可以参考更详细的 4.x 使用文档。
// 实例化消费者DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL权限// 设置NameServer的地址pushConsumer.setNamesrvAddr(nameserver);// 订阅topicpushConsumer.subscribe(topic_name, "*");// 注册回调实现类来处理从broker拉取回来的消息pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息处理逻辑System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费, 根据消费情况,返回处理状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 启动消费者实例pushConsumer.start();
参数 | 说明 |
accessKey | 角色密钥,在控制台的集群权限页面 AccessKey 列复制。 ![]() |
secretKey | 角色名称,在控制台的集群权限页面 SecretKey 列复制。 |
nameserver | 集群接入地址,在控制台集群基本信息页面的接入信息模块获取。 ![]() |
groupName | 消费者组名称,在控制台 Group 管理页面复制。 ![]() |
topic_name | Topic 的名称,在控制台 Topic 管理页面复制。 ![]() |
2. 发送消息后:
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=3, storeSize=287, queueOffset=250, sysFlag=0, bornTimestamp=1698765432100, bornHost=/192.168.1.100:53902, storeTimestamp=1698765432200, storeHost=/192.168.1.200:10911, msgId=0100017D1DC818B4AAC202F388CF0000, commitLogOffset=156789, bodyLength=16, body=Hello RocketMQ 0, topic=yourTopic, properties={MIN_OFFSET=0, MAX_OFFSET=251, CONSUME_START_TIME=1698765432300, UNIQ_KEY=0100017D1DC818B4AAC202F388CF0000, CLUSTER=DefaultCluster}, tags=null]]ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=0, storeSize=287, queueOffset=251, sysFlag=0, bornTimestamp=1698765432110, bornHost=/192.168.1.100:53902, storeTimestamp=1698765432210, storeHost=/192.168.1.200:10911, msgId=0100017D1DC818BD1CAC202F388CF0001, commitLogOffset=157045, bodyLength=16, body=Hello RocketMQ 1, topic=yourTopic, properties={MIN_OFFSET=0, MAX_OFFSET=252, CONSUME_START_TIME=1698765432310, UNIQ_KEY=0100017D1DC818BD1CAC202F388CF0001, CLUSTER=DefaultCluster}, tags=null]]



