前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringBoot动态创建绑定rabbitMq队列

SpringBoot动态创建绑定rabbitMq队列

作者头像
半月无霜
发布2024-03-07 09:19:30
3330
发布2024-03-07 09:19:30
举报
文章被收录于专栏:半月无霜半月无霜

SpringBoot动态创建绑定rabbitMq队列

一、介绍

在以前,我写过一篇如何使用SpringBoot整合rabbitMq的文章。

SpringBoot整合rabbitMq | 半月无霜 (banmoon.top)

上面这种方法,是自己创建队列,交换机,绑定。生成Bean,从而实现队列等等的创建。

这种方式太过于繁琐,有没有一种方法可以快速创建呢,我们只管使用就行了

还真的有,只需要在配置文件中配置队列、交换机等信息,就可以在服务启动的时候自动创建并绑定。

一次偶然间,在csdn上看到了,动态创建rabbitMq队列的文章。

拉出来魔改了一下,只要再配置文件中配置了相关的实现,实现了队列、交换机的绑定。

同时还解决了,多个开发连接同一个rabbitMq,导致自己生产的消息,被其他同事消费走的问题。

二、代码

1)读取配置的代码

这是RabbitModuleInfoProperties.java,读取配置文件中的信息,生成信息对象

代码语言:javascript
复制
package com.banmoon.config.properties;

import com.banmoon.enums.HeadersTypeEnum;
import com.banmoon.enums.RabbitExchangeTypeEnum;
import lombok.Data;

import java.util.List;
import java.util.Map;

@Data
public class RabbitModuleInfoProperties {

    /**
     * 路由key
     */
    private String routingKey;

    /**
     * 队列信息
     */
    private Queue queue;

    /**
     * 多个队列
     */
    private List<Queue> queues;

    /**
     * 交换机信息
     */
    private Exchange exchange;

    @Data
    public static class Queue {
        /**
         * 队列名称
         */
        private String name;

        /**
         * 是否持久化,默认true持久化,重启消息不会丢失
         */
        private boolean durable = true;

        /**
         * 是否具有排他性,默认false,可多个消费者消费同一个队列
         */
        private boolean exclusive = false;

        /**
         * 当消费者均断开连接,是否自动删除队列,默认false,不自动删除,避免消费者断开队列丢弃消息
         */
        private boolean autoDelete = false;

        /**
         * 绑定死信队列的队列名称
         */
        private String deadLetterQueue;

        /**
         * 绑定死信队列的交换机名称
         */
        private String deadLetterExchange;

        /**
         * 绑定死信队列的路由key
         */
        private String deadLetterRoutingKey;

        /**
         * 其他属性设置
         */
        private Map<String, Object> arguments;
    }

    @Data
    public static class Exchange {
        /**
         * 交换机类型,默认直连交换机
         */
        private String type = RabbitExchangeTypeEnum.DIRECT.getCode();

        /**
         * 交换机名称
         */
        private String name;

        /**
         * 是否持久化,默认true持久化,重启消息不会丢失
         */
        private boolean durable = true;

        /**
         * 当所有队绑定列均不在使用时,是否自动删除交换机
         */
        private boolean autoDelete = false;

        /**
         * 是否为txl延迟交换机
         */
        private boolean txlDelay = false;

        /**
         * 交换机其他参数
         */
        private Map<String, Object> arguments;

        /**
         * 头部交换机的参数
         */
        private Map<String, Object> headersMap;

        /**
         * 头部交换机的参数匹配类型,默认是所有参数都要匹配
         */
        private String headersType = HeadersTypeEnum.ALL.getCode();
    }

}

这是RabbitModuleProperties.java,上面有多个绑定配置

代码语言:javascript
复制
package com.banmoon.config.properties;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.List;

@Data
@Configuration
@ConfigurationProperties("spring.rabbitmq")
public class RabbitModuleProperties {

    private List<RabbitModuleInfoProperties> modules;

}

2)配置文件

这个是配置,请注意交换机,队列前缀,这个就是保证不同开发之间消息隔离的关键

代码语言:javascript
复制
spring:
  rabbitmq:
    host: rabbitMq服务地址
    port: rabbitMq服务端口
    username: 帐号
    password: 密码
    virtual-host: /
    # 确认消息是否发送至交换机
    publisher-confirm-type: correlated
    publisher-confirms: true
    # 确认消息是否发送至队列
    publisher-returns: true
    # 交换机,队列前缀
    prefix: whc
    modules: 
      - routingKey: test.direct.routingKey
        queue:
          name: test.direct.queue
        exchange: 
          name: test.direct.exchange
      - routingKey: test.fanout.router.key
        queues:
          - name: test.fanout.queue.a
          - name: test.fanout.queue.b
          - name: test.fanout.queue.c
        exchange: 
          name: test.fanout.exchange
          type: fanout
      - routingKey: test.topic.routerKey.#
        queue:
          name: test.topic.queue.log
        exchange: 
          name: test.topic.exchange
          type: topic
      - routingKey: test.topic.routerKey.text
        queue:
          name: test.topic.queue.text
        exchange: 
          name: test.topic.exchange
          type: topic
      - routingKey: test.topic.routerKey.image
        queue:
          name: test.topic.queue.image
        exchange: 
          name: test.topic.exchange
          type: topic
      - routingKey: test.headers.routerKey
        queue:
          name: test.headers.queue
        exchange: 
          name: test.headers.exchange
          type: headers
          headers-map:
            authentication: "半月无霜"
      - routingKey: test.ttl.routerKey
        queue:
          name: test.ttl.queue
          deadLetterQueue: test.ttl.death.queue
          deadLetterExchange: test.ttl.death.exchange
          deadLetterRoutingKey: test.ttl.death.routerKey
          arguments: 
            x-message-ttl: 5000
        exchange:
          name: test.ttl.exchange
      - routingKey: test.txl.routerKey
        queue:
          name: test.txl.queue
        exchange:
          name: test.txl.exchange
          txl-delay: true

3)初始化时创建队列、交换机

RabbitmqConfig.java;这是一个配置类,主要得到了AmqpAdmin对象、RabbitModuleProperties对象、以及定义的前缀

代码语言:javascript
复制
package com.banmoon.config;

import com.banmoon.config.init.RabbitModuleInitializer;
import com.banmoon.config.properties.RabbitModuleProperties;
import com.banmoon.constant.RabbitmqConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class RabbitmqConfig {

    @Value(RabbitmqConstant.RABBITMQ_PREFIX)
    private String rabbitPrefix;

    @Bean
    @ConditionalOnMissingBean
    public RabbitModuleInitializer rabbitModuleInitializer(AmqpAdmin amqpAdmin, RabbitModuleProperties rabbitModuleProperties) {
        return new RabbitModuleInitializer(amqpAdmin, rabbitPrefix, rabbitModuleProperties.getModules());
    }

}

RabbitModuleInitializer.java,初始化类,主要声明队列、交换机,以及绑定都在其中

代码语言:javascript
复制
package com.banmoon.config.init;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.banmoon.config.properties.RabbitModuleInfoProperties;
import com.banmoon.enums.HeadersTypeEnum;
import com.banmoon.enums.RabbitExchangeTypeEnum;
import com.banmoon.utils.stream.StreamUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.SmartInitializingSingleton;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

@Slf4j
@AllArgsConstructor
public class RabbitModuleInitializer implements SmartInitializingSingleton {

    private AmqpAdmin amqpAdmin;

    private String rabbitmqPrefix;

    private List<RabbitModuleInfoProperties> modules;

    @Override
    public void afterSingletonsInstantiated() {
        log.info("RabbitMQ 根据配置动态创建和绑定队列、交换机");
        declareRabbitModule();
    }

    /**
     * RabbitMQ 根据配置动态创建和绑定队列、交换机
     */
    private void declareRabbitModule() {
        if (CollUtil.isEmpty(modules)) {
            return;
        }
        for (RabbitModuleInfoProperties rabbitModuleInfo : modules) {
            // 配置参数校验
            configParamValidate(rabbitModuleInfo);
            // 队列
            List<Queue> queues = convertQueue(rabbitModuleInfo.getQueues(), rabbitModuleInfo.getQueue());
            // 交换机
            RabbitModuleInfoProperties.Exchange exchangeInfo = rabbitModuleInfo.getExchange();
            Exchange exchange = convertExchange(exchangeInfo);
            // 绑定关系
            String routingKey = rabbitmqPrefix + rabbitModuleInfo.getRoutingKey();
            // 创建队列
            queues.forEach(amqpAdmin::declareQueue);
            // 创建交换机
            amqpAdmin.declareExchange(exchange);
            // 队列 绑定 交换机
            queues.forEach(queue -> {
                Binding binding;
                if (RabbitExchangeTypeEnum.HEADERS.getCode().equals(exchange.getType())) {
                    HeadersExchange headersExchange = (HeadersExchange) exchange;
                    if (HeadersTypeEnum.ALL.getCode().equals(exchangeInfo.getHeadersType())) {
                        binding = BindingBuilder.bind(queue).to(headersExchange).whereAll(exchangeInfo.getHeadersMap()).match();
                    } else {
                        binding = BindingBuilder.bind(queue).to(headersExchange).whereAny(exchangeInfo.getHeadersMap()).match();
                    }
                } else {
                    binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).and(null);
                }
                amqpAdmin.declareBinding(binding);
            });
        }
    }

    /**
     * RabbitMQ动态配置参数校验
     */
    public void configParamValidate(RabbitModuleInfoProperties rabbitModuleInfo) {
        String routingKey = rabbitModuleInfo.getRoutingKey();

        Assert.isTrue(StrUtil.isNotBlank(routingKey), "RoutingKey 未配置");

        Assert.isTrue(rabbitModuleInfo.getExchange() != null, "routingKey:{}未配置exchange", routingKey);
        Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getExchange().getName()), "routingKey:{}未配置exchange的name属性", routingKey);

        Assert.isTrue(Objects.nonNull(rabbitModuleInfo.getQueue()) || CollUtil.isNotEmpty(rabbitModuleInfo.getQueues()), "routingKey:{}未配置queue", routingKey);
    }

    public List<Queue> convertQueue(List<RabbitModuleInfoProperties.Queue> queues, RabbitModuleInfoProperties.Queue queueInfo) {
        if (CollUtil.isNotEmpty(queues)) {
            if (Objects.nonNull(queueInfo)) {
                queues.add(queueInfo);
            }
            return StreamUtil.listToList(queues, this::convertQueue);
        }
        Queue queue = convertQueue(queueInfo);
        return CollUtil.newArrayList(queue);
    }

    /**
     * 转换生成RabbitMQ队列
     */
    public Queue convertQueue(RabbitModuleInfoProperties.Queue queue) {
        String name = rabbitmqPrefix + queue.getName();
        Map<String, Object> arguments = queue.getArguments();
        // 转换ttl的类型为long
        if (arguments != null && arguments.containsKey("x-message-ttl")) {
            arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));
        }
        // 设置队列的优先级
        if (arguments != null && arguments.containsKey("x-max-priority")) {
            arguments.put("x-max-priority", Convert.toLong(arguments.get("x-max-priority")));
        }
        // 是否需要绑定死信队列
        String deadLetterQueue = queue.getDeadLetterQueue();
        String deadLetterExchange = queue.getDeadLetterExchange();
        String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
        if (StrUtil.isNotBlank(deadLetterQueue) && StrUtil.isNotBlank(deadLetterExchange) && StrUtil.isNotBlank(deadLetterRoutingKey)) {
            if (arguments == null) {
                arguments = new HashMap<>();
            }
            deadLetterQueue = rabbitmqPrefix + deadLetterQueue;
            deadLetterExchange = rabbitmqPrefix + deadLetterExchange;
            deadLetterRoutingKey = rabbitmqPrefix + deadLetterRoutingKey;
            arguments.put("x-dead-letter-exchange", deadLetterExchange);
            arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);
            // 绑定死新队列
            Queue deadQueue = new Queue(deadLetterQueue, queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
            amqpAdmin.declareQueue(deadQueue);
            Exchange deadExchange = new DirectExchange(deadLetterExchange, true, true, null);
            amqpAdmin.declareExchange(deadExchange);
            Binding binding = BindingBuilder.bind(deadQueue).to(deadExchange).with(deadLetterRoutingKey).and(null);
            amqpAdmin.declareBinding(binding);
        }
        return new Queue(name, queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
    }

    /**
     * 转换生成RabbitMQ交换机
     */
    public Exchange convertExchange(RabbitModuleInfoProperties.Exchange exchange) {
        String type = exchange.getType();
        boolean txlDelay = exchange.isTxlDelay();
        String exchangeName = rabbitmqPrefix + exchange.getName();
        boolean isDurable = exchange.isDurable();
        boolean isAutoDelete = exchange.isAutoDelete();
        Map<String, Object> arguments = exchange.getArguments();
        if (txlDelay) {
            return RabbitExchangeTypeEnum.getTxlDelayExchangeByCode(type, exchangeName, isDurable, isAutoDelete, arguments);
        }
        return RabbitExchangeTypeEnum.getExchangeByCode(type, exchangeName, isDurable, isAutoDelete, arguments);
    }

}

4)其它代码

4.1)常量

这是一个常量类,里面记录着相关的队列名称,主要是给生产者、消费者使用的。太杂乱了不好打理,故专门弄了一个常量类来进行管理

代码语言:javascript
复制
package com.banmoon.constant;

/**
 * 记录rabbitmq相关的队列,交换机,路由KEY名称
 *
 * @author banmoon
 * @date 2024/02/27 12:22:13
 */
public interface RabbitmqConstant {

    /**
     * 定义的前缀
     */
    String RABBITMQ_PREFIX = "#{'${spring.rabbitmq.prefix:}'.empty ? '' : '${spring.rabbitmq.prefix:}' + '.'}";

    /**
     * 直连测试队列
     */
    String DIRECT_TEST_QUEUE = RABBITMQ_PREFIX + "test.direct.queue";
    String DIRECT_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.direct.exchange";
    String DIRECT_TEST_ROUTING_KEY = RABBITMQ_PREFIX + "test.direct.routingKey";


    /**
     * 扇形测试队列
     */
    String FANOUT_TEST_QUEUE_A = RABBITMQ_PREFIX + "test.fanout.queue.a";
    String FANOUT_TEST_QUEUE_B = RABBITMQ_PREFIX + "test.fanout.queue.b";
    String FANOUT_TEST_QUEUE_C = RABBITMQ_PREFIX + "test.fanout.queue.c";
    String FANOUT_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.fanout.exchange";
    String FANOUT_TEST_ROUTER_KEY = RABBITMQ_PREFIX + "test.fanout.routerKey";

    /**
     * 主题测试队列
     */
    String TOPIC_TEST_QUEUE_LOG = RABBITMQ_PREFIX + "test.topic.queue.log";
    String TOPIC_TEST_QUEUE_TEXT = RABBITMQ_PREFIX + "test.topic.queue.text";
    String TOPIC_TEST_QUEUE_IMAGE = RABBITMQ_PREFIX + "test.topic.queue.image";
    String TOPIC_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.topic.exchange";
    String TOPIC_TEST_ROUTER_KEY = RABBITMQ_PREFIX + "test.topic.routerKey.#";
    String TOPIC_TEST_ROUTER_KEY_TEXT = RABBITMQ_PREFIX + "test.topic.routerKey.text";
    String TOPIC_TEST_ROUTER_KEY_IMAGE = RABBITMQ_PREFIX + "test.topic.routerKey.image";

    /**
     * 头部测试队列
     */
    String HEADERS_TEST_QUEUE = RABBITMQ_PREFIX + "test.headers.queue";
    String HEADERS_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.headers.exchange";
    String HEADERS_TEST_ROUTER_KEY = RABBITMQ_PREFIX + "test.headers.routerKey";

    /**
     * TTL测试队列
     */
    String TTL_TEST_QUEUE = RABBITMQ_PREFIX + "test.ttl.queue";
    String TTL_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.ttl.exchange";
    String TTL_TEST_ROUTER_KEY = RABBITMQ_PREFIX + "test.ttl.routerKey";
    String TTL_TEST_DEATH_QUEUE = RABBITMQ_PREFIX + "test.ttl.death.queue";
    String TTL_TEST_DEATH_EXCHANGE = RABBITMQ_PREFIX + "test.ttl.death.exchange";
    String TTL_TEST_DEATH_ROUTER_KEY = RABBITMQ_PREFIX + "test.ttl.death.routerKey";

    /**
     * TXL测试队列
     */
    String TXL_TEST_QUEUE = RABBITMQ_PREFIX + "test.txl.queue";
    String TXL_TEST_EXCHANGE = RABBITMQ_PREFIX + "test.txl.exchange";
    String TXL_TEST_ROUTER_KEY = RABBITMQ_PREFIX + "test.txl.routerKey";

}
4.2)枚举代码

在上面的创建中,我们用到了两个枚举类,没什么可说的,直接贴出来

代码语言:javascript
复制
package com.banmoon.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;

/**
 * @author banmoon
 * @date 2024/03/04 16:35:27
 */
@Getter
@AllArgsConstructor
public enum HeadersTypeEnum {

    ANY("any", "任一"),
    ALL("all", "所有"),
    ;

    private final String code;
    private final String msg;

}
代码语言:javascript
复制
package com.banmoon.enums;

import cn.hutool.core.map.MapUtil;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.springframework.amqp.core.*;

import java.util.Arrays;
import java.util.Map;
import java.util.Optional;

@Getter
@AllArgsConstructor
public enum RabbitExchangeTypeEnum {

    DIRECT("direct", "直连交换机"),
    FANOUT("fanout", "扇形交换机"),
    TOPIC("topic", "主题交换机"),
    HEADERS("headers", "头部交换机"),
    ;

    private final String code;
    private final String msg;

    public static RabbitExchangeTypeEnum getByCode(String code) {
        return getByCode(code, null);
    }

    public static RabbitExchangeTypeEnum getByCode(String code, RabbitExchangeTypeEnum defaultEnum) {
        return Arrays.stream(values()).filter(e -> e.getCode().equalsIgnoreCase(code)).findFirst().orElse(defaultEnum);
    }

    public static Exchange getExchangeByCode(String type, String exchangeName, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
        AbstractExchange exchange = null;
        switch (RabbitExchangeTypeEnum.getByCode(type)) {
            case DIRECT:
                exchange = new DirectExchange(exchangeName, durable, autoDelete, arguments);
                break;
            case TOPIC:
                exchange = new TopicExchange(exchangeName, durable, autoDelete, arguments);
                break;
            case FANOUT:
                exchange = new FanoutExchange(exchangeName, durable, autoDelete, arguments);
                break;
            case HEADERS:
                exchange = new HeadersExchange(exchangeName, durable, autoDelete, arguments);
                break;
        }
        return exchange;
    }

    public static Exchange getTxlDelayExchangeByCode(String type, String exchangeName, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
        RabbitExchangeTypeEnum typeEnum = RabbitExchangeTypeEnum.getByCode(type);
        Map<String, Object> argMap = Optional.ofNullable(arguments).orElse(MapUtil.newHashMap(2));
        argMap.put("x-delayed-type", typeEnum.getCode());
        return new CustomExchange(exchangeName, "x-delayed-message", durable, autoDelete, argMap);
    }
}

三、生产者、消费者

1)生产者

这是一个生产者抽象类,我自己写的生产者都需要继承它

代码语言:javascript
复制
package com.banmoon.queues;

import cn.hutool.extra.spring.SpringUtil;
import com.banmoon.utils.JsonUtil;
import lombok.Data;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;

import java.util.HashMap;
import java.util.Map;

/**
 * 基础的生产者
 *
 * @author banmoon
 * @date 2024/02/28 11:44:28
 */
@Data
public abstract class AbstractProducer {

    private AmqpTemplate amqpTemplate;

    private String queueName;

    private String exchangeName;

    private String routingKey;

    public AbstractProducer(AmqpTemplate amqpTemplate, String queueName, String exchangeName, String routingKey) {
        this.amqpTemplate = amqpTemplate;
        // TODO: 2024/3/2 这边还要进行修改
        this.queueName = SpringUtil.getProperty(queueName);
        this.exchangeName = SpringUtil.getProperty(exchangeName);
        this.routingKey = SpringUtil.getProperty(routingKey);
    }

    public AbstractProducer(AmqpTemplate amqpTemplate) {
        this.amqpTemplate = amqpTemplate;
    }

    public void send(Object obj) {
        String msg = JsonUtil.toJSONString(obj);
        amqpTemplate.convertAndSend(exchangeName, routingKey, msg);
    }

    public void sendTtlMesssage(Object obj, Integer delayMillisecond) {
        Map<String, Object> map = new HashMap<>(2);
        map.put("x-message-ttl", delayMillisecond);
        send(obj, map);
    }

    public void sendTxlMesssage(Object obj, Integer delayMillisecond) {
        send(obj, message -> {
            MessageProperties properties = message.getMessageProperties();
            properties.setDelay(delayMillisecond);
            return message;
        });
    }

    public void send(Object obj, Map<String, Object> headers) {
        String msg = JsonUtil.toJSONString(obj);
        amqpTemplate.convertAndSend(exchangeName, routingKey, msg, message -> {
            MessageProperties properties = message.getMessageProperties();
            properties.setHeaders(headers);
            return message;
        });
    }

    public void send(Object obj, MessagePostProcessor messagePostProcessor) {
        String msg = JsonUtil.toJSONString(obj);
        amqpTemplate.convertAndSend(exchangeName, routingKey, msg, messagePostProcessor);
    }

}

比如说直连交换机队列的生产者

代码语言:javascript
复制
package com.banmoon.queues.producer;

import com.banmoon.constant.RabbitmqConstant;
import com.banmoon.queues.AbstractProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * 直连测试队列生产者
 *
 * @author banmoon
 * @date 2024/02/28 16:39:13
 */
@Slf4j
@Component
public class TestDirectProducer extends AbstractProducer {

    public TestDirectProducer(AmqpTemplate amqpTemplate) {
        super(amqpTemplate);
    }

    @Override
    @Value(RabbitmqConstant.DIRECT_TEST_QUEUE)
    public void setQueueName(String queueName) {
        super.setQueueName(queueName);
    }

    @Override
    @Value(RabbitmqConstant.DIRECT_TEST_EXCHANGE)
    public void setExchangeName(String exchangeName) {
        super.setExchangeName(exchangeName);
    }

    @Override
    @Value(RabbitmqConstant.DIRECT_TEST_ROUTING_KEY)
    public void setRoutingKey(String routingKey) {
        super.setRoutingKey(routingKey);
    }
}

2)消费者

代码语言:javascript
复制
package com.banmoon.queues.consumer;

import com.banmoon.constant.RabbitmqConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 直连测试队列消费者
 *
 * @author banmoon
 * @date 2024/02/27 12:08:12
 */
@Slf4j
@Component
public class TestDirectConsumer {

    @RabbitListener(queues = RabbitmqConstant.DIRECT_TEST_QUEUE)
    public void test(String message) {
        log.info("直连测试队列消费者:{}", message);
    }

}

四、最后

关于上面几种交换机类型,以及TTL死信队列、TXL延迟队列都有做了配置示例。

主要是没有生产者、消费者的代码示例,相信大家都知道怎么写了。

那个,关于生产者的那个抽象类AbstractProducer.java 有一个地方一直没有调通,就是如何将spel表达式获取配置文件中的配置信息 只能退而求其次,使用@Value注解来进行获取了。 相信注解能获取的,一定有注解解析器,这边也一定可以的。 又要看源码喽! 还有那个开发环境队列隔离问题 有些公司开发使用的是同一个配置文件,这样会导致前缀都是同一个,那样设置前缀就没有意义了。 其实可以这样,如果是使用nacos的远端配置的,可以创建自己的命名空间,修改前缀。 如果是在本地resources文件夹里面,可以使用maven编译后替换变量的那个功能。 如何读取到maven中profile设置的参数 | 半月无霜 (banmoon.top) 上面两种方法,都是可以实现的

我是半月,你我一同共勉!!!

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-03-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SpringBoot动态创建绑定rabbitMq队列
    • 一、介绍
      • 二、代码
        • 1)读取配置的代码
        • 2)配置文件
        • 3)初始化时创建队列、交换机
        • 4)其它代码
      • 三、生产者、消费者
        • 1)生产者
        • 2)消费者
      • 四、最后
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档