前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringBoot基础(四、整合RabbitMQ)

SpringBoot基础(四、整合RabbitMQ)

作者头像
营琪
发布2019-11-04 16:52:40
4450
发布2019-11-04 16:52:40
举报
文章被收录于专栏:营琪的小记录营琪的小记录

因为在做一个秒杀项目,优化环节需要使用到RabbitMQ,所以这里也记录一下。

环境安装

RabbitMQ是用Erlang语言编写的,所以需要安装此语言,

下载地址:Erlang语言下载地址:RabbitMQ

先安装Erlang语言(一路确定即可),再安装RabbitMQ服务(一路确定即可,注意WIN10是否中文名字)。

安装完RabbitMQ后,启动管理插件和RabbitMQ服务看看。

代码语言:javascript
复制
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.16\sbin>rabbitmq-plugins enable rabbitmq_management
代码语言:javascript
复制
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.16\sbin>net start RabbitMQ

进入http://127.0.0.1:15672 ,默认密码为guest/guest

SpringBoot集成RabbitMQ

安装完成,那么我们就开始Springboot集成RabbitMQ。

(可以接着前一章的项目开始)

使用Maven并添加SpringBoot的父依赖,web的起步依赖也可以加上,方便后续是使用。

spring推出一个amqp的标准,消息队列的标准,只需要添加此依赖即可使用RabbitMQ服务。

代码语言:javascript
复制
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.8.RELEASE</version>
    </parent>

 <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

配置Rabbitmq的参数,需要配什么可以在官网查,这里只介绍一点。

代码语言:javascript
复制
#rabbitmq
#4个最基础配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

添加消息队列

代码语言:javascript
复制
package com.yingqi.rabbitmq;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MQConfig {
    public static final String QUEUE = "queue";
    @Bean
    public Queue queue() {
        return new Queue(QUEUE, true);
    }
}

创建消息发送者

代码语言:javascript
复制
package com.yingqi.rabbitmq;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MQSender {
    @Autowired
    AmqpTemplate amqpTemplate;
    public void send(String message) {
        amqpTemplate.convertAndSend(MQConfig.QUEUE,message);
    }
}

创建消息接收者

代码语言:javascript
复制
package com.yingqi.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class MQReceiver {
    @RabbitListener(queues = MQConfig.QUEUE)
    public void receive(String message) {
        System.out.println("接收者"+message);
    }
}

使用​​​​​​​SpringBootTest测试,就不要web起步依赖了

代码语言:javascript
复制
import com.yingqi.QuickStartOneApplication;
import com.yingqi.rabbitmq.MQSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = QuickStartOneApplication.class)
public class RabbitMQTest {
    @Autowired
    private MQSender sender;
    @Test
    public void mq(){
        sender.send("hello,flash");
    }

}

//启动spring boot
@SpringBootApplication
@MapperScan("com.yingqi.mapper")
public class QuickStartOneApplication {
    public static void main(String[] args) {
        SpringApplication.run(QuickStartOneApplication.class);
    }
}

测试结果

SpringBoot整合RabbitMQ就介绍完毕了,下面的是四种消息队列的伪代码。

简要说一下四种模式

config:添加消息队列

代码语言:javascript
复制
/**
 * 配置消息交换机
 * 针对消费者配置
 FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
 HeadersExchange :通过添加属性key-value匹配
 DirectExchange:按照routingkey分发到指定队列
 TopicExchange:多关键字匹配
 **/
@Configuration
public class MQConfig {
    public static final String QUEUE = "queue";
    public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";
    public static final String HEADERS_QUEUE = "headers.queue2";
    public static final String TOPIC_EXCHAGE = "topic.exchage";
    public static final String FANOUT_EXCHAGE = "fanout.exchage";
    public static final String HEADERS_EXCHAGE = "headers.exchage";
    public static final String ROUTING_KEY1 = "topic.key1";
    public static final String ROUTING_KEY2 = "topic.#";

    //Direct模式 交换机模式 exchange
    @Bean
    public Queue quese() {
        return new Queue(QUEUE, true);
    }   

    //Fanout 模式
    @Bean
    public FanoutExchange fanoutExchange() {
        return  new FanoutExchange(FANOUT_EXCHAGE);
    }
    @Bean
    public Binding fanoutBinding1() {
        return BindingBuilder.bind(topicQueue1()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutBinding2() {
        return BindingBuilder.bind(topicQueue2()).to(fanoutExchange());
    }

    //topic模式 交换机模式 exchange
    @Bean
    public Queue topicQueue1() {
        return new Queue(TOPIC_QUEUE1, true);
    }
    @Bean
    public Queue topicQueue2() {
        return new Queue(TOPIC_QUEUE2, true);
    }
    @Bean
    public TopicExchange topicExchange() {
        return  new TopicExchange(TOPIC_EXCHAGE);
    }
    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.key1");
    }
    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");
    }

    //Header模式
    @Bean
    public HeadersExchange headersExchange() {
        return new HeadersExchange(HEADERS_EXCHAGE);
    }
    @Bean
    public Queue queueHeaders() {
        return new Queue(HEADERS_QUEUE);
    }
    @Bean
    public Binding headersBinding() {
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("key", "123321");
        return BindingBuilder.bind(queueHeaders()).to(headersExchange()).whereAll(map).match();
    }
}

sender: 创建消息发送者

代码语言:javascript
复制
@Service
public class MQSender {

    @Autowired
    AmqpTemplate amqpTemplate;

    public void send(Object message) {
        String msg = JSON.toJSONString(message);
        System.out.println(msg);
        amqpTemplate.convertAndSend(MQConfig.QUEUE,msg);
    }

    //topic模式 交换机模式 exchange
    public void sendTopic(Object message) {
        String msg = JSON.toJSONString(message);
        System.out.println("send+topic模式"+msg+new Date());
        //发两条不同key消息,接收者收到3条
        amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHAGE,"topic.key1",msg+"topic.key1"+new Date());//发送的时候topicQueue1和2都接收到了
        amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHAGE,"topic.key2",msg+"topic.key2"+new Date());
    }

    //Fanout 模式
    public void sendFanout(Object message) {
        String msg = JSON.toJSONString(message);
        System.out.println("send+Fanout 模式:"+msg+new Date());
        amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHAGE,"",msg+new Date());
    }

    //Header模式
    public void sendHeaders(Object message, String key ,String val) {
        String msg = JSON.toJSONString(message);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setHeader(key,val);
        Message s = new Message(msg.getBytes(),messageProperties);
        System.out.println("send+Header模式:"+s.toString()+new Date()+"请验证,是否提交队列");
        amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHAGE,"",s);
    }

receiver:创建消息接收者

代码语言:javascript
复制
@Service
public class MQReceiver {

    @RabbitListener(queues = MQConfig.QUEUE )
    public void receive(String message) {
        System.out.println(message);
    }
 //topic模式 交换机模式 exchange
    @RabbitListener(queues = MQConfig.TOPIC_QUEUE1 )
    public void receiveTopic1(String message) {
        System.out.println("Receiver:TOPIC_QUEUE1-"+message);
    }
    //Fanout 模式
    @RabbitListener(queues = MQConfig.TOPIC_QUEUE2 )
    public void receiveTopic2(String message) {
        System.out.println("Receiver:TOPIC_QUEUE2-"+message);
    }
    //Header模式
    @RabbitListener(queues = MQConfig.HEADERS_QUEUE )
    public void receiveHeaders(String message) {
        System.out.println("Receiver:HEADERS_QUEUE-"+message);
    }

测试结果

代码语言:javascript
复制
public static void main(String[] args) {

        sender.send("hello,flash");
//输出hello,flash

        sender.sendTopic("hello,flash");
//send+topic模式hello,flashSat Jul 13 16:52:03 CST 2019
//Receiver:TOPIC_QUEUE1-hello,flashtopic.key1Sat Jul 13 16:52:03 CST 2019
//Receiver:TOPIC_QUEUE2-hello,flashtopic.key2Sat Jul 13 16:52:03 CST 2019
//Receiver:TOPIC_QUEUE2-hello,flashtopic.key1Sat Jul 13 16:52:03 CST 2019

        sender.sendFanout("hello,flash");
//send+Fanout 模式:hello,flashSat Jul 13 16:52:17 CST 2019
//Receiver:TOPIC_QUEUE2-hello,flashSat Jul 13 16:52:17 CST 2019
//Receiver:TOPIC_QUEUE1-hello,flashSat Jul 13 16:52:17 CST 2019
        String key = "key";
        String val = "123321";

        sender.sendHeaders("hello,flash",key,val);
//send+Header模式:。。。。。
//Receiver:HEADERS_QUEUE-104,101,108,108,111,44,102,108,97,115,104

}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019/07/13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 环境安装
  • SpringBoot集成RabbitMQ
    • 添加消息队列
      • 创建消息发送者
        • 创建消息接收者
        • 测试结果
        • 简要说一下四种模式
        相关产品与服务
        消息队列 CMQ 版
        消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档