前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringBoot RabbitMQ实现消息可靠投递

SpringBoot RabbitMQ实现消息可靠投递

作者头像
喜欢天文的pony站长
发布2020-06-29 12:16:08
6110
发布2020-06-29 12:16:08
举报

image.png

「消息投递时 可能发生丢失的场景:」

  1. 生产者------msg------> MQ 。可开启消息投递结果回调,确保每条消息都收到了回调。
  2. MQ。将Queue与消息设置成可持久化,搭建镜像集群队列。
  3. MQ-------callback---->生产者。回调时失败,某条消息在一段时间内未收到回调,则默认投递失败,生产者需要再次投递该消息到MQ。(该场景下会导致同一条消息被重复投递,消费者端需要自行保证消息幂等消费)

一、实现思路

「使用技术:」

  • SpringBoot
  • RabbitMQ
  • Mysql
  • MybatisPlus
  • XxlJob

二、准备,框架搭建

  • 数据库Entity:
    • Message
/**
 * 消息发送历史
 *
 * @author futao
 * @date 2020/3/31.
 */
@Getter
@Setter
@Builder
@TableName("message")
public class Message extends IdTimeEntity {

    @Tolerate
    public Message() {
    }

    /**
     * 消息承载的业务数据
     */
    @TableField("msg_data")
    private String msgData;

    /**
     * 交换机名称
     */
    @TableField("exchange_name")
    private String exchangeName;

    /**
     * 路由键
     */
    @TableField("routing_key")
    private String routingKey;

    /**
     * 消息状态
     *
     * @see com.futao.springboot.learn.rabbitmq.doc.reliabledelivery.model.enums.MessageStatusEnum
     */
    @TableField("status")
    private int status;

    /**
     * 重试次数
     */
    @TableField("retry_times")
    private int retryTimes;

    /**
     * 下一次重试时间
     */
    @TableField("next_retry_date_time")
    private LocalDateTime nextRetryDateTime;

}
  • 消息状态枚举
/**
 * 消息状态枚举
 *
 * @author futao
 * @date 2020/3/31.
 */
@Getter
@AllArgsConstructor
public enum MessageStatusEnum {

    /**
     * 1=发送中
     */
    SENDING(1, "发送中"),

    /**
     * 2=发送失败
     */
    SUCCESS(2, "发送成功"),

    /**
     * 3=发送失败
     */
    FAIL(3, "发送失败");

    private int status;
    private String description;
}
  • RabbitMQ配置
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: futao
    password: 123456789
    virtual-host: reliable-delivery
    connection-timeout: 15000
    # 发送确认
    publisher-confirms: true
    # 路由失败回调
    publisher-returns: true
    template:
      # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
      mandatory: true

app:
  rabbitmq:
    retry:
      # 消息最大重试次数
      max-retry-times: 5
      # 每次重试时间间隔
      retry-interval: 5s
    # 队列定义
    queue:
      user: user-queue
    # 交换机定义
    exchange:
      user: user-exchange

三、编码

  • 队列交换机定义与绑定
/**
 * RabbitMQ队列定义与绑定
 *
 * @author futao
 * @date 2020/3/31.
 */
@Configuration
public class Declare {

    @Bean
    public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName) {
        return QueueBuilder
                .durable(userQueueName)
                //.withArgument("x-max-length", 2)
                .build();
    }

    @Bean
    public Exchange userExchange(@Value("${app.rabbitmq.exchange.user}") String userExchangeName) {
        return ExchangeBuilder
                .topicExchange(userExchangeName)
                .durable(true)
                .build();
    }

    @Bean
    public Binding userBinding(Queue userQueue, Exchange userExchange) {
        return BindingBuilder
                .bind(userQueue)
                .to(userExchange)
                .with("user.*")
                .noargs();
    }
}
  • 对RabbitTemplate进行增强,设置confirmCallback()消息投递回调方法与returnCallback()消息路由失败回调方法
/**
 * Bean增强
 * 【严重警告】: 不可在该类中注入Bean,被注入的Bean不会被BeanPostProcessor增强,造成误伤。
 * 必须通过容器来获取需要注入的Bean
 *
 * @author futao
 * @date 2020/3/20.
 */
@Slf4j
@Component
public class BeanEnhance implements BeanPostProcessor {

//    @Resource
//    private MessageMapper messageMapper;

    /**
     * 消息的最大重试次数
     */
    @Value("${app.rabbitmq.retry.max-retry-times}")
    private int maxRetryTimes;

    /**
     * 每次重试时间间隔
     */
    @Value("${app.rabbitmq.retry.retry-interval}")
    private Duration retryInterval;

//    @Autowired
//    private RabbitTemplate rabbitTemplate;
//
//    @Autowired
//    private BeanEnhance enhance;


    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        //增强RabbitTemplate
        if (RabbitTemplate.class.equals(bean.getClass())) {
            //消息投递成功与否的监听,可以用来保证消息100%投递到rabbitMQ。(如果某条消息(通过id判定)在一定时间内未收到该回调,则重发该消息)
            //需要设置 publisher-confirms: true
            ((RabbitTemplate) bean).setConfirmCallback((correlationData, ack, cause) -> {
                String correlationDataId = correlationData.getId();
                if (ack) {
                    //ACK
                    log.debug("消息[{}]投递成功,将DB中的消息状态设置为投递成功", correlationDataId);
                    ApplicationContextHolder.getBean(MessageMapper.class).update(null,
                            Wrappers.<Message>lambdaUpdate()
                                    .set(Message::getStatus, MessageStatusEnum.SUCCESS.getStatus())
                                    .eq(Message::getId, correlationDataId)
                    );
                } else {
                    log.debug("消息[{}]投递失败,cause:{}", correlationDataId, cause);
                    //NACK,消息重发
                    ApplicationContextHolder.getBean(BeanEnhance.class).reSend(correlationDataId);
                }
            });

            //消息路由失败的回调--需要设置   publisher-returns: true 并且   template: mandatory: true 否则rabbit将丢弃该条消息
            ((RabbitTemplate) bean).setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                log.warn("消息路由失败回调...做一些补偿或者记录.............................................");
                log.warn("message{}", message);
                log.warn("replyCode{}", replyCode);
                log.warn("replyText{}", replyText);
                log.warn("exchange{}", exchange);
                log.warn("routingKey{}", routingKey);
            });
        }
        return bean;
    }


    /**
     * NACK时进行消息重发
     *
     * @param correlationDataId
     */
    @Transactional(rollbackFor = Exception.class)
    public void reSend(String correlationDataId) {
        Message message = ApplicationContextHolder.getBean(MessageMapper.class).selectById(correlationDataId);
        if (message.getRetryTimes() < maxRetryTimes) {
            //进行重试
            ApplicationContextHolder.getBean(RabbitTemplate.class).convertAndSend(message.getExchangeName(), message.getRoutingKey(), message.getMsgData(), new CorrelationData(correlationDataId));
            //更新DB消息状态
            ApplicationContextHolder.getBean(MessageMapper.class).update(null,
                    Wrappers.<Message>lambdaUpdate()
                            .set(Message::getStatus, MessageStatusEnum.SENDING.getStatus())
                            .set(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
                            .set(Message::getRetryTimes, message.getRetryTimes() + 1)
                            .eq(Message::getId, correlationDataId)
            );
        }
    }
}
  • 生产者:

/**
 * @author futao
 * @date 2020/3/31.
 */
@Component
public class Sender {

    @Value("${app.rabbitmq.retry.retry-interval}")
    private Duration retryInterval;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Resource
    private MessageMapper messageMapper;

    @Value("${app.rabbitmq.exchange.user}")
    private String userExchangeName;

    public void send(User user) {
        Message message = Message.builder()
                .msgData(JSON.toJSONString(user))
                .exchangeName(userExchangeName)
                .routingKey("user.abc")
                .status(MessageStatusEnum.SENDING.getStatus())
                //下次重试时间
                .nextRetryDateTime(LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
                .retryTimes(0)
                .build();
        //消息落库
        messageMapper.insert(
                message
        );
        CorrelationData correlationData = new CorrelationData(message.getId());
        //消息投递到MQ
        rabbitTemplate.convertAndSend(userExchangeName, "user.abc", JSON.toJSONString(user), correlationData);
    }
}
  • 定时任务扫描DB中的消息状态,如果存在发送中的消息,且当前时间>=下一次投递时间 and 重试次数<=最大重试次数,则再次进行投递。
    • XxlJob配置
xxl:
  job:
    switch: ON
    admin:
      ### 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
      addresses: http://127.0.0.1:9090/xxl-job-admin
    executor:
      ### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
      appname: xxl-job-executor-rabbitmq
      ### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
      # ip:
      ### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
      port: 9999
      ### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
      logpath: data/applogs/xxl-job/jobhandler
      ### 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
      logretentiondays: 30
    ### 执行器通讯TOKEN [选填]:非空时启用;
    accessToken:
  • Configuration
/**
 * XXL-JOB配置
 *
 * @author futao
 * @date 2020/4/1.
 */
@Setter
@Getter
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "xxl.job")
public class XxlJobConfig {


    private final Admin admin = new Admin();
    private final Executor executor = new Executor();

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor(XxlJobConfig xxlJobConfig) {
        log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(xxlJobConfig.getAdmin().getAddresses());
        xxlJobSpringExecutor.setAppName(xxlJobConfig.getExecutor().getAppName());
        xxlJobSpringExecutor.setIp(xxlJobConfig.getExecutor().getIp());
        xxlJobSpringExecutor.setPort(xxlJobConfig.getExecutor().getPort());
        xxlJobSpringExecutor.setLogPath(xxlJobConfig.getExecutor().getLogPath());
        xxlJobSpringExecutor.setLogRetentionDays(xxlJobConfig.getExecutor().getLogRetentionDays());
        return xxlJobSpringExecutor;
    }

    @Getter
    @Setter
    public static class Admin {
        private String addresses;
    }

    @Getter
    @Setter
    public static class Executor {
        private String appName;
        private String ip;
        private int port;
        private String logPath;
        private int logRetentionDays;
    }
}
  • 定时扫描任务编写

/**
 * 扫描数据库中需要重新投递的消息并重新投递
 *
 * @author futao
 * @date 2020/4/1.
 */
@Slf4j
@Component
public class MessageReSendJob extends IJobHandler {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Resource
    private MessageMapper messageMapper;

    @Autowired
    private MessageReSendJob messageReSendJob;

    /**
     * 最大重试次数
     */
    @Value("${app.rabbitmq.retry.max-retry-times}")
    private int retryTimes;

    /**
     * 重试时间间隔
     */
    @Value("${app.rabbitmq.retry.retry-interval}")
    private Duration retryInterval;

    /**
     * 批量从数据库中读取的消息
     */
    private static final int PAGE_SIZE = 100;


    @XxlJob(value = "MessageReSendJob", init = "jobHandlerInit", destroy = "jobHandlerDestroy")
    @Override
    public ReturnT<String> execute(String s) throws Exception {
        long startTime = System.currentTimeMillis();
        log.info("开始扫描需要进行重试投递的消息");
        XxlJobLogger.log("开始扫描需要进行重试投递的消息");
        service(1);
        log.info("扫描需要进行重试投递的消息任务结束,耗时[{}]ms", System.currentTimeMillis() - startTime);
        XxlJobLogger.log("扫描需要进行重试投递的消息任务结束,耗时[{}]ms", System.currentTimeMillis() - startTime);
        return ReturnT.SUCCESS;
    }

    public void service(int pageNum) {
        IPage<Message> messageIPage = messageMapper.selectPage(new Page<>(pageNum, PAGE_SIZE),
                Wrappers.<Message>lambdaQuery()
                        //发送中的消息
                        .eq(Message::getStatus, MessageStatusEnum.SENDING.getStatus())
                        //已到达下次发送时间
                        .le(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)))
        );
        List<Message> messages = messageIPage.getRecords();
        messages.forEach(message -> {
            if (retryTimes <= message.getRetryTimes()) {
                //已达到最大投递次数,将消息设置为投递失败
                messageMapper.update(null, Wrappers.<Message>lambdaUpdate().set(Message::getStatus, MessageStatusEnum.FAIL.getStatus()).eq(Message::getId, message.getId()));
            } else {
                messageReSendJob.reSend(message);
            }
        });
        if (PAGE_SIZE == messages.size()) {
            service(++pageNum);
        }
    }


    /**
     * 重新投递消息
     *
     * @param message
     */
    public void reSend(Message message) {
        messageMapper.update(null,
                Wrappers.<Message>lambdaUpdate()
                        .set(Message::getRetryTimes, message.getRetryTimes() + 1)
                        .set(Message::getNextRetryDateTime, LocalDateTime.now(ZoneOffset.ofHours(8)).plus(retryInterval))
                        .eq(Message::getId, message.getId())
        );
        try {
            //再次投递
            rabbitTemplate.convertAndSend(message.getExchangeName(), message.getRoutingKey(), message.getMsgData(), new CorrelationData(message.getId()));
        } catch (Exception e) {
            log.error("消息[{}]投递失败", JSON.toJSONString(message));
        }
    }

    public void jobHandlerInit() {
        log.info("before job execute...");
        XxlJobLogger.log("before job handler init...");
    }

    public void jobHandlerDestroy() {
        log.info("after job execute...");
        XxlJobLogger.log("after job execute...");
    }
}
  • XxlJob 新增调度任务

四、测试

  • 测试接口
/**
 * @author futao
 * @date 2020/4/1.
 */
@RequestMapping("/user")
@RestController
public class UserController {

    @Autowired
    private Sender sender;

    @RequestMapping("/send")
    public void send() {
        sender.send(User
                .builder()
                .userName("天文")
                .birthday(LocalDate.of(1995, 1, 31))
                .address("浙江杭州")
                .build());
    }
}
  • 正常场景:

消息落库,状态为1=发送中

回调

消息设置成投递成功

  • 异常场景

启动生产者服务后停止MQ 发送消息

因为收不到该条消息的ACK。所以一直处于发送中。开启任务调度再次进行投递(投递次数+1,且更新下次投递时间)

当投递次数达到最大投递次数,下一次,将消息设置成投递失败

调度日志

image.png

# Next
  • 消息可靠消费
  • 消费端限流保护
  • 死信队列
  • 延迟队列
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-04-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 喜欢天文 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • # Next
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档