rabbitMQ教程(三) spring整合rabbitMQ代码实例

一、开启rabbitMQ服务,导入MQ jar包和gson jar包(MQ默认的是jackson,但是效率不如Gson,所以我们用gson)

 二、发送端配置,在spring配置文件中配置

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

 <!-- 连接服务配置 如果MQ服务器在远程服务器上,请新建用户用新建的用户名密码  guest默认不允许远程登录-->  
    <rabbit:connection-factory id="connectionFactory"  
        host="localhost" username="guest" password="guest" port="5672"  
        virtual-host="/" channel-cache-size="5" />  
  <!-- 配置爱admin,自动根据配置文件生成交换器和队列,无需手动配置 -->
    <rabbit:admin connection-factory="connectionFactory" />  
  
    <!-- queue 队列声明 -->  
    <rabbit:queue  durable="true"  
        auto-delete="false" exclusive="false" name="spring.queue.tag" />  
  
  
    <!-- exchange queue binging key 绑定 -->  
    <rabbit:direct-exchange name="spring.queue.exchange"  
        durable="true" auto-delete="false">  
        <rabbit:bindings>  
            <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key" />  
        </rabbit:bindings>  
    </rabbit:direct-exchange>  
  
    <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于Gson的速度快于jackson,这里替换为Gson的一个实现 -->  
    <bean id="jsonMessageConverter"    class="sendMQ.Gson2JsonMessageConverter" />  
  
    <!-- spring template声明 -->  
    <rabbit:template id="amqpTemplate" exchange="spring.queue.exchange"  routing-key="spring.queue.tag.key"  
        connection-factory="connectionFactory" message-converter="jsonMessageConverter" /> 

发送端代码:GSON配置

package sendMQ;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.DefaultClassMapper;
import org.springframework.amqp.support.converter.MessageConversionException;

import com.google.gson.Gson;

public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter{

      private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class);  
      
        private static  ClassMapper classMapper =  new DefaultClassMapper();  
      
        private static Gson gson = new Gson();  
      
        public Gson2JsonMessageConverter() {  
            super();  
        }  
      
        @Override  
        protected Message createMessage(Object object,  
                MessageProperties messageProperties) {  
            byte[] bytes = null;  
            try {  
                String jsonString = gson.toJson(object);  
                bytes = jsonString.getBytes(getDefaultCharset());  
            }  
            catch (IOException e) {  
                throw new MessageConversionException(  
                        "Failed to convert Message content", e);  
            }  
            messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);  
            messageProperties.setContentEncoding(getDefaultCharset());  
            if (bytes != null) {  
                messageProperties.setContentLength(bytes.length);  
            }  
            classMapper.fromClass(object.getClass(),messageProperties);  
            return new Message(bytes, messageProperties);  
        }  
      
        @Override  
        public Object fromMessage(Message message)  
                throws MessageConversionException {  
            Object content = null;  
            MessageProperties properties = message.getMessageProperties();  
            if (properties != null) {  
                String contentType = properties.getContentType();  
                if (contentType != null && contentType.contains("json")) {  
                    String encoding = properties.getContentEncoding();  
                    if (encoding == null) {  
                        encoding = getDefaultCharset();  
                    }  
                    try {  
                            Class<?> targetClass = getClassMapper().toClass(  
                                    message.getMessageProperties());  
                            content = convertBytesToObject(message.getBody(),  
                                    encoding, targetClass);  
                    }  
                    catch (IOException e) {  
                        throw new MessageConversionException(  
                                "Failed to convert Message content", e);  
                    }  
                }  
                else {  
                    log.warn("Could not convert incoming message with content-type ["  
                            + contentType + "]");  
                }  
            }  
            if (content == null) {  
                content = message.getBody();  
            }  
            return content;  
        }  
      
        private Object convertBytesToObject(byte[] body, String encoding,  
                Class<?> clazz) throws UnsupportedEncodingException {  
            String contentAsString = new String(body, encoding);  
            return gson.fromJson(contentAsString, clazz);  
        }  
}

发送类接口:

public interface MQProducer {
    /**
     * 发送消息到指定队列
     * @param queueKey
     * @param object
     */
    public void sendDataToQueue(String queueKey, Object object);
}

实现类:test是测试用的。

package sendMQ;

import java.util.HashMap;
import java.util.Map;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(value = SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
    "classpath:/spring-common.xml"})


@Component
public class MQProducerImpl implements MQProducer {

    @Autowired
    private  AmqpTemplate amqpTemplate;

        @Override
        public void sendDataToQueue(String queueKey, Object object) {
            System.out.println("--"+amqpTemplate);
            try {
                amqpTemplate.convertAndSend(object);
                System.out.println("------------消息发送成功");
            } catch (Exception e) {
                System.out.println(e);
            }

        }
        
        @Test
      public  void test() {  
            Map<String,Object> msg = new HashMap<>();
            msg.put("data","hello,456");
            while(true){
            amqpTemplate.convertAndSend(msg); 
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                // TODO 自动生成的 catch 块
                e.printStackTrace();
            }
            }
           
        }  

}

接收端配置:

  <!-- 连接服务配置  -->  
    <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"  
        password="guest" port="5672" virtual-host="/"  channel-cache-size="5" />  
           
   <rabbit:admin connection-factory="connectionFactory"/>  
     
   <!-- queue 队列声明-->  
   <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="spring.queue.tag"/>  
      
      
   <!-- exchange queue binging key 绑定 -->  
    <rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false">  
        <rabbit:bindings>  
            <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key"/>  
        </rabbit:bindings>  
    </rabbit:direct-exchange>  
      
    <bean id="receiveMessageListener"  
        class="receiveMQ.QueueListenter" />  
       
    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->  
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" >  
        <rabbit:listener queues="spring.queue.tag" ref="receiveMessageListener" />  
    </rabbit:listener-container>  

接收端代码:

package receiveMQ;

import java.io.UnsupportedEncodingException;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class QueueListenter implements MessageListener{
    @Override
    public void onMessage(Message msg) {
         try {
            System.out.print("-------------------"+new String(msg.getBody(),"UTF-8"));
        } catch (UnsupportedEncodingException e) {
            // TODO 自动生成的 catch 块
            e.printStackTrace();
        }
    }

}

接收端测试启动:

package receiveMQ;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ConsumerMain {
    public static void main(String[] args) {  
        new ClassPathXmlApplicationContext("spring-common.xml");    
    }  
}

上面代码均有注释,应该不难看懂,复制即可使用,实现了MQ的简单功能。

说明:可以配置多个接收端,spring默认的是负载均衡机制,每个接收端接收一条的来,这些扩展功能待后面有时间再讲解

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏java、Spring、技术分享

Spring Import 三种用法与源码解读

  最近在看Spring Cloud相关的源码,每次引入一个新的starter,发现都会加一些enable的注解,比如:@EnableDiscoveryClie...

2304
来自专栏菩提树下的杨过

spring-boot 速成(9) druid+mybatis 多数据源及读写分离的处理

按上节继续学习,稍微复杂的业务系统,一般会将数据库按业务拆开,比如产品系统的数据库放在product db中,订单系统的数据库放在order db中...,然后...

7925
来自专栏清晨我上码

spring mvc 国际化的几种方案

通过设置浏览器请求测试:http://localhost:8080/xxx/nation/test

6183
来自专栏Android源码框架分析

获取Android设备DeviceId与反Xposed Hook技术

APP开发中常需要获取设备的DeviceId,以应对刷单,目前常用的几个设备识别码主要有IMEI(国际移动设备身份码 International Mobile ...

3312
来自专栏后端之路

jsp改造之sitemesh注意事项

背景 现在各种现代化的浏览器确实惯坏了开发者 智能纠错 无论是忘记关闭标签甚至重复等等都有可能被chrome这些浏览器智能纠错===》chrome会合并多个bo...

2604
来自专栏你不就像风一样

2小时学会Spring Boot(IDE:eclipse)

1.)使启动类继承SpringBootServletInitializer 覆写configure()方法。

3364
来自专栏向治洪

史上最强Spring mvc入门

一、SpringMVC基础入门,创建一个HelloWorld程序   1.首先,导入SpringMVC需要的jar包。 ?   2.添加Web.xml配置...

34110
来自专栏hbbliyong

5步搭建GO环境

Easy Go Programming Setup for Windows Dec 23, 2014 I’ve had to do this more t...

4007
来自专栏Java技术栈

SpringMVC表单验证器的使用

本章讲解SpringMVC中怎么通过注解对表单参数进行验证。 SpringBoot配置 使用springboot, spring-boot-starter-we...

2863
来自专栏颇忒脱的技术博客

Spring、Spring Boot和TestNG测试指南 - @TestConfiguration

@TestConfiguration是Spring Boot Test提供的一种工具,用它我们可以在一般的@Configuration之外补充测试专门用的Bea...

2044

扫码关注云+社区