00:00
接下来我们说一下rabbit MQ的延时队列,我们可以使用这个延时队列的最终实现我们定时任务的效果,比如我们先来看一下我们延迟队列的这个使用场景,这个场景呢,我们系统里边首先第一个是下订单,我们正常呢是订单下成功以后,如果30分钟这个人没支付,那系统呢就会把这个订单关了,那关了以后呢,我们这个订单以前锁定的库存肯定得解锁,那我们先不说锁库存的这个事情,那订单30分钟以后关闭订单,那这个操作呢,们可以用定时任务来做,假设我们以前用定时任务,我们就可以每隔30分钟,比如呢,我们来扫描一次数据库,数据库我们来看哪些订单呢?现在还没有支付,如果呢,没有支付,而且它到期了,我们就把它给关掉了,那包括如果关了这个订单以后,我们肯定要解锁它的库存,所以我们这个下订单呢,需要我们30分钟以后关单,那在接下来我们说这个锁库。
01:00
存,那这个锁库存呢,我们现在往往是这样子的,订单下成功,然后呢,我们下订单会调用锁库存方法,还会调用别的方法,那万一是我们这个所库存成功了,订单呢,本来也成功了,但别的方法调用失败,订单呢自己会回滚,那就不存在关单的问题,但我们这个库存服务呢,如果我们使用西塔来控制分布式事务,它也得跟着回滚,但是呢,为了方便起间,我们说为了高并发,我们不用C塔控,不用西塔控怎么办呢?我们希望库存锁定了的这些库存也能自己来解个锁,所以呢,我们就可以设置一个定时任务,比如我们40分钟以后来进行一个检查,为什么要设置40分钟,因为订单的关单时间是30分钟,因为我们想要检查的话,那这个订单人家可能是成功的,所以呢,我们在确保所有的订单肯定都没问题以后,我们来检查这些订单的这些库存状态,然后呢,我们来看,如果我们来定时扫描我们库存里边的所有这。
02:00
一些状态我们发现呢,诶,我们刚锁的这个库存对应的这个订单根本就没有,那订单不存在,那就是上一次回滚了,或者我们这个订单呢,已经被人取消了,那说明我们这个订单呢,他没有支付,那么此时就要把我们之前锁定的库存呢,就得解锁回来,所我们这个锁库存到解锁库存,我们也可以来做一个定时任务,但是如果我们都是来使用定时任务,就会给我们带来很大的问题,首先我们定时任务消耗我们这个系统内存,并且呢,它增加我们数据库压力,那么定时任务呢,是每隔一段时间就要轮询去来访问数据库,相当于我们这个数据库呢,每隔一段时间就要做一个全盘扫描,全盘扫描扫这些订单,扫这些库存,这样呢,整个数据库的压力就会非常大,而且我们这个定时任务最致命的问题是它有较大的时间误差,比如我们来看一下这个场景。们来看我们的这个时间误差,如果我们使用定时任务,我们每隔30分钟,每隔30分钟来想扫一次,那现在呢,就变成这样,我们假设呢,第一次下单,这个订单呢,是在这我们下过来了,它呢是在第一分钟下的,但是我们这个定时任务呢,上一个定时工任务刚运行完,因为我们这个下单它是一个时间随机的,我们也不知道人家啥时候下,那我们这个上一次定时任务刚下完,那他呢,接下来一分钟他下了个订单,然后呢,我们隔上30分钟以后,我们定时任务扫,那扫它的时候呢,巧了么?这个订单呢,他还没有经过30分钟的这个煎熬期,哎,那么这个只有30分钟以后没支付才能关,所以他还差一分钟,所以我们这个定时任务呢,扫到他发现他不符合条件,那不给他进行关单,那接下来呢,结果定时任务刚一完,我们这个订单呢,自己相当于就超时了,在数据库里边,比如我们记录了订单的两个时间,下单时间,那接下来我们再来加上30分钟。
03:53
就是它的这个解锁时间,但我们发现呢,上一次呢,不符合要求,我们把它放过,放过了以后呢,结果我们刚一放过人家呢超时了,超时了以后啥时候能被扫到呢?那就得等下一次的定时任务,相当于我们再来等29分钟以后,那我们现在总共等了多长时间呢?订单的超时30分钟家我们再来等上29分钟,因为它一分钟以后在这儿过期了,那现在呢,59分钟以后呢,我们这个订单呢,可能才会被扫到,它这个已经过期了,没要锁解锁库存了,所以呢,这是我们这个定时任务会出现的时效性问题,那基于这些考虑,我们都可以不去在这个场景下来采用定时任务,那采用什么呢?采用我们MQ的延时队列,这延时队列呢,它是基于消息的TTLTTL就是消息的存活时间和我们这个死性路由来结合的,这两个我们下来会说,那有了这个延时队列,我们工作应该是这样工作的,比如我们这个下订单,订单一下。
04:53
成功以后,我们就给消息队列里边发一个消息,说我们哪个单咔咔咔下成功了,然后呢,我们以前是定时任务,30分钟以后扫描关闭,那么现在就不用管了,不用写定时任务,我们发的这个消息呢,发到队列里边这个队列。
05:10
他的消息最大的特点就是他们这些消息呢,30分钟以后才能被人收到,所以这样的话呢,我们如果有一个服务专门来监听这个队列,那接下来我们这里边存来的这些订单下成功的消息只有30分钟以后才会来到我们的监听者的这一块,那这样呢,监听者拿到这个订单再一查,结果这个30分钟我都到了,这个消息都抵达我这儿了,你还没支付,那么就给你关了。所以呢,我们整个过程呢,无需任何定时任务,我们让MQ相当于把消息暂缓存一段时间,包括我们这个锁库存也一样,只要我们库存锁成功了,我们就给MQ里边发一个消息,这样呢,我们MQ先把消息保持上一段时间,先别着急出发,然后呢,到了我们这段时间以后,MQ自己把这它发出去,发出去呢,我们解锁库存服务,一拿着这个消息一检查订单,诶这个订单呢,没支付或者订单呢,早都没有了,那我们就给。
06:10
啊,解锁库存,所以我们使如果使用延时队列,我们基本呢就能解决我们定时任务的这个大面积的时效性问题,那么这个延时队列呢,可能时效性差上那么一秒五秒,乃至于一分钟,我们都不可能差上20分钟,所以呢,我们整个基于考虑,我们应该使用延时队列的这个场景来做我们的下订单,关闭订单和锁库存以后的解锁库存操作,最终保证我们的事物一致性,也就是说我们的最终一致性。我们能引入它的目的,我们引入MQ的现在的第一个目的,我们就是来解决事物的最终一致性的,因为我们这个订单最终还是要关的,所以呢,我们使用MQ来暂缓存一段时间消息。不占用系统的任何资源,只是多架设一个MQ服务器,等时间到了以后,我保证我们最终的数据一致,那这是我们说的延时队列的使用,那接下来要怎么使用延时队列,我们延时队列呢,是结合消息的TTL和死性的exchange,这两个都是什么意思?还有什么叫死性,什么是t TL exchange,我们都来看一下。首先我们得知道消息的TTLTTL呢叫time to live,就是一个东西的存活时间,大家给red里边存数据,我们用KV存数据的时候,我们给red的这个K也能指定TTL就是它的存活时间,所以我们这个MQ的消息呢,也有一个它的存活时间,这个存活时间呢,我们可以对队列和消息都设置这个存活时间,而且呢,我们给队列和消息都能设置它的TTL存活时间,那无论给哪个设置它的存活时间代表的都是一个意思,什么意思呢?就是我们这个消息只要在我们指定的时间。
07:57
里边没有被人消费,那这个消息呢,我们就相当于没用了,我们就称为把它叫做死信,那么这个消息呢,相当于就死了,那死了以后呢,我们就可以把它做一个单独处理,所以我们现在如果是给队列设置了过期时间,那就是这样子的,队列里边的这些消息我们存到里边,如果这些消息呢,一直没有被人监听,如果我们一起监听,肯定就拿到了,如果我们这个消息呢,没有连任何的消费者,那么这个队列呢,没有连上消费者,队列里边的消息只要30分钟一过,那这些消息呢就成为了死性没人要的东西,那么接下来就可以把它扔掉了,服务器呢,默认就会把它进行丢弃,那我们给消息设置,我们之前说也是一样的效果,如果我们给单独的每一个消息设置,我们设置了30分钟过期时间存到这个队列里边,只要这个队列没人消费这个消息,消息30分钟没有人消费,它呢就自己过期了。所以我们设置消息的TTL的目的就是只要有。
08:57
一段时间,我们指定的这段时间没人把你这个消息取走,你呢就会被服务器认为这是一个死性,我们称为死性,就把它丢弃了,也就说我们这个消息呢就死亡了,那接下来我们就来说一下什么叫死性,也就说我们这个一个消息,如果满足如下条件,那他们呢就会成为死性,而且呢,死了的这些消息,比如我们这些是队列里边的消息,死了的这些消息呢,我们可以让它进入到一个路由,就是exchange买这个路由呢,我们可以称为死性路由,它就是一个我们普通的交换机,但只是呢,这个交换机呢,可以收到这些死了的消息,那哪些消息是死性,那就是这些消息第一个是我们被consumer拒收的,因为在说手动确认模式里边,我有一个叫reject的方法,相当于拒收,拒收这个消息,我呢,有一个人虽然监听了这个队列,我把你这个消息拿到了。但是呢,我相。
09:57
他拒签了,我把你这个消息拒收了,而且不让他再回归到队列里边,所以呢,就相当于直接要丢弃了。
10:06
那这个东西呢,我们就称为死性,这是第一种死刑条件,那第二种我们消息的存活时间到了,比如我们队列里边设置了30分钟过期,这个里边存的消息呢,只要过了30分钟没人取走,那他们呢就存活时间到了,消息过期就成为了死刑,这是第二种情况,包括我们的第三种情况。如果我们这个队列里边呢,我们限定了长度,我们消息呢,只要放进来,我们就会放到队列里边,那放到队列里边,如果我们这个队列满了,那么以前放的这些老消息呢,就会被人家服务器给丢掉,服务器呢就认为这是一个死性,所以呢,我们这个死性就是这么来的,但是呢,如果这个姓死了以后,如果我们把它丢掉,我们就没有延迟队列的功能了,那这个延迟队列呢,最重要的特点就是我给队列设一个过期时间30分钟,然后呢,30分钟以后,只要我们这个消息没人消费,我们就认为是死信,死信了怎么办?我们让服务器别乱丢,扔给隔壁的一个交换机,这个交换机呢,我们就可以称为死性路由,他们专门来收这些死性,然后呢,他把这个死性再路由到另外一个队列里边,别人呢,专门监听这个队列,注意上边的这个队列,我们不能让任何人监听,一监听消息就算。
11:29
置了过期时间,被人一拿,什么都没了。所以呢,消息只要一过期,让服务器别乱丢,丢给了exchange exchange再丢给另外一个队列。其他人呢,去来监听这个队列,那这个里队列里边的所有内容,他呢,其实都是我们这个队列里边过了30分钟以后的这些消息,他被路由到过来的,所以我们相当于就模拟了一个延迟,那30分钟在这儿呢,存一下,因为没人消费它存完了以后呢,又回归给你放到这个队列里边,那如果我们解锁订单的服务,一直来监听这个队列,订单一下成功,先把订单消息呢放到这一块,延迟上30分钟,延迟以后呢,交给我们这个交换机,交换机再重新路由回来,那么收到的这些订单消息一定都是过了30分钟时间的,所以呢,我们将刚才的这个东西,就是我们队列过了过期时间的消息放给这个路由,把这个路由呢,我们就称为死性路由,我们死了的这些消息我们就称为死性,死性路由呢,就是一个非常普通的路由而已,那只要这两者结合,我们就能模拟出延时队列,消息在这儿先保持。
12:41
IPHONE1030分钟这个队列呢,一直不被人消费,然后30分钟一过以后,消息被服务器认为是死性,再来丢给我们这个交换机,然后呢,这个交换机再丢给我们指定的队列,然后这个指定的队列才被人消费,只要这个人能从这个队列里边拿到内容,这些内容一定都是过了我们这个30分钟的内容,我就是通过它来模拟延时队列的,所以最终的效果呢就是这样。
13:07
我们延时队列呢,可以这么来实现,比如我们一个p publish,我们消息的发送者,还有一个c consumer,我们的消费者,我们这个发送者呢,我们发消息的时候,我们指定了一个路由件,比如叫de message,然后呢,我们这个消息先会被交给一个交换机,这是一个超级普通的交换机,然后呢,交换机会按照这个路由键把它呢交给一个队列,那这个队列跟交换机的绑定关系就是这个DELL message,但这个队列呢,它非常特殊,队列呢有这么几项设置,第一项设置叫X message TTL,叫我们消息的存活时间,它呢是以毫秒为单位,相当于我们300秒以后。也就是五分钟以后,我们这个消息就算过期,这个队列呢,除了设置这个参数外,它还设置了一个参数叫X date letter exchange,叫我们的死性交换机,就是死性路由,也意思就是告诉服务器,我们这个队列里边消息死了,别乱丢,扔给隔壁的这个路由叫delight exchange,所以我们隔壁呢,可能有一个路由名字叫deline exchange,所以我们这块消息只要死了,我们就不会乱丢,丢给我们的delay exchange,丢给它以后,接下来怎么办?那又告诉了我们服务器,你如果这个死性要往出丢,丢给那个路由用的这个件root key叫什么呢?叫de delay message,所以呢,我们死性就会交给我们这个普普通通的路由,然后呢,又会按照这个路由件交给这个路由,所以呢,这个路由收到了,我们这个死线,一看路由件是delete message,那呢他就会找。
14:50
啊,哪个队列跟他绑定的路由件叫delay message,然后呢,把这个消息就交给这个队列,以后呢,只要有人去来监听我们这个队列,那我们这个人收到的消息都是在这儿存了五分钟以后的过期消息,这是我们延时队列的第一种时间,我们可以给队列直接设置所有消息的过期时间,这个队列里边所有消息的过期时间,我们说这个过期时间呢,不仅可以给队列设置,我们给消息单独设置也行,比如第二种时间就可以是这种,比如我们这个消费者呢,发了一个消息,他中间过了哪些交换机,我们就不看了,他呢发的这个消息,他为消息呢单独设置了一个过期时间,比如它是300毫秒,300秒五分钟,然后呢,这个消息经过这个交换机被交给我们这个延时队列,因为我们这个队列里边呢,他说队列里边的消息死了,别乱丢,是丢给我。
15:50
那么这个交换机用这个路由件由我们这个队列消息存到队列以后,没有人永远去监听这个队列里边的内容,所以呢,这个消息就一直待在队列里边,什么时候过期呢,服务器就会来检查,那么这个消息呢,发现它是五分钟以后过期的,所以五分钟以后服务器呢,就会把第一个消息拿出来,然后呢,把这个消息按照我们队列的指定死了的消息交给这个路由,然后呢再交给我们这个队列,所以我们这一块呢,最终收到的消息也都是五分钟以后的过期消息,这是我们两种给我们这个消息设置过期时间和给队列设置过期时间,但是呢,我们推荐大家用的都要应该是给队列设置过期时间,因为给消息如果获设置过期时间,我们rabbit MQ它采用的是叫惰性检查机制,也就说呢,我们是这个蓝检查,什么叫蓝检查,那么这个MQ里边呢,假设这个队列里边存了第一个消息,第一个消息呢,我们指定它呢,是一。
16:50
啊,五分钟以后过期的,我们给这个队列连发了三条消息,第一个是五分钟以后过期,第二个呢是一分钟以后过期,第三个呢是一秒以后过期,然后我们难道按照正常情况应该是一秒过期的,我们优先弹出这个队列,但是呢,服务器不是这么检查的,服务器呢是这样,他从队列里边呢,先来拿第一个消息,第一个消息呢,他刚一拿,人家说要五分钟过期,然后呢,他又放回去了,行了,我不拿了,你要五分钟过期,那我五分钟以后再来拿,服务器呢,五分钟以后那会把第一个消息拿出来,那第一个消息呢,相当于就过期了,这是一个死性,然后交给这个交换机,最终路由到这儿,所以我们会发现,我们设置到一分钟过期的消息,得等第一个五分钟过期了以后,它呢才能过期,所以呢,第一个消息过期了以后,服务器来拿第二个消息,第二个消息呢,他说一分钟过期,当然服务器也不用等他一分钟,因为他发消息有一个时间,服务器一算,诶你这个早过期了,赶紧买。
17:50
那么然后呢,第三个他说一秒以后过期,那我们也给他扔了,当然呢,我们会发现扔这两个消息呢,就会在五分钟以后才扔,所以呢,我们应该使用给整个队列设置一个过期时间,这样队列里边所有的消息都是这个过期时间,我们服务器呢,直接批量咔咔咔全部拿出来给这一块来放就行了,那下一课呢,我们就使用代码模拟一下我们的延时队列,实现我们的定时功能,我们比如五分钟以后来关闭订单。
我来说两句