专栏首页OSChinaSpringboot 整合RabbitMQ ---基于Class的开发

Springboot 整合RabbitMQ ---基于Class的开发

1 加载配置文件

package com.zjxnjz.mall.core.config;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;  


/**
 * 
 * @author daiyy
 * RabbitMQ配置读取文件
 */
@Configuration
//@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {
	@Value("${spring.rabbitmq.addresses}")
    private String addresses;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.publisher-confirms}")
    private Boolean publisherConfirms;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    // 构建mq实例工厂
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setPublisherConfirms(publisherConfirms);
        connectionFactory.setVirtualHost(virtualHost);
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }
}

2 创建实体对象(交换机,队列,俩者的绑定)

package com.zjxnjz.mall.core.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.zjxnjz.mall.core.util.RabbitMqEnum;

/**
 * 用于配置交换机和队列对应关系
 * 新增消息队列应该按照如下步骤
 * 1、增加queue bean,参见queueXXXX方法
 * 2、增加queue和exchange的binding
 * @author daiyy
 * @create 2018-4-12 上午10:33
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class RabbitMqExchangeConfig {
	/** logger */
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);

    /**
     * @Author:daiyy
     * @Description: 主题型交换机
     * @param
     * @return
     */
    /*@Bean
    TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){
        TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());
        rabbitAdmin.declareExchange(contractTopicExchange);
        logger.debug("完成主题型交换机bean实例化");
        return contractTopicExchange;
    }*/
    /**
     * 直连型交换机
     */
    @Bean
    DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {
        DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());
        rabbitAdmin.declareExchange(contractDirectExchange);
        logger.debug("完成直连型交换机bean实例化");
        return contractDirectExchange;
    }

    //在此可以定义队列

    @Bean
    Queue queueTest(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("测试队列实例化完成");
        return queue;
    }

    /*//topic 1
    @Bean
    Queue queueTopicTest1(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("话题测试队列1实例化完成");
        return queue;
    }
    //topic 2
    @Bean
    Queue queueTopicTest2(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("话题测试队列2实例化完成");
        return queue;
    }*/


    //在此处完成队列和交换机绑定
    @Bean
    Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){
        Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());
        rabbitAdmin.declareBinding(binding);
        logger.debug("测试队列与直连型交换机绑定完成");
        return binding;
    }
    //topic binding1
    /*@Bean
    Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){
        Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());
        rabbitAdmin.declareBinding(binding);
        logger.debug("测试队列与话题交换机1绑定完成");
        return binding;
    }

    //topic binding2
    @Bean
    Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){
        Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());
        rabbitAdmin.declareBinding(binding);
        logger.debug("测试队列与话题交换机2绑定完成");
        return binding;
    }*/
    
    
    /**
     * -----------------------------------app 端用户添加/和积分变动
     */
    
    // 话题性 交换机  
    @Bean
    TopicExchange appDirectExchange(RabbitAdmin rabbitAdmin){
        TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_APP_USERADD.getCode());
        rabbitAdmin.declareExchange(contractTopicExchange);
        logger.debug("完成主题型交换机bean实例化");
        return contractTopicExchange;
    }
    
    //  定义俩个对列   app端用户添加队列  -----queue1
    @Bean
    Queue queueTopicTest1(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.QUEUE_APP_USERADD.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("--queue1-- 实例化完成");
        return queue;
    }
    //topic    app 端积分变动队列-----queue2
    @Bean
    Queue queueTopicTest2(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.QUEUE_APP_INTRAGATLADD.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("--queue2-- 实例化完成");
        return queue;
    }
    
    //  积分商城用户添加队列-----queue3
    @Bean  
    Queue queueTopicTest3(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.QUEUE_BUSS_USERADD.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("--queue3-- 实例化完成");
        return queue;
    }
    //topic   积分商城积分变动队列-----queue4
    @Bean
    Queue queueTopicTest4(RabbitAdmin rabbitAdmin){
        Queue queue = new Queue(RabbitMqEnum.QueueName.QUEUE_BUSS_INTRAGATLADD.getCode());
        rabbitAdmin.declareQueue(queue);
        logger.debug("--queue4-- 实例化完成");
        return queue;
    }
    
    
    //topic binding1 话题型交换机与队列的绑定
    @Bean
    Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){
        Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.APP_USERADD_KEY.getCode());
        rabbitAdmin.declareBinding(binding);
        logger.debug("队列与话题交换机绑定完成");
        return binding;
    }

    //topic binding2
    @Bean
    Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){
        Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.APP_INTRAGATLADD_KEY.getCode());
        rabbitAdmin.declareBinding(binding);
        logger.debug("队列与话题交换机绑定完成");
        return binding;
    }
}

3 创建发送者工具类

package com.zjxnjz.mall.core.util;

import java.util.UUID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * rabbitmq发送消息工具类
 *
 * @author daiyy
 * @create 2018-4-12 上午10:33
 **/
@Component
public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{
    /** logger */
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class);

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMqSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitTemplate.setConfirmCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        logger.info("confirm: " + correlationData.getId());
    }

    /**
     * 发送到 指定routekey的指定queue
     * @param routeKey
     * @param obj
     */
    public void sendRabbitmqDirect(String routeKey,Object obj)throws Exception {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        logger.info("send: " + correlationData.getId());
        this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData);
    }

    /**
     * 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上
     * @param routeKey
     * @param obj
     */
    public void sendRabbitmqTopic(String routeKey,Object obj) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        logger.info("send: " + correlationData.getId());
        this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData);
    }
    
    /**
     * ----------------------------------start - SHF-----------------------------------
     * 发送到   用户添加的队列中 
     * 指定交换机 和  路由规则
     * @param obj
     */
    public void sendUserAddTopic(Object obj)throws Exception {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        logger.info("send: " + correlationData.getId());
        this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_APP_USERADD.getCode(), RabbitMqEnum.QueueEnum.APP_USERADD_KEY.getCode(), obj, correlationData);
    }

    
    /**
     * 发送到 积分变动的 queue
     * 指定交换机 和  路由规则 
     * @param obj
     */
    public void sendIntragatlAddTopic(Object obj)throws Exception {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        logger.info("send: " + correlationData.getId());
        this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_APP_USERADD.getCode(),  RabbitMqEnum.QueueEnum.APP_INTRAGATLADD_KEY.getCode() , obj, correlationData);
    }
    //----------------------------------end-----------------------------------
    
    /**
     * 从指定队列中接受消息
     * @author SHF
     * @version 创建时间:2018年7月27日  下午2:17:33
     *  @param queueName
     *  @return
     */
    public Object reciveRabbitMqObject(String queueName) {
    	Object object = this.rabbitTemplate.receiveAndConvert(queueName);
    	return object;
    }

}

4 创建 消费者工具类 (监听)

package com.zjxnjz.mall.restfulapi.modular.intragatl;

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.zjxnjz.mall.core.config.RabbitMqConfig;
import com.zjxnjz.mall.core.util.RabbitMqEnum;

@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class UserIntragatlListener {
	
	@Autowired
	private UserSynchronization userSynchronization;
	
	@Autowired
	private IntragatlSynchronization intragatlSynchronization;
	
	
	/**
	 *  *************************用户添加 监听***************************
	 * @author SHF
	 * @version 创建时间:2018年7月30日  下午4:11:50
	 *  @param connectionFactory
	 *  @return
	 */
    @Bean("userAddContainer")
    public MessageListenerContainer userAddListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(RabbitMqEnum.QueueName.QUEUE_BUSS_USERADD.getCode());
        container.setMessageListener(userAddListener1());
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return container;
    }

    @Bean("userAddListener")
    public ChannelAwareMessageListener userAddListener1(){
    	
        return new ChannelAwareMessageListener() {
			@Override
			public void onMessage(Message message, Channel channel) throws Exception {
				Map map= JSON.parseObject(new String(message.getBody()));
                System.out.println("QUEUE_APP_USERADD ===" + map);
                userSynchronization.reciveUserAddFromBuliss(map); //调用 用户添加函数
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
			}
        };
    }
    
    
    /**
     * *************************积分变动 监听***************************
     * @author SHF
     * @version 创建时间:2018年7月30日  下午4:12:15
     *  @param connectionFactory
     *  @return
     */
    @Bean("IntragatlChangeContainer")
    public MessageListenerContainer IntragatlChangeListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(RabbitMqEnum.QueueName.QUEUE_BUSS_INTRAGATLADD.getCode());
        container.setMessageListener(IntragatlChangeListener2());
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return container;
    }

    @Bean("IntragatlChangeListener")
    public ChannelAwareMessageListener IntragatlChangeListener2(){
    	
        return new ChannelAwareMessageListener() {
			@Override
			public void onMessage(Message message, Channel channel) throws Exception {
				Map map= JSON.parseObject(new String(message.getBody()));
                System.out.println("QUEUE_APP_USERADD ===" + map);
                intragatlSynchronization.bqylIntragatlChange(map); //调用积分变动函数
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
			}
        };
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • FreeMarker整合Springmvc

    用户5927264
  • FastDFS的使用

    FastDFS安装(http://blog.csdn.net/LoveCarpenter/article/details/75913329)

    用户5927264
  • Hystrix

    用户5927264
  • Mybatis+Thymeleaf前端显示时间格式问题解决方法

    我的开发工具是IntelliJ IDEA,然后在SpringBoot集成Mybatis,前端用模块引擎Thymeleaf的过程中遇到几个问题,不过也花了点时间,...

    SmileNicky
  • 使用切片拦截Rest服务

    我们可以在建立的springboot的项目中建立新的类来是先Filter的接口,doFilter是过滤器中的主要方法,用来做处理逻辑,最后我们只需要在类上加@C...

    Dream城堡
  • FreeMarker整合Springmvc

    用户5927264
  • Spring Boot整合Scheduled定时任务器、整合Quartz定时任务框架

    首先说明一下,这里使用的是Springboot2.2.6.RELEASE版本,由于Springboot迭代很快,所以要注意版本问题。

    别先生
  • 作业调度框架Quartz

    版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。

    奋飛
  • springboot缓存之使用redis作为缓存管理

    (1)使用docker安装redis,可参照之前的docker安装使用,然后输入以下命令下载安装redis镜像。

    绝命生
  • flask + mysql + 微信小程序开发的校园微信报修小程序

    最近收到不少博友的消息 ,对微信小程序开发整体流程不是很了解 , 希望得到我的帮助 , 因之前版本的微信小程序,后端均由 Java 完成 , 基础不好的朋友一时...

    热心的程序员

扫码关注云+社区

领取腾讯云代金券