00:00
呃,那么接下来啊,咱们就来看一看对吧,我现在呢,对我们这个这个消费的这个程序啊,来做一个我们的优化什么呢?就在这里,我为了去保证它的一个精准一次性,对吧,这样的一个消费对吧?那么咱们呢,在这里我采用的是手动提交偏移量对吧?然后加上咱们这幂等性,呃,今天呢,那咱们就手动提交偏移啊,这个时候提交偏移量对吧?那么如果手动提交偏移量的话,大家注意啊,那你想一想咱们呢,那肯定对吧,那么再去消费的时候,对,那你得知道我现在的偏量呢,在咱RA里边是不是有个地方给咱们存着的呀,对吧?然后接下来,那我现在呢,当我去创建从卡卡里读数据,创建咱们这第trip的时候,那你这块呢,不是直接从我们这个latest对吧,开始消费了,是不是应该从re读我应该从哪开始消费呀,对吧,然后接下来我消费完之后,那我肯定呢,要把咱。
01:00
当前的这个偏量的位置再保存过去,对吧?再保存过去,那也就是说你不管怎么样,你肯定呢,要做两个操作,一个呢,是从我red里边来读取我P的位置,那么另外一个呢,你要把P量位置是保存到W去呀,对吧?所以说呢,那么咱们为了实现这些功能,那接下来我在这啊把这都关掉了,再写一工具类是吧,再写工具类这个呢,比如说对这个是关于我们的偏移量处理的,那就是outside manager u这个呢,是关于我们现在偏移量它的一个工具类的,唯有偏移量的工具类,然后接着在咱们这个工具类里边,那至少呢,应该不会有两个方法对吧?一个呢,是从我们这个RA中来获取我们这个偏移量。
02:00
啊,一个同时去获取我们这偏移量,那另外一个呢,是将我们这个偏移量的信息保存到我们这个red中,对吧?是不是应该有这么两个方法呀,对吧,这个跟上同学们对吧?能跟这就能理解这两个方法对吧?然后接下来那么咱们现在呢,先完成第一个从red里边来获取我们偏移量,那么如果从ready获取偏量的话,我们前面分析过呀,对吧,这个type我们选择的是什么?哈希,对,就我在维护我们的偏量的时候,是不是哈希类型啊,对吧?然后接下来那么是不是应该有它的K对吧?这个K呢是什么?比如说前缀是outside,然后接下来这里面是不是也有个topic代表哪个主题,然后是不是有一个group ID是代表了消费者组啊,然后接下来,那么如果是哈希的话,除了key之外,它是不是有fail value,那么咱们这个fail它表示什么呢?Part吧,对吧,表示的哪个分。
03:00
区,然后接下来那么咱这value那是什么呢?是我们这个偏移量啊,这个分区的偏移量啊是多少,那么我们它现在还有一个什么呢?还有失效时间,这个失效时间需不需要设置一天把它给它失效,这个不需要啊,对吧,因为我现在这个和我们上午对吧,它的一个是是我首次登录这个不太一样,对吧,它不需要失效啊,它不需要失效对吧,得一直在这保存着,所以说这个失效时间呢,我就不设置了,对吧?那比如说我在这里定一个函数叫get outside啊叫get outside,然后接下来那么咱们现在返回值手。你们可以到某一个地方把这个返回值类型给我拿到,但是去哪拿呢?
04:00
去哪拿呢?我去什么地方可以把这个东西,哎呀这个这这个对吧,这把这个流程啊,注意咱们现在前面在写代码的时候,对吧,那我们现在呢,其实我们写了各种各样的工具类,怎么样把他们给联系起来。来大家想一想,咱们现在的这个消费在哪,消费是不是在低于APP里边的消费呢?对吧,那目前咱们现在呢,再去创建或者读取卡卡数据的时候,是不是直接上来是这样,然后呢,我用一个MY卡卡YouTube,然后呢去调这get的卡a stream的方法。然后把咱们什么主题呀,什么分这个这个消费者组啊,是给传过去了,对吧,这是从哪消费,是不是从咱们lucky最新的位置开始消费啊,对吧,但是呢,那我们现在如果说你要自己留偏量的话,你还能从最新消费吗?你是要从指定的偏量位置开始消费啊,如果从指定的偏量位置开始消费的话,同学们你在这里就不应该教这个方法。我记得当时咱们在碰装的工具类的时候,是不是有一个指定我们这个outside,指定偏移量位置的一个方法呀,对吧,那个方法什么类型的来着,点进去看一看对吧?在咱们这里其中呢,有一个方法对吧,叫咱们这个啊,这个get cup stream,然后记得它需要传一个接触偏量过来,这个偏量什么类型呢?是不是咱们这个mapb集合呀,那么当前的mapb集里放的是什么?放的是某一个我们的主题的某一个分区,它的偏移量是多少啊对吧?只要你把这偏量给我,那我就可以从指定的偏移量来进行消费,对吧?也就是说我现在啊,我的卡法工具类里边,对吧,它呢可以根据我的偏移量来进行消费,那如果说你要想调这个方法的话,你是不是得把这偏移量给你传过去啊,你现在写什么呢?你不就写获取偏移量的方法了吗?那你说这个方法返回的是什么?是不是人家要求你传的这个外部集合呀?
05:57
啊,所以说注意啊,咱们现在要获取偏移量这个东西,最终返回什么,返回mapb集合mab经理存的什么呢?存的是每一某一个我们这个topic中它的某一个分区,它的偏移量多少,对吧?然后最终呢,我要根据咱们现在的这个偏移量到卡卡里边去拿数据,对吧?但那么咱们这方法有什么参数没有,那你可以给我提供的,大家想一想,你在这里啊,你要想去把这个获取偏移量的话,那么你是不是可以把我们这个对吧?在这里啊,你是不是可以把我现在当前我这个topic给我对吧?这个topic也可以给我格数ID,你是不是也可以给我呀,对吧?所以说呢,在这里我传两个接触两个参数,一个呢是我们的topic对吧,这行,那么另外一个呢,跟ID对吧,也是我们现在呢这个词string行,然后接下来对吧,那我要从red来获取我们这个偏音量对吧,那么具体怎么来获取对吧,具体怎么来获取对吧?首先第一件事获取什么。
06:57
啊获这就是客户端链接呗,对吧,第一个获取我们的客户端链接啊,获取客户端链接,然后接下来咱们的关闭客户端啊关闭客户端,那接下来在这里我可以用MY,咱们这个red YouTube.get get client来VR拿到什么呢?拿到咱们这个jeice啊拿到jeice,然后接下来咱们现在呢,Jeice点我们的close对吧,把客户端关掉,然后接下来咱们现在呀,要拿数据了,对吧?那么如果拿数据的话,大家想一想,你这个K是不是这种形式的呀,所以说呢,那你肯定得给拼接一个咱们的key对吧?干什么呢?来拼接我们的一个操作red它的一个key,这个形式什么样的,这个形式是这样的是吧?Offset。
07:48
啊,这个K长这样,那我们现在呢,定义一个吧,这个呢是我们的off outside k,它就等于那个是吧,前面呢,这个是字符串啊,前面呢,这个是我们的字符串,然后接下来,那么咱们现在这里是不是得把topic打过来,然后进来topic和我们的格白之间是不是有一个冒号了,把这个东西拿过来对吧?那么这个K拿到之后,咱们呢,是不是就可以根据K来获取数据了呀,怎么获取数据?
08:17
比如说啊同学们,我现在在这里啊,我就创建一个我们现在这个哈希对吧,那么怎么样对哈希来进行操作呢?那么这会你得知道来我们现在呢K星对吧?然后进来把这个拉烧一下对吧,发烧一下,然后我现在如果哈气的话,好像都是什么H什么什么对吧?那么咱们现在呢,比如说啊h side的,呃,咱当时这里注意h side呀,然后接下来这个呢,是我们K,比如说我现在这里有一个topic a的主题,它的group是什么呢?GROUP001,然后接下来这个fair的的这个字段对吧,这什么这是某一个分区啊,比如说零二分区偏量多少,偏量十。对吧,那么咱们现在往里边放了一个数据,然后接下来那么再往下,比如说还是啊,咱们当前这个主题对吧,然后接下来只不过呢,这是一号分区了,偏量多少呢?偏量是20。
09:13
就是我现在在咱们这里,我是不是可以做这种形式,对吧?来看一看我当前这个消费者组,然后呢,对你这个主题的一些消费的时候,那么每一个分区它的偏量是不是消费到哪了呀,就记录一下我现在消费到什么位置了,都消什么位置了,然后接下来H啊S是不是h get呀,对吧?哎,这h get它只能获取一个,我想把这个都都过去看一看叫啥来着。A get的什么?是不是a get到啊?这什么?我现在想把这个东西我都给他对吧,查看一下对吧,这个是H对吧,这个是that,然后接下来那么咱们现在呢,可以通过它h get对吧,然后接下来我可以去查看对吧,那么某一个咱们现在它的一个这这这个什么某一个我是分区的内容对吧,比如说我现在呢,把这个拿过来,然后接下来这个是K,然后这个呢,零二分区对吧,那你可不能拿到,然后接下来那咱们现在这里边对吧,是不是有一个H黑会所有的这叫什么来呢?It get on是吗?来h get哦,然后接下来这个把这个拿过来。
10:30
对吧,然后拿过来,那么咱们现在呢,是不是可以把当前零二分区的也拿过来,一二分区是不是也拿到了,对吧?那你现在其实我需要的对吧?在这里我是不是应该去调一下它h get off来获取咱们呀,那某一个我们这个主题对吧?然后呢,某一个消费者他所对应的这个分区的便宜量的情况呀,对吧?所以说呢,咱们接下来对吧,来获取咱们这个当前我们这个消费者组啊,消费的主题中啊,消费的主题啊,对啊,对应的分区以及我们的偏移量情况啊与偏移量得到怎么获取呢?那应该是DD点咱们这个h get off,注意看返回的结果类型。
11:24
对吧,返回的旧类型是什么?是一个map集合呀,而且应该是一个Java的一个map集合吧,对吧,那把咱们现在呢,这个outside k给传过去VR对吧,那么这里呢,是我们这个outside的map对吧?它呢是一个这样的一个集合对吧,是一个这样集合对吧?然后接下来那么咱们现在呢,拿的这个东西了,那你拿的这个东西意味着什么,意味着我现在是不是我们最终啊这这这这这个东西拿到了这什么,这个应该是我分区,这个是不是我当前分区对应的一个便移量啊对,因为你看你看咱们现在拿直接的命令两个东西,一个是分区,一个是分区对的偏移量,分区,分区对应的偏移量我拿到了,但是最终你返回的是什么,你返回的是碳。
12:09
对,你返回是他。那这个东西咱们是不是还得再去处理一下呀,对吧?怎么处理呢?那肯定对吧,对我们这个卖集合呀,来进行便利对吧?那么如果卖就行便利的话,大家想一想这个东西本身是不是扎va麦法,好像操作起来不是特别方便吧,转换一下是吧?那么转换的话,那是不是得咱们去做个导包吧,将什么呢?将我们这个Java啊,将我们的Java的map转换为我们的SKY。Map对吧?那么这里导包import,然后接下来应该是SKY下边有一个connection对吧,下面有一个Java cameras点下划线,然后接下来那么咱们现在呢,这里opposite map点我们现在对吧,带个scale转换完了,转换完之后呢,那么这个时候咱们呢,可以去通来做一个处理对吧?怎么处理呢?比如说map一下。
13:07
啊,比如说卖了一下,那么当前咱们的麦的时候呀,同学们,那么这里它拿到的是什么呢?拿到的是不是一个我们的元组啊,那么这个元组其实对应的我们应该是两个内容,一个是我们这个partition啊,一个是我partition。对吧,那么另外一个呢,是我们这个off outside对吧,另外一个是one,然后接下来,那么你现在拿到这东西之后,对吧,你现在啊,拿着那西之后,那怎么办呢?那我是不是要想办法创建一个这样的一个这这这麦把这个东西给它打过去啊对吧,那看一下呗,在咱们这里对吧,那我现在呢,拿到的是一个对吧,这样的一个分区,以及呢,我们这样的一个数据,对吧,但是最终啊,那我应该要返回这样的内容,对吧?那么如果返回这样内容的话,那来呗,在这里对吧,往这里一放。
14:02
当然立放,然后接下来,那么咱们这个本身它是一个mapb集合,Mapb集合里放的是什么呀?放的是我们现在它以及啊我们现在的一个偏移量,那这里是在对数据做一个我们这个封装啊,那这块new一下呗,用什么呢?Topic,然后呢,Tradition。Topic法继,然后接下来这里需要传参数,一个呢是我们的主题,这个主题咱们是不是通过参数可以拿过来,那么另外一个什么呢?那么另外一个是我们当前呢,它的一个我们的一个分区,这个分区咱们便利出来了,这个数据咱分区呀,对吧?那么除了这个之外,大家注意,那么它要求啊,咱们这个分区呢,那应该是我的in的,那你拿的是不是字串呢?所以说咱们需要to印一下对吧?那么除了这个之外呢,还有一个对吧,什么呢?就是我们这个偏移量,这个偏移量呢,那么咱们可以通过ET对吧来获取,好了,那这样的话,咱们现在啊可以拿到一个对吧这个东西对吧,就是我现在拿的一个新的元素,这个元素里放的什么呢?一个是我们现在这个topic part,那么另外一个呢,是我们这偏移量,然后接下来点VR一下,注意啊,点VR之后,那么拿到的是什么,拿到的是一个咱们这个对这个multiple map a。
15:28
但是咱们好像最终需要的不是这个,你需要的是什么,你看同学们,你需要的这个东西,咱们返回的结果是什么?可变的脉部集合吧,但是人家这个咱们这个要求的是不可变的脉部集合吧,而且咱们现在的偏移量,人家是不是浪类型,咱们现在是不是我们现在这个string型,所以说还不行,同学们在这里,那我需要资源处理,怎么处理呢?先找简单的来做第一个点to long。对吧,这个是不是变成咱浪费行了呀,然后接下来在这里我默认情况下调咱那个方法,它给咱返回的是一个对吧,这个可变,但是它不可变,那怎么办呢?在这里to map一下对吧,然后点V2,这个时候它会拿到一个outside map对吧,其实这个呢,会拿到一个我们现在o map吧,对吧,然后接下来对吧来看一看。
16:20
那么咱们现在呢,是不是返回一个这样的一个map集合呀,对吧,然后把咱们现在的这个map集合对吧,它返回回去啊,其实这个关闭客端呢,那你可以往上点对吧,咱们用完了之后呢,其实就可以关,对吧,在这里你用完了之后,你就可以把它给关掉。啊,把这个鬼关掉,然后下面呢,其实主要就对它来做一些处理了,对吧,下面呢,主要对他来做处理,最终呢,把咱们现在这个麦给返回回去啊来看一看同学们啊。
17:07
这是关于什么呢?这是关于啊,我从我们现在red里边来获取,我们现在呢,它的一个对吧,这个对应的一个偏移量啊定偏量,那么具体怎么来做的呢?咱们来看一看对吧,这个代码呀,它的具体一个实现啊具体实现那么首先呢,咱们先呃去确定我们在获取偏量的时候呀,那么这个方法它的参数以及呢,我们的返回值,那么参数的有两个,一个呢是主题,一个是我们这个消费者组,对吧?那这两个东西是不是你在调的时候也可以给我传过来,那么另外一个这个方法的返回值,这个怎么确定的呢?对吧,大家想一想我获取偏移量的目的,我是不是要根据这个偏移量,然后到卡卡里边去查数据呀,那么咱们在卡卡工具类里边,我是不是封装一个方法,你可以把偏移量是不是给我传过来呀,对吧,那么这个方法它要求你,你要想传偏移量,你给我传一个什么参数呢?是不是一个map类型的这种格式参数了,所以说人家要求咱们。
18:07
他是不是就给人家返回一个这样的map集合过去啊,所以说呢,那我们最终要返回一个这样的一个集合对吧?然后接下来,那么咱们现在怎么样去把这个东西给拿到呢?那么首先获取这些客户端,这个不用说了,这个关闭不用说了,那么接下来咱们呢,有一个叫get的报率方法,这个get多少方法,它根据你的key对吧,然后呢,取red里边对吧,那么把我现在这个数据也给拿到,因为我现在使用的是我们的哈希对吧,这样一个类型,那么如果哈希类型你一定要get到的话,那么它返回的什么呢?它返回的是一个Java的mapb集合对吧,扎个mapb集合,这个mapb集合里放的是什么呢?这个mapb集合里边放的是当前这个主题的对应的这个消费者,它里边的分区的偏移量情况吧,就咱们这里放的其实是我偏移量情况,对吧?这个分区的偏移量到哪了,这个分的偏移量到哪了,对吧?它其实放的是这种间对,然后接着你把它拿到之后呢,那我需要对它做一个处理,怎么处理的呢?
19:07
因为你最终你需要的是这个,所以说呢,我对我当前变的map部集合呢,我做了一个我们的结构的转换对吧?那么怎么转换呢?先把它从我们Java转换成我们的一个skyla,然后接下来对它做一个映射对吧?调到map盘,调到这个map对吧,这个算子对吧?在这里我们呢。把这个东西啊拿到之后呢,我们自己来封装啊,我们现在呢自己对吧来封装对吧,来封装一个这样的一个结构对吧?这里面两个内容,一个是topic position,那另外一个呢是我偏移量,那需要注意对吧,这个ID呢应该是in类型,偏移量呢应该long类型对吧?这块咱们也可以做一个我输处啊,比如说我现在呢,在咱们这儿对吧,你每次在处理的时候呢,咱们对吧去读取一下什么呢?说我现在要从我们这卡卡是它呢,就从我们red来读取我们现在的这个偏移量了,这偏移量应该是从哪是开始的啊吧,这篇应该从哪开始的,你可以做一个网站输出对吧?好了,这是关于我们现在啊从red读取偏移量的操作。
我来说两句