00:00
好,接下来呢,我们来看一下指定up进行一个消费。那其实呢,哎,我们有这么几种消费方式啊,一个呢是a list latest a none这么几种模式,那么默认的方式呢,是latest,那都代表什么含义呢?哎,这个a list这个非常简单,就是从头开始消费,之前我们学过一个参数叫杠杠from beginning。哎,就从这。对吧,从最原始的位置,从最开始的位置开始进行一个消费就OK了啊,那么late呢,就是从最后一个up这个位置开始进行消费,你启动之后再来新的数据,我可以消费了,历史的数据我就不消费了啊,在生产环境当中呢,用这种方式也比较多了。那看一看它默认的是不是这种方式啊,大家养成习惯,来到官网。看一下你看这里面呢,就是latest采用的是最新的off位置开始消费,那下面呢,有三个值啊。行,那这个呢,是属于常规操作啊,那在生产环境当中啊,我不想从最开始消费,也不想从最后一个位置开始消费,我想从中间的某一个位置开始消费。比如说从三。
01:07
那这个能不能做到呢?当然也是可以做到的,那下面呢,我们写一下代码来实现一下对应的这个功能啊。好,复制一下。创建一个类,那下面呢,我们再来写一遍。呃,没方法。那我们。写一遍吧,第一遍啊,第一步叫创建。消费者。第二步,订阅主题。第三步叫消费数据。再来一个第零步。就是配置信息。好,那下面呢,我们来创建第一个消费者。又一个卡普卡很传去对应的词,对。死罪。这个我明天写过很多遍了哈,我就快一点,那传一个purpose。
02:02
有一个parties。小包啊,拿到它之后给它添加进去。点吧。补充过来好,那下面呢,这里面有几个必配的信息,那就是连接。呃,点put,那我们传递一个consumer config。点点什么呢?Boat server好。那传一个hi doop1029092。HIOP103。9092。那还有一个呢,就是反序列化。点put。很很点key反序的话,那传进去对应的这个string。反叙的话,嗯,然后点class.get name。那再补上对应的这个Y流值。
03:01
第二。Y6反区的话。There class.get name。成,别忘了还有一个参数必须配置,就是对应的主ID必须得给。点food。Consumer fig,点主ID,那就是group ID。给他一个test。二吧。行,那下面呢,我们来开始订阅对应的主题,那就是consumer.subscribe啊,订阅它,那需要一个集合,传入一个集合list。死罪类型。加班。那这个呢,是TOPIC4OK。帕斯点ad。First。上。把它拿过来。饭去,哎这样呢就OK了,那下面呢,消费数据well出。
04:00
开始消费星猫砍点铺拉取。那我们是一秒钟拉取一次。点好拉回来一波数据之后,那我对这波数据呢,进行一个循环遍历打印点。System out可清文啊,Record这样呢就OK了啊,那OK之后呢,我们下面呃来验证一下,看它行不行。消费者打开,那下面呢,往里面发送一波数据。好检查一下,哎,这边呢,目前呢,消费者能够消费到数据说明正常,那下面在这个代码基础上,我再进行一个优化了,优化什么呢?我要指定位置进行一个消费。那志豪啊,要在这个订阅主题下面写上,这地方是指定位置。
05:02
进行消费。那就是卡consumer点,哎,指定那是这个S。是它,那这个里面有一个分区和upset,比如说哪一个分区从哪一个位置开始往后消费。那这个分区信息我如何获取呢?非常简单啊,它给你提供了对应的API点。啊,Assignment,通过它呢,你可以获取到对应的分区信息。拿它之后,你看这里面返回来一个分区的集合,那既然是一个集合,我就便列对应的这个集合点。拿到所有的分区信息,把它添加进去。分区给过来,这就所有的分区信息都有了,那下面再补一个upset,那么比如说从100这个位置开始,往后进行一个消费啊,这样行不行。真的是?指定offset。
06:00
行,那我们运行一下看行不行啊。行,那现在我启动之后啊,那下面呢,我们往这里面发送点儿数据,看看能不能进入到。好了。我这边是不是已经发送完毕了,但是你看到这边是不是没有拿到相关的数据啊,那这是为什么呢?大家思考一下,为什么这边我消费不到数据呢。差在哪儿了?关掉。大家想啊,回忆一下消费者主的一个初始化流程,初始化流程的时候,是不是多个消费者跟对应的coordinator进行一个汇报啊。汇报说我要加入到一个组里面,然后呢,他会选择一个consumer leader啊,然后呢,发送所有信息给到他,他呢制定对应的分区分配方案之后呢,再给到这个coin coin呢把方把这个消费方案进行一个分发,是这样一套流程,比如说这里面启动的时候需要大量的时间交互。
07:02
那好,那你看这个代码。代码这是单线条了,启动之后就正常往下走,往下走往下走,走到这儿啊,说我要订阅这个for的主题,然后呢就来获取对应的分区信息。你订阅这主题,你能拿到对应的分区信息吗?能这么快吗?是不是需要点时间反应啊好,那你这块有可能拿的就是空的分区信息,那你这个指定也就没有任何意义。那我怎么能说,哎,我拿到分期信息之后再往下走呢,那我怎么办呢。这地方你要保证。分区已经分配完了,分区分配。方案。已经制定完毕。制定完毕,那怎么获取?这样啊,你来一个assignment.set也就是说它的大小如果一直等于零,说明它没有获取到任何分析方案。如果想加快它的一个获取到分区分配的方案,那怎么办呢?哎,这里面我们帮他一把看清吗?点poor,我我来拉取对应的数据那点。
08:07
哎,这样我去拉起数据。那好,那么如果说他真的获取到分区分配的方案之后,那我怎么更新这个值啊,那这里面是不是有一个更新的一个过程。那你就再来一遍,那这样的话,这里面是不是就是哎,获取到分区分配方案之后,那我就能够跳出对应的循环,然后往下走,是这样过程吗?哎,是这样一个流程啊,那这样呢,就可以正常的一个执行了。试一下行不行?好,那这个呢,先把它干掉啊,干掉之后我这边在生成数据。走。再来看你看现在呢,就能够消费到数据了,那你看我现在offet从哪个位置开始的啊,这个offet数都大于100呀。嗯,两百五百六百,那我再来一个大一点,我来一个600。看有没有600以上的,有吧,有600以上的啊,我来一个600,再进行一个充消费,看这回行不行。
09:03
直径。没动啊,有同学说没动,这是不是有问题呢?哎,不是有问题啊,你看你上次这个你这个消费者主啊是TEST2是不是已经消费到600的位置了,哎,那这个呢,你可以给它改一下啊,比如说改成一个T3,哎这样呢,改变一下主这个就可以重新来。那现在呢,我是改了一个消费者主你看。你看这个off outside这个位置是不是全部都是大于600级以上的。最大多少,最大是631啊,这种方式呢,就是指定off进行一个消费啊。
我来说两句