前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >整合RabbitMQ&Spring

整合RabbitMQ&Spring

作者头像
用户1212940
发布2022-04-13 15:34:12
2580
发布2022-04-13 15:34:12
举报
文章被收录于专栏:Lambda

RabbitAdmin

RabbitAdmin类可以很好的操作RabbitMQ,在spring中直接进行注入即可

代码语言:javascript
复制
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
  RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
  rabbitAdmin.setAutoStartUp(true);
  return rabbitAdmin;
}
  • 注意: autoStartUp必须设置为true,否则Spring容器不会加载RabbitAdmin类
  • RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean方式的声明
  • 然后使用RabbitTemplate的execute方法指定对应的声明、修改、删除等一系列RabbitMQ基础功能操作。
  • 例如:添加一个交换机、删除一个绑定、清空一个队列的消息等等就要使用的RabbitAdmin

实例:

  1. 添加maven依赖
代码语言:javascript
复制
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
    <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>3.6.5</version>
    </dependency>
  1. 编写RabbitMQConfig类
代码语言:javascript
复制
package com.pyy.spring;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan({"com.pyy.spring.*"})
public class RabbitMQConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("192.168.43.113:5672");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}
  1. 编写测试类
代码语言:javascript
复制
@Test
    public void testAdmin() {
        rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));

        rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));

        rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));

        rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));

        rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));

        rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));

        rabbitAdmin.declareBinding(new Binding("test.direct.queue",
                Binding.DestinationType.QUEUE, "test.direct", "direct", new HashMap<>()));


        rabbitAdmin.declareBinding(BindingBuilder.bind(
                new Queue("test.topic.queue1", false))  // 直接创建队列
                .to(new TopicExchange("test.topic", false, false))// 直接创建交换机建立关系
                .with("user.#"));// 直接指定路由键

        rabbitAdmin.declareBinding(BindingBuilder.bind(
                new Queue("test.topic.queue1", false))  // 直接创建队列
                .to(new FanoutExchange("test.topic", false, false)));// 直接创建交换机建立关系
    }

SpringAMQP-RabbitMQ声明式配置使用

  • 在Rabbit基础API里面声明一个Exchange、声明一个绑定、一个队列: channel.exchangeDeclare(exchangeName, exchangeType, true, false, false,null); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, routingkey);
  • 使用SpringAMQP的声明,就需要使用SpringAMQP的如下模式,即声明@Bean方式:
代码语言:javascript
复制
@Bean
public TopicExchange exchange() {
  return new TopicExchange("topicExchange", true, false);
}

@Bean
public Queue queue() {
  return new Queue("queue", true);
}

@Bean
public Binding binding() {
  return new BindingBuilder.bind(queue()).to(exchange()).with("spring.*");
}

SpringAMQP消息模板组件-RabbitTemplate实战

  • RabbitTemplate, 即消息模板 我们在与SpringAMQP整合的时候进行发送消息的关键类
  • 该类提供了丰富的发送消息方法,包括可靠性消息投递方法回调监听消息接口ConfirmCallback返回值确认接口ReturnCallback等等。同样我们需要进行注入到Spring容器中,然后直接使用。
  • 在与Spring整合时需要实例化,当时在与SpringBoot整合时,在配置文件中添加配置即可。

配置注入:

代码语言:javascript
复制
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //rabbitTemplate.setConfirmCallback(null);
        // rabbitTemplate.setReturnCallback(null);
        return rabbitTemplate;
    }

编写测试类:

代码语言:javascript
复制
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage() {
        // 1 创建消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc", "信息描述:。。。");
        messageProperties.getHeaders().put("type", "自定义消息类型");

        Message message = new Message("hello Rabbitmq".getBytes(), messageProperties);

        // 2 发送消息
        rabbitTemplate.convertAndSend("test.topic.exchange", "user.#", message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                System.out.println("------添加额外设置--------");
                message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                message.getMessageProperties().getHeaders().put("attr", "额外新加的描述");
                return message;
            }
        });
    }

    @Test
    public void testSendMessage2() {
        // 1 创建消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
        
        Message message = new Message("消息1234".getBytes(), messageProperties);

        // 2 发送消息
        rabbitTemplate.convertAndSend("test.topic.exchange", "user.#", message);

        rabbitTemplate.convertAndSend("test.topic.exchange", "user.#", "hello object message send");

        rabbitTemplate.convertAndSend("test.topic.exchange", "user.abc", "12234");
        
        rabbitTemplate.send("test.topic.exchange", "user.#", message);
    }

SpringAMQP消息容器-SimpleMessageListenerContainer详解

简单消息监听容器

  • 这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
  • 监听队列(多个队列)、自动启动、自动声明功能
  • 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等
  • 设置消费者数量、最小最大数量、批量消费配置
  • 设置消息确认(签收)和自动确认模式、是否重回队列、异常捕获handler函数
  • 设置消费者标签生成策略、是否独占模式、消费者属性等
  • 设置具体的监听器、消息转换器等等。

注意:SimpleMessageListenerContainer 可以进行动态设置,比如在运行中的应用可以动态修改其消费者数量大小、接收消息的模式等。

很多基于RabbitMQ的自定制的后端管控台在进行动态设置的时候,也是根据这一特性实现的。所有可以看出SpringAMQP非常强大。

代码语言:javascript
复制
@Bean
    public Queue queue001() {
        Queue queue = new Queue("queue001", true);
        return queue;
    }

    @Bean
    public Queue queue002() {
        Queue queue = new Queue("queue002", true);
        return queue;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue001(), queue002());
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(5);
        container.setDefaultRequeueRejected(false);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());
                System.out.println("---消费者---" + msg);
            }
        });
        return container;
    }

思考问题:SimpleMessageListenerContainer为什么可以动态感知配置变更?

SpringAMQP消息适配器-MessageListenerAdapter使用

MessageListenerAdapter 即消息监听适配器

MessageDelegate:

代码语言:javascript
复制
package com.pyy.spring;
public class MessageDelegate {

    /**
     * 方法名称固定、参数类型固定
     * @param messageBody
     */
    public void hadnleMessage(byte[] messageBody) {
        System.out.println("默认方法,消息内容:" + new String(messageBody));
    }

    public void consumeMessage(byte[] messageBody) {
        System.out.println("字节数组方法,消息内容:" + new String(messageBody));
    }

    public void consumeMessage(String messageBody) {
        System.out.println("字符串方法,消息内容:" + new String(messageBody));
    }
}

TextConverter:

代码语言:javascript
复制
package com.pyy.spring;


import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;

public class TextMessageConverter implements MessageConverter {


    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(object.toString().getBytes(), messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        String contentType = message.getMessageProperties().getContentType();
        if(null != contentType && contentType.contains("text")) {
            return new String(message.getBody());
        }
        return message.getBody();
    }
}

RabbitConfig:

代码语言:javascript
复制
@Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue001(), queue002());
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(5);
        container.setDefaultRequeueRejected(false);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });
//        container.setMessageListener(new ChannelAwareMessageListener() {
//            @Override
//            public void onMessage(Message message, Channel channel) throws Exception {
//                String msg = new String(message.getBody());
//                System.out.println("---消费者---" + msg);
//            }
//        });

        // 适配器方式,默认方法名称:handleMessage
        // 可以自定义方法名称:
        // 也可以添加一个转换器:从字节数组转换为String
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");// 设置默认监听方法名称
        adapter.setMessageConverter(new TextMessageConverter());

        container.setMessageListener(adapter);
        return container;
    }
  • 通过messgeListenerAdapter的代码我们可以看出如下核心属性:
  • defaultListenerMethod默认监听方法名称:用于设置监听方法名称
  • Delegate委托对象:实际真是的委托对象,用于处理消息

SpringAMQP消息转换器-MessageConverter

  • 我们在进行发送消息时候,正常情况下消息体为二进制数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter
  • 自定义常用转换器:MessageConverter,一般来讲都需要实现这个接口 重写下面两个方法: toMessage: java对象转换为Message formMessage: Message对象转换为java对象
  • Json转换器:Jackson2JsonMessageConverter:可以进行java对象的转换功能
  • DefaultJackson2JavaTypeMapper映射器:可以进行java对象的映射关系
  • 自定义二进制转换器:比如图片类型、PDF、PPT、流媒体
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018/10/19 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RabbitAdmin
  • SpringAMQP-RabbitMQ声明式配置使用
  • SpringAMQP消息模板组件-RabbitTemplate实战
  • SpringAMQP消息容器-SimpleMessageListenerContainer详解
  • SpringAMQP消息适配器-MessageListenerAdapter使用
  • SpringAMQP消息转换器-MessageConverter
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档