前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ 学习笔记3 - Java 使用 RabbitMQ 示例

RabbitMQ 学习笔记3 - Java 使用 RabbitMQ 示例

作者头像
张云飞Vir
发布2021-07-23 16:00:11
6930
发布2021-07-23 16:00:11
举报
文章被收录于专栏:写代码和思考写代码和思考

1. 背景

本节讲述 Java 使用 RabbitMQ 的示例,和 发送者确认回调,消费者回执的内容。

2.知识

高级消息队列协议 (AMQP) 是面向消息的中间件的平台中立的协议。Spring AMQP 项目将 Spring 的概念应用于 AMQP,形成解决方案的开发。

AMQP 的一些基本概念:

开始之前, 要使用 RabbitMQ 首先要了解 AMQP 协议的基本概念,更多可阅读我的另一篇文章

  • 生产者:一个发送消息的程序,它产生消息并发送到队列。这里是用Go写的发送端示程序例。
  • 消息队列:即 RabbitMQ 内部的队列,它安装在一个服务器中。做为消息中间件,它与具体开发语言无关,支持 Go,Java等接入连接。
  • 消费者:消费者是一个等待消息,接收消息的接收端程序示例
  • 交换机(Exchange)可以理解成邮局,交换机将收到的消息根据路由规则分发给绑定的队列(Queue)

image.png

安装 RabbitMQ

参考我的另一篇文章:https://cloud.tencent.com/developer/article/1611599

我们使用 Spring AMQP 框架来 操作 RabbitMQ 收发消息。

Spring AMQP 框架

Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。它提供了一个“模板”作为发送和接收消息的高级抽象。

该项目由两部分组成;spring-amqp 是基础抽象,spring-rabbit 是 RabbitMQ 实现。

Spring AMQP 的特征

  • 用于异步处理入站消息的侦听器容器
  • RabbitTemplate 用于发送和接收消息
  • RabbitAdmin 用于自动声明队列、交换和绑定

3. 示例

下面通过一个示例看下基本的收发消息的操作。

3.1 编写程序“生产者”

第一步:配置Rabbit的数据连接

编辑 application.yml, 指定 rabbitmq 的服务器地址,端口号,账户名密码等。

代码语言:javascript
复制
spring:
  application:
    name: producer
  rabbitmq:
    host: localhost
    virtual-host: /
    port: 5672
    username: admin
    password: admin

第二步:配置好 队列,交换机,和绑定(queue,exchange,binding)

队列里存储了消息,交换机类似邮局,而“绑定”是个“ 队列+交换机”关联关系。通过“绑定” binding 将 交换机和 队列连线在一起。

代码语言:javascript
复制
@Configuration
public class RabbitConfig {
    // 路由的 key
    public static final String ROUTING_KEY = "hello_routing_key";
    public static final String EXCHANGE_NAME = "zyf_direct_exchange";
    public static final String QUEUE_NAME = "first_queue";

    //  一个队列
    @Bean
    public Queue getFirstQueue() {
        return new Queue(QUEUE_NAME);
    }

    // 一个 直接交换机
    @Bean
    public DirectExchange getDirectExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    // 进行绑定
    @Bean
    public Binding getBinding() {
        return BindingBuilder.bind(getFirstQueue()).to(getDirectExchange()).with(ROUTING_KEY);
    }
}

第三步:发消息

RabbitTemplate 是操作发送消息的 “模板方法”,springboot 已帮忙配置好注入关系,直接拿来用就可以了。调用: rabbitTemplate.convertAndSend(...) 来发消息,需要指明 交换机的名称,和路由key。

代码语言:javascript
复制
@Service
public class BusinessService {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void sendMessage(String msg) {
        System.out.println("# sendMessage,msg=" + msg);


        String routingKey = RabbitConfig.ROUTING_KEY;
        String exchangeName = RabbitConfig.EXCHANGE_NAME;
        rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
    }

}

总结:

  • 先配置好从 “交换机”到“队列”的连线。
  • 发送者 就可以通过 交换机名称和路由 key 来发送消息。

3.2 编写程序“消费者”

然后就是准备接收消息了。

第一步:配置好 rabbitmq 的数据连接。

和上面的 发送者一样,编辑 application.yml, 指定 rabbitmq 的服务器地址,端口号,账户名密码等。

第二步:配置 异步消息的监听器

接收消息配置一个回调即可。使用 @RabbitMessageListener 注解标注。

代码语言:javascript
复制
@Component
public class RabbitMessageListener {
    public static final String QUEUE_NAME = "first_queue";

    @RabbitListener(queues = QUEUE_NAME)
    public void receive(String msg) {
        System.out.println(msg);
    }
}

至此就完成了收发消息。我的代码示例见:https://github.com/vir56k/java_demo/tree/master/rabbitmq_demo1

4. 更多扩展

4.1 生产者发送时的结果回调(确认模式)

发布是异步的——如何检测成功和失败?

发布消息是一种异步机制,默认情况下,"无法路由的消息" 会被 RabbitMQ 丢弃。为了成功发布,您可以收到异步确认,如相关发布者确认和返回 中所述。

考虑两种失败情况:

  • 发消息到不存在的交换机。
  • 发消息到交换机,但没有匹配的队列。

第一种情况的场景是 指定了 错误的交换机名称。

第二种情况的场景是 “发送者的退货” 。

(1)发送者发送消息后的 “消息确认” 回调事件

对于发布者确认 ,RabbitTemplate 需要 设置:

代码语言:javascript
复制
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
和
rabbitTemplate.setConfirmCallback(confirmCallback());

然后注册回调的实习,示例:

代码语言:javascript
复制
// 用来确认生产者 producer 将消息发送到 broker 的回调
 @Bean
 public RabbitTemplate.ConfirmCallback confirmCallback() {
     return new RabbitTemplate.ConfirmCallback() {
         @Override
         public void confirm(CorrelationData correlationData, boolean ack, String cause) {
             String id = correlationData == null ? "" : correlationData.getId();
             if (ack) {
                 // log.info(String.format("# [%s] 投递到 broker 成功!, cause=%s", id, cause));
             } else {
                 log.error(String.format("# [%s] 投递到 broker 失败!, cause=%s", id, cause));
             }
         }
     };
 }

上面的 ack 参数 指示了是否投递(到交换机)成功。

注意:一个 ConfirmCallback 仅支持 一个RabbitTemplate。

**(2)发送者的 “退货” 回调事件

对于返回的消息,模板的 mandatory 属性必须设置为true 。也需要将 CachingConnectionFactory 其 publisherReturns属性设置为true

即:

代码语言:javascript
复制
connectionFactory.setPublisherReturns(true);
...
rabbitTemplate.setReturnsCallback(returnsCallback());
// 强制标志,当 setReturnsCallback 被设置时,这里要设置为 true
rabbitTemplate.setMandatory(true);

示例:

代码语言:javascript
复制
// 发布到交换机,但没有匹配的目标队列 时,退货
@Bean
public RabbitTemplate.ReturnsCallback returnsCallback() {
    return new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            int replyCode = returnedMessage.getReplyCode();
            String replyText = returnedMessage.getReplyText();
            String exchange = returnedMessage.getExchange();
            System.out.println(String.format("# 退货消息:原因=%s, replyCode=%s, exchange=%s", replyText, replyCode, exchange));
        }
    };
}

上面方法里的 ReturnedMessage 具有以下属性:

  • message - 返回的消息本身
  • replyCode - 指示退货原因的代码
  • replyText - 退货的文字原因 - 例如 NO_ROUTE
  • exchange - 消息发送到的交换
  • routingKey - 使用的路由密钥

每个 ReturnsCallback 仅支持一个RabbitTemplate。

4.2消费者回执(确认模式)

消息接收回执是指 消息接收者 收到消息后 向 “broker” 消息代理 回复的“ 确认消息 ”

注意:这里的回执和 发送者 “没有任何关系” 。它通知到 rabbitmq ,rabbitmq 根据回执决定是 重复,或者放弃。

有三种回执模式:

  • NONE:不发送确认。RabbitMQ 将此称为“自动确认”,因为代理假定所有消息都已确认,而消费者没有采取任何行动。
  • MANUAL:侦听器必须通过调用来确认所有消息Channel.basicAck()。
  • AUTO:容器自动确认消息,除非MessageListener抛出异常。

实现手动回执时,注入 ackMode 就可以了。

代码语言:javascript
复制
    @RabbitListener(queues = QUEUE_NAME, ackMode = "MANUAL")

示例:

代码语言:javascript
复制
    int i = 0;

    // 异步 接收消息
    @RabbitListener(queues = QUEUE_NAME, ackMode = "MANUAL")
    public void receiveForManual(String msg, Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        System.out.println(msg);

        String returned_message_correlation = message.getMessageProperties().getHeader("spring_returned_message_correlation");
        log.info(String.format("# 触发 receiveForManual msg =%s ,correlationId = %s", msg, returned_message_correlation));

        i++;
        if (i % 3 == 0) {
            log.info("# 接收消息...");
            channel.basicAck(tag, false);
        } else if (i % 3 == 1) {
            log.error("# 消息已重复处理失败,拒绝再次接收...");
            channel.basicReject(tag, false);
        } else if (i % 3 == 2) {
            log.error("# 消息即将再次返回队列处理...");
            channel.basicNack(tag, false, true);
        }

    }

我的代码示例见:https://github.com/vir56k/java_demo/tree/master/rabbitmq_demo2

5.参考:

Spring AMQP 文档

https://spring.io/projects/spring-amqp

https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.messaging.amqp

https://github.com/spring-projects/spring-amqp-samples

https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#scheduling

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 背景
  • 2.知识
  • 3. 示例
    • 3.1 编写程序“生产者”
      • 第一步:配置Rabbit的数据连接
      • 第二步:配置好 队列,交换机,和绑定(queue,exchange,binding)
      • 第三步:发消息
    • 3.2 编写程序“消费者”
      • 第一步:配置好 rabbitmq 的数据连接。
      • 第二步:配置 异步消息的监听器
  • 4. 更多扩展
    • 4.1 生产者发送时的结果回调(确认模式)
      • 4.2消费者回执(确认模式)
      • 5.参考:
      相关产品与服务
      消息队列 CMQ 版
      消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档