00:00
前面我们使用延时队列完成了订单的自动关单的功能,以及我们库存的自动解锁功能,特别对于订单的关单来说,只要我们关闭了这个订单,我们的消息呢还会发给队列,然后呢让它自动解锁库存,而且呢,为了防止我们在订单创建的时候,然后呢由于其他方法失败,我们库存已经创建成功,但订单回滚,那我们这个库存呢,也应该触发自动解锁逻辑,所以我们在这儿呢,还写了一个自动解锁逻辑,库存到时间以后呢,也会自动解锁,那么最终呢,就是利用MQ保证了我们的最终一致性,无论是什么时候,我们只要订单失败了,我们库存呢最终一定会解锁,那这个最终一致性呢,就是我们这个分布式事物里边的柔性事务方案,但是呢,我们想要使用最终一致性来保证我们这个分布式事务最应该强调的就是这四个字叫可靠消息,你们想要做最终一致性,我们呢全是靠发消息来完成的。
01:01
我们要发给MQ相关的消息,那MQ收到以后呢,我们其他服务监听到我们去来执行所可靠消息呢非常重要,如果由于不任何不可靠的因素导致我们消息出现了一些问题,故障,没发出去等等,那我们可能会面临很大的损失,咱们来说一下如何保证我们这个消息的可靠性,来看一下rabbit MQ我们这个文档里边,首先我们这个消息可能会导致哪些问题,我们常见的几个问题,第一个是我们消息丢失,我们这个消息呢丢了,第二个呢是我们消息重复,我们相当于一个消息多发了好多遍,还有我们这个消息的积压,我们消息好的东西没人消费,所以我们来看一下每一个这些不可靠因素出来以后,我们都怎么办?首先我们来看消息丢失,消息丢失呢是一个非常害怕的现象,特别是我们做这些电商业务,我们只要有一个消息丢失,比如我们这个订单消息的,我们发送到MQ里边,消息进行丢失了。
02:01
可能会影响到后边一连串的操作,比如我们的MQ的解锁库存,包括我们后边的物流等等各种信息,所以我们说我们一般要使用消息事物,也就是我们说的可靠消息加最终一致性,那我们呢就应该保证消息可靠性,首先保证消息不丢,那我们先来看一下消息在哪些场景下可能会丢。第一种场景,什么消息呢?发出去,但由于网络问题没有抵达服务体系,比如参照我们这一块的代码,我们这个订单服务。他呢想要关单,只要订单关闭完成以后呢,他会给MQ来发一个消息,但如果此时那么正在运行这个方法,结果呢,MQ服务器跟我们的网络突然出现了闪断,我们这个消息呢没发出去,现在网络连接出现问题,那怎么办呢?所以呢,我们现在就应该有一个try catch,我来尝试给他来发送消息,那消息呢一定要保证100%发出去,虽然我们目前写的这个业务逻辑,就算我们这个订单消息,关单消息丢了,我们后边也会触发自动解锁库存,但是呢,我们说一定要保证消息百分百发出去,这才是一个符合业务逻辑的好,我们现在来说第一个场景保证。
03:18
保证咱们这个消息呢一定会发送出去,那怎么保证发送出去,这我们说的第一个场景,如果我们这个生产者发消息跟服务器broke,我们的消息代理让MQ服务器这间网络出现了中断,没发出去怎么办?所以我们说可以做好容错方法,我们在这呢使用try catch,如果你出现问题,那这一定是网络问题,我们没发出去,那么就应该做的一件事就是出现问题以后。来写一个图录,出现问题我们怎么做呢?我们可以将咱们这个没发送成功的消息,将没发送成功的消息,成功的消息,然后呢进行重试发送,重试发送,比如说在下边呢,写了一个well循环,重试了几次,每隔几秒重试一次,试一试,但是一般情况下,我们这个网络段呢,可能短时间恢复不过来,咱们这个重试代码呢,不应该在这一个V循环,来在这儿测试尝试几次,所以我们应该要做的就是我们可以把这个发送失败的消息看我们这个第二步我们做好日志记录,只要我们每发一个消息,那我们都在这呢做好记录。
04:29
我们每一个发送出去的消息,每一个消息,每一个消息那都可以做好,咱们这个日志记录,相当于呢,告诉我们这个业务,我们哪个消息呢,他准备要发了,但是发出去,发不出去,我一会儿还得核查,那怎么做日志记录呢?我就可以给数据库里边创建一张相应的表,比如我们给每一个业务里边来创建上一张这个样子的表,我们就叫MQ的消息表,这个表里边呢,有我们指定的这些属性,好,我们拿过来走来直接运行来看一下我们这张表,这张表里边呢,有我们消息的ID,我们每发一个消息有唯一ID,还有消息的内容,我们将要发给出去的这个内容,当然我们给他序列换成我们这个杰森,然后呢我们再来发,然后呢,再来加上我们这个消息呢,要发给哪个交换机,它用的路由件是什么,包括呢,由于我们是把这个消息这个对象你要发出去的这个对象呢,我们转换成杰森了,所以我们。
05:29
要重试发送的话,还得转回来,所以我们还要标志一下它的类型,以为我们这个消息的状态,诶我们这个消息呢,刚新建来,我们接下来一已发送,那就是呢,已经发送出去了,那发送出去成了还是没成,那么就有两个状态位,一个是错误抵达,一个是成功抵达,然后呢,接下来是消息的这两个时间等等,所以我们就可以的做法,就是我们给每一个消息做好日志记录,说起来是日记记录,就是给数据库,给数据库保存。
06:01
每一个消息的详细信息,包括它的状态信息,那这样我们保存了以后呢,只要发送失败了,我们可以定期扫描数据库,把这些失败的消息呢拿出来再发一遍,所以我们定期扫描数据库,将失败的消息再发送一遍,所以这是我们说的第一种失败,确实网络连接不上,我们没法出去了,我们可以这么来做,那我们来看第二种失败。更麻烦的失败呢,是这样子的,我们这个消息呢,假设这是我们的消息生产者,消息呢,要发给我们的消息代理broke MQ服务器,然后呢,我们这个消息发给服务器达服务了,但是呢,服务因为我们现在消息都是一个持久化的,我们一定要持久化保存,要不然的话,服务器就算收到消息,服务器一开一关,消息又没了,这就很完蛋,咱们要让消息持久化保存,那服务器拿到消息以后,他们先得把消息持久化存到一个地方里边,然后呢,再由交换机交给指定的相关队列,而且呢,他最终交给队列才是持久化的,所以我们现在服务器收到消息,他是收到了,只有交给队列了才算持久化保存了,但万一就是他刚收到消息,还没来得及处理,服务器崩了,那么MQ呢,一停机以后,它再次启动,那这个没处理的消息相当于就完蛋了,所以呢,我们说这个消息丢失是最可怕的,我们虽然抵达了,但是呢,将。
07:33
消息要持久化,到磁盘的时候出现了宕机,那出现完蛋的事情,那这个事情怎么解决,我们之之前说呢,我们的这个确认机制,除了我们这个消费消息有确认机制,发送消息呢也有确认机制,来看一下我们这个消息的生产者,把我们这个消息只要发给博我们消息代理收到我们这个消息以后,那相当于服务器收到了,但是呢,只有服务器通过交换机把这个消息呢抵达送给队列以后。
08:03
才算我们这个消息完整的持久化保存到队列里边,所以我们就可以使用我们生产者的这个确认模式,只要我们这个队列里边消息存到队列里边了,那么就可以放心了,那这个队列里边确实就有消息了,就等人消费了,所以我们说解决这个问题,那我们的做法就是我们使用生产者发送者的消息确认机制,我们可以在他的消息确认机制里边,我们之前在订单服务里边,那还写了这个消息确认机制,来可以看一下。来到我们这个config里边,我们之前在rabbit conf里边,我们在配置rabbit temp里的时候,我们在这呢确定了每一个消息的这个con回调,以及return return呢就是哪些消息发失败了的回调,所以我们现在要做的就是只要con了,那说明服务器收到了,我们可以记录一下消息的状态,服务器呢已经收到了,这个是服务器收到了,然后呢,只要服务器收到了,没有给我们报错误,能回来到这个回调方法,那就是报错误了,报错误了,那只要报错误呢,我们相当于就应该修改数据库。
09:15
当前消息的错误状态,当前消息的错误状态,主要是张楠这个消息的状态,把它状态呢,变成我们这个错误了,我们相当于没收到。消息,我们这个队列里边没存储到消息,那这样呢,我们就应该定期重发,其实我们在这呢,只要confirm了,Confirm服务器收到了,那就行了,你只要一收到服务器就把消息的持久化,持久化以后呢,接下来他才慢慢的把消息传送给我们队列和交换机,所以我们呢,可以结合发送者的消息确认机制,每一个确认成功的消息,我们都去数据库这张表里边,我们重新呢,再来改一下,来看一下我们刚刚创建的这张表,我们重新呢,再来改一下我们这个消息的状态,由它刚才的新建状态,比如呢,我们现在变为我们这个数据库已经收到这个状态,所以呢,这是我们的第二个做法,那预防这个问题,我们再来加上这个机制,然后呢,第三个机制,那这种情况下丢失也很麻烦,比如呢,我们现在生产者发了一个消息,我们确认已达已经抵达我们这个服务器了,然后我们消费者呢,在这消费消息,他呢,刚把消息拿到,还没来得及消费,他给宕机了。
10:28
那宕机以后呢,我们这个消息如果是我们自动A开的状态,我们消费者是自动AC的话,虽然消费者只要一上线,一拿到消息,我就默认回复给服务器,我已经收到了,但是呢,我却没有消费成功,那这个消息相当于走了一遍过场,然后呢,我们什么都没做,所以呢,这个就很害怕。所以我们要预防这种情况,我们一定要开启手动A模式,只有我们这个消费者,他确认把消息消费成功了,才告诉服务器,你给我删掉,否则呢,给服务器只要一回复A,这个消息就被他删掉,一删掉以后呢,你再也就没有这个消息了,所以我们说为了防止消息丢失,我们就来保证上这三点,其实这三点呢,回顾起来就是一点。
11:17
首先第一点我们来做好消息确认机制,做好咱们这个消息确认机制,消息确认呢,两端确认publisher,我们这一端呢要确认,包括我们的consumer也都要确认,我们这个两端的消息确认我们都一直在用,特别是这个consumer,我们要让他手动ack,手动AC只有消费成功了才AC。否则呢,我们只要一收到消息,我们还没做事呢,宕机了,那这个消息就直接被删除了。所以我们保证了这个消息确认机制,都保证好了以后呢,接下来我们作为一个补偿,然后呢,每一个消息,每一个发送的消息都在数据库,都在数据库做好记录。
12:01
然后呢,我们定期定期将咱们这个失败的消息,失败的消息,你各种原因失败的消息,我们这边都能收到了。无论你是消费失败了,还是我们这个服务器没有抵达等各种问题失败了,无论你哪端的失败,我们都能收到,定期将这些失败的消息呢,我就再来发送一遍,再次发送一遍,因为保存消息的时候呢,都告诉你了,当时这个消息的内容是什么,他要发给哪个交换机,用哪个录音件,你拿出来把反序列化成对象,然后调我们的方法卡再发出去就行了。这是我们说的如何防止我们这个消息丢失,那防止消息丢失我们就这么来做,接下来我们来看如何防止消息重复。那这个消息重复呢,其实处理起来非常简单,但我们来考虑一下,什么情况下会消息重复,所谓的消息重复就是一个消息给我们这个消费者呢,发了两遍,我们相当于消费者呢,收到了两遍这样的消息,那这种情况下呢会重复,我们先来看第一种场景,假设呢,我们现在是我们这个消费者consumer,我们在这儿呢收到消息,那收到消息呢,我们一般专门写了一个监听器,来看一下我们的这一块接收消息的监听器。
13:17
我们来到我们这个监听器,我们监听器一收到消息以后,我们会调用业务逻辑处理,那假设呢,我们现在消息消费成功了,业务逻辑也处理完了,我们这个方法呢,只要一掉我们service,整个事物完成,它就提交了,但是提交了以后呢,接下来。正好走到代码这一行,我们宕机了,停电了,那么代码它下一行没有走了,相当于我们没有给我们的MQ broke回复我们这个消息已经成功消费,然后呢,我们这个消费者就跟它断开连接了,那一断开连接,哪怕不是我们这个宕机,是我们这个网线突然断开连接了,只要一断开连接呢,我们这个broke,我们的服务器就认为我们这个消费者没有把它处理成功,因为我们是手动A模式,所以消息呢就会由我们之前的这个on a状态,然后呢,On a状态就是呢,大家正在处理的,然后呢变成ready,然后呢,它又要重新处理的,那变成ready状态呢,接下来他又会发给其他的消费者,相当于其他消费者收到以后再调一遍解锁库存,那么这个消息呢,相当于同样一个解锁库存收到了两遍,这是我们说的第一个重复场景,那第二个重复场景,那就是我们这个消息消费失败了,我们这个消息呢,消费失败了,我们自。
14:33
你本身就失败了,然后呢,我告诉他,我给他拒绝了,让他重新回到队列里边,消息呢再发出来,想我们这个消息呢,还是发两遍,那我们这种两遍我们是允许的啊,因为我们这第一遍呢是失败了,第二遍回来我们可能就成功了,所以呢,这是我们的第二次我们这个消息的重试机制,然后呢,还有我们第三个我们消息可能会重复的场景,这个场景呢,其实就是我们说的第一个场景,你这个AC开时,我们这个宕机,或者我们这个ACK的时候网络中断,反正不管怎么样,我们呢,只要这个没有AC成功消息呢,就是on a c状态,这个状态呢,只要我们消费者跟他断开了连接,On a状态的消息又会变成ready状态,然后呢,又会发给其他消费者,所以我们会发现我们消息重复呢,就是我们同一个消息多收了两遍,那即使是我们多收了两遍,我们最害怕的呢,就是我们这个库存扣两遍,所以呢,我们的。
15:33
解决方案可能呢,面向的只有一种,就是我们的业务逻辑,解锁库存的方法,我设计成密等的就行了,而且呢,我们这一块确实是设及的是密等的,因为我们想要解锁库存,得判断你的这个库存状态,没解锁了我才解锁,那只要我们这个解锁了,解锁了呢,我们还会修改我们这个库存状态,只要我们解锁一遍,你第二遍再要来解锁,这个已经解锁了,那就不解锁了,所以呢,我们在这儿将业务设计成密等的,这才是最重要的。或者呢,我们可以使用防虫表,每一个消息呢,由于它都有一个唯一ID,只要呢,它被处理过了,我们可以在这个防虫表里面记录一下,诶,它被处理过了,那第二次来那就不处理了,其实跟我们来写接口的密是一模一样的效果,包括呢,我们还可以利用re MQ的这个消息的这个属性,我们会看到每一个消息呢,只要抵达过来,我们来给大家示范一下,每一个这个message抵达过来,它呢,都还有一个属性,它这个。
16:34
属性呢,叫做get re delivered,什么叫re delivered就叫新派发,相当他返回当前消息是是被重新派过来的,相当于第二次再派发过来的,当前消息是否被第二次及以后,及以后,也就是我们说的重新派发过来的,如果是重新派发过来的,但你可以直接是重新派发过来的,相当于我处理过了,那就不处理的,但这样做太暴力了,我们这个业务呢,应该判断我们这个消息呢,虽然是重新派过来的,万一上一次是失败,我们根本没处理成功怎么办?所以呢,我们也结合这种也可以来做一个判断,这就是我们说的,如果消息重复了简单业务设计成幂等的就行了,它消息发上1万遍,那都是我们执行一遍的结果。接下来我们再来看消息积压了怎么办,比如说呢,如果是我们这种场景,那生产者呢,会生产。
17:34
消息给我们的消息队列,然后呢,我们消费者会连上我们这个消息队列消费消息,但是如果消息队列里边的消息太多,我们这个MQ里边存了太多消息,那肯定会影响很多的性能,所以消息积压呢,会带来的是我们MQ性能的下降,所以我们一定要解决消息积压的问题,但是消息在哪些情况下会积压,比如我们这三种情况,第一种是我们这个消费者宕机,那相当于MQ呢没有连上任何一个消费者了,或者连的消费者太少了,我们这个消费者呢宕机了,没有人去消费我们消息队列里边的消息了,而且我们这儿呢,还源源不断的生产消息,那就会导致们一个队列里边可能百万的消息我们都没有人去来做,这是第一种,那消费者宕机了,第二种消费者消费能力不足,宕机其实就是能力不足了,我们假设呢,原来有十个消费者,那全连上我们这个MQ来进行消费的,结果呢,咔咔咔咔,我们九个全宕机了,就剩一个了。本来十。
18:34
那每一个人每秒消费1000,十个还能消费1万呢,每秒消费1万,但是呢,现在剩一个了,我们想要消费1万呢,就得十秒,所以呢,可能消息源源不断的过来,就会导致大量的积压,积压呢两种长期积压,第一种是消费端能力不足,无论是你宕机了,你断开连接了,或者你消费者上线数量太少了等等等等,造成了积压。第二种我们发送端,我们发送的流量太大。
19:01
比如我们这个秒杀业务下单流量太大,我们这个流量太大呢,MQ里边存了好好多的这个消息,那这个时候怎么做呢?我们就应该来这么来做,比如我们可以来限制我们发送端的这个流量,但要限制流量,这是我们后来说的,你肯定得限制业务,只要让业务不进来,业务不执行就不发这个消息出去,但是我们更多的呢,可以从消费端我们来解决这个问题,那你这个既然队列里边消息积压了,你可以有这么两种选择,第一种你可以上线更多的消费者,比如我们上线更多的库存服务,我们库存服务的这段代码,一号机器,二号机器,三号机器,四号机器,如果都有的话,那他们呢,都能消费消息队列里边,那他不行了,他他来消费,他不行,他来那我们就可以上线更多的消费者来进行消费,这是第一个,然后呢,第二个,如果由于我们这个业务量太大,数据量太大,因为我们如果上线更多的消费者,那么这个消费者呢,如果是我们正常业务消费来消息,他还要处理处理呢,可能。
20:01
排队一段时间,可能嫌太慢了,那么就可以上线一个专门的处理消息的消费者,那我们这个消费者呢,你现在有百万积压,直接从你的这里边将百万积压的消息全部批量拿出来存到数据库,我呢现在简单处理就是把消息拿来直接存数据库,这样呢,百万处理也很快,可能数据库呢,一分钟两分钟把百万的消息呢全部存完了,那存完了以后,接下来怎么办?你的消息空了,那你接下来慢慢的这个性能就不影响了,然后呢,我们接下来再来自己编写一个离线处理业务,我们从数据库里边呢,慢慢的取出一条一条一条去来处理这个消息,去来执行相关的业务逻辑。所以这是我们说的消息不可靠的几种常见因素,比如消息的积压,消息的重复以及消息的丢失。所以如果我们想要使用我们的分布式事物,并且呢想要使用最终一致性方案,那一定要保证可靠消息在这一块。
21:01
嗯。保证可靠消息,最核心的其实其他都好说,积压了哪怕慢也行,重复了我们写成密等的也行,那最害怕的就是丢失,所以我们呢,我们最终说来做这个分布式事务的最终一致性方案,就是要防止消息丢失,那防止消息丢失的办法我们也说了,我们只需要呢,做到我们的手动确认机制和我们这个消息呢做好记录,然后呢定期重试,这就是我们防止消息丢失的逻辑,所以我们会发现,如果我们每一个业务,每发一个消息,我们都要这么来做。我们写的代码呢,可能很多,所以我们可以专门将消息服务来写成一个中间价来,我们来自己写一个微服务,我们想要发消息了,调用我们这个微服务去来发消息,这个微服务呢,会自动在数据库里边存消息,然后呢,他发失败了,还会自动重试,将所有的这些功能都给我们考虑周到,那我们要做的就是呢,调用它发消息就行了,当然这一块的所有代码呢,我们就不写了,数据库也给大家放在这儿了,那每一次发送之前在这confirm确认到了就来修改,当前呢,这个消息有一个唯一ID,这个唯ID在消息里边我们也能取到,我们就可以在这呢修改我们这个消息的状态来,我们现在呢。
22:24
服务器已经确认收到了,然后呢,接下来我们自己再来编写一段代码,定期的从服务器里边拿这些失败的消息重新发送就行了,而且呢,每一次发送的时候别忘了给服务器来记录一下我们当前消息的这个日志,这是我们说的柔性事物可靠消息加最终一致性,可靠最重要,最终一致。是我们业务来进行保证。
我来说两句