前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redis进阶学习04---秒杀优化和消息队列

Redis进阶学习04---秒杀优化和消息队列

作者头像
大忽悠爱学习
发布2022-05-09 14:15:08
9640
发布2022-05-09 14:15:08
举报
文章被收录于专栏:c++与qt学习

Redis进阶学习04---秒杀优化和消息队列

  • 秒杀优化
    • 秒杀优化的具体实现
      • 基于jdk阻塞队列完成的秒杀优化总结
  • Redis消息队列实现秒杀
    • 基于Redis的List实现消息队列
    • 基于Redis的PubSub实现消息队列
    • 基于Stream实现消息队列
    • 基于Stream的消息队列之消费者组
  • Redis-Stream详解
    • 追加新消息,XADD,生产消息
    • 从消息队列中获取消息,XREAD,消费消息
    • 消息ID说明
    • 消费者组模式,consumer group
    • Pending 等待列表
    • 消息转移
    • 坏消息问题,Dead Letter,死信问题
    • 信息监控,XINFO
    • 命令小结
  • JAVA伪代码实现Stream消费队列
    • Stream消息队列小结
  • 综上比较
  • 使用Stream作为消息队列优化之前的秒杀案例
  • 总结

秒杀优化

如果一个饭店只有一个服务员,并且这个服务员不仅需要负责客人的点餐服务,还需要负责炒菜服务,显然这样的话,只能是先处理完第一个客人所有的点餐,烧菜任务后,才能去处理下一个客人的点餐,烧菜任务,这样显然把任务给串行化了,效率大大降低。

而现在我们就面临这样的问题:

在这里插入图片描述
在这里插入图片描述

目前整个秒杀的过程都是串行化执行的,并且这个流程里面涉及多次数据库查询操作,数据库查询是最耗费时间的,因此优化的思路就是把最耗费时间的数据库写操作转换为异步执行,然后把数据库查询操作通过redis查询替换掉,这样整体就分为了两部分,一部分是主线程去redis判断校验,然后如果判断和校验都通过了,就将消息放入一个队列中,异步线程从该队列中取出消息,然后去执行数据库写操作。

在这里插入图片描述
在这里插入图片描述

此时redis就相当于服务员,负责库存数量判断和重复购买校验,然后将合法的订单交易,放入队列中,异步处理线程,从队列读取消息,进行数据库写处理,即扣减库存,创建订单的耗时逻辑,全部异步完成。


显然,关于redis那部分判断逻辑,应该都由lua脚本来完成,而非java代码

在这里插入图片描述
在这里插入图片描述

秒杀优化的具体实现

在这里插入图片描述
在这里插入图片描述

1.新增优惠卷的同时,将优惠卷信息保存到Redis中

代码语言:javascript
复制
    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
        // 保存优惠券
        save(voucher);
        // 保存秒杀信息
        SeckillVoucher seckillVoucher = new SeckillVoucher();
        seckillVoucher.setVoucherId(voucher.getId());
        seckillVoucher.setStock(voucher.getStock());
        seckillVoucher.setBeginTime(voucher.getBeginTime());
        seckillVoucher.setEndTime(voucher.getEndTime());
        seckillVoucherService.save(seckillVoucher);
        //保存优惠卷信息到Redis
        stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucher.getId(),voucher.getStock().toString());
    }

测试:

在这里插入图片描述
在这里插入图片描述

2.lua脚本编写

代码语言:javascript
复制
-- 1.参数列表
-- 1.1 优惠卷id
local voucherId= ARGV[1]
-- 1.2 用户id
local userId= ARGV[2]

--2.数据key
--2.1库存key
local storeKey="seckill:stock:" .. voucherId
--2.2订单key
local orderKey="seckill:order:" .. voucherId

--3.脚本业务
--3.1判断库存是否充足 get storeKey
if(tonumber(redis.call('get',storeKey))<=0) then
    --3.2库存不足,返回1
    return 1
end

--3.2判断用户是否下单--set集合的判断方法,判断某个集合中是否存在某个value 
if(redis.call('sismember',orderKey,userId)==1) then
      --3.3存在,说明是重复下单,返回2
    return 2
end

--3.4扣库存incrby storeKey -1
redis.call('incrby',storeKey,-1)
--3.5下单(保存用户)sadd orderkey userId
redis.call('sadd',orderKey,userId)
return 0

3.修改抢购逻辑

代码语言:javascript
复制
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Autowired
    private ISeckillVoucherService iSeckillVoucherService;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedissonClient redissonClient;
    @Autowired
    private RedisWorker redisWorker;

     private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

     static {
         SECKILL_SCRIPT=new DefaultRedisScript<>();
         SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
         SECKILL_SCRIPT.setResultType(Long.class);
     }

    @Override
    @Transactional
    public Result seckillVoucher(Long voucherId) {
        Long uid = UserHolder.getUser().getId();
        //1.执行lua脚本
        Long res = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), uid.toString());
        //2.判断结果是否为0
        int r=res.intValue();
        if(r!=0){
            return Result.fail(r==1?"库存不足":"不能重复下单");
        }
         //3.为0,有购买资格,把下单信息保存到阻塞队列
        long order = redisWorker.nextId("order");
        //TODO:保存到阻塞队列
        //4.返回订单id
        return Result.ok(order);
    }
}

当我们测试一下后:

在这里插入图片描述
在这里插入图片描述

此时数据库并无变化,因为我们还没把消息放入阻塞队列,从而通知异步线程去处理


4.异步线程处理阻塞队列中的消息

代码语言:javascript
复制
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Autowired
    private ISeckillVoucherService iSeckillVoucherService;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedissonClient redissonClient;
    @Autowired
    private RedisWorker redisWorker;
    

     private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

     static {
         SECKILL_SCRIPT=new DefaultRedisScript<>();
         SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
         SECKILL_SCRIPT.setResultType(Long.class);
     }


    /**
     * 阻塞队列
     */
    private BlockingQueue<VoucherOrder> orderTasks=new ArrayBlockingQueue(1024*1024);

    /**
     * 异步线程
     */
    private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();

     @PostConstruct
     public void init(){
         SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
     }

    public class VoucherOrderHandler implements Runnable{
        @Override
        public void run() {
          while(true){
              //1.获取队列中的订单信息
              try {
                  //1.获取队列中的订单信息
                  VoucherOrder voucherOrder = orderTasks.take();
                  //2.创建订单
                  createVoucherOrder(voucherOrder);
              } catch (InterruptedException e) {
                  log.error("订单创建异常: ",e);
              }
          }
        }

        /**
         * 保守起见,还会再次进行判断
         */
        private void createVoucherOrder(VoucherOrder voucherOrder) {
            Long userId = voucherOrder.getUserId();
            Long voucherId = voucherOrder.getVoucherId();
            //创建锁对象
            RLock lock = redissonClient.getLock("lock:order:" + userId);
            //尝试获取分布式锁
            // 第一个参数为获取锁的最大等待时间(期间会重试)--默认-1,,失败直接返回
            //锁自动释放时间--默认30秒
            //时间单位
            boolean tryLock = lock.tryLock();
            if(!tryLock){
                log.error("用户["+userId+"]对"+voucherId+"优惠卷重复抢购");
            }
            //我们只需要确保下面这两行代码的集群并发问题被解决
            try{
                Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();

                if(count>0){
                    log.error("用户["+userId+"]对"+voucherId+"优惠卷重复抢购");
                }
            }finally {
                lock.unlock();
            }

            //6.扣减库存
            boolean success = iSeckillVoucherService.update()
                    .setSql("stock = stock -1")
                    .eq("voucher_id", voucherId)
                    .gt("stock",0)
                    .update();
            if(!success){
               log.error("库存扣减失败");
            }
            save(voucherOrder);
        }
    }

    @Override
    public Result seckillVoucher(Long voucherId) {
        Long uid = UserHolder.getUser().getId();
        //1.执行lua脚本
        Long res = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), uid.toString());
        //2.判断结果是否为0
        int r=res.intValue();
        if(r!=0){
            return Result.fail(r==1?"库存不足":"不能重复下单");
        }
        //3.为0,有购买资格,把下单信息保存到阻塞队列
        VoucherOrder voucherOrder = new VoucherOrder();
        long orderId = redisWorker.nextId("order");
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(uid);
        voucherOrder.setVoucherId(voucherId);
        orderTasks.add(voucherOrder);
        //4.返回订单id
        return Result.ok(orderId);
    }
}

基于jdk阻塞队列完成的秒杀优化总结

在这里插入图片描述
在这里插入图片描述

阻塞队列里面数据过多可能会导致jvm内存溢出,还有就是即便设置了阻塞队列最大元素个数上限也有弊端,就是如果元素过多,处理速度跟不上,会导致很多额外任务放入阻塞队列失败

还有就是数据都是存放在内存中的,一旦java程序出现异常,那么内存中的任务将会全部丢失,并且一旦出现异常,也会导致某个任务执行失败


Redis消息队列实现秒杀

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

基于Redis的List实现消息队列

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

基于Redis的PubSub实现消息队列

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

基于Stream实现消息队列

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

默认是非阻塞的,并且如果阻塞时长传入0,表示无限等待

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

基于Stream的消息队列之消费者组

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Redis-Stream详解

相信各位光看上面的介绍,应该对Stream还是一知半解,下面我来详细介绍一下它的用法:

追加新消息,XADD,生产消息

XADD,命令用于在某个stream(流数据)中追加消息,演示如下:

代码语言:javascript
复制
127.0.0.1:6379> XADD memberMessage * user kang msg Hello
"1651325244694-0"
127.0.0.1:6379> XADD memberMessage * user zhong  msg nihao
"1651325256282-0"
在这里插入图片描述
在这里插入图片描述

其中语法格式为:

代码语言:javascript
复制
XADD key ID field string [field string …]

需要提供key,消息ID方案,消息内容,其中消息内容为key-value型数据。

ID,最常使用*,表示由Redis生成消息ID,这也是强烈建议的方案。

field string [field string], 就是当前消息内容,由1个或多个key-value构成。

上面的例子中,在memberMemsages这个key中追加了user kang msg Hello这个消息。Redis使用毫秒时间戳和序号生成了消息ID。此


从消息队列中获取消息,XREAD,消费消息

XREAD,从Stream中读取消息,演示如下:

代码语言:javascript
复制
127.0.0.1:6379> XREAD streams memberMessage 0
1) 1) "memberMessage"
   2) 1) 1) "1651325244694-0"
         2) 1) "user"
            2) "kang"
            3) "msg"
            4) "Hello"
      2) 1) "1651325256282-0"
         2) 1) "user"
            2) "zhong"
            3) "msg"
            4) "nihao"
在这里插入图片描述
在这里插入图片描述

消息被读取后,并不会从stream队列中消失,这点需要注意

上面的命令是从消息队列memberMessage中读取所有消息。XREAD支持很多参数,语法格式为:

代码语言:javascript
复制
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

其中:

  • [COUNT count],用于限定获取的消息数量
  • [BLOCK milliseconds],用于设置XREAD为阻塞模式,默认为非阻塞模式
  • ID,用于设置由哪个消息ID开始读取。使用0表示从第一条消息开始。(本例中就是使用0)此处需要注意,消息队列ID是单调递增的,所以通过设置起点,可以向后读取。在阻塞模式中,可以使用 $ 表 示 最 新 的 消 息 I D 。 ( 在 非 阻 塞 模 式 下,表示最新的消息ID)。(在非阻塞模式下无意义)。

XRED读消息时分为阻塞和非阻塞模式,使用BLOCK选项可以表示阻塞模式,需要设置阻塞时长。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。

一个典型的阻塞模式用法为:

代码语言:javascript
复制
127.0.0.1:6379> XREAD block 1000 streams memberMessage $
(nil)
(1.07s)

我们使用Block模式,配合$作为ID,表示读取最新的消息,若没有消息,命令阻塞!等待过程中,其他客户端向队列追加消息,则会立即读取到。

因此,典型的队列就是 XADD 配合 XREAD Block 完成。XADD负责生成消息,XREAD负责消费消息。


消息ID说明

XADD生成的1553439850328-0,就是Redis生成的消息ID,由两部分组成:时间戳-序号。时间戳是毫秒级单位,是生成消息的Redis服务器时间,它是个64位整型(int64)。序号是在这个毫秒时间点内的消息序号,它也是个64位整型。较真来说,序号可能会溢出,but真可能吗?

可以通过multi批处理,来验证序号的递增:

代码语言:javascript
复制
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> XADD memberMessage * msg one
QUEUED
127.0.0.1:6379> XADD memberMessage * msg two
QUEUED
127.0.0.1:6379> XADD memberMessage * msg three
QUEUED
127.0.0.1:6379> XADD memberMessage * msg four
QUEUED
127.0.0.1:6379> XADD memberMessage * msg five
QUEUED
127.0.0.1:6379> EXEC
1) "1553441006884-0"
2) "1553441006884-1"
3) "1553441006884-2"
4) "1553441006884-3"
5) "1553441006884-4"

由于一个redis命令的执行很快,所以可以看到在同一时间戳内,是通过序号递增来表示消息的。

为了保证消息是有序的,因此Redis生成的ID是单调递增有序的。由于ID中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis的每个Stream类型数据都维护一个latest_generated_id属性,用于记录最后一个消息的ID。若发现当前时间戳退后(小于latest_generated_id所记录的),则采用时间戳不变而序号递增的方案来作为新消息ID(这也是序号为什么使用int64的原因,保证有足够多的的序号),从而保证ID的单调递增性质。

强烈建议使用Redis的方案生成消息ID,因为这种时间戳+序号的单调递增的ID方案,几乎可以满足你全部的需求。但同时,记住ID是支持自定义的,别忘了!


消费者组模式,consumer group

当多个消费者(consumer)同时消费一个消息队列时,可以重复的消费相同的消息,就是消息队列中有10条消息,三个消费者都可以消费到这10条消息。

但有时,我们需要多个消费者配合协作来消费同一个消息队列,就是消息队列中有10条消息,三个消费者分别消费其中的某些消息,比如消费者A消费消息1、2、5、8,消费者B消费消息4、9、10,而消费者C消费消息3、6、7。也就是三个消费者配合完成消息的消费,可以在消费能力不足,也就是消息处理程序效率不高时,使用该模式。该模式就是消费者组模式。

即消费者组模式可以让多个消费者协同合作,来共同消息队列中的消息,提高队列中消息的消费效率

消费者组模式的支持主要由两个命令实现:

  • XGROUP,用于管理消费者组,提供创建组,销毁组,更新组起始消息ID等操作
  • XREADGROUP,分组消费消息操作

进行演示,演示时使用5个消息,思路是:创建一个Stream消息队列,生产者生成5条消息。在消息队列上创建一个消费组,组内三个消费者进行消息消费:

代码语言:javascript
复制
# 生产者生成10条消息
127.0.0.1:6379> MULTI
127.0.0.1:6379> XADD mq * msg 1 # 生成一个消息:msg 1
127.0.0.1:6379> XADD mq * msg 2
127.0.0.1:6379> XADD mq * msg 3
127.0.0.1:6379> XADD mq * msg 4
127.0.0.1:6379> XADD mq * msg 5
127.0.0.1:6379> EXEC
 1) "1553585533795-0"
 2) "1553585533795-1"
 3) "1553585533795-2"
 4) "1553585533795-3"
 5) "1553585533795-4"

# 创建消费组 mqGroup
127.0.0.1:6379> XGROUP CREATE mq mqGroup 0 # 为消息队列 mq 创建消费组 mgGroup
OK

# 消费者A,消费第1条
127.0.0.1:6379> XREADGROUP group mqGroup consumerA count 1 streams mq > #消费组内消费者A,从消息队列mq中读取一个消息
1) 1) "mq"
   2) 1) 1) "1553585533795-0"
         2) 1) "msg"
            2) "1"
# 消费者A,消费第2条
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-1"
         2) 1) "msg"
            2) "2"
# 消费者B,消费第3条
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerB COUNT 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-2"
         2) 1) "msg"
            2) "3"
# 消费者A,消费第4条
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA count 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-3"
         2) 1) "msg"
            2) "4"
# 消费者C,消费第5条
127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerC COUNT 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-4"
         2) 1) "msg"
            2) "5"

上面的例子中,三个在同一组 mpGroup 消费者A、B、C在消费消息时(消费者在消费时指定即可,不用预先创建),有着互斥原则,消费方案为,A->1, A->2, B->3, A->4, C->5。语法说明为:

XGROUP CREATE mq mqGroup 0,用于在消息队列mq上创建消费组 mpGroup,最后一个参数0,表示该组从第一条消息开始消费。(意义与XREAD的0一致)。除了支持CREATE外,还支持SETID设置起始ID,DESTROY销毁组,DELCONSUMER删除组内消费者等操作。

XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq >,用于组mqGroup内消费者consumerA在队列mq中消费,参数>表示未被组内消费的起始消息,参数count 1表示获取一条。语法与XREAD基本一致,不过是增加了组的概念。

可以进行组内消费的基本原理是,STREAM类型会为每个组记录一个最后处理(交付)的消息ID(last_delivered_id),这样在组内消费时,就可以从这个值后面开始读取,保证不重复消费。

以上就是消费组的基础操作。除此之外,消费组消费时,还有一个必须要考虑的问题,就是若某个消费者,消费了某条消息,但是并没有处理成功时(例如消费者进程宕机),这条消息可能会丢失,因为组内其他消费者不能再次消费到该消息了。下面继续讨论解决方案。


Pending 等待列表

为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。命令XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息。演示如下:

代码语言:javascript
复制
127.0.0.1:6379> XPENDING mq mqGroup # mpGroup的Pending情况
1) (integer) 5 # 5个已读取但未处理的消息
2) "1553585533795-0" # 起始ID
3) "1553585533795-4" # 结束ID
4) 1) 1) "consumerA" # 消费者A有3个
      2) "3"
   2) 1) "consumerB" # 消费者B有1个
      2) "1"
   3) 1) "consumerC" # 消费者C有1个
      2) "1"

127.0.0.1:6379> XPENDING mq mqGroup - + 10 # 使用 start end count 选项可以获取详细信息
1) 1) "1553585533795-0" # 消息ID
   2) "consumerA" # 消费者
   3) (integer) 1654355 # 从读取到现在经历了1654355ms,IDLE
   4) (integer) 5 # 消息被读取了5次,delivery counter
2) 1) "1553585533795-1"
   2) "consumerA"
   3) (integer) 1654355
   4) (integer) 4
# 共5个,余下3个省略 ...

127.0.0.1:6379> XPENDING mq mqGroup - + 10 consumerA # 在加上消费者参数,获取具体某个消费者的Pending列表
1) 1) "1553585533795-0"
   2) "consumerA"
   3) (integer) 1641083
   4) (integer) 5
# 共3个,余下2个省略 ...

每个Pending的消息有4个属性:

  • 消息ID
  • 所属消费者
  • IDLE,已读取时长
  • delivery counter,消息被读取次数

上面的结果我们可以看到,我们之前读取的消息,都被记录在Pending列表中,说明全部读到的消息都没有处理,仅仅是读取了。那如何表示消费者处理完毕了消息呢?使用命令 XACK 完成告知消息处理完成,演示如下:

代码语言:javascript
复制
127.0.0.1:6379> XACK mq mqGroup 1553585533795-0 # 通知消息处理结束,用消息ID标识
(integer) 1

127.0.0.1:6379> XPENDING mq mqGroup # 再次查看Pending列表
1) (integer) 4 # 已读取但未处理的消息已经变为4个
2) "1553585533795-1"
3) "1553585533795-4"
4) 1) 1) "consumerA" # 消费者A,还有2个消息处理
      2) "2"
   2) 1) "consumerB"
      2) "1"
   3) 1) "consumerC"
      2) "1"
127.0.0.1:6379> 

有了这样一个Pending机制,就意味着在某个消费者读取消息但未处理后,消息是不会丢失的。等待消费者再次上线后,可以读取该Pending列表,就可以继续处理该消息了,保证消息的有序和不丢失。

此时还有一个问题,就是若某个消费者宕机之后,没有办法再上线了,那么就需要将该消费者Pending的消息,转义给其他的消费者处理,就是消息转移。请继续。


消息转移

消息转移的操作时将某个消息转移到自己的Pending列表中。使用语法XCLAIM来实现,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。演示如下:

代码语言:javascript
复制
# 当前属于消费者A的消息1553585533795-1,已经15907,787ms未处理了
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"
   2) "consumerA"
   3) (integer) 15907787
   4) (integer) 4

# 转移超过3600s的消息1553585533795-1到消费者B的Pending列表
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
1) 1) "1553585533795-1"
   2) 1) "msg"
      2) "2"

# 消息1553585533795-1已经转移到消费者B的Pending中。
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"
   2) "consumerB"
   3) (integer) 84404 # 注意IDLE,被重置了
   4) (integer) 5 # 注意,读取次数也累加了1次

以上代码,完成了一次消息转移。转移除了要指定ID外,还需要指定IDLE,保证是长时间未处理的才被转移。被转移的消息的IDLE会被重置,用以保证不会被重复转移,以为可能会出现将过期的消息同时转移给多个消费者的并发操作,设置了IDLE,则可以避免后面的转移不会成功,因为IDLE不满足条件。例如下面的连续两条转移,第二条不会成功。

代码语言:javascript
复制
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
127.0.0.1:6379> XCLAIM mq mqGroup consumerC 3600000 1553585533795-1

这就是消息转移。至此我们使用了一个Pending消息的ID,所属消费者和IDLE的属性,还有一个属性就是消息被读取次数,delivery counter,该属性的作用由于统计消息被读取的次数,包括被转移也算。这个属性主要用在判定是否为错误数据上。请继续看:


坏消息问题,Dead Letter,死信问题

正如上面所说,如果某个消息,不能被消费者处理,也就是不能被XACK,这是要长时间处于Pending列表中,即使被反复的转移给各个消费者也是如此。此时该消息的delivery counter就会累加(上一节的例子可以看到),当累加到某个我们预设的临界值时,我们就认为是坏消息(也叫死信,DeadLetter,无法投递的消息),由于有了判定条件,我们将坏消息处理掉即可,删除即可。删除一个消息,使用XDEL语法,演示如下:

代码语言:javascript
复制
# 删除队列中的消息
127.0.0.1:6379> XDEL mq 1553585533795-1
(integer) 1
# 查看队列中再无此消息
127.0.0.1:6379> XRANGE mq - +
1) 1) "1553585533795-0"
   2) 1) "msg"
      2) "1"
2) 1) "1553585533795-2"
   2) 1) "msg"
      2) "3"

注意本例中,并没有删除Pending中的消息因此你查看Pending,消息还会在。可以执行XACK标识其处理完毕!


信息监控,XINFO

Stream提供了XINFO来实现对服务器信息的监控,可以查询:

查看队列信息

代码语言:javascript
复制
127.0.0.1:6379> Xinfo stream mq
 1) "length"
 2) (integer) 7
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "groups"
 8) (integer) 1
 9) "last-generated-id"
10) "1553585533795-9"
11) "first-entry"
12) 1) "1553585533795-3"
    2) 1) "msg"
       2) "4"
13) "last-entry"
14) 1) "1553585533795-9"
    2) 1) "msg"
       2) "10"

消费组信息

代码语言:javascript
复制
127.0.0.1:6379> Xinfo groups mq
1) 1) "name"
   2) "mqGroup"
   3) "consumers"
   4) (integer) 3
   5) "pending"
   6) (integer) 3
   7) "last-delivered-id"
   8) "1553585533795-4"

消费者组成员信息

代码语言:javascript
复制
127.0.0.1:6379> XINFO CONSUMERS mq mqGroup
1) 1) "name"
   2) "consumerA"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 18949894
2) 1) "name"
   2) "consumerB"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 3092719
3) 1) "name"
   2) "consumerC"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 23683256

至此,消息队列的操作说明大体结束!


命令小结

  • XACK 结束Pending
  • XADD 生成消息
  • XCLAIM 消息转移
  • XDEL 删除消息
  • XGROUP 消费组管理
  • XINFO 得到消费组信息
  • XLEN 消息队列长度
  • XPENDING Pending列表
  • XRANGE 获取消息队列中消息
  • XREAD 消费消息
  • XREADGROUP 分组消费消息
  • XREVRANGE 逆序获取消息队列中消息
  • XTRIM 消息队列容量

JAVA伪代码实现Stream消费队列

在这里插入图片描述
在这里插入图片描述

Stream消息队列小结

在这里插入图片描述
在这里插入图片描述

综上比较

在这里插入图片描述
在这里插入图片描述

使用Stream作为消息队列优化之前的秒杀案例

在这里插入图片描述
在这里插入图片描述

lua脚本改造

代码语言:javascript
复制
-- 1.参数列表
-- 1.1 优惠卷id
local voucherId= ARGV[1]
-- 1.2 用户id
local userId= ARGV[2]
--1.3订单id
local orderId=ARGV[3]

--2.数据key
--2.1库存key
local storeKey="seckill:stock:" .. voucherId
--2.2订单key
local orderKey="seckill:order:" .. voucherId

--3.脚本业务
--3.1判断库存是否充足 get storeKey
local storeNum=redis.call('get',storeKey)
--3.2 如果redis中没有该优惠卷库存记录,返回3
-- lua中只有false和nil是假值, 其他都是真值
if(storeNum) then
else
    return 3
end
if(tonumber(storeNum)<=0) then
    --3.2库存不足,返回1
    return 1
end

--3.2判断用户是否下单--set集合的判断方法,判断某个集合中是否存在某个value
if(redis.call('sismember',orderKey,userId)==1) then
      --3.3存在,说明是重复下单,返回2
    return 2
end

--3.4扣库存incrby storeKey -1
redis.call('incrby',storeKey,-1)
--3.5下单(保存用户)sadd orderkey userId
redis.call('sadd',orderKey,userId)
--3.6发送消息到队列,XADD stream.orders * k1 v1 k2 v2
redis.call('xadd','stream.orders','*',"userId",userId,"voucherId",voucherId,"id",orderId)
return 0

代码实现

代码语言:javascript
复制
package com.hmdp.service.impl;

import cn.hutool.core.bean.BeanUtil;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisWorker;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

import static com.hmdp.utils.RedisConstants.*;

/**
 * <p>
 *  服务实现类
 * </p>
 *
 * @author 虎哥
 * @since 2021-12-22
 */
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Autowired
    private ISeckillVoucherService iSeckillVoucherService;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedissonClient redissonClient;
    @Autowired
    private RedisWorker redisWorker;

    /**
     * 建议放到yml配置文件中
     */
    private static final String LUA_FILE_PATH="seckill.lua";

    private static final String ORDER = "order";

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

     static {
         SECKILL_SCRIPT=new DefaultRedisScript<>();
         SECKILL_SCRIPT.setLocation(new ClassPathResource(LUA_FILE_PATH));
         SECKILL_SCRIPT.setResultType(Long.class);
     }

    /**
     * 异步线程
     */
    private static final ExecutorService SECKILL_ORDER_EXECUTOR= Executors.newSingleThreadExecutor();

    @PostConstruct
     public void init(){
        //如果指定stream队列关联的消费者组已经存在,则不进行处理
        if (!targetGroupExistsInStream(STREAM_QUEUE_NAME,STREAM_GROUP_NAME)) {
         //这里createGroup的mkStream为true,表示在创建消费者组时,如果关联的stream队列不存在,也会自动创建
        stringRedisTemplate.opsForStream().createGroup(STREAM_QUEUE_NAME,ReadOffset.from("0"),STREAM_GROUP_NAME);
        }
        //异步监听任务执行
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
     }

    private boolean targetGroupExistsInStream(String streamName,String groupName) {
          //如果stream队列不存在
        if(stringRedisTemplate.countExistingKeys(Collections.singletonList(streamName))==0){
         return false;
        }
        StreamInfo.XInfoGroups order_stream_groups = stringRedisTemplate.opsForStream().groups(streamName);
        Iterator<StreamInfo.XInfoGroup> iterator = order_stream_groups.iterator();
        while(iterator.hasNext()){
            StreamInfo.XInfoGroup xInfoGroup = iterator.next();
            if(xInfoGroup.groupName().equals(groupName)){
                return true;
            }
        }
        return false;
    }

    public class VoucherOrderHandler implements Runnable{
        @Override
        public void run() {
          while(true){
              try {
                  //1.获取队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
                  List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                          //消费组的名字和消费者的名字
                          Consumer.from(STREAM_GROUP_NAME, STREAM_GROUP_CONSUMER_NAME),
                          StreamReadOptions.empty().count(1)
                                  //我这里设置为一直阻塞,直到有消息可读为止
                                  .block(Duration.ofSeconds(0)),
                          //从哪个Stream队列进行消息读取,此处读取方式为>
                          StreamOffset.create(STREAM_QUEUE_NAME, ReadOffset.lastConsumed())
                  );
                  if (handleMsgFromStream(list)) {
                      continue;
                  }
              } catch (Exception e) {
                  log.error("处理消息异常");
                  //处理Pending队列中消息
                  handlePendingList();
              }
          }
        }

        private void handlePendingList() {
            while(true){
                try {
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from(STREAM_GROUP_NAME, STREAM_GROUP_CONSUMER_NAME),
                            StreamReadOptions.empty().count(1),
                            //从头开始消费pending队列中所有消息
                            StreamOffset.create(STREAM_QUEUE_NAME, ReadOffset.from("0"))
                    );
                    if(handleMsgFromStream(list)){
                        //Pending队列中没有错误消息,那么直接退出循环
                        break;
                    }
                } catch (Exception e) {
                    //处理Pending队列中的异常消息
                    //可以在这里做一些异常记录等
                    try {
                        //避免循环频率过高
                        Thread.sleep(3000);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }


        private boolean handleMsgFromStream(List<MapRecord<String, Object, Object>> list) {
            //2.判断订单信息是否为空
            if(list ==null || list.isEmpty()){
                //如果为null,说明没有消息
                return true;
            }
            //解析消息
            MapRecord<String, Object, Object> record = list.get(0);
            Map<Object, Object> value = record.getValue();
            VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
            //3.创建订单
            createVoucherOrder(voucherOrder);
            //4.确认消息
            stringRedisTemplate.opsForStream().acknowledge(STREAM_QUEUE_NAME,STREAM_GROUP_NAME,record.getId());
            return false;
        }

        /**
         * 保守起见,还会再次进行判断
         */
        private void createVoucherOrder(VoucherOrder voucherOrder) {
            Long userId = voucherOrder.getUserId();
            Long voucherId = voucherOrder.getVoucherId();
            //创建锁对象
            RLock lock = redissonClient.getLock(LOCK_ORDER_KEY + userId);
            //尝试获取分布式锁
            // 第一个参数为获取锁的最大等待时间(期间会重试)--默认-1,,失败直接返回
            //锁自动释放时间--默认30秒
            //时间单位
            boolean tryLock = lock.tryLock();
            if(!tryLock){
                log.error("用户["+userId+"]对"+voucherId+"优惠卷重复抢购");
                return;
            }
            //我们只需要确保下面这两行代码的集群并发问题被解决
            try{
                Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();

                if(count>0){
                    log.error("用户["+userId+"]对"+voucherId+"优惠卷重复抢购");
                    return;
                }
            }finally {
                lock.unlock();
            }

            //6.扣减库存
            boolean success = iSeckillVoucherService.update()
                    .setSql("stock = stock -1")
                    .eq("voucher_id", voucherId)
                    .gt("stock",0)
                    .update();
            if(!success){
               log.error("库存扣减失败");
               return;
            }
            save(voucherOrder);
        }
    }

    @Override
    public Result seckillVoucher(Long voucherId) {
        Long uid = UserHolder.getUser().getId();
        Long orderId = redisWorker.nextId(ORDER);
        //1.执行lua脚本
        Long res = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), uid.toString(),orderId.toString());
        //2.判断结果是否为0
        int r=res.intValue();
        if(r!=0){
            //说明redis中没有当前优惠卷的库存记录
            //去数据库查询该优惠卷是否存在,如果不存在,说明是恶意访问
            //如果存在,就更新redis记录
            if(r==3){
                if(handleUnKnownVoucherId(voucherId)){
                    seckillVoucher(voucherId);
                }else {
                    return Result.fail("指定优惠卷不存在");
                }
            }
            return Result.fail(r==1?"库存不足":"不能重复下单");
        }
        //3.为0,有购买资格,把下单信息保存到阻塞队列
        VoucherOrder voucherOrder = new VoucherOrder();
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(uid);
        voucherOrder.setVoucherId(voucherId);
        //4.返回订单id
        return Result.ok(orderId);
    }

    private Boolean handleUnKnownVoucherId(Long voucherId) {
        SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
        if(voucher==null){
            return false;
        }
        //更新redis记录
        stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY+voucherId,voucher.getStock().toString());
       return true;
    }
}

在这里插入图片描述
在这里插入图片描述

总结

本节最重要的地方还是在redis的Stream消息队列,并且也花了大量篇幅去讲解加实践,当然一切还是以官方文档为主,因此建议大家可以没事去看看redis的stream部分文档

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-05-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Redis进阶学习04---秒杀优化和消息队列
  • 秒杀优化
    • 秒杀优化的具体实现
      • 基于jdk阻塞队列完成的秒杀优化总结
  • Redis消息队列实现秒杀
    • 基于Redis的List实现消息队列
      • 基于Redis的PubSub实现消息队列
        • 基于Stream实现消息队列
          • 基于Stream的消息队列之消费者组
          • Redis-Stream详解
            • 追加新消息,XADD,生产消息
              • 从消息队列中获取消息,XREAD,消费消息
                • 消息ID说明
                  • 消费者组模式,consumer group
                    • Pending 等待列表
                      • 消息转移
                        • 坏消息问题,Dead Letter,死信问题
                          • 信息监控,XINFO
                            • 命令小结
                            • JAVA伪代码实现Stream消费队列
                              • Stream消息队列小结
                              • 综上比较
                              • 使用Stream作为消息队列优化之前的秒杀案例
                              • 总结
                              相关产品与服务
                              消息队列 CMQ 版
                              消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档