文档中心>消息队列 RabbitMQ 版>快速入门>步骤4:使用 SDK 收发消息

步骤4:使用 SDK 收发消息

最近更新时间:2025-07-24 15:02:11

我的收藏
在控制台上完成集群、Vhost 等资源配置后,您可以使用我们提供的 SDK Demo 连接集群,进行消息收发测试。本文以调用 Java SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

已完成前期的 RabbitMQ 集群资源创建。
已参考准备工作完成 Linux 服务器准备和环境配置。

一、准备配置

2. 将下载下来的 Demo 上传到同一个 VPC 下的 Linux 服务器,然后登录 Linux 服务器,进入 src/***/tdmq/rabbitmq/demo 目录下。
3. 修改该目录下的 Constant.java 文件。
package com.tencent.tdmq.rabbitmq.demo;

public class Constant {


/**
* RabbitMq服务地址
* 集群详情-客户端接入页面,复制该接入点
* 比如amqp://1.1.1.1:5672,这里只需要填写ip即可
*/
public static final String URI = "1.1.1.1";

/**
* 用户名
* 需要先在控制台上创建该用户,也可以使用集群详情-web控制台访问地址页面,admin账号
*/
public static final String USERNAME = "test";

/**
* 密码
* 需要先在控制台上创建该密码
*/
public static final String PASSWORD = "test";

/**
* 用于指定Virtual Hosts
* 这里填写自定义的vhost,需要先在控制台上创建该vhost
*/
public static final String VHOST_NAME = "test";
}

参数
说明
URI
集群的接入地址,在集群基本信息页面的客户端接入模块获取。比如接入地址为 amqp://1.1.1.1:5672,这里只需要填写 IP 即可。

USERNAME
用户名称,填写在控制台创建的用户名称。

PASSWORD
用户密码,填写在控制台创建用户时填写的密码。
VHOST_NAME
Vhost 名称,在控制台的 Vhost 列表页面获取。


二、生产消息

1. 编译并运行生产消息程序 MessageProducer.java。此处以 hello world 目录下的简单消息收发程序为例。
package com.tencent.tdmq.rabbitmq.demo.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.tencent.tdmq.rabbitmq.demo.Constant;

/**
* rabbitmq Hello World 模型示例
* 消息生产者
*/
public class MessageProducer {

/**
* 消息队列名称
*/
public static final String QUEUE_NAME = "hello-world-queue";


public static void main(String[] args) throws Exception {
// 连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址
factory.setHost(Constant.URI);
// 设置Virtual Hosts
factory.setVirtualHost(Constant.VHOST_NAME);
// 设置用户名
factory.setUsername(Constant.USERNAME);
// 设置密码
factory.setPassword(Constant.PASSWORD);
Connection connection = null;
Channel channel = null;
try {
// 获取连接
connection = factory.newConnection();
// 建立通道
channel = connection.createChannel();
// 绑定消息队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello World!";
// 发布消息 (hello world 消息模型无需指定交换机类型)
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [Producer(hello world)] Sent '" + message + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放资源
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}
}
说明:
QUEUE_NAME 为 Queue 名称,在控制台的 Queue 列表页面获取。

2. 运行结果如下:
[Producer(hello world)] Sent 'Hello World!'

三、消费消息

1. 编译并运行消费消息程序 MessageConsumer.java。
package com.tencent.tdmq.rabbitmq.demo.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.tencent.tdmq.rabbitmq.demo.Constant;

import java.nio.charset.StandardCharsets;

/**
* rabbitmq Hello World 模型示例
* 消息消费者
*/
public class MessageConsumer {

/**
* 消息队列名称
*/
public static final String QUEUE_NAME = "hello-world-queue";

public static void main(String[] args) throws Exception {
// 连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址
factory.setHost(Constant.URI);
// 设置Virtual Hosts
factory.setVirtualHost(Constant.VHOST_NAME);
// 设置用户名
factory.setUsername(Constant.USERNAME);
// 设置密码
factory.setPassword(Constant.PASSWORD);
// 获取连接、建立通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 绑定消息队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [Consumer(hello world)] Waiting for messages.");
// 消息处理回调;当收到消息后会执行该消息处理逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [Consumer(hello world)] Received '" + message + "'");
};
// 订阅消息 (第二个参数为true表示收到消息自动确认)
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
System.out.println("consumerTag = " + consumerTag);
});
}
}

说明:
QUEUE_NAME 为 Queue 名称,在控制台的 Queue 列表页面获取。

2. 运行结果如下。
[Consumer(hello world)] Waiting for messages.
[Consumer(hello world)] Received 'Hello World!'
consumerTag = amq.ctag-xxx(具体的 tag 字符串)