首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >20-SpringBoot整合RabbitMQ

20-SpringBoot整合RabbitMQ

作者头像
彼岸舞
发布2022-10-06 08:43:20
3060
发布2022-10-06 08:43:20
举报

SpringBoot整合RabbitMQ

整合就直接使用单机版的了, 一直开着5个虚拟机, 我电脑不太行

新建SpringBoot工程

你已经是一个长大的IDEA了, 要学会自己新建工程, 然后IDEA自己创建了rabbitmq-consumer和rabbitmq-producer工程

添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

POM.xml

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.72</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

都是在创建工程的时候随便点的功能

启动类就不用我粘贴了吧

生产者代码实现

配置文件

spring:
  application:
    name: rabbitmq-producer
  rabbitmq:
    # 集群用
#    addresses: 192.168.247.142:5672
    username: root                         # 用户名
    password: 123456                       # 密码
    host: 192.168.247.142                  # IP地址
    port: 5672                             # 端口号
    virtual-host: /                        # 虚拟地址
    connection-timeout: 15000              # 连接超时时间
    # https://blog.csdn.net/yaomingyang/article/details/108410286 : 详解了publisher-confirm-type 与 publisher-confirms的关系
    publisher-confirm-type: correlated     # confirm回调模式
    publisher-returns: true                # 回调监听, 没有被路由到的消息, 配合下面的mandatory使用
    template:
      mandatory: true

server:
  port: 8001
  servlet:
    context-path: /

消息发送类

package com.dance.rabbitmqproducer.component;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;

@Component
public class RabbitMQSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * correlationData : 唯一标识
     * ack : 是否成功持久化
     * cause : 失败原因
     */
    final RabbitTemplate.ConfirmCallback confirmCallback = (correlationData, ack, cause) -> {
        System.out.println("消息ID: " + correlationData.getId());
        System.out.println("ack: " + ack);
        System.out.println("cause: " + cause);
    };

    /**
     * 发送消息
     *
     * @param messageBody 消息提
     * @param headers     参数
     * @throws Exception 异常
     */
    public void sendMessage(Object messageBody, Map<String, Object> headers) throws Exception {
        // 消息头
        MessageHeaders messageHeaders = new MessageHeaders(headers);
        Message<Object> message = MessageBuilder.createMessage(messageBody, messageHeaders);
        // 设置消息确认监听
        rabbitTemplate.setConfirmCallback(confirmCallback);
        /**
         * exchange : 交换机
         * routingKey : 路由键
         * message : 消息体
         * messagePostProcess : 消息后置处理器
         * correlation : 消息唯一ID
         */
        rabbitTemplate.convertAndSend("exchange-test",
                "test.order",
                message,
                postMessage -> {
                    System.out.println("post process message: " + postMessage);
                    return postMessage;
                },
                new CorrelationData(UUID.randomUUID().toString().replace("-", "")));
    }

}

测试类

package com.dance.rabbitmqproducer;

import com.dance.rabbitmqproducer.component.RabbitMQSender;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.HashMap;

@SpringBootTest
public class TestSendMsg {

    @Autowired
    private RabbitMQSender rabbitMQSender;

    @Test
    public void testSendMsg() throws Exception {
        String msg = "hello world!";
        rabbitMQSender.sendMessage(msg,new HashMap<>());
    }

}

消费者代码实现

配置文件

spring:
  application:
    name: rabbitmq-consumer
  rabbitmq:
    # 集群用
    #    addresses: 192.168.247.142:5672
    username: root                         # 用户名
    password: 123456                       # 密码
    host: 192.168.247.142                  # IP地址
    port: 5672                             # 端口号
    virtual-host: /                        # 虚拟地址
    connection-timeout: 15000              # 连接超时时间
    listener:
      simple:
        acknowledge-mode: manual           # 手工ACK, 默认为auto, 自动ack
        concurrency: 5                                     # 默认通道数量
        max-concurrency: 10                                 # 最大通道数量
        prefetch: 1                        # 限制消费流量, ack 1个后再接收
      order:
        exchange:
          value: exchange-test
          type: topic
          durable: true
          ignoreDeclarationExceptions: true
        queue:
          value: test-queue
          durable: true
        bindings:
          key: test.#

server:
  port: 8002
  servlet:
    context-path: /

消息消费类

package com.dance.rabbitmqconsumer.component;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQRReceiver {

    /**
     * 消费消息
     *
     * @param messageBody 消息体
     * @param channel     通道
     * @throws Exception 异常
     * @RabbitListener 监听消息
     * @QueueBinding 绑定
     * @Queue 队列
     * @Exchange 交换机
     */
    // 直接写死[不建议]
//    @RabbitListener(
//            bindings = {
//                    @QueueBinding(
//                            value = @Queue(value = "test-queue", durable = "true"),
//                            exchange = @Exchange(
//                                    value = "exchange-test",
//                                    type = "topic",
//                                    durable = "true",
//                                    ignoreDeclarationExceptions = "true"
//                            ),
//                            key = "test.#"
//                    )
//            }
//    )
    // 采用配置文件的方式
    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(
                                    value = "${spring.rabbitmq.listener.order.queue.value}",
                                    durable = "${spring.rabbitmq.listener.order.queue.durable}"
                            ),
                            exchange = @Exchange(
                                    value = "${spring.rabbitmq.listener.order.exchange.value}",
                                    type = "${spring.rabbitmq.listener.order.exchange.type}",
                                    durable = "${spring.rabbitmq.listener.order.exchange.durable}",
                                    ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"
                            ),
                            key = "${spring.rabbitmq.listener.order.bindings.key}"
                    )
            }
    )
    @RabbitHandler
    public void onMessage(Message<Object> messageBody, Channel channel) throws Exception {
        System.out.println("消费消息 :" + messageBody.getPayload());
        MessageHeaders headers = messageBody.getHeaders();
        long deliveryTag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
        // 手工ACK
        channel.basicAck(deliveryTag, false);
    }

}

测试

启动消费者

启动成功

查看控制台

多了exchange-test交换机

多了test-queue队列

并且存在绑定关系, 路由key是test.#

对了一个连接, 这个就是消费者

在channel中多了5个通道, 这个就是我们在配置文件中设置的初始值

点进去可以看到消费的是哪个队列

启动生产者测试类

可以看到, 在confirm监听中, 得到了消息ID, ack为true, 没有异常, 消息发送成功

查看消费者

消费成功, SpringBoot成功集成RabbitMQ

当然这只是一个Demo, 具体开发中使用, 该需要各位自行改造

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-10-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SpringBoot整合RabbitMQ
    • 新建SpringBoot工程
      • 添加依赖
        • POM.xml
          • 生产者代码实现
            • 配置文件
            • 消息发送类
            • 测试类
          • 消费者代码实现
            • 配置文件
            • 消息消费类
          • 测试
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档