00:00
同学们,我们刚刚呢完成了异步确认发布啊,预异步确认发布消息。那么完成之后呢,我们这里存在一个问题,就是什么问题呢?说如何处理异步未确认消息。我们刚才一步未确认消息呢,确实已经代码写出来了,但是呢,对于未确认消息的处理呢,我们没有处理。我们来一起看一下。咱们之前的代码是这么写的。前面都一样啊,就创建什么声明队列呀,完了开启发布确认的,完了打开开始时间的这些都不说了啊,这都不重点了,重点就是下方进行批量的一个发布,上方呢写了一个监听器,专门监听哪些消息,呃,得到了确认,哪些消息未得到确认,当然得到确认的消息呢倒好理解,我们就不做任何处理了,因为都确认成功了,就没有必要处理了。重点就是对于未确认的消息,你是不是应该想办法进行啊,重新发送,或者是将未确认的消息保存起来,以方便于以后。
01:13
重新发送对吧,至少你得把它保存起来。而且经过刚才程序的运转,同学们会发现这个监听器是异步的,什么叫异步的?就是当你把消息都发送半天了。这个监听器才会被处罚。对,也就是说,所以打印的时间你看就在众多发消息的里面,对吧,众多发消息的里面,所以消息都已经发送完成完了呢,才最后走了一个。用时多久对吧?这个是发消息的用时,但是等你发现用时写都打完了,后续竟然还有监听器在监听消息。所以这些消息啊,在打印这句话的时候,其实已经怎么的已经将1000条数据发完了。
02:04
对,已经前一条数据发了,那么后续这些其实就不是在发数据的时候是在什么,是在执行后续的监听器监听到。其他消息已经确认收到了。对,也就是说是属于监听器,后来执行。那么这就麻烦了,因为由于监听器是后来执行的,那我如何。对吧,找出哪些消息未确认呢?办法就是文档上提出的最好的解决方案就是把未确认的消息放在一个基于内存的并且能够被发布线程访问到的队列。比如说用的是并发链,链式的一个队列。哎,他就可以在发布确认回调以及发布线程之间消息进行传递。
03:02
哪哪两个线程啊,这是哪两个线程呢?这两个线程一个叫叫做。发消息的这个现场。复云文1000遍。另外一个是监听器的现场,例如监听确认的,监听未确认的。这个间都是两个线程了。凡是学过多线程的同学都知道,这个线程是从上往下走,对,走到监听器的时候,监听器执行了吗?并没有。但是这个线程不会等它会继续往下走,一直把消息发送完,执行最后的打印,即使到了打印了监听器执行完了吗?也没有执行完,所以这是几个线程啊,这是两个线程。一个线程负责监听,一个线程负责发送并打印相应结果。那么这是俩线程,两个线程之间交互,那么你只能用谁呀?并发链路式队列,哎,它可以在发布线程。
04:03
与确认回调直接进行消息的传递,所以用它比较适合。那么我们来用它怎么用呢?我们来简单的写一下。首先你在发消息的时候,你就要用到。哪个呀,用到这个队列,对这个队列可以将你发送的1000条数据都先记录下来。将来怎么的?将来哪些确认了,将确认的删除掉,剩下的不就是未确认的吗?所以相当于要写几步呢,就写三步啊,第一步。对,你要在这个位置此处。记录下所有发送。的消息。对,记录下所有要发送的。所有要发送的消息,也就是消息的一个总。总消息,哎,总和总和完之后呢,紧接着在上方确认部分怎么的删除掉,这是第二步。
05:09
删除掉已经。确认的消息,那么剩下的是多少?剩下的就是未确认的消息,对完了呢,未确认的消息这个位置啊,你可以这在这第三步啊,打印一下是吧,打印一下。未确认的消息都有哪些啊?当然了,要是没有,要是没有未确认的,那么就不用打印了,也就是说第三步呢,其实是可以不写的,重点是第一步要记录,第二步呢,要将已经确认的删掉,剩下的不就是未确认的?那谁来完成这个两个线程之间的信息的传递呢?谁来记录呢?谁来删除呢,才能剩下未确认的消息。
06:02
就由文档上提出的,谁呢,叫做并发链路对立。那也就意味着我们得准备出一个并发列入队列是吧?准备几个?那来准备一个。咱们代码写一下啊,首先呢,在开始时间的下面。或者是开始时间的上面都可以,那么。我们最好是在上面,因为开始时间呢,它记录的是什么呢?记录的应该是发消息的时间,所以这个开始时间尽量的往下,呃,移动移动到哪儿呢?移动到发消息这里。对,移动到这个发消息的位置,而监听器是另外一件事。对吧,所以呢,我们在上面。在这个位置开启确认的下面这个位置,你要准备出一个叫。叫县城。我们得用用这种注释啊,得用这种注释。
07:00
县城。安全。有序。安全有序的一个哈希表。哈希表其实就是一个map啊,其实就是一个map,适用于。适用于高并发的情况下。完了,我们可以轻松的去关联是吧?轻松的去关联这个未发送的消息,或者是已经确认的消息。那我们这个队列的名字叫什么名字写一下啊,就是文档的名字啊,叫高并发吧,叫康。Con,有没有联想?有联想,但是挺长的是吧,不是是吧,是一个。哎,他叫并发。List。对,就它完了呢,泛型呢是浪类型的,为什么呢?因为它这个东西是按照上方那个确认来的,就这个图来。
08:06
因为将来你把消息不是发到一个map里边嘛,完map这个不有K值和YYV值吗?K值不就是那个数字123456嘛,所以说就是整数呗,整数就是long类型呗,至于Y6值,Y6值类型咱们也不确定啊,应该是你要是发字符串就写字符串型,如果不确定就发object。所以呢,这个对象它其实按照这来的啊,按照这来行。咱们走一波是吧,首先我们发的是字符串吧,对这个我们本次是发的字符串啊,你看。发的这个消息不就是个字符串的行吗?来,再往下,我们给他准备一个名字啊,叫什么?对。斯坦丁康。FORM4对吧,这就是一个外部的一个确认的一个容器。你一个。
09:01
并发他。这就完成。这个容器准备出来呢,其实就是一个线程安全有序的一个哈斯表,对,确实这个是哈斯表啊,完了呢,适用于高并发情况下。对,他能干什么事呢?说一说功能,第一个它能。轻松的。轻松的。呃,边边将这个序号是吧,将这个序号与这个消息。形松的姜字啊。将序号与消息。进行关联。对,怎么关联的,不就是map嘛,是吧,就是map。麦克关联啊,因为一个序号作为K值,那个内容作为Y流值,对这就是它的功能,第二个功能呢,就是可以轻松的批量删除,因为我们将来是要删除的是吧,第二条你看要删除已经确认的消息,他可以完成轻松的批量。删除。
10:01
条条数是吧,或者是里面的内容啊条目吧。完之后呢,呃,只只要给到什么呀,这个序号呗。因为它本身是序号充当K完消息充当啊Y有值嘛,通过序号不就可以删除消息吗。啊,非常的轻松。第三个。支持高兵法对。一定要重点是支持高并发,支持高并发其实就是多线程的意思啊,支持多线程的意思好完成。所以这就基于这三条,我们就选择了这个容器啊,这个哈斯,所谓哈希表,这么一个容器就选择它,所以这就是选择它的理由。那么接下来我们用一用,看好不好用啊,说的倒挺好,但是能不能用呢?来试一下。首先第一步呢,我们去到下面来到发消息这个位置进行所谓的记录,记录呢,它由于是map是吧,所以这就可以点put将我们的消息全记录下来,而K值已经研究好了,打算用什么。
11:05
打算用这个叫。序号是吧,序号应该在这个信道里是吧。信道调取。序号呢,调取下一次发布的一个序号。是他吧,来确认一确认一下啊,应该是他对。完,同时Y6值就是咱们的message信息。是信息吧,完了,这个信息就是一个字符串类型就可以了,因为本身它就已经定义成Y流值是一个字符串类型。所以就家里了。完了呢,每循环一次,你发一次消息,你就记录一次。对,记录一次完了呢,我们可以呃自己打印一下啊,就是打印一下内容,当然在这写上C怎么打印。表示发出去了是吧,但是呢,我们也可以不打印,就让这样记录完就行了。完了呢,紧接着当你把循环完事之后呢,记录了多少条,1000条。
12:04
那么接下来。当你确认到消息怎么办?你再删除掉呗。删除掉确认的,不就剩下未确认的吗?来,拿着它到上面进行删除。怎么算?这么着?我们现在手上有什么东西呢?手上有的就是一个这个。消息的一个一个标题编号是吧,那我们点上。这个叫get吧。Get出这个编号。对吧,当然了,我们当然这个编号应该是在头上啊,头上这一个麦编号完了呢,你把这个。编号扔里面。好,扔完之后呢,紧接快速返回,拿到了一个呃,麦,真正拿到了这个麦。对吧,把这个map我们起名字就小写点吧,叫需要确认是吧,考form e。
13:00
已经。拿到的就是被确认。通过这个标识就拿到了被确认的所有的消息。将。确认的消息怎么办?进行清理?明白吧,当然了,所以说怎么还这么多呢?就是因为你发消息的时候有可能是批量批量发消息。所以呢,你一定要判断一下是不是P链。如果是批量的话,就是批量的删除,你看这呢批量,如果我是批量。就把它进行。批量的删除。所以这个位置。就调取,就将它全部清理掉。清理干净。清理干净之后呢,剩下的就是什么,剩下的就是未确认的,反之如果不是批量呢,对吧,如果不是批量的,那你怎么办?你就直接删就行了。就拿着它直接调取remove,删除掉当前的这个消息。
14:04
完了我们打印一下是吧,最终呢,再打印一下确认的消息的标识。这个就是批量确认或者是单个确认的。目的我们一般都是单个确认啊,批量确认,我们灾灾还没用过,因为考虑到消息不丢失嘛,因为你批量确认肯定会造成很多消息丢失,单个确认是没问题。到此完成。完了,我们最后呢,可以打印一下这里面到底是什么东西,当然这里面没有东西。对,没有东西,因为我们本次消息发送的都是成功的,并没有未确认的,所以也看不到任何未确认的,当然我们可以打印一下试试啊,如果未确认的消息要是有的话,应该是通过这个位置是可以取出来的,通过这个标记是吧。消息的标记,拿到具体的呃消息。完了呢,你就可以说了未确认的消息。的是谁是吧,这呢写上。
15:02
这个叫啊,未确认的消息是冒号,加上这个叫message。消息。那么未确认消息的什么,这个叫。标记。是这个。对吧,和中间呢,我们可以再用这个冒号冒冒冒隔开一下。完了我们再一起运行一下。就行了。好,完成这时候你会发现全都是确认的消息啊,没有一个是未确认的,因为我们本次并没有未确认的消息啊,这个经过这个代码的添加,我们的用时呢,依然是呃。四十来毫秒是吧。时间呢,依然是很少。这次运行是吧,47秒比上次运行62秒还要节省时间。所以呢,最终判定为是吧,异步批量这个确认性能是最高的。
我来说两句