前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ学习笔记(二)——RabbitMQ快速上手

RabbitMQ学习笔记(二)——RabbitMQ快速上手

作者头像
不愿意做鱼的小鲸鱼
发布2022-09-26 18:59:39
5220
发布2022-09-26 18:59:39
举报
文章被收录于专栏:web全栈

RabbitMQ快速上手的学习案例使用一个高可用外卖系统的demo。

RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客
RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客

高可用外卖系统

高可用外卖系统需求分析

  1. 一个外卖后端系统,用户可以在线下单外卖
  2. 用户下单后,可以实时查询订单进度
  3. 系统可以承受短时间的大量并发请求

架构设计

使用微服务系统,组件之间充分解耦 使用消息中间件,解耦业务逻辑 使用数据库,持久化业务数据

RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客
RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客

什么是微服务架构

将应用程序构建为松耦合、可独立部署的一组服务 服务:一个单一的、可独立部署的软件组件,实现了一些有用的功能 松耦合:封装服务的实现细节,通过API调用

如何拆分微服务

根据系统操作进行微服务拆分 根据业务能力进行微服务拆分(推荐使用) 根据子域进行微服务拆分

根据业务能力进行微服务拆分

RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客
RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客

合理的交换机和队列设置

  • 交换机数量不能过多,一般来说同一个业务,或者同一类业务使用同一个交换机
  • 合理设置队列数量,一般来说一个微服务监听一个队列,或者一个微服务的一个业务监听一个队列
  • 合理配置交换机类型,使用Topic模式时仔细设置绑定键
  • 尽量使用自动化 配置将创建交换机/队列的操作固化在应用代码中,免去复杂的运维操作,高效且不易出错

业务流程时序图

RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客
RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客

接口需求

新建订单接口 查询订单接口 接口采用REST风格

微服务的数据库设计原则

每个微服务使用自己的数据库 不要使用共享数据库的方式进行通信 不要使用外键,对于数据量非常少的表慎用索引

RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客
RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客

food.sql

代码语言:javascript
复制
-- ----------------------------
-- Table structure for deliveryman
-- ----------------------------
DROP TABLE IF EXISTS `deliveryman`;
CREATE TABLE `deliveryman`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '骑手id',
  `name` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '名称',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of deliveryman
-- ----------------------------
INSERT INTO `deliveryman` VALUES (1, 'wangxiaoer', 'AVALIABLE', '2020-06-10 20:30:17');

-- ----------------------------
-- Table structure for order_detail
-- ----------------------------
DROP TABLE IF EXISTS `order_detail`;
CREATE TABLE `order_detail`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '订单id',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `address` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '订单地址',
  `account_id` int(11) NULL DEFAULT NULL COMMENT '用户id',
  `product_id` int(11) NULL DEFAULT NULL COMMENT '产品id',
  `deliveryman_id` int(11) NULL DEFAULT NULL COMMENT '骑手id',
  `settlement_id` int(11) NULL DEFAULT NULL COMMENT '结算id',
  `reward_id` int(11) NULL DEFAULT NULL COMMENT '积分奖励id',
  `price` decimal(10, 2) NULL DEFAULT NULL COMMENT '价格',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 27 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of order_detail
-- ----------------------------
INSERT INTO `order_detail` VALUES (9, 'SETTLEMENT_CONFIRMED', '深圳', 12145, 2, 1, 2, NULL, 23.25, '2022-04-04 17:57:02');
INSERT INTO `order_detail` VALUES (10, 'ORDER_CREATED', '深圳', 12145, 2, 1, 3, 1, 23.25, '2022-04-05 23:57:19');

-- ----------------------------
-- Table structure for product
-- ----------------------------
DROP TABLE IF EXISTS `product`;
CREATE TABLE `product`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '产品id',
  `name` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '名称',
  `price` decimal(9, 2) NULL DEFAULT NULL COMMENT '单价',
  `restaurant_id` int(11) NULL DEFAULT NULL COMMENT '地址',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of product
-- ----------------------------
INSERT INTO `product` VALUES (2, 'eqwe', 23.25, 1, 'AVALIABLE', '2020-05-06 19:19:04');

-- ----------------------------
-- Table structure for restaurant
-- ----------------------------
DROP TABLE IF EXISTS `restaurant`;
CREATE TABLE `restaurant`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '餐厅id',
  `name` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '名称',
  `address` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '地址',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `settlement_id` int(11) NULL DEFAULT NULL COMMENT '结算id',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of restaurant
-- ----------------------------
INSERT INTO `restaurant` VALUES (1, 'qeqwe', '2weqe', 'OPEN', 1, '2020-05-06 19:19:39');

-- ----------------------------
-- Table structure for reward
-- ----------------------------
DROP TABLE IF EXISTS `reward`;
CREATE TABLE `reward`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '奖励id',
  `order_id` int(11) NULL DEFAULT NULL COMMENT '订单id',
  `amount` decimal(9, 2) NULL DEFAULT NULL COMMENT '积分量',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 4 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of reward
-- ----------------------------
INSERT INTO `reward` VALUES (1, 10, 23.25, 'SUCCESS', '2022-04-06 00:00:01');

-- ----------------------------
-- Table structure for settlement
-- ----------------------------
DROP TABLE IF EXISTS `settlement`;
CREATE TABLE `settlement`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '结算id',
  `order_id` int(11) NULL DEFAULT NULL COMMENT '订单id',
  `transaction_id` int(11) NULL DEFAULT NULL COMMENT '交易id',
  `amount` decimal(9, 2) NULL DEFAULT NULL COMMENT '金额',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 6 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of settlement
-- ----------------------------
INSERT INTO `settlement` VALUES (2, 9, 571087981, 23.25, 'SUCCESS', '2022-04-04 17:59:08');

原生RabbitMQ快速上手步骤

订单微服务搭建步骤:

  1. 目录结构
RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客
RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客
  1. 导入pom.xml
代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.6.4</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.kt</groupId>
<artifactId>food</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>food</name>
<description>food System</description>
<properties>
    <java.version>1.8</java.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.mybatis.spring.boot</groupId>
        <artifactId>mybatis-spring-boot-starter</artifactId>
        <version>2.2.2</version>
    </dependency>
 
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.6</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
 
<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <excludes>
                    <exclude>
                        <groupId>org.projectlombok</groupId>
                        <artifactId>lombok</artifactId>
                    </exclude>
                </excludes>
            </configuration>
        </plugin>
    </plugins>
</build>
</project>
 

  1. 编写配置文件application.properties
代码语言:javascript
复制
#订单微服务配置类
server.port=8080
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/food?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
#Rabbitmq相关配置
rabbitmq.host=192.168.137.133
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
#本服务使用的交换机
rabbitmq.exchange=exchange.food
 

  1. 编写PO、VO、DTO等数据传输对象

OrderDetailPO.java(存数据库所用类型)

代码语言:javascript
复制
package cn.kt.food.orderservicemanager.po;
import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
import lombok.Data;
import java.math.BigDecimal;
import java.util.Date;
/**
 * @author tao
 * @date 2022-03-22 21:36
 * 概要:存数据库所用类型
 */

@Data
public class OrderDetailPO {
    private Integer id;
    private OrderStatusEnum status;
    private String address;
    private Integer accountId;
    private Integer productId;
    private Integer deliverymanId;
    private Integer settlementId;
    private Integer rewardId;
    private BigDecimal price;
    private Date date;
}

OrderCreateVO.java(前端传进来的数据)

代码语言:javascript
复制
package cn.kt.food.orderservicemanager.vo;
import lombok.Data;

/**
 * @author tao
 * @date 2022-03-22 21:25
 * 概要:  vo:前端传进来的数据
 */
@Data
public class OrderCreateVO {
    /**
     * 用户ID
     */
    private Integer accountId;

    /**
     * 地址
     */
    private String address;

    /**
     * 产品ID
     */
    private Integer productId;
}

OrderMessageDTO.java(消息体,用于传输数据)

代码语言:javascript
复制
package cn.kt.food.orderservicemanager.dto;
import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
import lombok.Data;
import java.math.BigDecimal;

/**
 * @author tao
 * @date 2022-03-22 21:27
 * 概要:dto:消息体,用于传输数据
 */
@Data
public class OrderMessageDTO {

    /**
     * 订单ID
     */
    private Integer orderId;

    /**
     * 订单状态
     */
    private OrderStatusEnum orderStatus;

    /**
     * 价格
     */
    private BigDecimal price;

    /**
     * 骑手ID
     */
    private Integer deliverymanId;

    /**
     * 产品ID
     */
    private Integer productId;

    /**
     * 用户ID
     */
    private Integer accountId;

    /**
     * 结算ID
     */
    private Integer settlementId;

    /**
     * 积分结算ID
     */
    private Integer rewardId;

    /**
     * 积分奖励数量
     */
    private BigDecimal rewardAmount;

    /**
     * 确认
     */
    private Boolean confirmed;
}
  1. 编写订单状态枚举类OrderStatusEnum.java
代码语言:javascript
复制
package cn.kt.food.orderservicemanager.enums;
/**
* @author tao
* @date 2022-03-22 21:29
* 概要:  订单状态枚举
*/
public enum OrderStatusEnum {
 
/**
 * 创建中
 */
ORDER_CREATING,
 
/**
 * 餐厅已确认
 */
 
RESTAURANT_CONFIRMED,
 
/**
 * 骑手确认
 */
DELIVERYMAN_CONFIRMED,
 
/**
 * 已结算
 */
SETTLEMENT_CONFIRMED,
 
/**
 * 订单已创建
 */
ORDER_CREATED,
 
/**
 * 订单创建失败
 */
FAILED;
}
 

  1. 编写数据库dao层
  2. OrderDetailDao.java
代码语言:javascript
复制
package cn.kt.food.orderservicemanager.dao;
import cn.kt.food.orderservicemanager.po.OrderDetailPO;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Repository;
/**
* @author tao
* @date 2022-03-22 21:39
* 概要:
*/
@Mapper
@Repository
public interface OrderDetailDao {
 
@Insert("INSERT INTO order_detail (status, address, account_id, product_id, deliveryman_id, settlement_id, " +
        "reward_id, price, date) VALUES(#{status}, #{address},#{accountId},#{productId},#{deliverymanId}," +
        "#{settlementId}, #{rewardId},#{price}, #{date})")
@Options(useGeneratedKeys = true, keyProperty = "id")
void insert(OrderDetailPO orderDetailPO);
 
@Update("update order_detail set status =#{status}, address =#{address}, account_id =#{accountId}, " +
        "product_id =#{productId}, deliveryman_id =#{deliverymanId}, settlement_id =#{settlementId}, " +
        "reward_id =#{rewardId}, price =#{price}, date =#{date} where id=#{id}")
void update(OrderDetailPO orderDetailPO);
 
@Select("SELECT id,status,address,account_id accountId, product_id productId,deliveryman_id deliverymanId," +
        "settlement_id settlementId,reward_id rewardId,price, date FROM order_detail WHERE id = #{id}")
OrderDetailPO selectOrder(Integer id);
}

  1. 编写处理用户订单的业务请求service OrderService.java
代码语言:javascript
复制
package cn.kt.food.orderservicemanager.service;
import cn.kt.food.orderservicemanager.dao.OrderDetailDao;
import cn.kt.food.orderservicemanager.dto.OrderMessageDTO;
import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
import cn.kt.food.orderservicemanager.po.OrderDetailPO;
import cn.kt.food.orderservicemanager.vo.OrderCreateVO;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;
/**
* @author tao
* @date 2022-03-24 21:13
* 概要:  处理用户关于订单的业务请求
*/
@Slf4j
@Service
public class OrderService {
@Autowired
private OrderDetailDao orderDetailDao;
@Autowired
RabbitTemplate rabbitTemplate;
 
@Value("${rabbitmq.host}")
public String host;
@Value("${rabbitmq.exchange}")
public String exchangeName;
 
private ObjectMapper objectMapper = new ObjectMapper();
 
// 创建订单
public void createOrder(OrderCreateVO orderCreateVO) throws IOException, TimeoutException {
    log.info("createOrder:orderCreateVO:{}", orderCreateVO);
    OrderDetailPO orderPO = new OrderDetailPO();
    orderPO.setAddress(orderCreateVO.getAddress());
    orderPO.setAccountId(orderCreateVO.getAccountId());
    orderPO.setProductId(orderCreateVO.getProductId());
    orderPO.setStatus(OrderStatusEnum.ORDER_CREATING);
    orderPO.setDate(new Date());
    // 会返回数据库自动生成的数据
    orderDetailDao.insert(orderPO);
 
    OrderMessageDTO orderMessageDTO = new OrderMessageDTO();
    orderMessageDTO.setOrderId(orderPO.getId());
    orderMessageDTO.setProductId(orderPO.getProductId());
    orderMessageDTO.setAccountId(orderCreateVO.getAccountId());
 
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost(host);
 
    // 创建订单之后给restaurant发消息
    try (Connection connection = connectionFactory.newConnection();
         Channel channel = connection.createChannel()) {
        // 配置channel,开启确认模式
        channel.confirmSelect();
 
        //单条同步确认机制
        if (channel.waitForConfirms()) {
            log.info("RabbitMQ confirm success");
        } else {
 
            log.info("RabbitMQ confirm failed");
        }
 
        // 异步同步确认机制
        ConfirmListener confirmListener = new ConfirmListener() {
            @Override
            public void handleAck(long l, boolean b) throws IOException {
                log.info("Ack deliveryTag:{},mutiple:{}", l, b);
                // 消息发送成功
            }
 
            @Override
            public void handleNack(long l, boolean b) throws IOException {
                log.info("Nack deliveryTag:{},mutiple:{}", l, b);
                // 消息发送失败
            }
        };
        channel.addConfirmListener(confirmListener);
 
        String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
 
        //(exchange,routingKey,消息特殊参数,消息体本身(字节))
        // channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
 
        // 设置单条消息的过期时间(时间到期后消息会被消费)
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("15000").build();
        channel.basicPublish("exchange.order.restaurant", "key.restaurant", properties, messageToSend.getBytes());
        /*for (int i = 0; i < 50; i++) {
            channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
            log.info("message sent");
        }*/
 
        // 发送多条消息
        /*for (int i = 0; i < 10; i++) {
            channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
            log.info("message sent");
        }
        Thread.sleep(10000);*/
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
}

  1. 编写消息处理相关业务逻辑service OrderMessageService.java
代码语言:javascript
复制
package cn.kt.food.orderservicemanager.service;
import cn.kt.food.orderservicemanager.dao.OrderDetailDao;
import cn.kt.food.orderservicemanager.dto.OrderMessageDTO;
import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
import cn.kt.food.orderservicemanager.po.OrderDetailPO;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author tao
* @date 2022-03-24 21:15
* 概要:消息处理相关业务逻辑
*/
@Slf4j
@Service
public class OrderMessageService {
@Value("${rabbitmq.host}")
public String host;
@Value("${rabbitmq.exchange}")
public String exchangeName;
 
@Autowired
private OrderDetailDao orderDetailDao;
ObjectMapper objectMapper = new ObjectMapper();
/**
 * 声明消息队列、交换机、绑定、消息的处理
 * (异步线程调用这个方法,且异步线程不能退出,注册完消费者之后sleep,需要设置线程池)
 */
@Async
public void handleMessage() throws IOException, TimeoutException, InterruptedException {
    log.info("start linstening message");
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost(host);
    try (Connection connection = connectionFactory.newConnection();
         Channel channel = connection.createChannel()) {
 
        /*---------------------restaurant微服务(声明)---------------------*/
        // order交换机
        channel.exchangeDeclare(
                "exchange.order.restaurant",    //交换机名称
                BuiltinExchangeType.DIRECT,     //交换机类型
                true,   //是否持久化
                false,  //是否交换机长时间不使用删除
                null);  //是否交换机长时间不使用删除
 
        // 订单队列
        channel.queueDeclare(
                "queue.order",  //队列名称
                true,   //是否持久化
                false,  // 队列是否独占(独占只允许一个应用连接)
                false,  //是否交换机长时间不使用删除
                null);  //是否交换机长时间不使用删除
 
        // 队列绑定交换机
        channel.queueBind(
                "queue.order",  //队列名称
                "exchange.order.restaurant",    //交换机名称
                "key.order");   //路由键,用来指示消息的路由转发,相当于快递的地址
 
        /*---------------------deliveryman微服务---------------------*/
        // 骑手交换机
        channel.exchangeDeclare(
                "exchange.order.deliveryman",
                BuiltinExchangeType.DIRECT,
                true,
                false,
                null);
 
        channel.queueBind(
                "queue.order",
                "exchange.order.deliveryman",
                "key.order");
 
        /*---------------------settlement微服务---------------------*/
        // 结算交换机
        channel.exchangeDeclare(
                "exchange.order.settlement",
                BuiltinExchangeType.FANOUT,
                true,
                false,
                null);
 
        channel.queueBind(
                "queue.order",
                "exchange.settlement.order",
                "key.order");
 
        /*---------------------reward微服务---------------------*/
        // 积分交换机
        channel.exchangeDeclare(
                "exchange.order.reward",
                BuiltinExchangeType.TOPIC,
                true,
                false,
                null);
 
        channel.queueBind(
                "queue.order",
                "exchange.order.reward",
                "key.order");// 降级使用,没有使用到TOPIC的特性
 
        /**
         * 监听订单状态
         * (队列,是不是ACK,回调函数,消费者标签)
         */
        channel.basicConsume("queue.order", true, deliverCallback, consumerTag -> {
        });
        while (true) {
            Thread.sleep(100000);
        }
    }
}
 
DeliverCallback deliverCallback = (consumerTag, message) -> {
    String messageBody = new String(message.getBody());
    log.info("deliverCallback:messageBody:{}", messageBody);
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost(host);
    try {
        // 将消息体反序列化成DTO
        OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody,
                OrderMessageDTO.class);
        // 读取数据库中的PO
        OrderDetailPO orderPO = orderDetailDao.selectOrder(orderMessageDTO.getOrderId());
        switch (orderPO.getStatus()) {
            case ORDER_CREATING:
                // 修改订单状态
                if (orderMessageDTO.getConfirmed() && null != orderMessageDTO.getPrice()) {
                    orderPO.setStatus(OrderStatusEnum.RESTAURANT_CONFIRMED);
                    orderPO.setPrice(orderMessageDTO.getPrice());
                    orderDetailDao.update(orderPO);
                    // 订单状态更新后给骑手发消息
                    try (Connection connection = connectionFactory.newConnection();
                         Channel channel = connection.createChannel()) {
                        String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                        channel.basicPublish("exchange.order.deliveryman",
                                "key.deliveryman",
                                null,
                                messageToSend.getBytes());
                    }
                } else {
                    orderPO.setStatus(OrderStatusEnum.FAILED);
                    orderDetailDao.update(orderPO);
                }
                break;
            case RESTAURANT_CONFIRMED:
                if (null != orderMessageDTO.getDeliverymanId()) {
                    orderPO.setStatus(OrderStatusEnum.DELIVERYMAN_CONFIRMED);
                    orderPO.setDeliverymanId(orderMessageDTO.getDeliverymanId());
                    orderDetailDao.update(orderPO);
                    // 发消息
                    try (Connection connection = connectionFactory.newConnection();
                         Channel channel = connection.createChannel()) {
                        String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                        channel.basicPublish("exchange.order.settlement",
                                "key.settlement",
                                null,
                                messageToSend.getBytes());
                    }
                } else {
                    orderPO.setStatus(OrderStatusEnum.FAILED);
                    orderDetailDao.update(orderPO);
                }
                break;
            case DELIVERYMAN_CONFIRMED:
                if (null != orderMessageDTO.getSettlementId()) {
                    orderPO.setStatus(OrderStatusEnum.SETTLEMENT_CONFIRMED);
                    orderPO.setSettlementId(orderMessageDTO.getSettlementId());
                    orderDetailDao.update(orderPO);
                    try (Connection connection = connectionFactory.newConnection();
                         Channel channel = connection.createChannel()) {
                        String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                        channel.basicPublish("exchange.order.reward", "key.reward", null, messageToSend.getBytes());
                    }
                } else {
                    orderPO.setStatus(OrderStatusEnum.FAILED);
                    orderDetailDao.update(orderPO);
                }
                break;
            case SETTLEMENT_CONFIRMED:  // 订单创建完成
                if (null != orderMessageDTO.getRewardId()) {
                    orderPO.setStatus(OrderStatusEnum.ORDER_CREATED);
                    orderPO.setRewardId(orderMessageDTO.getRewardId());
                    orderDetailDao.update(orderPO);
                } else {
                    orderPO.setStatus(OrderStatusEnum.FAILED);
                    orderDetailDao.update(orderPO);
                }
                break;
        }
 
    } catch (JsonProcessingException | TimeoutException e) {
        e.printStackTrace();
    }
};
}
 

  1. 编写接口controller OrderController.java
代码语言:javascript
复制
package cn.kt.food.orderservicemanager.controller;
import cn.kt.food.orderservicemanager.service.OrderService;
import cn.kt.food.orderservicemanager.vo.OrderCreateVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author tao
* @date 2022-03-24 22:12
* 概要:
*/
@Slf4j
@RestController
@RequestMapping("api/v1")
public class OrderController {
@Autowired
OrderService orderService;
 
@PostMapping("/orders")
public void createOrder(@RequestBody OrderCreateVO orderCreateDTO) throws IOException, TimeoutException {
    log.info("createOrder:orderCreateDTO:{}", orderCreateDTO);
    orderService.createOrder(orderCreateDTO);
}
}
 

  1. 线程池配置类和自动监听配置 线程池配置类:AsyncTaskConfig.java
代码语言:javascript
复制
package cn.kt.food.orderservicemanager.config;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/**
* @author tao
* @date 2022-03-24 22:44
* 概要:  线程池配置类
*/
@Configuration
@EnableAsync
public class AsyncTaskConfig implements AsyncConfigurer {
 
// ThredPoolTaskExcutor的处理流程
// 当池子大小小于corePoolSize,就新建线程,并处理请求
// 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
// 当workQueue放不下任务时,就新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理
// 当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁
 
@Override
@Bean
public Executor getAsyncExecutor() {
    // 起一个线程池
    ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
    //设置核心线程数
    threadPool.setCorePoolSize(10);
    //设置最大线程数
    threadPool.setMaxPoolSize(100);
    //线程池所使用的缓冲队列
    threadPool.setQueueCapacity(10);
    //等待任务在关机时完成--表明等待所有线程执行完
    threadPool.setWaitForTasksToCompleteOnShutdown(true);
    // 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
    threadPool.setAwaitTerminationSeconds(60);
    //  线程名称前缀
    threadPool.setThreadNamePrefix("Rabbit-Async-");
    // 初始化线程
    threadPool.initialize();
    return threadPool;
}
 
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    return null;
}
}
 

RabbitMQ需要自动执行并且实时监听,因此需要配置自动执行OrderMessageService中handleMessage方法 RabbitConfig.java

代码语言:javascript
复制
package cn.kt.food.orderservicemanager.config;
import cn.kt.food.orderservicemanager.service.OrderMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author tao
 * @date 2022-03-24 22:58
 * 概要:  自动执行OrderMessageService中handleMessage方法(配置了RabbitMQ的交换机等)
 */
@Slf4j
@Configuration
public class RabbitConfig {
    @Autowired
    OrderMessageService orderMessageService;
    //配置类中的@Autowired方法会被自动调用
    @Autowired
    public void startListenMessage() throws IOException, TimeoutException, InterruptedException {
        orderMessageService.handleMessage();
    }
}

订单微服务和RabbitMQ的创建大致如上,因此也还有:商家微服务、骑手微服务、结算微服务、积分微服务。 其功能是在订单的每个阶段处理相应的业务逻辑,其中在每个微服务的消息通讯时使用RabbitMQ进行消息的路由和转发,套路和订单微服务差不多一致。

注:其余微服务和总代码放在文章末尾

RabbitMQ使用总结

  1. 新建ConnectionFactory
代码语言:javascript
复制
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setHost("localhost");
 

  1. Channel处理相关配置和使用basicPublish发送消息 注意:channel.basicPublish(exchange,routingKey,消息特殊参数,消息体本身(字节)) RabbitMQ发送的消息体本身是字节
代码语言:javascript
复制
try (Connection connection = connectionFactory.newConnection();
       Channel channel = connection.createChannel()) {
       // 业务逻辑
 
       // 发送消息处理
       ObjectMapper objectMapper = new ObjectMapper();
       String messageToSend = objectMapper.writeValueAsString("需要发送的消息");
       //(exchange,routingKey,消息特殊参数,消息体本身(字节))
       channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
 
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
 

  1. 配置RabbitMQ的Exchange和queue
代码语言:javascript
复制
// 声明交换机
channel.exchangeDeclare(
 "exchange.name",
 BuiltinExchangeType.DIRECT,
 true,
 false,
 null);
// 声明消息队列
channel.queueDeclare(
 "queue.name",
 true,
 false,
 false,
 null);
// 队列绑定交换机
channel.queueBind(
 "queue.name",
 "exchange.name",
 "key.name");
 

  1. 使用basicConsume消费消息
代码语言:javascript
复制
@Async
public void handleMessage() {
  /**
    * 监听订单状态
    * (队列,是不是ACK,回调函数,消费者标签)
   */
 channel.basicConsume("queue.name", true, deliverCallback, consumerTag -> {
 });
}
 

  1. 定义回调函数 收到消息后进入的回调函数
代码语言:javascript
复制
DeliverCallback deliverCallback = (consumerTag, message) -> {
//业务逻辑
};
 

  1. 配置线程池
代码语言:javascript
复制
@Configuration
@EnableAsync
public class AsyncTaskConfig implements AsyncConfigurer {
// ThredPoolTaskExcutor的处理流程
// 当池子大小小于corePoolSize,就新建线程,并处理请求
// 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
// 当workQueue放不下任务时,就新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理
// 当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁
 
@Override
@Bean
public Executor getAsyncExecutor() {
    // 起一个线程池
    ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
    //设置核心线程数
    threadPool.setCorePoolSize(10);
    //设置最大线程数
    threadPool.setMaxPoolSize(100);
    //线程池所使用的缓冲队列
    threadPool.setQueueCapacity(10);
    //等待任务在关机时完成--表明等待所有线程执行完
    threadPool.setWaitForTasksToCompleteOnShutdown(true);
    // 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
    threadPool.setAwaitTerminationSeconds(60);
    //  线程名称前缀
    threadPool.setThreadNamePrefix("Rabbit-Async-");
    // 初始化线程
    threadPool.initialize();
    return threadPool;
}
 
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    return null;
}
}
 

  1. 使用线程池启动basicConsume
代码语言:javascript
复制
//配置类中的@Autowired方法会被自动调用
@Autowired
public void startListenMessage() throws IOException, TimeoutException, InterruptedException {
    orderMessageService.handleMessage();
}
 

使用原生RabbitMQ项目中的不足之处

消息真的发出去了吗?

消息发送后,发送端不知道RabbitMQ是否真的收到了消息 若RabbitMQ异常,消息丢失后,订单处理流程停止,业务异常 需要使用RabbitMQ发送端确认机制,确认消息发送

消息真被路由了吗?

消息发送后,发送端不知道消息是否被正确路由,若路由异常,消息会被丢弃 消息丢弃后,订单处理流程停止,业务异常 需要使用RabbitMQ消息返回机制,确认消息被正确路由

消费端处理的过来吗?

业务高峰期,可能出现发送端与接收端性能不一致,大量消息被同时推送给接收端,造成接收端服务崩溃 需要使用RabbitMQ消费端限流机制,限制消息推送速度,保障接收端服务稳定

消费端处理异常怎么办?

默认情况下,消费端接收消息时,消息会被自动确认(ACK) 消费端消息处理异常时,发送端与消息中间件无法得知消息处理情况 需要使用RabbitMQ消费端确认机制,确认消息被正确处理

队列爆满怎么办?

默认情况下,消息进入队列,会永远存在,直到被消费 大量堆积的消息会给RabbitMQ产生很大的压力 需要使用RabbitMQ消息过期时间,防止消息大量积压

如何转移过期消息?

消息被设置了过期时间,过期后会直接被丢弃 直接被丢弃的消息,无法对系统运行异常发出警报 需要使用RabbitMQ死信队列,收集过期消息,以供分析

不足之处总结

目前项目急需引入的RabbitMQ新特性: 发送端确认机制 消费端确认机制 消息返回机制 消息过期机制 消费端限流机制 死信队列

解决这些不足之处需要用到RabbitMQ的高级特性。

实际开发中经验及小结

  1. 使用线程池:对于频繁创建与销毁的线程,必须使用线程池,否则极易线程溢出,造成“线程爆炸”
  2. POJO类单一职责 a. 各种POJO数据结构必须单一职责,混用会导致代码混乱 b. PO/DO: (Persistent Object/Data Object)持久对象 c. DTO:(Data Transfer Object)数据传输对象 d. BO:(Business Object)业务对象 e. vo: (View Object)显示层对象

源代码:

https://gitee.com/KT1205529635/rabbit-mq/tree/master/food_master_1

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 高可用外卖系统
    • 高可用外卖系统需求分析
      • 架构设计
        • 什么是微服务架构
          • 如何拆分微服务
            • 根据业务能力进行微服务拆分
            • 合理的交换机和队列设置
              • 业务流程时序图
                • 接口需求
                  • 微服务的数据库设计原则
                  • 原生RabbitMQ快速上手步骤
                  • RabbitMQ使用总结
                  • 使用原生RabbitMQ项目中的不足之处
                    • 消息真的发出去了吗?
                      • 消息真被路由了吗?
                        • 消费端处理的过来吗?
                          • 消费端处理异常怎么办?
                            • 队列爆满怎么办?
                              • 如何转移过期消息?
                                • 不足之处总结
                                • 实际开发中经验及小结
                                • 源代码:
                                相关产品与服务
                                数据库
                                云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
                                领券
                                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档