前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文读懂Springboot+RocketMQ+Redis抢单实现10W级QPS

一文读懂Springboot+RocketMQ+Redis抢单实现10W级QPS

原创
作者头像
QGS
修改2024-04-21 16:56:11
2520
修改2024-04-21 16:56:11
举报
文章被收录于专栏:QGS探索

环境介绍

技术栈

springboot+mybatis-plus+mysql+rocketmq

软件

版本

mysql

8

IDEA

IntelliJ IDEA 2022.2.1

JDK

17

Spring Boot

3.1.7

Redis

7.0.4

mybatis-plus

3.5.3.2

rocketmq

4.9.4

QPS:每秒处理请求的数量

高并发:很短时间内处理大量请求

并发:多个请求在同一时间内执行(模拟淘宝抢单活动)

并行:多核CPU说多给任务在同一时刻进行

synchronized (this):同步方法支持一种简单的策略来防止线程受到干扰和内存一致性错误;如果一个对象对多个线程可见,则对该对象变量的所有读取或写入都是通过同步方法完成。通俗点来说就是程序中用于保护线程安全的一种机制。

抢单-十万级QPS

假设:一个用户ID只能抢一个商品

代码语言:java
复制
RedisTemplate常用方法
redisTemplate.hasKey(key);    //判断是否有key所对应的值,有则返回true,没有则返回false
redisTemplate.opsForValue().get(key); //有则取出key值所对应的值
redisTemplate.delete(key);    //删除单个key值
redisTemplate.delete(keys);    //其中keys:Collection<K> keys
redisTemplate.dump(key);    //将当前传入的key值序列化为byte[]类型
redisTemplate.expire(key, timeout, unit); //设置过期时间
redisTemplate.expireAt(key, date);  //设置过期时间
redisTemplate.keys(pattern);   //查找匹配的key值,返回一个Set集合类型
redisTemplate.rename(oldKey, newKey); //返回传入key所存储的值的类型
redisTemplate.renameIfAbsent(oldKey, newKey); //如果旧值存在时,将旧值改为新值
redisTemplate.randomKey();    //从redis中随机取出一个key
redisTemplate.getExpire(key);   //返回当前key所对应的剩余过期时间
redisTemplate.getExpire(key, unit);  //返回剩余过期时间并且指定时间单位
redisTemplate.persist(key);    //将key持久化保存
redisTemplate.move(key, dbIndex);  //将当前数据库的key移动到指定redis中数据库当中

String类型

代码语言:java
复制
ValueOperations opsForValue = redisTemplate.opsForValue();
opsForValue.set(key, value); //设置当前的key以及value值
opsForValue.set(key, value, offset);//用 value 参数覆写给定 key 所储存的字符串值,从偏移量 offset 开始
opsForValue.set(key, value, timeout, unit);  //设置当前的key以及value值并且设置过期时间
opsForValue.setBit(key, offset, value); //将二进制第offset位值变为value
opsForValue.setIfAbsent(key, value);//重新设置key对应的值,如果存在返回false,否则返回true
opsForValue.get(key, start, end); //返回key中字符串的子字符
opsForValue.getAndSet(key, value); //将旧的key设置为value,并且返回旧的key
opsForValue.multiGet(keys);   //批量获取值
opsForValue.size(key);    //获取字符串的长度
opsForValue.append(key, value); //在原有的值基础上新增字符串到末尾
opsForValue.increment(key,double increment);//以增量的方式将double值存储在变量中
opsForValue.increment(key,long increment); //通过increment(K key, long delta)方法以增量方式存储long值(正值则自增,负值则自减)
Map valueMap = new HashMap();
valueMap.put("valueMap1","map1");
valueMap.put("valueMap2","map2");
valueMap.put("valueMap3","map3");
opsForValue.multiSetIfAbsent(valueMap);  //如果对应的map集合名称不存在,则添加否则不做修改
opsForValue.multiSet(valueMap);    //设置map集合到redis

数据库表

Yml配置

代码语言:java
复制
redis:
 host: 192.168.68.133
 port: 6379
 password: xxxxxx
 database: 12
mapper

代码语言:java
复制
@Mapper
public interface GoodsMapper extends BaseMapper<Goods> {

 @Select("SELECT id,stocks FROM goods WHERE `status`= 1 and spike = 1")
 List<Goods> getAllGoodsFlash();
}
@Mapper
public interface SnlogsMapper extends BaseMapper<Snlogs> {

}

Service

代码语言:java
复制
public interface GoodsService extends IService<Goods> {

 void FlashSaleBusiness(Integer userID, Integer goodsID);
}
public interface SnlogsService extends IService<Snlogs> {
}
@Service
public class GoodsServiceImpl extends ServiceImpl<GoodsMapper, Goods>
 implements GoodsService{
 @Resource
 private GoodsMapper goodsMapper;
 @Resource
 private SnlogsMapper snlogsMapper;

 @Override
 @Transactional(rollbackFor = Exception.class)
 public void FlashSaleBusiness(Integer userID, Integer goodsID) {
 //减库存
 Goods goods = goodsMapper.selectById(goodsID);
 Integer stocks = goods.getStocks()-1;
 if (stocks < 0){
 throw new RuntimeException("商品ID"+goods.getId()+"用户ID:"+userID+"抢单失败");
 }
 goods.setStocks(stocks);
 goods.setUpdateTime(new Date());
 goodsMapper.updateById(goods);
 //生成订单
 Snlogs snlogs = new Snlogs();
 snlogs.setCreateTime(new Date());
 snlogs.setUserId(userID);
 snlogs.setGoodsId(goodsID);
 String sn = String.valueOf(UUID.randomUUID());
 snlogs.setOrderSn(userID+"-"+goodsID+"-"+sn);
 snlogsMapper.insert(snlogs);
 }
}
@Service
public class SnlogsServiceImpl extends ServiceImpl<SnlogsMapper, Snlogs>
 implements SnlogsService{
}

初始化Mysql同步Redis

代码语言:java
复制
@Component
public class DataSync {
 //定时任务,将mysql库存定时同步到redis
 //@Scheduled(initialDelay = 1000,fixedDelay = 1000) initialDelay初始化一秒后执行,fixedDelay每间隔1秒后执行
 //@Scheduled(cron = "0 0 8 0 0 ?") //秒分时年月 每天8点执行
 //public void MysqlSyncRedis(){
 //
 //}
 @Resource
 private GoodsMapper goodsMapper;

 @Resource
 private RedisTemplate redisTemplate;

 /**
 * 项目启动后,将mysql库存同步到redis
 * 初始化(PostConstruct前,InitializingBean中,BeanPostProcessor后)
 */
 @PostConstruct
 public void initGoodsData(){
 List<Goods> goods =goodsMapper.getAllGoodsFlash();
 if (!CollectionUtils.isEmpty(goods)){
 goods.forEach(good->{
 redisTemplate.opsForValue().set("goodsId:"+String.valueOf(good.getId()),String.valueOf(good.getStocks()));
 });
 }
 }
}

生产者Controller

代码语言:java
复制
@RestController
@RequestMapping("/A")
public class FlashSaleController {

 @Resource
 private RedisTemplate redisTemplate;

 @Resource
 private RocketMQTemplate rocketMQTemplate;


 /**
 * 1.去重(查询该用户是否已经参与秒杀活动)
 * 2.扣减库存
 * 3.消息传入MQ
 * @param goodsId
 * @return
 */
 @GetMapping("/flashSaleA")
 public String flashSaleTest(Integer goodsId){
 //模拟用户id
 int userid = 200019987;
 /**
 * 1.查询去重-- 每个商品只能抢一次
 * userid + ":" + goodsId;
 */
 String uKey = userid + ":" + goodsId;
 System.out.println("uKey:"+uKey);
 //redis存入唯一标识
 Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(uKey, "");
 if (!aBoolean) {
 return "你已经秒杀过了";
 }

 /**s
 *2.扣减redis库存
 */
 Long count = redisTemplate.opsForValue().decrement("goodsId:"+goodsId);
 if (count < 0) {
 return "库存消耗完了";
 }
 /**
 *3.发送至MQ
 */
 rocketMQTemplate.asyncSend("FlashSaleTopic", uKey, new SendCallback() {
 @Override
 public void onSuccess(SendResult sendResult) {
 System.out.println("发送成功");
 }

 @Override
 public void onException(Throwable throwable) {
 System.out.println("发送失败:"+throwable.getMessage());
 }
 });

 return "抢单成功,请稍后查看清单信息";
 }


}

消费者FlashMQListener

代码语言:java
复制
@Component
@RocketMQMessageListener(
 topic = "FlashSaleTopic",
 consumerGroup = "FlashSaleConsumerGroup",
 consumeMode = ConsumeMode.CONCURRENTLY,
 consumeThreadMax = 10)
public class FlashMQListener implements RocketMQListener<MessageExt> {
 @Resource
 private GoodsService goodsService;
 /**
 * onMessage 消费者方法
 * @param messages 消息内容
 */
 @Override
 public void onMessage(MessageExt messages) {
 //userid + ":" + goodsId;
 System.out.println("接收到消息:"+new String(messages.getBody()));
 String msg = new String(messages.getBody());
 Integer userID = Integer.valueOf(msg.split(":")[0]);
 Integer goodsID = Integer.valueOf(msg.split(":")[1]);
 //4.减库存,添加订单信息--1、业务代码事务外加锁可实现线程安全 或2、数据库语句行锁update goods set stocks = stocks -1,update_time = now() where id = 1;
 synchronized (this){
 goodsService.FlashSaleBusiness(userID,goodsID);
 }

 }
}

验证

压力测试

压力测试工具apache-jmeter

https://dlcdn.apache.org//jmeter/binaries/apache-jmeter-5.6.3.zip

消息堆积问题

单条队列消息差值大于5w条,算消息堆积问题,根据引用场景定义

消息堆积的情况:

一、生产者生产太快

处理方式:

1、 生产者做业务限流

2、动态增加消费者数量(rocketmq-dashboard-1.0.0.jar)

3、增加消费者数量,但是消费者线程数量要<=队列数。队列确定后不建议更改,根据实际场景修改

IO型:逻辑处理器数*2, cpu型:逻辑处理器数+1

consumeThreadMax = 24

二、消费者产问题(程序奔溃,BUG等)

处理方式:

1、重启消费者程序,新增消费者

2、重置消费位点(再次消费)

3、 跳过堆积

消息丢失

1、生产者使用同步发送模式,MQ消息中间件返回确认后,执行业务程序,数据写入消息状态和创建时间

2、消费者消费后 ,修改数据状态

3、开启MQ的trace机制,消息跟踪机制

4、使用集群模式,主倍模式,将消息持久化在不同硬盘

5、MQ的刷盘机制设置为同步刷盘,性能相对不高(磁盘:随机读写,顺序读写),机械(随机读写比固态快)

6、数据库持久化,log日记

我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档