00:00
对吧,那其实大家想一想,我们现在日志模拟生成到卡夫卡主题这块,咱们还可能出问题吗?这个这个其实这个出问题的几率不太大了,对吧,其实出问题几率不太大了,对吧,说老师那我现在对吧,这个卡不卡的,这个这个outside的对吧?那我一直一直这么生成,我有一天我忘了关了对吧,是不是会有问题,是不是太多了呀,对不用担心的同学们,对吧,就咱们现在本身呢,是不是有维护偏移量啊,这个偏移量大家想想它什么类型的,是不是咱们定的浪类型的呀,叫浪类型的取值范围是多少。浪类型的取值范围啊,二多多多少,但是我现在啊,咱们现在在大圆里边对吧,这不有一个浪这个数据类型吗?它的取值范围是多少。哎呦,这这个问题特别人告诉我是吧,如果面试官给咱们聊的话,聊什么呢?比如说聊我们现在它的一个基准一致性,对吧,那么这个时候呢,那你应该知道关于我们的卡不卡它的一个基准一性的话,那其实是不是应该是从我们的两方面呢,对吧?一个呢,是我们这个生产者生产数据,然后发送给我们的卡斯卡broker对吧?那么这块的过程,它的一个基属一次性,那么这个金属一次性它是怎么去保证的呢?
01:19
怎么去保证的这些句子性,其实就是幂等性嘛,对,其实就是幂等性,然后接下来人家如果再跟你聊这会的过程的话,比如说哎,我现在这个数据,对,那么我怎么确定我现在生产者对吧?这个这个发送的这个数据被你现在就接收到了,那你这里是是给你聊什么a cck啊,对吧,然后接下来,那我现在如果要聊a cck的话,那么你是不会来聊ACK它几种我们的机制啊,对吧,如果说ACKS它设成我们零是怎么样的,设成我们一是怎么样,设成负一是怎么样的,那这个你要跟你聊,然后另外一个呢,那么咱们现在呢,在这个设置的时候,我们应答机制除了零一啊,还有负一这几个选择之外,那么还有一个对吧,就是说我现在呢,要把咱们这个副本啊,同步完毕之后,然后才给你我们的ACK对吧?那么如果说同步副本的话,那这里是不是都有一期我们副本同步策略对吧?一个是我们现在过半对吧?副本同步完了,那么这个时候呢,我给你ACK,那么另外一个呢,是全部同步完了,我给你ACK,咱们卡卡选的应该是我们的全部对吧?哎,那么如果全部的话。
02:19
其中有一个节点,始终咱们这个follow同步不了,那么这个时候那怎么办呢?咱们是不是有个isr队列呀,对吧?我会把我当前的follow呢放到我isr队列里边,然后接下来,对吧?如果长时间这个I这个follow没有反应,那我会把它从我的is里边给踢出去,对吧?这个时间呢,可以咱们自己来设定,然后接下来,那么如果follower和leader他们之间在同步的时候,那么我现在follow挂掉了,那怎么办?如果说我现在follow和leader他们之间同步的时候,我leader挂掉了,那怎么办?是不是有咱们现在你你这会要跟聊聊什么,是不是就聊咱们的高水位,以及我们现在它的一个Leo了,对吧?哎,你就得聊这些了,如果说我现在要是我们这个这个follow对吧,在同步的时候挂掉了,那怎么办呢?对吧?那它启动之后它会记录一下对吧,原来我这挂的时候,它这个这个高水位对吧到哪了,然后接下来我在启动的时候是把我现在高于我高水位的这部分内种给它截掉啊,然后接下来再向leader对吧进行发起同步请求,对吧?那么如果说我现在这个leader挂掉怎么办呢?那是不是也是在起的时候,他肯定会选取新的leader,你在起的时候,你现在就作为fo来跟一了,对吧?那你是不是也应该把咱们这高于高水位的这部分内容给截掉了,对吧,所以说有这个东西。
03:37
啊,有这些东西对吧,然后接下来那么除了我们现在啊这个啊,它的一个就是我们生产者到我们这卡不卡的对这个精准仪器之外,那么我们这里主要在开发的时候呢,我们主要考虑什么呀?我们主要考虑的是我们这个对吧,消精准的消费对吧?因为我们生产啊,它的密等性,大家你想一想,就是我们现在本身呢,得过一个属性,对,如果你要想保证密等性的话,想保证咱当前这个分区对吧,它的一个密等性的话,那么其实就设立一个属性对吧,维处开启密等信就可以了,对吧?但是这个东西你靠谱吗?也不靠谱啊,对,因为我现在如果要重启的话,这个咱们是不是变了,你还得加什么事物对不对,但是一般咱们没有加事物,一般都干什么,在下游对就是我现在你把数据给我对吧,数据呢,给我之后,那可能什么呀,可能重复对然后在下游我就过一个重复就行,对吧,但有些呢,我这个数据呢,这丢两条没关系,对吧,那我就把这个咱们的AC级别呢。
04:33
给它改为我们这个对吧,这个零啊,对或者改成其他的,对那丢点数据没关系,我能接受啊,我能接受,对那重复数据呢,那没关系,我现在虽然说不允许重复,但是我现在呢,我可以在下游进行处理,就是你把这数据拿过来之后,我读你的数据,然后我接下来我对你进行处理,这也没问题,对吧,然后接下来那咱们在处理的时候呀,大家注意我现在那么这里就涉及到什么呢?就涉及到我们这个啊精准一次性消费了啊,涉及到精准一消费了,那么我们现在呢,大家想一想,再讲精准一次性消费的时候呀,那么我们给大家呢,列出这么几种情况,对吧?列出这么几种情况,一个呢是我们的数据丢失的情况,一个呢是我数据重复的情况,对吧,那么这两种数据重复,数据丢失,那其实呢,是不是根据我现在这个修改偏量时机不确定,然后呢,出现不同的一个情况呀,如果说我现在处理的时候先去修改偏量,然后数据呢还没处理,那么这个时候数据有可能丢失,如果说我现在这个处理咱数据的时候,先把数据处理了,但偏移量还没改,那么这个时候呢,是不是数据会重复呀。
05:33
对吧,也就是说啊,之所以我们现在呢,会出现我们这个对吧,这个不是精准一次性at least one或者at我们这个most one这样的情况,这个就是我们现在这个偏量提交的时机呢,我不好把控,默认什么呢?默认自动提交,自动提交呢,其实是我们这个根据时间对吧,他现在是五秒钟对吧,然后呢,他帮你去提交一次,那你知道我这五秒钟的处于什么情况了,你确定不了对吧?所以说呢,我现在要想保证密等性的话,对吧?那么咱们这里我得想,那么我现在是不是把这偏量别让它自动提交了,那么如果偏量不自动提交的话,那么这个时候,那咱们是得手动去管理偏量对吧?手动管理偏量那什么时候去提交呢?这个时候你先把数据处理完,数据处理完之后,然后我再去提交偏量,对吧,但是这个时候大家想一想,如果这种形式的话,假如说我现在数据处理完了,然后你这个偏移量呢,还没提交,这个时候吧唧挂掉了。
06:32
对吧,那么是不是有可能会数据重复呀,没关系,咱们再做一个去重对吧,也就说做一个密等性的一个处理对吧,就是你不管给我拿多少张数据来对吧,然后接下来我现在往咱们这个对吧,这个后续处理的时候,它始终呢,得到的是相同的一个结果,对吧,就针对于我现在往ES里面插入的话,你哪怕够出100条这个ID一样的数据,那么最终咱们现在插入到我ES就是不是只有一条了,对吧,做一个密能性的一个保证啊,做为密能性保证,所以说这是我们现在的处理,我们现在基础仪的消费啊,这第一种方式,那么除了这种方式之外呢,那么还有一种方式。
07:09
是不是把它俩放到一个整体对吧?哎,把咱们这两个东西啊放到一个整体,放到咱们这关系数据库里边去啊,放到关系数据里边去,对吧?哎,那么如果放到关系数据库里边都有什么要求呢?第一个咱们现在呢得支持事务。第二个,那么如果关于数据库的话,这支数字的性能都不会太好,所以说你的数据量呢,不能太大啊,你的数据量不能太大,对吧?哎,那么咱们如果是我现在这这种方式呢,那一般呢数据量比较大,对吧?还有一个就是什么,就是我一般呢,可以实现咱们这个密等性啊,可以实现密等性对吧,一般我用我们现在这种方式,那我们现在先给大家介绍这种方式,到后边呢,我们会把上面这种方式也给大家介绍了啊,也给大家介绍了,对这两种方式也没有说说老师哪种U或哪种劣,对吧,这个呢就是适合,就包括咱们现在呢,就是有些同学说老师我现在你往ES里保存ES对吧,处理我们现在大数据量的数据的话,那肯定不如我们这个h base对吧,那其实注意同学们,那你看什么样场景,如果做分析统计的话,那你想h base肯定要略胜一桶,但如果说我要做咱们什么呀,我如果做咱们这个就是检索的话,对吧?哎做咱们那个什么全文检索,那这个时候我这个H被我这个ES,我是不是要更好一点,对吧。
08:24
就是而且呢,还有一个就如果说你现在呢,要使用我们这个,呃,这个ex search的话,那么他对接的我们现在这个皮班的做做展示其实比较方便的,做展示比较方便,所以说呢,这个东西不是说谁优谁虑,找一个适合你们的啊,找一个适合你们的,然后还有个同学有些想法什么的,说老师啊,我现在要实现咱们的精准一次啊,手动提交偏移量,然后去实现精准一次消费,那么这个时候呢,咱们这个偏移量放在哪对吧?那么大家第一个想法对吧,原来最早的时候0.9之前放到组K去了,对吧?那咱们现在再把组keep放的行不行,可以,但是不太合适了,人家好不容易从咱们这组keepper给拿出来,然后你再去在主K创建一个东西,然后再存放便宜点是不太合适啊,对吧?哎,那时候如果说主keep不合适的话,那咱们现在放到consumeret里边呗,他们有一个内部主题吗?对,咱们自己要这这提要偏量,那么也对,往这里边放呗,注意啊,同学们如果往这里边放的话,注意这个东西这个主题啊。
09:24
其实我在操作的时候是比较麻烦的。对吧,假如说我现在想看一看我当前某一个主题啊,我当前某一个主题,然后某一个分区,它的片量是多少,对吧?那么你对于我们现在这个consumer outside的话,那你想一想,它对你应该不像我们现在自己维护这么透明吧,你是不是得在这里边,那我这里你想所有的主题的分区的便宜量都在这里存着,那你是得查,对吧,把所有都查一遍,便利一遍,对吧,这个不太好,还有一个其实最重要的,其实最重要的,那么这里呢,有一点对吧,这是一点,就如果说啊,我现在要去使用我们现在的这个consumer of的话,那么去维护我们偏移量,其实还有一个的比较重要的一个问题,对吧,什么呢?在这儿。
10:13
在这里对吧,如果这种方式管管理偏量,它有个限制就是什么呢?就是你现在啊,要提交偏移量的时候,对吧,你不能说像我们这维护这个在red里边,那你调ready方法,那么如果你要是放在它呢,它提交偏移量的时候,它有它固定的一个方法来提交偏移量,对吧,叫什么commit什么什么什么把这些片加物拿过去对吧,但是你在调这个方法的时候呀,这个方法呢,它有限制,就是你当前这个Dis stream,它的结构也不能发生变化。啊,你必须什么,你必须是consumer record这种结构,对吧?你必须是这种结构,如果说你发生变化了,那么你就没有办法呢去提交,这是一个限制,大家想一想啊,这个一会我们其实我们现在就算是我维护的这个red里边,那其实呢,我们也有这个限制,但是呢,相比较他在讲我们限制更小一点,对吧,这个是什么?大家想一想,这个是不是我们现在刚从卡卡里拿出来这种结构呀,对吧,就你拿完之后呢,你马上短期的把这偏量呢去处理一下啊,然后呢,咱们现在如果red话,那我可以自己去写对吧,相关的一些对吧,这个提交方法或怎么怎么样对吧,但是呢,如果是他的话,那么这块呢,那你必须得在这里马上就把它个提交,对吧?但是你想一想,如果说你现在刚拿出来,你就把天线提交了,那是不是咱数据还没处理呢,那是不和咱原来对吧,这种情况。
11:36
是不是就一样了,对吧?在咱们这里边,你上来先提偏量,然后这还没没梳理,是不是有可能会丢数据啊,对吧,所以说这块呢,对吧,其实这是它的一个限制,对吧,这是它的一个限制,对吧?好了,这是关于咱们前面呢,对吧,这讲东西我们现在呢,要自己来留偏移量,然后自己维护偏移量,我们现在呢,把它放在ready里边,那么这个时候大家想一想,那如果说维护偏移量这个ready的结构是什么样的,咱们昨天是不是已经这个从ready把这个数据取出来了呀?对吧,虽然说现在瑞力还没有,但是我假装里面有,我要从这里面取了,对吧?那取的话这个结构是什么样的呢?哈希对不对?那么如果是哈希的话,那咱就得知道了,哎,这个东西它的key是什么,它的一个我field对吧,是什么,然后它的value是什么,K是什么。
12:26
Set,然后接下来topic对吧?然后接下来group ID,对哎,这个呢是我们K表示的是我现在哪一个消费者组,然后呢,对我哪一主题进行消费的偏移量,那么大家想想偏移量的话,因为我现在在消费的时候,对吧,那我当前我的消费者他可能消费的是我当前不同当前这个主题的不同分区吧,那么如果不同分区的话,那么这块这个fair的是谁,是不是咱分区对吧?Part,然后这个是不是就是我们的off,对这个呢,就是我们的off。
13:01
好了,那么具体咱代码怎么实现的呢?具体咱们代码怎么实现的,注意咱们是不是在这里我用哪里写了一个工具类呀,对吧?然后呢,在咱们这里注意啊,同学们,这些工具类大家一定要自己去写,对吧?因为咱们现在工具类呢,我们之所以把它封装成工具类是因为什么?是不是因为以后我会经常用它呀,对吧?哎,我现在呢,你想我再去获取咱们那个red或什么客户端的时候,你想咱们获取客户端的时候,我经常会用它,对吧?所以说我的封装的工具类对吧,我把它封装工具类对吧,那以后你在用的时候呢,会出现这种情况,同学们到时候这工具用的很爽,但是这工具类怎么写的,过两天我就忘了,过两天忘了,所以说这个东西呢,一定要自己写出来对吧,然后接下来那么咱们在获取偏量的时候呀,那大概的需求是这样的,首先呢,定义方法,这个方法呢叫get s,然后接下来这个方的返回值,这个方法的返回值对吧?你们昨天在写的时候有分析这个过程吗?同学们。就我当时的方方法返回值为什么是他啊,我当时你们看,那你们在写这个代码的时候,你们当时这返回值怎么写的,一看我的代码,一看讲义,哎哟,返回值麦不集合里面放这个东西,把这个东西E1写是这样的。
14:08
不应该这么来做吧,想想咱们当初在想写这个方法的时候,这个返回值为什么是脉部集合M部经理放的,为什么这两个东西?为什么?因为你想一想,我现在获取偏量的目的是什么,我到时候从卡卡里读数据,我是不是要根据偏量去读取啊,对吧?那么曾经呢,咱们封装了一个工具类叫什么叫麦卡卡U,其中呢,在这里提供的一个方法,这个方法呢,可以根据我偏音量,然后来去读取数据,这个方法是谁给我提供的,不是我们自己写的这个方法是不是我们现在这个卡夫卡u to,也就是说卡夫卡和咱们这个什么呀巴streaming结合的零杠幺零这个包里边,他给我提供的,提供这个方法,那么这个方法如果你要想从指定的便量来读取的话,那么其中呢,那你得把咱们现在这个偏移量给我拿过来,偏移量拿过来。
15:09
而且咱们现在这里除了它之外不一样,这里这这写的有有有点问题吧,这这在咱们这里边啊,这个这这个指定了是吧?哎,这指定了,然后现在呢,咱们现在除了它之外呢,同学们,那么我是不是得把这个东西给我传过来,你可以看到咱们现在呢,这个参数,这个参数什么类型呢?是个mapb集合对吧,表示的是我现在哪个主题的哪个分区,它的偏移量是多少,对吧?而且大家想一想,这个麦部集合的类型你有的选择吗?你没得选择对吧?然后昨天有同学问说老师,我现在呢,对也这个有一个对,就是在写代码的时候,我不太明白什么呢?什么时候该用我们Java的map啊JA集合,什么时候呢,该用我们现在这个skyla它的集合,如果说咱们自己写代码的话,同学们,那你不用想,因为我现在写的是不是skyla,所以说你在创建的时候呢,你应该去想,首先首选咱们SKY的集合对吧,所以说你看咱们现在这里边我这个东西对吧,包括在调的方法是该集何,但是呢,同学们有的时候呀,有的时候对吧,你看我在调什么JD h get。
16:10
的时候,他返回的就是Java了,说这是为什么,这不是选择的同学们,这个东西不是你选,老师,我现在我不想用这个,我想用SKY,不是你选的对吧,为什么?是因为这个J是不是人家Java提供的,咱们这个和ready连接客户端工具啊,这个是不是人家提供的一个API啊,人家这个方法本身返回的就是一个Java的map集合,你用人家你是不是得人家怎么说你得怎么干呀,对吧?所以说这个东西到底用什么类型的对吧?一般的不是咱们选对吧,就是我现在如果说我自己写代码的话,要写SKY代码,那肯定优先使用SKY的集合,对吧,但如果说我现在呢,对吧,这个是我们调Java的,有写我API,那人家给你返回什么,你得用什么来接收,对吧?那么具体的过程什么样的呢?对吧?诶返回值是这个,然后接下来那我先获取我们这个red它的一个客户端工具对吧,然后接下来封装我们现在呢,操作的一个我们的个key对吧,然后接下来对吧,那么根据我们这个get对吧,它有一个方法叫盖套。
17:10
这get造什么?是不是把当前这个哈希结构的K中的所有的这个这个预值对都要给拿出来,对把这个东西所有拿出都给拿出来,那么拿出之后放在哪了呢?放在一个mapb集合里边,这个mapb集合里边的K代表什么?是不是代表的分区,这个value是不是代表的当前这个分区,这的便移量了对吧?哎,那你现在拿到这个东西之后,但是最终人家返回的可不是这个,那怎么办呢?咱们对它做一个处理,因为我写我该的代码,所以说呢,那我现在操作的时候,那肯定我的转的together集合更方便一点,对吧,所以说呢,我去做了一个转换,那么转换完,转换完之后呢,那么这里最终我返回的是一个这样内容,对吧?把我们这个主题和分区啊封装成一个topic牌的对象,然后呢,把这个偏移量对吧,然后呢,这个封装啊也他们俩组成一个元组,然后最后呢,咱们转换成一个集合map集合给它反过去,那这样的话,就把咱们当前这个每一个主题的啊,每个分区的它偏移量封装一个map集合。
18:10
啊,给它返回去了,那么你现在有这mapb集合之后,大家想想,相当于我现在偏移量是不是有了呀,那我再去从卡卡读数据的时候,我是不是应该从指定的便量开始去消费的,对吧?那么具体怎么做,稍微休息一会,回来开始写代码啊,休息一下。
我来说两句