00:00
好,刚才呢,我们看了几种那个生产者什么带回调函数的呀,还有自定义分区啊,啊都生产那个内容,那接下来我们看一看这个消费者,消费者这边的OK,来直接在这吧。建一个包com.I硅谷点。怎么会里面呢,写了一个那个customer consumer。Consumer。可网好,然后同样的它应该也是PSVM对吧,方法可执行的,可执行的多数据的,那同样的它也是有一个什么卡夫卡。Consumer啊,也是他这个案例呢,也在他这个源码里边,案例也在源码里边,大家进来看一下。嗯,往上找同样的,它都在前面那个注释里边。说这里面这边呢,也会提供一个小的案例。
01:04
嗯。在这。对吧,一个一个什么,这是一个案例,一个案例来把这个拿过来。看谁给他拿到这边来。替换掉,那这里面也要替换很多内容吧,提换很多内容control。R是把这个心啊直接replace,然后这个东西。这个东西是什么?是一个双引号对吧,双引号CTRL,把它变成一边的一个双引号二啊这样就好了,那还有一个什么LT,这个干什么呢?这个我们就不用了,我们把这个干掉,消费者这块就不用了,我们主要拿了解一下它的一个配置吧。了解他的一个配置,OK。拿一下好,嗯,这个地方是什么,所有的一个配置信息是不是啊配置信息OK,然后这个同样的是卡夫卡啊集群。
02:10
然后我们改成自己的吧,好多102 OK,然后这个地方是什么。Group念ID,消费者组ID吧,消费者组IDIDOK,然后这两个东西。这两个也是一套的。这个叫什么?是否自动提交,是否自动提交,那这个地方指的是。它处理了这个自动提交,大家猜一猜,这个自动提交的是什么东西呢?像想看消费者这边消费数据,他还要提交什么,那只能提交什么呀,提交off赛的,哎,对提交赛的就是这个意思,提交off的。设置自动提交。
03:02
我在喂,然后还有一个这个是。自动提交的一个什么延时,就是说这个是干什么用的呢?你获取到数据之后,你消费过来数据,你一般不是说你在公司生产环境当中,你不可能做什么打印,打印一下这种操作吧,你是不是要把这个数据做一个相应的一个业务处理啊。对吧,哎,就是过来数据,这个延时指的是你读到数据,就是说你从那个topic里边把数据拿出来了之后,过一秒他再提到三等。过一秒之后,他再提交奥复赛的,那这一块呢,给生产环境当中会有一个小小的问题,就是干什么呢?如果说你数据是读过来了,但是提交是不是过一秒啊,对吧,在这个过程当中挂了。或者说业务处理到一半。挂了,那就产生了什么。
04:00
这一段数据会产生,或者说这样吧,呃,他整个过程样呢,先读数据读过来,然后再业务处理,然后再提交吧,再提交,假如说在提交之前这挂了,就是说业务也处理完了,这挂了,那你。在启动数据的时候,它会出现什么现象,重复消费了对吧?啊重复消费了,重复消费了,所以在公司当中呢,我们对这块也是用低级API去管理的,低级API去管理的。就是说做成类似的,就类似一个等这个数据完全处理完了之后,我们再提交这个offet,保证这个数据处理跟opposite要保证它这个一致,保证它那个一致啊,是这样的啊,这块要注意的一个点就是它可能就是由于这个时间的一个设置,可能会产生那种重复数据的一个读数读取,当然正常的你一个流失处理的框架都正常的,在运行的过程当中,它不会有这种问题吧,那你晚一秒晚一秒提交,你晚多少秒提交都无所谓,对吧,还都行,因为你整个是流失的一个处理流程一个处理,但是当它挂了之后,你重新启动的时候。
05:11
你要重新读上一次的所在位置吧,也就上一次提交的一个结果,他可能这一次你消费过了,而且已经处理完了的数据还没有。我们现还没有提交,没有提交是有可能这样的啊,这个地方是提交延时。一二验室,那下面两个就不用说了,对吧,KB的什么。反序列化了,反序列化了,因为它是什么D来吧,还D反序列化,哎,这配置文件好了,然后我们就。创建。消费者对象。OK,你有一个卡卡consumer啊,它里边的一个参数看一下也可以传一个进去吧,对吧?哎,把我那个给它传进去。
06:02
生产者消费者,那消费者是不是同样的,来看一下。你这个地方指定那个的时候有指定topic吗。对吧,没有指定topic,那看一下哎,那源码当中它是怎么来指定topic的呢。是什么意思?订阅的意思对吧,啊订阅的意思,那也就是说他呀,要指定一个你要消费哪个topic,用这个方法来。指定。Topic是consumer.subscribe这里面要一个集合对吧,要一个集合来放这个。To,也就是说你看啊,这个地方就说明了什么问题啊。
07:01
我可以消费,同时就一个消费者可以同时消费。多个topic的数据吧,哎,多个topic数据来看一下,如果说多个的话,他用的是a.at least来做吧,来看一下,如果说我只想消费一个呢。这个里边放一个。放一个。放一个,那这块idea呢,提供了另外一种写法啊,就是说他不是爆黄了对吧,爆黄了就说他有优化空间,或者说有提示的一个什么内容,那你可以看一下。columns.ss。啊是这样的,你可以用这直接,如果说你仅消费一个主题的话,你用这种方式,那我们就想消费多个。At least,然后我里面再放一个什么first。
08:03
范。2SD,然后我还消费。是的。我我有色的那个主题吗?没有对吧,我就写一个这个东西啊,写一个这个东西。啊,看他能不能消费是吧,看他这个消费者能不能创建,消费者能不能创建,是这样的,OK,然后我主定了主题之后,是不是应该去获取数据了呀,哎,他获取数据是。Yeah。这什么意思啊?或拉的意思吧,哎,拉取的意思,那就是说消费者是主动的从topic里边来获取数据的,OK,那它里边传的一个什么。延时就是说隔多长时间我来获取一次吧,哎,我来获取一次,所以说这个东西呢,是一直要循环的,再调用的,隔一段时间就会调一次,隔段时间掉一次,它返回值是什么。
09:04
这是。拉取拉取,拉取。你看一下它也有这个意思,然后这个返回值是什么。Consumer record。Consumer record对吧?Consumer record,刚才我们生产者生产的叫什么producer record吧,一条一条的数据,那消费者消费过来的也是一条银行数据,但是他加了什么?S加了S,这说明它一次拉取过来的数据会是多条啊啊,假如说100毫秒拉一次。好,Consumer because,那object呢,是不是不好处理啊,如果你想改,是不是最核心的地方,你在这改把它改了。
10:00
下面也得改一下吧,STEM,然后把这个。把这个改一下,把这个改一下,OK,那接下来我们要对这个获取过来的数据干什么。先打印一下吧,哎,要不然正常的你去处理是不是这个地方我们就只能打印一下consumer.for循环。啊,For循环,那这个for循环出来的就是一条一条的数据,对不对?哎,我们看一下这个一条一条数据里边它有什么内容点。值是什么?Offset是什么?Partition had key这些东西topic都能够拿到吧,都能拿到,那我们打一下这个topic点。So,除了这个topic呢,我们还想看一下。他是。哪个分区的对吧,也在这个的点。
11:00
Partition再加上我们要看一下。值对吧?哎,我们最重要的是获取它这个值加上一个什么record.value record.value这样的。你可以把这个。写成这种样子,简单好看一点,好看一点,那就是打印下,打印下OK。好,那consumer这个东西啊。你想想看,你正常的这是一个慢方法对吧,那他往下走往下走。走到这去获取了一次数据,打印了,接下来呢?接完不就?接下我们就退出了吧,也就是说你你当年这个消费者只能够。获取一次数据是不是啊。只能够获取一次,所以说呢,我们应该来个什么。没有。处处,然后呢,再个这个胸。
12:00
在这个什么循环里边,我们就不断的。来获取数据吧,啊,来获取数据,不断的来获取数据,是这样的。啊,在循环里边来获取数据啊整个的,要不然因为咱们自己写的代码对吧,你没办法干什么。不让他退出吧,他自己走完了,打印完了之后,他GM就退出了呀。你想想看,你什么100毫秒获取一次数据,它能够获取吗?获取不了啊啊获取不了,所以说我们只能在这个用一个什么死循环来让它一直在这。来调用啊,循环一直调用循环OK,然后接下来我们来启动一下。启动一下当年的那个。消费者消费者。这就启动了啊,这就启动了它外要处,因为这一个外要处对吧,它就不会退出的,这个是GF不会自己关闭的,不会自己关闭的,OK,然后我们来一个什么呢?生产者生产点数据。
13:06
我们就拿这个控制台的一个生产者吧,把这个关掉。B卡不卡?然后杠brook对吧。杠list,然后是哈多102问号9092啊9092杠杠你要往哪个没有发送出去,发送出去啊走一下。好,这个东西有了,然后我们还来一个什么。Second吧,啊,Second,好,在second发一个hello,走。来这儿。Second的零号分区的啊,零号分区的一个hello,然后再来这个地方呢,我们也来一个什么艾特硅谷。
14:00
爱德博士。First这个来获取到的吧,来获取到的,所以说它可以一个消费者同时。消费。多个。而且你填了一个不存在的。报了错,报了错,他报了错,它里边说这个东西不存在了,就是说因为我们没有logo缩件,但是。他他只是报了一个那个异常对吧,就是说你这个你你订阅的有一个topic不存在,不存在,但是不影响其他两个,从其他两个topic给他获取数据吧,就是说他报错只是抛了一个一查自己给他处理掉了,而没有说整个的业务就挂掉了吧,哎,不会这样的,不会这样,因为我们这边正常的能干什么。进行一个消费啊,进行一个消费是这样的,OK,这是它的一个高级消费者,你看这个过程当中,我们有设定它的一个什么off等等这些值吗?根本都没有的对吧,哎,根本都没有的。
我来说两句