在控制台上完成集群、Vhost 等资源配置后,您可以使用我们提供的 SDK Demo 连接集群,进行消息收发测试。本文以调用 Java SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
前提条件
已完成前期的 RabbitMQ 集群资源创建。
已参考准备工作完成 Linux 服务器准备和环境配置。
一、准备配置
1. 下载 SDK Demo。
2. 将下载下来的 Demo 上传到同一个 VPC 下的 Linux 服务器,然后登录 Linux 服务器,进入
src/***/tdmq/rabbitmq/demo
目录下。3. 修改该目录下的 Constant.java 文件。
package com.tencent.tdmq.rabbitmq.demo;public class Constant {/*** RabbitMq服务地址* 集群详情-客户端接入页面,复制该接入点* 比如amqp://1.1.1.1:5672,这里只需要填写ip即可*/public static final String URI = "1.1.1.1";/*** 用户名* 需要先在控制台上创建该用户,也可以使用集群详情-web控制台访问地址页面,admin账号*/public static final String USERNAME = "test";/*** 密码* 需要先在控制台上创建该密码*/public static final String PASSWORD = "test";/*** 用于指定Virtual Hosts* 这里填写自定义的vhost,需要先在控制台上创建该vhost*/public static final String VHOST_NAME = "test";}
参数 | 说明 |
URI | 集群的接入地址,在集群基本信息页面的客户端接入模块获取。比如接入地址为 amqp://1.1.1.1:5672 ,这里只需要填写 IP 即可。![]() |
USERNAME | 用户名称,填写在控制台创建的用户名称。 ![]() |
PASSWORD | 用户密码,填写在控制台创建用户时填写的密码。 |
VHOST_NAME | Vhost 名称,在控制台的 Vhost 列表页面获取。 ![]() |
二、生产消息
1. 编译并运行生产消息程序 MessageProducer.java。此处以
hello world
目录下的简单消息收发程序为例。package com.tencent.tdmq.rabbitmq.demo.helloworld;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.tencent.tdmq.rabbitmq.demo.Constant;/*** rabbitmq Hello World 模型示例* 消息生产者*/public class MessageProducer {/*** 消息队列名称*/public static final String QUEUE_NAME = "hello-world-queue";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置服务地址factory.setHost(Constant.URI);// 设置Virtual Hostsfactory.setVirtualHost(Constant.VHOST_NAME);// 设置用户名factory.setUsername(Constant.USERNAME);// 设置密码factory.setPassword(Constant.PASSWORD);Connection connection = null;Channel channel = null;try {// 获取连接connection = factory.newConnection();// 建立通道channel = connection.createChannel();// 绑定消息队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);String message = "Hello World!";// 发布消息 (hello world 消息模型无需指定交换机类型)channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [Producer(hello world)] Sent '" + message + "'");} catch (Exception e) {e.printStackTrace();} finally {// 释放资源if (channel != null) {channel.close();}if (connection != null) {connection.close();}}}}
说明:
QUEUE_NAME 为 Queue 名称,在控制台的 Queue 列表页面获取。

2. 运行结果如下:
[Producer(hello world)] Sent 'Hello World!'
三、消费消息
1. 编译并运行消费消息程序 MessageConsumer.java。
package com.tencent.tdmq.rabbitmq.demo.helloworld;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DeliverCallback;import com.tencent.tdmq.rabbitmq.demo.Constant;import java.nio.charset.StandardCharsets;/*** rabbitmq Hello World 模型示例* 消息消费者*/public class MessageConsumer {/*** 消息队列名称*/public static final String QUEUE_NAME = "hello-world-queue";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置服务地址factory.setHost(Constant.URI);// 设置Virtual Hostsfactory.setVirtualHost(Constant.VHOST_NAME);// 设置用户名factory.setUsername(Constant.USERNAME);// 设置密码factory.setPassword(Constant.PASSWORD);// 获取连接、建立通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 绑定消息队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);System.out.println(" [Consumer(hello world)] Waiting for messages.");// 消息处理回调;当收到消息后会执行该消息处理逻辑DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println(" [Consumer(hello world)] Received '" + message + "'");};// 订阅消息 (第二个参数为true表示收到消息自动确认)channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {System.out.println("consumerTag = " + consumerTag);});}}
说明:
QUEUE_NAME 为 Queue 名称,在控制台的 Queue 列表页面获取。

2. 运行结果如下。
[Consumer(hello world)] Waiting for messages.[Consumer(hello world)] Received 'Hello World!'consumerTag = amq.ctag-xxx(具体的 tag 字符串)