前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ学习笔记(七)——RabbitMQ分布式事务框架

RabbitMQ学习笔记(七)——RabbitMQ分布式事务框架

作者头像
不愿意做鱼的小鲸鱼
发布2022-09-26 19:08:18
3500
发布2022-09-26 19:08:18
举报
文章被收录于专栏:web全栈web全栈

分布式事务框架分析

事务为什么要分布式

  1. 什么是事务 ◆ 事务指的是一 系列业务操作,只能同时成功或同时失败 ◆ 传统事务有4个主要特性:原子性、一致性、隔离性、持久性
  2. 微服务化带来的挑战 ◆ 在传统单体应用中,事务在本地即可完成 ◆ 随着后端架构的微服务化,事务无法在本地完成 ◆ 所以需要将事务“分布式化”
  3. 传统单体应用 ◆ 在传统单体应用中,事务在本地即可完成
  1. 微服务应用 ◆ 随着后端架构的微服务化,事务无法在本地完成

◆ 所以需要将事务"分布式化"

事务的前提理论

  1. 分布式框架理论 ACID 事务正确执行的四个基本要素 ◆ 原子性(Atomicity) ◆ 一致性(Consistency) ◆ 隔离性(Isolation) ◆ 持久性(Durability)
  2. 分布式框架理论 CAP 一致性、可用性、分区容忍性不可能三者兼顾 ◆ 一致性(Consistency) ◆ 可用性(Availability) ◆ 分区容忍性 (Partition tolerance)
  3. 分布式框架理论 BASE 由于CAP无法同时满足,基于I程实际,提出了BASE理论 ◆ Basically Available (基本可用) ◆ Soft state (软状态) ◆ Eventually consistent (最终一致性)

分布式事务的取舍

◆ ACID往往针对传统本地事务,分布式事务无法满足原子性和隔离性,需要舍弃传统ACID理论

◆ 基于BASE理论,业务状态不需要在微服务系统内强一致

◆ 基于BASE理论,订单状态要做到最终一致性即可

◆ 为了做到最终一致性, 要保证消息不丢失,发送处理的流程要有重试机制,重试多次失败后要有告警

分布式事务框架设计

根据上述分析,分布式事务框架应该包含以下部分

◆ 发送失败重试

◆ 消费失败重试

◆ 死信告警

数据表设计

分布式事务框架搭建

要用到的相关技术:

◆ 声明ConnectionFactory、RabbitAdmin、RabbitListenerContainerFactory、RabbitTemplate

◆ 声明枚举、PO、 开发dao层

◆ 声明定时任务

分布式事务相关说明

  1. 消息发送失败重试 ◆ 发送消息前消息持久化 ◆ 发送成功时删除消息 ◆ 定时巡检未发送成功消息、重试发送
  1. 消息消费失败重试 ◆ 收到消息时先进行持久化 ◆ 消息处理成功,消费端确认(ACK),删除消息 ◆ 消息处理失败,延时,不确认消息(NACK),记录次数 ◆ 再次处理消息
  1. 死信消息告警 ◆ 声明死信队列、交换机、绑定 ◆ 普通队列加入死信设置 ◆ 监听到死信,持久化、告警

步骤如下

该消息发送失败重试、消息消费失败重试、死信消息告警的事务框架功能我们写在一个统一的包下面,以便于以后的复用。包名为moodymq。

目录结构如下:

开发为在RabbitMQ学习笔记(四)——RabbitMQ与SpringBoot适配的源码基础上新增代码。

1. 新建数据表
代码语言:javascript
复制
DROP TABLE IF EXISTS `trans_message`;
CREATE TABLE `trans_message`  (
  `id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '消息ID',
  `service` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '服务名称',
  `type` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '消息类型',
  `exchange` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '交换机',
  `routing_Key` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '路由键',
  `queue` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '队列',
  `sequence` int(11) NULL DEFAULT NULL COMMENT '序号',
  `payload` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '消息内容',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`, `service`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
2. 修改配置文件

application.properties增加moodymq

代码语言:javascript
复制
#订单微服务配置类
server.port=8080
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/food?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
rabbitmq.host=192.168.137.138
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.exchange=exchange.food
spring.rabbitmq.addresses=192.168.137.138
spring.rabbitmq.host=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 自动ack
spring.rabbitmq.listener.direct.acknowledge-mode=auto

moodymq.service=orderService
moodymq.host=192.168.137.138
moodymq.port=5672
moodymq.username=guest
moodymq.password=guest
moodymq.vhost=/
# 重复消费最多五次
moodymq.resendTimes=5
# 重复消费间隔时长
moodymq.resendFreq=5000
3. 新建状态枚举、Po和Dao层

TransMessageType.java

代码语言:javascript
复制
package cn.kt.food.orderservicemanager.moodymq.enummeration;
public enum TransMessageType {
    SEND,
    RECEIVE,
    DEAD;
}

TransMessagePO.java

代码语言:javascript
复制
package cn.kt.food.orderservicemanager.moodymq.po;

import cn.kt.food.orderservicemanager.moodymq.enummeration.TransMessageType;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.util.Date;

@Getter
@Setter
@ToString
public class TransMessagePO {
    private String id;
    private String service;
    private TransMessageType type;
    private String exchange;
    private String routingKey;
    private String queue;
    private Integer sequence;
    private String payload;
    private Date date;
}

TransMessageDao.java

代码语言:javascript
复制
package cn.kt.food.orderservicemanager.moodymq.dao;

import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
import java.util.List;

@Mapper
@Repository
public interface TransMessageDao {

    @Insert("INSERT INTO trans_message (id, type, service, " +
            "exchange, routing_key, queue, sequence, payload," +
            "date) " +
            "VALUES(#{id}, #{type}, #{service},#{exchange}," +
            "#{routingKey},#{queue},#{sequence}, #{payload},#{date})")
    void insert(TransMessagePO transMessagePO);

    @Update("UPDATE trans_message set type=#{type}, " +
            "service=#{service}, exchange =#{exchange},"+
            "routing_key =#{routingKey}, queue =#{queue}, " +
            "sequence =#{sequence}, payload =#{payload}, " +
            "date =#{date} " +
            "where id=#{id} and service=#{service}")
    void update(TransMessagePO transMessagePO);

    @Select("SELECT id, type, service, exchange, " +
            "routing_key routingKey, queue, sequence, " +
            "payload, date " +
            "FROM trans_message " +
            "where id=#{id} and service=#{service}")
    TransMessagePO selectByIdAndService(@Param("id") String id,
                                        @Param("service") String service);

    @Select("SELECT id, type, service, exchange, " +
            "routing_key routingKey, queue, sequence, " +
            "payload, date " +
            "FROM trans_message " +
            "WHERE type = #{type} and service = #{service}")
    List<TransMessagePO> selectByTypeAndService(
            @Param("type") String type,
            @Param("service") String service);

    @Delete("DELETE FROM trans_message " +
            "where id=#{id} and service=#{service}")
    void delete(@Param("id") String id,
                @Param("service") String service);
}
4. 发送消息封装send

TransMessageSender.java

代码语言:javascript
复制
package cn.kt.food.orderservicemanager.moodymq.sender;

import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class TransMessageSender {

    @Autowired
    RabbitTemplate rabbitTemplate;
    @Autowired
    TransMessageService transMessageService;

    // 发送消息封装
    public void send(String exchange, String routingKey, Object payload) {
        log.info("send(): exchange:{} routingKey:{} payload:{}",
                exchange, routingKey, payload);
        try {
            ObjectMapper mapper = new ObjectMapper();
            String payloadStr = mapper.writeValueAsString(payload);
            System.out.println(payloadStr);
            // 发送前暂存消息
            TransMessagePO transMessagePO =
                    transMessageService.messageSendReady(
                            exchange,
                            routingKey,
                            payloadStr
                    );

            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("application/json");
            // 封装消息
            Message message = new Message(payloadStr.getBytes(), messageProperties);
            message.getMessageProperties().setMessageId(transMessagePO.getId());
            // 发送消息
            rabbitTemplate.convertAndSend(exchange, routingKey, message,
                    new CorrelationData(transMessagePO.getId()));

            log.info("message sent, ID:{}", transMessagePO.getId());

        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
}
5. 新建service层和实现类

TransMessageService.java

代码语言:javascript
复制
package cn.kt.food.orderservicemanager.moodymq.service;
import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import java.util.List;

public interface TransMessageService {
    /**
     * 发送前暂存消息
     *
     * @param exchange   exchange
     * @param routingKey routingKey
     * @param body       body
     * @return TransMessagePO
     */
    TransMessagePO messageSendReady(String exchange, String routingKey, String body);

    /**
     * 设置消息发送成功,需要把消息删除
     *
     * @param id 消息ID
     */
    void messageSendSuccess(String id);

    /**
     * 设置消息返回,将消息持久化
     *
     * @param id         id
     * @param exchange   exchange
     * @param routingKey routingKey
     * @param body       body
     * @return TransMessagePO
     */
    TransMessagePO messageSendReturn(
            String id, String exchange, String routingKey, String body);

    /**
     * 查询应发未发消息(之前发送出错的消息,还需要重发,未到告警)
     *
     * @return List<TransMessagePO>
     */
    List<TransMessagePO> listReadyMessages();

    /**
     * 记录消息发送次数
     *
     * @param id id
     */
    void messageResend(String id);

    /**
     * 消息重发多次,放弃
     *
     * @param id id
     */
    void messageDead(String id);

    /**
     * 保存监听到的死信消息
     * @param id
     * @param exchange
     * @param routingKey
     * @param queue
     * @param body
     */
    void messageDead(String id, String exchange,
                     String routingKey, String queue,
                     String body);

    /**
     * 消息消费前保存
     *
     * @param id
     * @param exchange
     * @param routingKey
     * @param queue
     * @param body
     * @return
     */
    TransMessagePO messageReceiveReady(
            String id,
            String exchange,
            String routingKey,
            String queue,
            String body);

    /**
     * 消息消费成功
     *
     * @param id
     */
    void messageReceiveSuccess(String id);

}

TransMessageServiceImpl.java

代码语言:javascript
复制
package cn.kt.food.orderservicemanager.moodymq.service;

import cn.kt.food.orderservicemanager.moodymq.dao.TransMessageDao;
import cn.kt.food.orderservicemanager.moodymq.enummeration.TransMessageType;
import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.List;
import java.util.UUID;

@Service
public class TransMessageServiceImpl implements TransMessageService {

    @Autowired
    TransMessageDao transMessageDao;

    @Value("${moodymq.service}")
    String serviceName;

    @Override
    public TransMessagePO messageSendReady(String exchange, String routingKey, String body) {
        final String messageId = UUID.randomUUID().toString();
        TransMessagePO transMessagePO = new TransMessagePO();
        transMessagePO.setId(messageId);
        transMessagePO.setService(serviceName);
        transMessagePO.setExchange(exchange);
        transMessagePO.setRoutingKey(routingKey);
        transMessagePO.setPayload(body);
        transMessagePO.setDate(new Date());
        transMessagePO.setSequence(0);
        transMessagePO.setType(TransMessageType.SEND);
        transMessageDao.insert(transMessagePO);
        return transMessagePO;
    }

    @Override
    public void messageSendSuccess(String id) {
        transMessageDao.delete(id, serviceName);
    }

    @Override
    public TransMessagePO messageSendReturn(String id, String exchange, String routingKey, String body) {
       TransMessagePO selectByIdAndService = transMessageDao.selectByIdAndService(id, serviceName);
        if (selectByIdAndService == null) {
            TransMessagePO transMessagePO = new TransMessagePO();
            transMessagePO.setId(id);
            transMessagePO.setService(serviceName);
            transMessagePO.setExchange(exchange);
            transMessagePO.setRoutingKey(routingKey);
            transMessagePO.setPayload(body);
            transMessagePO.setDate(new Date());
            transMessagePO.setSequence(0);
            transMessagePO.setType(TransMessageType.SEND);
            transMessageDao.insert(transMessagePO);

            return transMessagePO;
        } else {
            return selectByIdAndService;
        }
//        return messageSendReady(exchange, routingKey, body);
    }

    @Override
    public List<TransMessagePO> listReadyMessages() {
        return transMessageDao.selectByTypeAndService(
                TransMessageType.SEND.toString(), serviceName
        );
    }

    @Override
    public void messageResend(String id) {
        TransMessagePO transMessagePO = transMessageDao.selectByIdAndService(id, serviceName);
        transMessagePO.setSequence(transMessagePO.getSequence() + 1);
        transMessageDao.update(transMessagePO);
    }

    @Override
    public void messageDead(String id) {
        TransMessagePO transMessagePO = transMessageDao.selectByIdAndService(id, serviceName);
        transMessagePO.setType(TransMessageType.DEAD);
        transMessageDao.update(transMessagePO);
    }

    @Override
    public void messageDead(String id, String exchange, String routingKey, String queue, String body) {
        TransMessagePO transMessagePO = new TransMessagePO();
        transMessagePO.setId(id);
        transMessagePO.setService(serviceName);
        transMessagePO.setExchange(exchange);
        transMessagePO.setRoutingKey(routingKey);
        transMessagePO.setQueue(queue);
        transMessagePO.setPayload(body);
        transMessagePO.setDate(new Date());
        transMessagePO.setSequence(0);
        transMessagePO.setType(TransMessageType.DEAD);
        transMessageDao.insert(transMessagePO);
    }

    @Override
    public TransMessagePO messageReceiveReady(
            String id, String exchange,
            String routingKey, String queue, String body) {

        TransMessagePO transMessagePO =
                transMessageDao.selectByIdAndService(id, serviceName);
        if (null == transMessagePO) {
            // 说明是第一次消费
            transMessagePO = new TransMessagePO();
            transMessagePO.setId(id);
            transMessagePO.setService(serviceName);
            transMessagePO.setExchange(exchange);
            transMessagePO.setRoutingKey(routingKey);
            transMessagePO.setQueue(queue);
            transMessagePO.setPayload(body);
            transMessagePO.setDate(new Date());
            transMessagePO.setSequence(0);
            transMessagePO.setType(TransMessageType.RECEIVE);
            transMessageDao.insert(transMessagePO);
        } else {
            // 否则消费次数 + 1
            transMessagePO.setSequence(transMessagePO.getSequence() + 1);
            transMessageDao.update(transMessagePO);
        }
        return transMessagePO;
    }

    @Override
    public void messageReceiveSuccess(String id) {
        // 消费成功后删除消息
        transMessageDao.delete(id, serviceName);
    }
}
6. 新建config配置RabbitAdmin和RabbitTemplate实现消息的监听和确认逻辑

消息监听使用手动ack

消息确认机制消息投递至交换机失败进行消息重发

MoodyRabbitConfig.java

代码语言:javascript
复制
package cn.kt.food.orderservicemanager.moodymq.config;

import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class MoodyRabbitConfig {

    @Autowired
    TransMessageService transMessageService;

    @Value("${moodymq.host}")
    String host;
    @Value("${moodymq.port}")
    int port;
    @Value("${moodymq.username}")
    String username;
    @Value("${moodymq.password}")
    String password;
    @Value("${moodymq.vhost}")
    String vhost;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vhost);
        // CORRELATED:发送消息的成功还是失败需要有id的参数,因为确认消息是异步的,需要确认哪条消息被确认,
        // 体现在发送消息前持久化时设置id:message.getMessageProperties().setMessageId(transMessagePO.getId());
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        // 消息无法路由的时候需要设置消息返回
        connectionFactory.setPublisherReturns(true);
        connectionFactory.createConnection();
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin admin = new RabbitAdmin(connectionFactory);
        admin.setAutoStartup(true);
        return admin;
    }

    /* 配置消费端消息监听 */
    @Bean
    public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory =
                new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        // 手动ack
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    /* 消息是否路由的消息确认机制 */
    @Bean
    public RabbitTemplate customRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        // 消息确认回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("correlationData:{}, ack:{}, cause:{}",
                    correlationData, ack, cause);
            // 如果消息确认接收
            if (ack && null != correlationData) {
                String messageId = correlationData.getId();
                log.info("消息已经正确投递到交换机, id:{}", messageId);
                transMessageService.messageSendSuccess(messageId);
            } else {
                // 如果消息确认接收失败,则消息保留,等待下次重发
                log.error("消息投递至交换机失败,correlationData:{}", correlationData);
            }
        });
        // 当消息未进入队列时回调
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            log.error("消息无法路由!message:{}, replyCode:{} replyText:{} exchange:{} routingKey:{}",
                    returnedMessage.getMessage(), returnedMessage.getReplyCode(),
                    returnedMessage.getReplyText(), returnedMessage.getExchange(),
                    returnedMessage.getRoutingKey());
            transMessageService.messageSendReturn(
                    returnedMessage.getMessage().getMessageProperties().getMessageId(),
                    returnedMessage.getExchange(),
                    returnedMessage.getRoutingKey(),
                    new String(returnedMessage.getMessage().getBody())
            );
        });
        return rabbitTemplate;
    }
}
7. 新建抽象类实现ChannelAwareMessageListener完成消息监听

监听消息的接收和业务执行是否异常,如果消息处理异常,则消息重回队列

AbstractMessageListener.java

代码语言:javascript
复制
package cn.kt.food.orderservicemanager.moodymq.listener;

import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import java.io.IOException;

@Slf4j
public abstract class AbstractMessageListener implements ChannelAwareMessageListener {

    @Autowired
    TransMessageService transMessageService;
    @Value("${moodymq.resendTimes}")
    Integer resendTimes;

    public abstract void receviceMessage(Message message) throws JsonProcessingException;

    @Override
    public void onMessage(Message message, Channel channel) throws IOException, InterruptedException {
        MessageProperties messageProperties = message.getMessageProperties();
        // deliveryTag:跟消息接收确认有关的数字
        long deliveryTag = messageProperties.getDeliveryTag();
        // 持久化接收到的消息
        log.info("收到的消息{}", new String(message.getBody()));
        TransMessagePO transMessagePO =
                transMessageService.messageReceiveReady(
                        messageProperties.getMessageId(),
                        messageProperties.getReceivedExchange(),
                        messageProperties.getReceivedRoutingKey(),
                        messageProperties.getConsumerQueue(),
                        new String(message.getBody())
                );
        log.info("收到消息{}, 消费次数{}",
                messageProperties.getMessageId(), transMessagePO.getSequence());

        try {
            // 该方法让业务去执行,这里抓异常
            receviceMessage(message);
            // 消息处理完成
            channel.basicAck(deliveryTag, false);
            transMessageService.messageReceiveSuccess(messageProperties.getMessageId());
        } catch (Exception e) {
            // 消息处理异常
            log.error(e.getMessage(), e);
            // 判断该消息的消费次数
            if (transMessagePO.getSequence() >= resendTimes) {
                // 消费次数超限,拒收消息
                channel.basicReject(deliveryTag, false);
            } else {
                // 消息重回队列
                Thread.sleep((long) (Math.pow(2, transMessagePO.getSequence())) * 1000);
                channel.basicNack(deliveryTag, false, true);
            }
        }
    }
}
8. 配置死信消息告警

声明死信交换机、队列和绑定

DlxConfig.java

代码语言:javascript
复制
package cn.kt.food.orderservicemanager.moodymq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty("moodymq.dlxEnabled")
public class DlxConfig {
    /*
    * 声明死信交换机、队列和绑定
    */
    @Bean
    public TopicExchange dlxExchange() {
        return new TopicExchange("exchange.dlx");
    }

    @Bean
    public Queue dlxQueue() {
        return new Queue("queue.dlx",
                true,
                false,
                false);
    }

    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("#");
    }
}

监听死信消息

DlxListener.java

代码语言:javascript
复制
package cn.kt.food.orderservicemanager.moodymq.listener;

import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

@Component
@Slf4j
//配置是不是需要监听死信:@ConditionalOnProperty实现是通过havingValue与配置文件中的值对比,返回为true则配置类生效,反之失效.
@ConditionalOnProperty("moodymq.dlxEnabled")
public class DlxListener implements ChannelAwareMessageListener {
    @Autowired
    TransMessageService transMessageService;

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String messageBody = new String(message.getBody());
        log.error("dead letter! message:{}", message);
        //发邮件、打电话、发短信
        //XXXXX()
        MessageProperties messageProperties = message.getMessageProperties();
        transMessageService.messageDead(
                messageProperties.getMessageId(),
                messageProperties.getReceivedExchange(),
                messageProperties.getReceivedRoutingKey(),
                messageProperties.getConsumerQueue(),
                messageBody
        );
        // 单条确认
        channel.basicAck(messageProperties.getDeliveryTag(), false);
    }
}
9. 配置定时任务

每隔5秒巡检异常消息

ResendTask.java

代码语言:javascript
复制
package cn.kt.food.orderservicemanager.moodymq.task;

import cn.kt.food.orderservicemanager.moodymq.enummeration.TransMessageType;
import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.List;

@EnableScheduling
@Configuration
@Component
@Slf4j
public class ResendTask {

    @Autowired
    TransMessageService transMessageService;
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Value("${moodymq.resendTimes}")
    Integer resendTimes;

    // 在配置类中取出来
    @Scheduled(fixedDelayString = "${moodymq.resendFreq}")
    public void resendMessage(){
        log.info("resendMessage() invoked.");
        List<TransMessagePO> messagePOS =
                transMessageService.listReadyMessages();
        log.info("resendMessage(): messagepos:{}", messagePOS);

        for (TransMessagePO po: messagePOS) {
            log.info("resendMessage(): po:{}", po);
            // 过滤dead消息
            if(po.getSequence() > resendTimes){
                log.error("resend too many times!");
                transMessageService.messageDead(po.getId());
                continue;
            }
            // 封装和发送消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("application/json");
            Message message = new Message(po.getPayload().getBytes(), messageProperties);
            message.getMessageProperties().setMessageId(po.getId());
            rabbitTemplate.convertAndSend(
                    po.getExchange(),
                    po.getRoutingKey(),
                    message,
                    new CorrelationData(po.getId()));

            log.info("message sent, ID:{}", po.getId());
            // 消息重发,发送次数+1
            transMessageService.messageResend(po.getId());
        }
    }
}
10. 改造moddymq包外的业务代码
  1. 继承moddymq包中抽象出来的监听方法
  2. 注解绑定交换机队列配置改用RabbitConfig配置类中使用@Bean配置
  3. 监听消息的handMessage改用抽象类的方法receviceMessage进行重写
  4. 在moddymq包外的业务代码中使用包内封装的发送方法 具体实现详情看下面源码

源码下载

https://gitee.com/KT1205529635/rabbit-mq/tree/master/food_master_3

小结

  1. 消息发送失败重试,消息消费失败重试,死信消息告警只是有效的保证rabbitMQ消息的事务一致性,有效的解决了消息失败的结果。
  2. 在实际项目中可以把开发的分布式事务框架包moddymq新建另外一个项目,并打成jar包,统一使用规范供多微服务模块使用
  3. 本moddymq中并无注明给死信队列queue.dlx发送消息的场景,实际开发中可以定时将状态为DEAD的消息发送至死信队列进行死信告警。告警方法方法已给出,但具体告警逻辑可以根据实际场景需要进行完善。
  4. 在源代码中,沿用了RabbitMQ快速上手中的订单微服务的案例,改造使用了该分布式的事务框架。
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 分布式事务框架分析
    • 事务为什么要分布式
      • 事务的前提理论
        • 分布式事务的取舍
          • 分布式事务框架设计
          • 分布式事务框架搭建
            • 分布式事务相关说明
              • 步骤如下
                • 1. 新建数据表
                • 2. 修改配置文件
                • 3. 新建状态枚举、Po和Dao层
                • 4. 发送消息封装send
                • 5. 新建service层和实现类
                • 6. 新建config配置RabbitAdmin和RabbitTemplate实现消息的监听和确认逻辑
                • 7. 新建抽象类实现ChannelAwareMessageListener完成消息监听
                • 8. 配置死信消息告警
                • 9. 配置定时任务
                • 10. 改造moddymq包外的业务代码
            • 源码下载
            • 小结
            相关产品与服务
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档