有奖捉虫:行业应用 & 管理与支持文档专题 HOT

操作场景

本文以调用 Java SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

完成VhostQueue用户创建。Vhost 打开 Trace 插件。

操作步骤

步骤1:安装 Java 依赖库

在 pom.xml 添加以下依赖:
<!-- in your <dependencies> block -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.13.0</version>
</dependency>

步骤2:生产消息

1. 创建并编译运行 MessageProducer.java。
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * rabbitmq Hello World 模型示例 * 消息生产者 */ public class MessageProducer { /** * 消息队列名称 */ public static final String QUEUE_NAME = "hello-world-queue"; public static void main(String[] args) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置服务地址 factory.setUri("amqp://*****"); // 设置Virtual Hosts factory.setVirtualHost("VHOST-NAME"); // 设置用户名 factory.setUsername("USERNAME"); // 设置密码 factory.setPassword("PASSWORD"); Connection connection = null; Channel channel = null; try { // 获取连接 connection = factory.newConnection(); // 建立通道 channel = connection.createChannel(); // 绑定消息队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); String message = "Hello World!"; // 发布消息 (hello world 消息模型无需指定交换机类型) channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [Producer(hello world)] Sent '" + message + "'"); } catch (Exception e) { e.printStackTrace(); } finally { // 释放资源 if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } } }
参数
说明
QUEUE_NAME
Queue名称,在控制台 Queue 列表获取。
factory.setUri
集群接入地址,集群接入地址,在集群基本信息页面的 客户端接入 模块获取。



factory.setVirtualHost
Vhost 名称,在控制台 Vhost 列表获取。
factory.setUsername
用户名称,填写在控制台创建的用户名称。
factory.setPassword
用户密码,填写在控制台创建用户时填写的密码。
2. 可以在消息查询界面,选中指定的 Vhost 和 Queue,查询消息是否发送成功。




步骤3:消费消息

创建并编译运行 MessageConsumer.java。
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; /** * rabbitmq Hello World 模型示例 * 消息消费者 */ public class MessageConsumer { /** * 消息队列名称 */ public static final String QUEUE_NAME = "hello-world-queue"; public static void main(String[] args) throws Exception { // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置服务地址 factory.setUri("amqp://*****"); // 设置Virtual Hosts factory.setVirtualHost("VHOST-NAME"); // 设置用户名 factory.setUsername("USERNAME"); // 设置密码 factory.setPassword("PASSWORD"); // 获取连接、建立通道 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 绑定消息队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println(" [Consumer(hello world)] Waiting for messages."); // 消息处理回调;当收到消息后会执行该消息处理逻辑 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [Consumer(hello world)] Received '" + message + "'"); }; // 订阅消息 (第二个参数为true表示收到消息自动确认) channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { System.out.println("consumerTag = " + consumerTag); }); } }
参数
说明
QUEUE_NAME
Queue名称,在控制台 Queue 列表获取。
factory.setUri
集群接入地址,在集群基本信息页面的 客户端接入 模块获取。



factory.setVirtualHost
Vhost 名称,在控制台 Vhost 列表获取。
factory.setUsername
用户名称,填写在控制台创建的用户名称。
factory.setPassword
用户密码,填写在控制台创建用户时填写的密码。

步骤4:查看消费者

可以在控制台 集群管理 > Queue 基本信息页面查看接入的消费者情况。



说明
上述是基于 RabbitMQ 的发布订阅模型的一个简单示例。其他示例可参见 DemoRabbitMQ 官网 实例。