前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringBoot RabbitMQ

SpringBoot RabbitMQ

作者头像
用户8682940
发布2021-12-02 14:12:43
5370
发布2021-12-02 14:12:43
举报
文章被收录于专栏:alexzhangalexzhang

RabbitMQ 介绍

RabbitMQ的流程是:生产者将消息发送到对应交换机上,交换机再将消息转发到绑定的队列上,消费者从绑定的队列获取消息进行消费。

image.png

交换机的主要作用是接收相应的消息并且绑定到指定的队列.交换机有四种类型,分别为Direct,topic,headers,Fanout.

SpringBoot整合RabbitMQ

添加依赖:

代码语言:javascript
复制
<!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

在application.yml文件中添加配置

代码语言:javascript
复制
spring:
  application:
    name: async-task
  rabbitmq:
    host: 192.168.255.255
    port: 5672
    username: xxx
    password: 123456

Direct模式

Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即创建消息队列的时候,指定一个路由键(RoutingKey).当发送者发送消息的时候,指定对应的Key.当Key和消息队列的RoutingKey一致的时候,消息将会被发送到该消息队列中.

代码语言:javascript
复制
@Configuration
public class RabbitMQConfig{
    // 交换机有四种类型,分别为Direct,topic,headers,Fanout.

    // Direct 模式创建队列
    // 创建队列
    @Bean
    public Queue testQueue() {
        return new Queue("queueName");
    }

    // 创建一个交换机
    @Bean
    public DirectExchange testExchange() {
        return new DirectExchange("exchangeName");
    }

    // 把队列和交换机绑定在一起
    @Bean
    public Binding testBinding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("routingKey");
    }
}

// 消息生产者
@Component
public class HelloSender {
    @Autowired
    private RabbitTemplate template;
    
    public void send() {
    template.convertAndSend("exchangeName", "routingKey", "hello,rabbit~");
    }
}

// 定义消费者
@Component
public class TestConsumer {

    @RabbitListener(queues = "queueName")
    public void process(String data){
        log.info("------------data: {}",data);
    }
}

AmqpTemplate 发送的消息数据还可以是对象,但对象必须序列化

代码语言:javascript
复制
// 消息生产者
@Component
public class HelloSender {
    @Autowired
    private RabbitTemplate template;
    
    public void send() {
    User user = new User("name","password");
    template.convertAndSend("exchangeName", "routingKey",user);
    }
}


// 定义消费者
@Component
public class TestConsumer {

    @RabbitListener(queues = "queueName")
    public void process(User user){
        log.info("------------user: {}",user);
    }

@RabbitListener 可以作用在类上,需要和 @RabbitHandler 配合使用

代码语言:javascript
复制
@Component
@RabbitListener(queues = "queueName")
public class TestConsumer {

    @RabbitHandler
    public void process(String data){
        log.info("------------data: {}",data);
    }

    @RabbitHandler
    public void process(UserInfo userInfo){
        log.info("------------userInfo: {}",userInfo);
    }
    
    public void test(){
        log.info("------------");
    }
}

上面的TestConsumer 消费者接收所有路由键为 routingKey 的消息,队列中的消息会转发到被@RabbitHandler修饰的方法然后被消费,不同的消息类型被转发到对应的方法中。test()方法不会消费消息 RabbitMq 服务启动后会创建一个默认的DirectExchange,这个交换机只接收 路由键routingKey 和 队列名称相同的消息,所以direct模式可以简化:

代码语言:javascript
复制
@Configuration
public class RabbitMQConfig{
    // Direct 模式创建队列
    // 创建队列
    @Bean
    public Queue testQueue() {
        return new Queue("routingKey");  // 队列名和routingKey相同
    }
}

// 消息生产者
@Component
public class HelloSender {
    @Autowired
    private RabbitTemplate template;
    
    public void send() {
    template.convertAndSend("routingKey", "hello,rabbit~"); // 没有交换机名称,消息会被发送到默认交换机,然后被转发到 名称和routingKey相同的队列上
    }
}

// 定义消费者
@Component
public class TestConsumer {

    @RabbitListener(queues = "routingKey")
    public void process(String data){
        log.info("------------data: {}",data);
    }
}

Topic模式

topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中. 通配符:* 表示一个词,# 表示零个或多个词 注意: 通配符是针对交换机的!!!也就是说消息进入交换机时才进行通配符匹配,匹配完了以后才进入固定的队列

代码语言:javascript
复制
@Configuration
public class RabbitMQConfig{

   // 交换机有四种类型,分别为Direct,topic,headers,Fanout.

   // topit 模式

   @Bean(name="queueName1")
   public Queue queueMessage1() {
       return new Queue("queueName1");      // 定义第一个队列,名称为 queueName1
   }
   @Bean(name="queueName2")
   public Queue queueMessage2() {
       return new Queue("queueName2");     // 定义第二个队列,名称为 queueName2
   }
   @Bean
   public TopicExchange exchange() {
       return new TopicExchange("exchangeName");  // 定义交换机
   }
   // 定义绑定关系,通过交换机 将名称为queueName1 的队列绑定到交换机上, routingKey 为 topic.key1
   @Bean
   public Binding bindingExchangeMessage(@Qualifier("queueName1") Queue queue, TopicExchange exchange) {
       return BindingBuilder.bind(queue).to(exchange).with("topic.key1");
   }
   // 定义另一个绑定关系,通过交换机 将名称为queueName2 的队列绑定到交换机上 ,routingKey 是符合 通配符topic.#  的路由键
   // 如:topic.xx、topic.yy 等
   @Bean
   public Binding bindingExchangeMessages(@Qualifier("queueName2") Queue queue, TopicExchange exchange) {
       return BindingBuilder.bind(queue).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词
   }

消费者

代码语言:javascript
复制
    @RabbitListener(queues="queueName1")    //监听器监听指定的Queue
    public void process1(String str) {    
        System.out.println("message:"+str);
    }
    @RabbitListener(queues="queueName2")    //监听器监听指定的Queue
    public void process2(String str) {
        System.out.println("messages:"+str);
    }

然后发送消息

代码语言:javascript
复制
// 2个消费者都会收到消息
template.convertAndSend("exchangeName", "topic.key1", "data info");
// 只有第2个消费者收到消息
template.convertAndSend("exchangeName", "topic.key2", "data info");
// 只有第2个消费者收到消息
template.convertAndSend("exchangeName", "topic.key3", "data info");

Fanout模式

fanout是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略. 因此我们发送到交换机的消息会使得绑定到该交换机的每一个Queue接收到消息,这个时候就算指定了路由键(routingKey),或者规则(即上文中convertAndSend方法的参数2),也会被忽略!

代码语言:javascript
复制
// fanout模式
    @Bean(name="Amessage")
    public Queue AMessage() {
        return new Queue("fanout.A");
    }


    @Bean(name="Bmessage")
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean(name="Cmessage")
    public Queue CMessage() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");//配置广播路由器
    }

    @Bean
    Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }

然后发送消息

代码语言:javascript
复制
template.convertAndSend("fanoutExchange", "", "data info"); // 第二个参数会被忽略

消费者

代码语言:javascript
复制
    @RabbitListener(queues="fanout.A")
    public void processA(String str1) {
        System.out.println("ReceiveA:"+str1);
    }
    @RabbitListener(queues="fanout.B")
    public void processB(String str) {
        System.out.println("ReceiveB:"+str);
    }
    @RabbitListener(queues="fanout.C")
    public void processC(String str) {
        System.out.println("ReceiveC:"+str);
    }

结果三个都收到消息

image.png

RabbitMQ 实现延迟队列

rabbitMQ可以通过死信机制来实现延迟队列的功能,一些概念: 1、TTL ,即 Time-To-Live,存活时间,消息和队列都可以设置存活时间 2、Dead Letter,即死信,若给消息设置了存活时间,当超过存活时间后消息还没有被消费,则该消息变成了死信 3、Dead Letter Exchanges(DLX),即死信交换机 4、Dead Letter Routing Key (DLK),死信路由键 直接上代码:

代码语言:javascript
复制
@Configuration
public class RabbitMQConfig { 

    // 创建一个立即消费队列
    @Bean(QueueName.ImmediateQueue)
    public Queue immediateQueue() {
        return new Queue(QueueName.ImmediateQueue);
    }

    @Bean(ExchangeName.IMMEDIATE)
    public DirectExchange immediateExchange() {
        return new DirectExchange(ExchangeName.IMMEDIATE);
    }

    // 把 立即消费的队列 和 立即消费的exchange 绑定在一起
    @Bean
    public Binding immediateBinding(@Qualifier(QueueName.ImmediateQueue) Queue queue, @Qualifier(ExchangeName.IMMEDIATE) DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(RoutingKey.IMMEDIATE_ROUTING_KEY);
    }


    // 创建一个延时队列
    @Bean(QueueName.DelayQueue)
    public Queue delayQueue() {
        Map<String, Object> params = new HashMap<>();

        // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
        params.put("x-dead-letter-exchange", ExchangeName.IMMEDIATE);

        // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key
        params.put("x-dead-letter-routing-key", RoutingKey.IMMEDIATE_ROUTING_KEY);

        // 设置队列中消息的过期时间,单位 毫秒
        params.put("x-message-ttl", 5 * 1000);

        return new Queue(QueueName.DelayQueue, true, false, false, params);
    }

    @Bean(ExchangeName.DELAY)
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(ExchangeName.DEAD_LETTER);
    }

    // 把 延迟消费的队列 和 延迟消费的exchange 绑定在一起
    @Bean
    public Binding delayBinding(@Qualifier(QueueName.DelayQueue) Queue queue, @Qualifier(ExchangeName.DELAY) DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(RoutingKey.DELAY_KEY);
    }
}

@Component
public class TestConsumer {
    @RabbitListener(queues = QueueName.ImmediateQueue)
    public void process2(UserInfo userInfo){
        log.info("------------userInfo: {}",userInfo);
    }
}

// 常量
public interface ExchangeName {
    String IMMEDIATE = "immediate";
    String DELAY = "delay";
}
public interface QueueName {
    String ImmediateQueue = "ImmediateQueue";
    String DelayQueue = "DelayQueue";
}
public interface RoutingKey {
    String IMMEDIATE_ROUTING_KEY = "immediate.key";
    String DELAY_KEY = "delay.key";
}

image.png

过程: 1、先创建一个普通队列,即上面的 ImmediateQueue,创建一个普通交换机 immediateExchange,绑定两者。 2、创建一个延迟队列,即创建时设置了参数:x-dead-letter-exchange,x-dead-letter-routing-key,x-message-ttl,该队列就相当于是一个延迟队列了 3、创建延迟交换机(其实也是普通交换机),和延迟队列绑定 4、给ImmediateQueue创建监听消费者,注意,延迟队列不要设置监听消费者,不然延迟队列就变成普通队列了,不起作用 到此延迟队列已完成,直接发送消息到延迟交换机即可

代码语言:javascript
复制
        UserInfo userInfo = new UserInfo();
        userInfo.setPhone("15800000000");
        userInfo.setUserName("aaaaaaa");

        log.info("开始发送消息");
        template.convertAndSend(ExchangeName.DELAY, RoutingKey.DELAY_KEY, userInfo);

原理:发送消息到延迟交换机,延迟交换机将消息转发到延迟队列,因为延迟队列没有监听消费者,所以消息不会被消费,直到消息超过存活时间(即 延迟)变成死信,这时延迟队列会将死信转发到死信交换机,即上面的immediateExchange(因为延迟队列绑定的死信交换机x-dead-letter-exchange指向了immediateExchange),immediateExchange将消息转发给ImmediateQueue,然后被监听消费者消费

测试结果:

image.png

可以看出过了5秒才消费消息

注意: 一个延迟队列只能设置一个存活时间,即该延迟队列里面的所有消息的存活时间都必须一致,如果需要设置不一样的存活时间,只能再创建一个延迟队列。原因是延迟队列并不会去扫描队列里面所有消息的存活时间,只会判断队列头的第一个消息是否过期,若过期了就转发消息,否则一直等待,即使队列后面已经有消息先过期,也只能等前面的消息被转发后,该消息才被转发。

消息确认机制

消息确认分为两部分: 生产确认 和 消费确认。 生产确认: 生产者生产消息后,将消息发送到交换机,触发确认回调;交换机将消息转发到绑定队列,若失败则触发返回回调。 消费确认: 默认情况下消息被消费者从队列中获取后即发送确认,不管消费者处理消息时是否失败,不需要额外代码,但是不能保证消息被正确消费。我们增加手动确认,则需要代码中明确进行消息确认。 在配置文件中添加:

代码语言:javascript
复制
spring:
 application:
   name: async-task
 rabbitmq:
   host: 192.168.0.0
   port: 5672
   username: username
   password: password
   publisher-confirms: true  # 开启发送确认
   publisher-returns: true   # 开启发送失败退回
   template:
     mandatory: true
   listener:
     type: simple
     simple:
       acknowledge-mode: manual # 开启消息消费手动确认

在RabbitMQConfig 中添加如下配置

代码语言:javascript
复制
    @Bean
    public RabbitTemplate getTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate template = new RabbitTemplate(connectionFactory);

        // 消息发送到交换器Exchange后触发回调
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                //  可以进行消息入库操作
                log.info("消息唯一标识 correlationData = {}", correlationData);
                log.info("确认结果 ack = {}", ack);
                log.info("失败原因 cause = {}", cause);
            }
        });

        // 配置这个,下面的ReturnCallback 才会起作用
        template.setMandatory(true);
        // 如果消息从交换器发送到对应队列失败时触发(比如 根据发送消息时指定的routingKey找不到队列时会触发)
        template.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                //  可以进行消息入库操作
                log.info("消息主体 message = {}", message);
                log.info("回复码 replyCode = {}", replyCode);
                log.info("回复描述 replyText = {}", replyText);
                log.info("交换机名字 exchange = {}", exchange);
                log.info("路由键 routingKey = {}", routingKey);
            }
        });

        return template;
    }

成功确认: void basicAck(long deliveryTag, boolean multiple) throws IOException; deliveryTag:该消息的index multiple:是否批量. true:将一次性ack所有小于deliveryTag的消息。 消费者成功处理后,调用channel.basicAck(message.getMessageProperties().getDeliveryTag(), false)方法对消息进行确认。

失败确认: void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException; deliveryTag:该消息的index。 multiple:是否批量. true:将一次性拒绝所有小于deliveryTag的消息。 requeue:是否重新入队列。

拒绝 void basicReject(long deliveryTag, boolean requeue) throws IOException; deliveryTag:该消息的index。 requeue:被拒绝的是否重新入队列。

channel.basicNack 与 channel.basicReject 的区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息

消费者消息确认代码:

代码语言:javascript
复制
@Component
public class OrderConsumer {

    @Autowired
    private TaskOrderFeign orderFeign;

    @RabbitListener(queues = QueueName.OrderCancelQueue)
    public void process(OrderDTO orderDTO, Message message, Channel channel){

        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("---消费消息---------deliveryTag = {} ,  orderDTO: {}",deliveryTag ,orderDTO);

            // 取消订单
            orderDTO.setState(OrderStateEnum.DELAY_CANCEL.code);
            Response response = orderFeign.cancelOrder(orderDTO);

            //TODO 判断结果,是否需要重试

            // 成功确认消息
            channel.basicAck(deliveryTag, true);

        } catch (IOException e) {
            log.error("确认消息时抛出异常 , e = {}", PrintUtil.print(e));
            // 重新确认
            // 成功确认消息
            try {
                Thread.sleep(50);
                channel.basicAck(deliveryTag, true);
            } catch (IOException | InterruptedException e1) {
                log.error("确认消息时抛出异常 , e = {}", PrintUtil.print(e));
                // 可以考虑入库
            }

        } catch (Exception e) {

            log.error("取消订单失败 , e = {}", PrintUtil.print(e));

            try {
                // 失败确认
                channel.basicNack(deliveryTag, false, false);
            } catch (IOException e1) {
                log.error("消息失败确认失败 , e1 = {}", PrintUtil.print(e1));
            }
        }
    }
}

作者:万物归于简 链接:https://www.jianshu.com/p/2e6280ab2bc9 来源:简书 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RabbitMQ 介绍
  • SpringBoot整合RabbitMQ
  • Direct模式
  • Topic模式
  • Fanout模式
  • RabbitMQ 实现延迟队列
  • 消息确认机制
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档