前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >充电桩项目实战:消息丢失和重复消费问题

充电桩项目实战:消息丢失和重复消费问题

作者头像
田维常
发布2024-04-10 16:19:06
1070
发布2024-04-10 16:19:06
举报

你好,我是田哥

在我的充电桩项目中,有个用户积分模块,原型图如下:

我的积分

下面来聊聊这个项目中,积分增加场景。

  • 用户充值完成后,赠送积分,比如:充100元,给用户积分新增100个。
  • 用户充电支付完成(非余额支付),赠送积分。
  • 用户邀请新人注册,赠送积分
  • 新用户认证完成,赠送积分
  • ....

关于积分增加策略,基本上都是由运营来定,总之,很多项目中都有这么个功能。

常规系统就是用户上面的行为伴随着用户积分处理完成(在同一个事物里),但,咱们为了提升系统性能和用户体验,我们通常把积分增加这类业务采用异步方式去实现。

比如:线程池、各种消息队列等

在我们这个版本中,采用的是RabbitMQ消息队列来实现的。

问题

既然使用RabbitMQ,那我们就不得不考虑关于消息的问题:

  • 消息丢失问题
  • 重复消费问题
消息丢失问题

这里对用户积分增加,如果把消息搞丢了,用户积分最终并不会得到增加,那用户肯定不干了,为了防止这类问题出现,我们采用下面的解决方案。

1:我们采用confirm模式+持久化+手动ack

2:消息丢失种鸽问题:消息发送失败,我们采用失败消息记录表

3:定时任务轮训失败消息记录,再次发送

这里为了防止多次重试问题,所以设置一个重试上限,并加入警告(比如一条消息最多重发5次,一旦到5次了,就给运维/开发/测试发送邮件警告)。

重复消费问题

这里是对用户积分增加,所以,绝对不能重复消费,不然这样会导致用户积分暴增,数据会出现一致性问题。解决:

1:每个消息有一个唯一的reqId,reqId=业务前缀+UUID+年月日时分秒毫秒的时间戳

2:在对比是否重复消费之前,对用户加上分布式锁,key=固定用户分布式锁前缀+userId

整体流程图

标配版:

标配版

为了更好地监控消息发送失败问题,我们还可以对标配版进行升级。

升级版

其他问题

我们上面说了,为了防止消息丢失,采用confirm模式+持久化+手动ack

但,实现起来并非那么简单,如果没有做过,很多东西是无法体会到的。

在使用confirm模式时,新的问题来了。

问题

我们Spring中,一个Bean默认是单列的,这样的话会造成一个RabbitMQTemplate只能绑定一个confirm,这就不对了,我们需要RabbitMQTemplate不受Spring这个影响,很多人第一印象想到的就是采用原型模式。也就是在bean上添加注解:**@Scope("prototype")** 但,问题来了,比如在一个producer bean里注入RabbitMQTemplate,他最终还是认为你这个RabbitMQTemplate是单列,又和上面原型违背了,网上很多办法是给这个producer也搞成原型模式。

这个确实能解决这个问题。

说白了就是 从请求开始的bean开始到最后发送消息,这个过程的bean要都是原型模式才行。 比如:controller--service--producer

挖了个蛐蛐,问题又来了,项目中定时任务采用的是xxl-job,它的每个job都必须是单列的,上面的办法又不行了。

绝招:用Spring中的ApplicationContext的getBean方法直接获取对应的Bean就不存在问题。

核心代码

定义原型的rabbitTemplate。

代码语言:javascript
复制
@Configuration
public class RabbitConfig {
    //省略部分非核心代码
    @Bean
    @Scope("prototype")
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory); 
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }
}

confirm模式

代码语言:javascript
复制
@Component
public class UserPointProducer { 
    @Resource
    private RetryMessageMapper chargeUserMapper;

    public void sendMessage(String message) {
        RabbitTemplate rabbitTemplate = ApplicationContextFactory.getBean(RabbitTemplate.class);
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("UserPointConfirm ConfirmCallback 关联数据:{},投递成功,确认情况:{}", correlationData, ack);
            } else {
                RetryMessage retryMessage = new RetryMessage();
                retryMessage.setContent(message);
                retryMessage.setRetry(5);
                retryMessage.setCreateTime(new Date());
                retryMessage.setStatus(0);
                retryMessage.setRetriedTimes(0);
                retryMessage.setType(RabbitMQConstantEnum.USER_POINT.getType());
                chargeUserMapper.insert(retryMessage);
                log.info("UserPointConfirm ConfirmCallback 关联数据:{},投递失败,确认情况:{},原因:{}", correlationData, ack, cause);
            }
        });
 
        //后面+1 主要是为了掩饰消息发送失败
        rabbitTemplate.convertAndSend(RabbitMQConstantEnum.USER_POINT.getExchange()+"1"
                , RabbitMQConstantEnum.USER_POINT.getRoutingKey(), message, message1 -> {
                    message1.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置消息持久化
                    return message1;
                }, correlationId);
    }
}

下面来写个测试发送案例:

代码语言:javascript
复制
/**
 * {@code @description:} 测试案例
 *
 * @author tianwc 公众号:Java后端技术全栈
 * 在线刷题 1200+java面试题和1000+篇技术文章:https://woaijava.cc/
 * {@code @date:} 2024-03-24 9:19
 * {@code @version:} 1.0
 */
@GetMapping("/send")
public void send() {
    UserPointMessage userPointMessage = new UserPointMessage();
    userPointMessage.setType(UserUpdatePointEnum.ADD.getType());
    userPointMessage.setUserId(1L);
    userPointMessage.setPoint(9);
    userPointMessage.setReqId(MessageReqIdPrefixConstant.USER_POINT + UUID.randomUUID()+ DateUtils.formatDefaultDateMs());
    userPointProducer.sendMessage(JSON.toJSONString(userPointMessage));
}

消费者:

代码语言:javascript
复制
/**
 * {@code @description:} 用户积分消息消费者
 *
 * @author tianwc 公众号:Java后端技术全栈
 * 在线刷题 1200+java面试题和1000+篇技术文章:<a href="https://woaijava.cc/">博客地址</a>
 * {@code @date:} 2024-03-24 9:19
 * {@code @version:} 1.0
 */
@RabbitListener(queues = "user.point.queue")
@Component
@Slf4j
public class UserPointConsumer {

    @Resource
    private UserPointService userPointService;

    @RabbitHandler
    public void process(Object data, Channel channel, Message message) throws IOException {
        try {
            log.info("消费者接受到的消息是:{},消息体为:{}", data, message);
            UserPointMessage userPointMessage = JSON.parseObject(new String(message.getBody()), UserPointMessage.class);
            userPointService.updateUserPoint(userPointMessage);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception exception) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

具体业务逻辑实现:

代码语言:javascript
复制
/**
 * {@code @description:} 用户积分扣除消费者服务实现类
 *
 * @author tianwc 公众号:Java后端技术全栈
 * 在线刷题 1200+java面试题和1000+篇技术文章:<a href="https://woaijava.cc/">博客地址</a>
 * {@code @date:} 2024-03-24 9:19
 * {@code @version:} 1.0
 */
@Slf4j
@Service
public class UserPointServiceImpl implements UserPointService {
    @Resource
    private ChargeUserMapper chargeUserMapper;
    @Resource
    private RedissonClient redissonClient;
    @Resource
    private PointsChangeRecordMapper pointsChangeRecordMapper;

    @Transactional(rollbackFor = Exception.class)
    @Override
    public void updateUserPoint(UserPointMessage userPointMessage) {
        RLock lock = redissonClient.getLock(RedisConstantPre.USER_INFO_ID_PRE + userPointMessage.getUserId());
        lock.lock();
        try {
            int count = pointsChangeRecordMapper.selectByReqId(userPointMessage.getReqId());
            if (count > 0) {
                log.info("消息体参数 【重复消息】:{}", userPointMessage);
                return;
            }
            PointsChangeRecord pointsChangeRecord = new PointsChangeRecord();
            pointsChangeRecord.setUserId(userPointMessage.getUserId());
            pointsChangeRecord.setPoint(userPointMessage.getPoint());
            pointsChangeRecord.setType(userPointMessage.getType());
            pointsChangeRecord.setCreateTime(new Date());
            pointsChangeRecord.setReqId(userPointMessage.getReqId());
            //积分变动记录
            pointsChangeRecordMapper.insert(pointsChangeRecord);

            ChargeUser chargeUser = chargeUserMapper.selectByPrimaryKey(userPointMessage.getUserId());
            if (userPointMessage.getType() == UserUpdatePointEnum.ADD.getType()) {
                chargeUser.setPoint(chargeUser.getPoint() + userPointMessage.getPoint());
            } else {
                chargeUser.setPoint(chargeUser.getPoint() - userPointMessage.getPoint());
            }
            //用户积分变动
            chargeUserMapper.updateByPrimaryKey(chargeUser);
        } finally {
            lock.unlock();
        }
    }
}

这里的分布式锁的好处:

  • 保证了这个重复消费部分代码的原子性
  • 保证了此时只有一个线程对用户积分进行修改

其实,正常情况下,不会走失败消息记录表,但是作为程序不得不多考虑点。

定时任务部分代码:

代码语言:javascript
复制
/**
 * {@code @description:} 用户积分消息发送job
 *
 * @author tianwc 公众号:Java后端技术全栈
 * 在线刷题 1200+java面试题和1000+篇技术文章:https://woaijava.cc/
 * {@code @date:} 2024-03-24 9:19
 * {@code @version:} 1.0
 */
@Slf4j
@Component
public class UserPointRetryMessageJob {

    @Resource
    private RetryMessageMapper retryMessageMapper;
    @Resource
    private UserPointProducer userPointProducer;

    @XxlJob("userPointRetryMessageJob")
    public void process() {
        log.info("开始执行 userPointRetryMessageJob 定时任务");
        XxlJobHelper.log("start userPointRetryMessageJob job");
        int countRetryMessage = retryMessageMapper.countRetryMessage(0, 0);
        if (countRetryMessage == 0) {
            log.info(" 执行结束 userPointRetryMessageJob 没有消息需要重发");
        }
        List<RetryMessage> retryMessages = retryMessageMapper.selectRetryMessage(0, 0);
        for (RetryMessage retryMessage : retryMessages) {
            userPointProducer.sendMessage(retryMessage);
        }
    }
}

这里还可以优化,你能想到吗? List<RetryMessage> retryMessages = retryMessageMapper.selectRetryMessage(0, 0); 这里是一次性全部查出来了,如果出现大量消息发送失败,一次性放到本地缓存里,很容易出问题,所以,我们可以再优化成分页进行处理,比如:每次处理50条,再根据count来计算需要进行分页。

定时任务中生产者代码实现:

代码语言:javascript
复制
/**
 * {@code @description:} 用户积分消息发送生产者
 *
 * @author tianwc 公众号:Java后端技术全栈
 * 在线刷题 1200+java面试题和1000+篇技术文章:https://woaijava.cc/
 * {@code @date:} 2024-03-24 9:19
 * {@code @version:} 1.0
 */
@Slf4j
@Component
public class UserPointProducer {
    @Resource
    private RetryMessageMapper chargeUserMapper;

    public void sendMessage(RetryMessage retryMessage) {
        log.info("用户积分消息重试补发,{}", retryMessage);
        String message = retryMessage.getContent();
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        RabbitTemplate rabbitTemplate = ApplicationContextFactory.getBean(RabbitTemplate.class);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                retryMessage.setStatus(1);
                chargeUserMapper.updateByPrimaryKey(retryMessage);
                log.info("UserPointConfirm ConfirmCallback 关联数据:{},投递成功,确认情况:{}", correlationData, ack);
            } else {
                retryMessage.setRetriedTimes(retryMessage.getRetriedTimes() + 1);
                chargeUserMapper.updateByPrimaryKey(retryMessage);
                log.info("UserPointConfirm ConfirmCallback 关联数据:{},投递失败,确认情况:{},原因:{}", correlationData, ack, cause);
            }
        });

        rabbitTemplate.convertAndSend(RabbitMQConstantEnum.USER_POINT.getExchange()
                , RabbitMQConstantEnum.USER_POINT.getRoutingKey(), message, message1 -> {
                    message1.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置消息持久化
                    return message1;
                }, correlationId);
        log.info("用户积分消息重试补发完成");
    }
}
失败消息表
代码语言:javascript
复制
CREATE TABLE `retry_message` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `type` int DEFAULT NULL,
  `content` text,
  `retried_times` int DEFAULT NULL,
  `retry_limit` int NOT NULL DEFAULT '1',
  `create_time` datetime DEFAULT NULL,
  `status` int DEFAULT '0',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 ;

一个数据库里就只要一张表即可,专门用来存储发送失败消息。

tyep:什么业务场景 content:具体消息内容 retried_times:已经重试了几次 retry_limit:重试次数限制 status:状态,是否需要重试

这里的重试次数限制,我们也可以采用配置的方式,这样就可以在分布式配置中心对此进行动态调整,不过,这个好像没什么必要,因为不会频繁地更换这个限制。直接存在表里还可以动态的针对某些业务做特殊处理,比如业务A限制次数2次,业务B次数改成3次....

没有完美的解决方法,但总有相对完美的解决方案即可。

关于积分模块,其实不止有增加积分,还有扣除积分。

比如:用户使用积分兑换优惠券,积分目前设计在用户中心,优惠券又在营销中心,所以,会涉及到分布式事务问题。 我们可以采用Seata、Atomikos、RockSeataetMQ等技术来解决,目前充电桩项目中用到过Atomikos,但是代码量实在是会增加不少,最后使用了Seata来解决分布式事务问题。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2024-03-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java后端技术全栈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题
    • 消息丢失问题
      • 重复消费问题
        • 整体流程图
          • 其他问题
            • 核心代码
              • 失败消息表
          相关产品与服务
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档