00:00
好,那我们上午是讲到这个自动提交off这个代码是不是通过一个属性可以开关它off是否自动提交呀,对吧,当时我们也测了啊,你不自动提交的时候,你你把它关掉,重新启动的时候,它又接着从上一次就是说。第一次开启的时候,怎么消费的一个内容,你从一开始像跟之前没有消费过一样,又重新从那个地方开始了啊,就是重复的消费那十条数据啊,对吧?啊是这个意思,好,那既然它可以关闭掉,那也就是说既然你可以选择关闭的话,那也就可以肯定可以干什么。手动提交吧,因为你把自动的关了,那你不提交那也不行,你就手动提交,那我们看一下手动提交就比较方便一点啊,就是这样。第一个。自动提交啊,他有个问题在里边。什么问题呢?呃,因为自动提交,我们是根据一个延迟来的。根据一个延迟来的,呃,这边说了,开发人员难以把握off提交的一个数据。
01:02
那什么意思呢?是这样吗?我数据假如说他不是一秒钟提交一次吗?啊,一秒之后就去提交,一秒之后就去提交,假如说我读了100条数据过来了。100条数据过来了,那我如果说在处理过程当中。啊,因为它是我们说的自动提交的啊,过了一秒钟他自己就去提交了啊,自己就去提交了,OK,那你在提交的过程当中,他把这个东西给他提交了,但是我没有处理完。挂斗。就提交了自动提交的嘛,对吧,过了一秒钟过了之后,他自己提交了,好这个时候挂掉了,再重新启动的时候,它从哪个地方开始消费。一百零一开始消费对吧?啊,假如说后面一条数据是这个意思啊,从这开始消费,那么。这个数据刚才还没有处理完呢,产生了什么?丢数据吧,啊丢数据,注意这个丢数据跟我们生产的那块丢数据不一样吧,这个是消费这边丢数据。
02:05
不是说你卡不卡里边存的数据丢了,而是你消费的过程当中给他丢了,能听懂这个意思对吧?啊做的这个事儿啊,这个丢出去,那有的人可能说那简单点呗。你这边就发丢数据吗?我等那个什么十秒钟我再去提交。对吧,我十秒钟之后再去提交。是你可以保证不丢数据吧,这太慢了吧,啊,也就是说这个到底是一秒钟还是十秒钟,对于我们开发人员来说,它自动提交的什么。不好把握,不好把握时机就不好写这个什么延迟,你到底延迟多长时间去提交这个赛对吧?啊,你不知道这个意思啊好,那接下来是他肯定提供了一个手动提交,手动提交呢,提供了两种方式,一个是同步,一个是异步。
03:02
一个是一步,而且这个提交啊,你要放在哪。就这是两个方法啊,你选择这种或者这种。那你告诉我调用这两个方法的时机在什么时候?处理完数据之后吧,哎,处理吧,什么时候处理完对吧,你有时候数据一次读取的多,有时候因为数据量小的时候,它前面生产的少,那你读的时候不是少处理的就很快,对吧,很快那我。等待什么?处理完之后啊,做这个事啊,都是,那这里面呢,提供了两种方式,一个是同步提交,一个是异步提交,那首先第一第一步就是在这把我们那个关掉吧。关闭自动提交,这是第一步,第二步这个代码都跟我们之前一样,在我们拉取完数据这个地方,是不是我们处理逻辑。嗯。对吧,啊,我们的处理逻辑,因为这我们只是简单打印嘛,正常的你要对他说写到什么的,其他这个地方去做运算啊,啊或把这个数据写到买啊等等这些都可以,都是你的一个处理逻辑,对吧?啊,那之后在这去干什么?
04:11
提交一下就OK了啊,提交一下,而且呢,这种叫什么,你看这个叫commit sync是不是同步提交。对吧,啊,它是一种同步提交,那同步提交是这样的啊,就是说他一直在这提交,如果提交不了,那同事直到提交成功,你再去拉取什么。新的数据。那明显的能感觉到同步这种方式。效率低对吧,啊,他比较效率低啊,效率低啊,所以呢,他提供了一种另外一种方式叫异步提交,异步提交的一个方式,异步提交方式呢,是这样,前面同样的你还是要关闭,要自动提交的,因为是手动提交了,手动提交了,然后之后呢,你在这去调用它。它里面同样的有一个。回调。啊,同样的可以写一个回调,回调呢是这个意思。
05:04
你可以去打印一下他这个错误信息,诶这次提交失败了,你相应的可以做一些处理,对吧,做一些处理,这个是异步提交,注意。走到这块的时候。有专门的人去提交我这个主线程照照要干什么去拉取新的数据,这个叫异步。对吧,如果说还是一样的,等到你提交提交完了之后再去拉取,那不是跟同步一样吗?对吧,这个异步一定要注意,这个异步跟同步这种效率会高一些啊,生产环境当中像这种用的会多一些啊,用的会多一些,因为自动提交我们刚才说了。你无法把握提交的时机,对吧?啊,你把握不了啊,所以呢,他提供了两种方式,一种手动提交,一种一步提交,好那接下来我们想一下这个问题啊。对,不管是自动还是手动。有没有问题呢?有没有问题呢?自动的时候,如果说你提交的延时比较少,就比较短,容易什么。
06:08
丢失数据。对不对。想一下这个问题,是不是因为刚才所说的,你处理的数据过程当中,你就已经把提交了。但是我还没处理完吧,我此时挂掉了。我是不是接着读的时候,从你刚才提交完之后的那个数据开始读吧,那你刚刚读的那100条数据还没有处理完,就不会再读进来了,对吧,因为奥赛已经提交了,不会再补进来了,好,那这个是不是我们所说的丢数据啊,丢数据,那接下来啊,有的同学讲为了不丢数据,我把这个延迟长一点呢,十秒钟,十秒钟会带来什么问题,不是效率低,重复。为什么会是效率低呢?你十秒钟提交一次跟一秒钟提交一次,那那有效率问题吗。没有效率问题啊。没有效率问题啊,没有效率问题对吧,没有效率问题是会重复,重复在哪啊。
07:05
我读了100条,这100条已经处理完了,假如说我要把这100条呢,做一个什么计算,把它写到MY,搜狗这个数据都已经写到MY,可能我五秒钟就已经完成了这个逻辑,是不是,然后你要等到十秒之后才去提交,在这个过程当中处理完写到麦速过程当中,它挂掉了呢?他接下来再去启动的时候,他从每个地方开始读数据啊,从最开始的那个地方吧,啊从假如说啊,第一次我读的是100条数据,100条数据处理的时间,假设它是五秒钟,你是十秒才去提交赛,是不是把第100提交啊对吧?好,那五秒钟它的第六秒的时候不好意思坏掉了。那你想想看,这个没走吧。没走是不是,也就是说它里面奥萨还是维护的什么零,假如说我们从零开始读的,是不是还是维护着零啊,对吧,那这个时候你去启动,是不是又把这零到100G的数据又拿回来了,会产生什么?
08:04
重复问题吧,啊重复问题,那同样的,你这个同步提交异步提交,你手动的是不是也同样的存在这个问题。他不会丢数据啊,就是说这种方式呢,不会丢数据,因为它是处理完了之后再去干什么。提交的,但是他一定会怎么样,有可能会重复出去,那你看这个代码都走完了,走完了之后,恰好你在调用这个方法的时候挂掉,是不是有这种可能性。对吧,啊,恰好走这个代码的时候挂掉了,那这个时候不都是重复数据吗。能听懂对吧,啊,它都是重复数据,所以呢,无论是自动还是这种啊,都是重复数据,所以对于卡帕官方又提供了另外一种。四比存储。那自定义什么意思呢?跟之前那个我们看到的有什么点不同呢?是这样的。
09:00
之前无论是自动提交还是手动提交,我们是不是专门调人家官方提供好的一个方法。对吧,而且自动提交还不需要表方法,你只要把有一个属性改为出,而且默认就出,你那个书信你都可以不写对吧?啊,那如果说你手动提交,你是是自己手写的一行代码呀,Commit sy c或者ay c对吧,一个是同步,一个是D步提好的,那你想想看,刚才我们所说的那种方式,Off step提交到哪去了呀。我们说了提交提交到哪了呢?对吧,提交到哪啊,卡不卡本地对,因为我们写的是新API啊,没写老的,老的连租K可以提交到ZK啊,那无论怎么样都是官方帮你选的吧啊官方为你选的,而且呢它有问题,所以呢提供了一种自定义存储的。自定义程度就是你可以自己维护,你可以维护在本地文件,你也可以维护在MY搜狗。对吧,等等这些地方你一定要维护在一个非意识存储环境嘛,你不要维护在内存,当然维那维护在存也可以。
10:06
对吧,你搞一个专门的一个map对吧,你记录了哪个topic哪个消费到哪了,哪个topic哪个消费到哪,但是一旦挂掉了,这个就没了吧,啊那就有问题了,你还不如不存呢啊,还快一点,对吧?啊是这意思,那这个地方呢,维护起来比较麻烦,而且呢,你用这种方式,就目前最好的是什么样的一个业务场景啊,是这样的。如果说你最后通过这个卡普卡里边读,在数据做过计算之后,你要把它放在。买搜狗。这是特别好的一个应用场景,因为我们可以把数据处理完,存到MYSQL这一步操作。和我们保存赛搞一个什么事?因为前面我们所说的,无论谁先走。
11:00
你数据先执行后提交,是不是可能会导致重复数据,如果说先提交后走业务逻辑,是不是可能会丢数据,那也就是说只要你不是事故,这两个人只要一个先一个后。都会有问题。对吧,那就用什么,就用事物啊,就用事物那这块他官方提供了一个方法,问们提供了一种可能啊,提供了可能来看一下,嗯,是这样的,说009版本之前呢,是这个,我们把这个快速过一下啊。存在CK这个没问题吧,我们大家都看到了0.9之后呢,又将内置的一个topic叫下划线小线cons of里边对吧,然后除此之外呢,官方提供了自定义储。那自定义存储就不是说代码就像我们刚才直接那样写的嘛,它不一样了啊,它不一样了,来看一下啊这块呢,呃,其实它那个维护比较忙,麻烦在哪这。叫消费者的re balance,其实他讲的是一个什么东西。
12:04
这个再平衡讲的是什么事情?就是。和实际消费的位置。这个再平衡叫消费者的再平衡,讲的什么是?讲的其实就是分区分配原则重新出发这个逻辑。讲的是分区分配原则出发的那个逻辑,那你看啊,因为你写的代码,假如说你写的ABC。三个消费者正常的消费,一人消费一个分区对吧。一人消费个分区,注意现在是你自己维护这个,那你A假如说这个是地位的是T0第一第2012是分区,那你A之前的代码里边是不是维护。
13:00
T0这个分区的赛道,你维护T1分区赛你呢?第二了啊,假如说都在卖存着呢啊都在啊,现在如果你自己写的代码C挂掉了。那你自己写的代码当中,你是不是应该也要考虑这种问题啊。你是不是应该将这个T2给A,或者说给什么给B啊,这个给谁不是你做的。是不是系统re balance做的,但是你接下来的事情。假如说给他A了,你现在不仅要维护T0,那个还要维护什么。的对吧,啊是这个意思啊,是这个意思啊,讲的这个re balance balance讲的也就这个点。发生变化的时候,触发分区的重新分配,因为一旦重新分配,你消费的分区就会变化了啊,就会变化了,所以呢,这个里边要用到一个类叫这个。Consumer re balance listen啊,监听器来监听它的一个变化,不是说你自己真的自己写什么代码,去感知它有没有变化,然后去写他接触一个类就行了,因为它系统提供的那个什么自动提交也好,还有那个手动提交也好,没有说提提交到哪了,对吧,它里面有没有自动的这种机制。
14:17
有吧啊,他自己一定是实现了这种方式的啊,所以呢,它里面最重要的一个关注点就在这个,但代码不难啊,代码不难我们看一下啊,就在之前这前面东西。跟之前完全。一样吧,啊,完全一样,但是。区别来了。订阅订阅到这儿。订阅到这儿。啊,订阅的时候,因为我们之前提过一个点,就是无论是它自动保存也好,还是系统你帮他保存也好,这个offset这个东西。
15:03
你假如说保存在MY搜狗,它什么时候会去读啊。什么时候会去赌这个?嗯,什么时候会去读这个值啊。启启动,启动的时候会去读吧,啊启动的时候会去读,也就这意思啊,也就这意思,也就是说它是放在哪个地方的订阅里边,然后呢,我们要实现一个类啊,这个东西叫consumer re balance。啊,这个内容,然后这里面呢,要复习两个方法。一个是之前就没有做啊,他直接干什么调用的啊,直接调用的,然后这个里边你看他在做什么事。提交一个当前的一个啊,直接就正常的去提交就完了,正常提交就完了,但是你看这个方法它有一点区别了。
16:06
就是这个是re之后,因为之前整个这个内容不是你消费的,对吧,不是你消费的,现在呢。你要接着人家的消费吧。你seek我之前也提到过C的方法,就定位啊,就定位定位到哪啊。至于你是哪个分区发生变化,有可能给你分配了两个分区,对吧,之前是你自己的A分区。啊,A呢是T1T0 B呢是T1 C呢是T2,之前呢,各自维护各自的,无所谓了啊三个代码维护自己的,那现在C挂了,T2给你了。那你是不是应该从C上一次消费的地方开始消费呢?那你这个时候是不是要加载一下C上一次保存的那个地方,对吧?啊,保存的那个地方在这get。Seven。
17:01
啊get到OK,这个就是你要定位到这个点,那之后呢,这款东西逻辑用什么呢。又一样了,一样之后呢,去提交啊注意的点,大家这样看来是不是感觉这个commit offset跟这个get offset是系统的。你这样看是不是以为是系统的。对吧,那你如果说觉得是系统的话,那我想保存到MY搜狗,保存到文件系本地文件系统怎么做。做不了啊,这两个方法是要你自己去干什么。去写的。啊,这两个方法是你自己额外要写的两个方法,因为你想保存到每搜买搜狗,那你提交的时候是不是应该把这个数据给他提交到MYSO里边对吧?啊买里边好,那你要去这个get那获取某一个分区,你提交到买。那你接下来读,你是不是从白色L把数据读出来能听懂啊,这个就是我们所说的自定义,自己来维护这个奥赛的时候,主要的是这两个方法。
18:09
啊,主要是这俩方法,而且就是我们刚才所提到的一个点啊,就是说你要实现的那个什么cons。Re balance的listen的那个内容,那个内容是因为一定会遇到消费者在消费过程当中,某一个消费者给挂掉了,对吧?假如说某一个消费者挂掉了,你自己写的代码当中,你是不是要考虑这个事情?对吧?啊,所以呢,之前订阅的时候,不是说你简单的在这个后面去做一个什么异步提交,做一个什么样的一个内容,而且呢,他调的方法。调的方法还是这样的一个内容吧,啊,还是这样的一个内容啊,去提交的啊,整个这个提交这个,至于这一块就涉及不了那个我们所说的啊,那这块呢,你可以写一个MYSO的那个代码,JDBC代码,那你想想看,JDBC代码存在一张表里边,那表的。列应该怎么设计啊,假如说我真的要保存。
19:01
你要把什么东西放在一个列表?就是位置。时间。还是一样的,那这样吧。你可以做一个联合组建,或者说把。多个列给它分开,你还是跟那个系统一样,按照主来一个什么列,然后主题来一个列,然后呢,分区来一个列,最后一个列就是我们要的,你是不是应该也是按照这种方式存储,因为你不同的消费者组变了,每一个组里面是不是消费者。不一样对吧,两跟组相关,然后接下来每个主题或者哪个分区消费的位置都不一样嘛,因为它是区内有序的,对吧?啊,你还是一样的,在MY当中呢,也要通过这种方式来维护啊,这种方式来维护啊,后期我们做项目的时候,会有这一块的一个代码啊,会有这块一个代码,就是跟MY做一个事物啊,做一个事物,现在呢,你说把那个东西你就保存在ZK里边,其实官方提供了一个djpi可以保存在ZK啊等等一个地方啊,但是你现在在这里面写又保存到ZK里边,没有什么意思,对吧。
20:11
因为本身你连的,你如果说你用的是老API,它就是保存在那。保存在里边的啊,后期我们做项目的时候,会有类似的一个内容啊,会有类似的一个内容,就是这一块自定义存储啊,自义存储这一块呢,主要就在于如果说对消费者这块。啊,希望呢,完全的不丢失数据。啊,完全的不丢失数据的时候,那你可以这样去做啊,大家再做啊,这里边一个点啊,这里面的点要注意一下,就是自定义存储。啊,租定义的一个存储。啊,这个呢,是我们所讲的,其实代码呢,呃,没有多少,就类似于我们看到的那个什么同步发送异步发送,对吧,后面调一个get啊看着好像应该它差距比较大,但实际上。整个的自动提交,手动提交差距就在哪呢?
21:02
首先第一步你要关注的是这个点吧。你要关闭自动提交的一个啊这个内容,那接下来就是最后你切记要干什么,你不能光关了,你要手动提交,因为一旦你不手动提交,你这个奥没有保存下一次。又从头开始消费吧,啊这样就不好了啊,这样就不好了,是这意思,那还有一补提交那两个方法不一样吧,一个是考的SY,还有一个呢,Ay syc对吧?啊异步提交啊,异交一个同步,一个异步啊,同步异步,而且你要知道这两者无论是我们所说的自动也好,还是手动也好,它。或多或少的都存在一些。数据漏消费就是。丢数据对吧,或者叫重复数据啊,或多或少的都存在这个问题啊,或多或少的存在这个问题,那我们就可以去干什么,自定义存储of啊,自定义存储of是这个内容,后期我们做项目过程当中会有具体的这块的一个详应的代码,因为我们现在没有更多的一个视频去处理啊。
我来说两句