00:01
好了,我们接着讲了啊,同学们,呃,上午的话,我们已经把这个基本的环境呢,给他这个搭好了。好,那打好以后呢,接下来就是通过这个代码来去实现我们想要去做的这个事情。对吧,那这个最基本的功能就是呃,开始去做这个数据的一个消费,然后呢,然后再去做数据的一个分流操作。对吧,我们就要开始去写这个Spark streaming啊这样的程序啊,嗯,这个写之前的话呢。我们先来做一个简单的分析啊呃,目前啊,我们是要从你的卡夫卡中呢啊,通过这个Spark streaming,然后呢,去往出消费数据。对吧,消费出来以后呢,我们做一个分流,然后呢,最后呢,再把这个数据呢,再次写到这个卡卡中。好,那么这个过程中我们就会有一个消费,然后呢,有一个生产这样的过程,就对应你的卡不卡来讲啊,就是我要从你那边消费数据,然后呢,再把数据呢写出去。而且大家想想啊,我们把这个数据分流完成以后呢,写到这个DWD层。
01:02
那接下来我们是不是还得有一个这个Spark程序,然后呢,再去消费你的数据。然后再把它消费出来,然后再去什么做这个后续的处理。好,那我有可能呢,还要再去写回到你这个卡不卡中。所以说呢,我们在这个整个过程中啊,我们会频繁的用到,诶,基于卡夫卡的一个消费和生产。能理解吧,啊,那既然这个事情我们是能够这个提前这个分析到的啊,那我们就考虑啊,把这个卡不卡的一个消费和这个生产的话呢,给它做一个封装啊,比如说我们写上一个工具类,然后将来只要你用到这个消费数据了啊,只要你用到这个生产数据了,那你直接去调用我的工具类就可以了。要不的话,你想想,你每次是不是都得重新写这个消费者,重新写这个生产者呀?基本上都是固定的代码,那你就没必要这么去做。对吧,所以说我们在这个具体写这个功能之前啊,我们先考虑。写一些工具类。能明白我的意思吧,哎,我们先考虑先写一些工具类啊OK,好,那我们先写一个就卡不卡的工具类吧,就把我们刚刚分析的这个生产和消费啊,给他这个先封装出来,封装出来以后呢,我们再去写这个具体的这个分流的这个代码啊好,所以说你看接下来工作的话,我们是先去准备这个工具类了啊同学们。
02:18
好,那我们就回到我们这个开发工具里面好,进来以后呢,我们。来到我们的这个慢下面啊,因为现在我们写的是Spark的这个程序,所以说我们肯定用的是这个skyla对吧,那我们就按照以前的习惯,呃,我在这个麦下面,我再去新建上一个目录啊,我叫这个盖,然后呢。把它呀标记成一个。好,Source route,然后这样的话呢,我就可以在它里面写代码对不对。好,接下来是创建我们的包啊呃,包的话我就叫。呃,Gmail的一个real time可以吧,就是这个实时处理啊行,然后呢,呃,我们现在要写的是一个工具类,那我在它下面我再去创建一个包,我叫有跳这个包。
03:10
可以吧,那将来的话呢,我们除了这个工具类之外,呃,我们还会写这个具体的程序。是不是啊,比如说我们再去规划一下啊,我们将来会写这个具体的程序,呃,那我就这样啊,我再去新建上一个包,又上一个package。好,准备好点,不要time,比如说我们写的是这个应用啊,那我就叫这个APP吧,对吧,我们的处理过程啊,我就把它称之为是一个APP啊,这样的话你看一下我们就可以分到这个不同的包里面,然后呢去做这个事儿。啊,那比如说将来我们还有可能会涉及到一些,呃,需要去声明一些什么这个病类啊,那你可能还会有什么病包什么的,对吧,你比如说啊,我们再用上一个package。啊叫这个什么病对吧,这里面我们可以什么放一些我们的这个,就是所谓的这个实体类什么的。是不是啊,大概的规划是这样子的啊,行,现在的话我们先去写这个有跳啊,那我们用上一个呃,这个SKY类啊,呃,就这个环境,大家这个需要自己看一看啊,因为我这个是做过全局配置的,所以说我把这个呃工程创建好以后呢,它里面就已经默认帮我加上那个,就是默认上这个SKY的那个什么呃框架的支持了啊,如果说诶你在这个new的时候呢,发现我这个new不了这个SKY相关的东西啊,那就说明你还没有把这个SKY的支持把它加进来,那你需要在这个地方去右键,然后呢去挨着什么framework对吧,这里面你就往下翻,然后呢,在大概是这个位置啊,有一个SC,你需要把它勾选上。
04:37
对吧,勾选上以后呢,你就可以什么正常去使用了啊好来接下来我们就开始写了啊,我们就上一个盖类。好,那现在我们写的是一个工具类,那我们要把它生命成一个半生对象啊,我们就叫做MY卡不卡U。可以吧,来写个注释啊好,这是我们的这个卡夫卡工具类啊,啊用于什么,用于用于这个生产和消费啊,用于这个生产数据啊。
05:11
和消费数据啊,OK行,呃,那按照我们这个目前的这个规划来讲的话,它里面最起码得有两个方法啊,一个是我们的叫做这个消费对吧,一个是我们的这个生产。对不对啊,一个是这个消费,一个是这个生产啊,呃,那我们需要这个具体分析一下啊,像这个生产的话呢,应该还简单啊,就是你把这个数据呢,写到你这个指定的这个topic里面啊,然后就完事了,但这个消费啊,现在这个稍微会麻烦一些,呃,因为我们现在这个消费数据啊,我们再来看这个整体的架构啊,就是我一直说过,如果说你想不清楚了,你就回头去看看这个图啊,我们现在的消费是通过这个Spark streaming,然后呢进行这个消费的。对吧,并不是说我自己写消费者对吧,我们自己用上一个什么叫卡布卡,什么consumer啊,不是的,我们是什么直接把卡不卡呢,作为一个数据源,然后呢,通过我的这个Spark STEM呢,然后去对接这个数据源,然后进行这个数据的消费啊所以说呢,这个地方我们写的消费呢,应该是什么,基于就是什么呀。
06:15
基于什么这个Spark streaming啊,然后呢,进行这个消费。理解我的意思吧,啊,所以说你在写的时候,你要去考虑了啊,好来吧,那我们就呃先写谁呀,先写这个。先写消费吧,啊,因为它这个稍微会麻烦一些啊,那我们需要定一个方法,呃,那你这个消费出来的数据呢,呃,将来我们是要什么在我这个实时的处理中呢,去进行这个处理的,所以说呢,我这个地方的话,我只要什么获取到一个就是呃第stream就可以了,对吧,就是基于我这个Spark stream,然后呢,去连上你的卡夫卡,然后呢,我拿到一个这个stream,那我拿到这个stream以后呢,我就可以什么正常的去做后续的处理,所以说这个地方我不做这个具体数据的一个处理,你能听懂吧。
07:00
因为我们只是一个工具类,那我就把这个能够获取到这个stream。就完事了,然后呢,接下来就什么交给你的这个APP里面去做这个处理就可以了啊,所以说这地方我们就是什么就获获取啊,获取到我们的这个叫卡布卡迪dream啊。对吧,获取到它就可以了啊好,那我们就来写个方法吧,那这个方法的话,比如说我们就叫什么get卡不卡啊,写成大写的啊,卡不卡,然后呢,Stream啊行呃,那将来肯定会有一个返回值啊,这个返回值的话,我们先不写吧,对吧,让他自己去推上啊一会儿OK,那我们在这个获取的时候呢,你就要去知道一个点,这个点就是什么,就是你的SPA streaming是怎么基于你的卡夫卡。对吧,来获取这个stream的这个过程应该怎么写。这个首先你要知道。OK吧,呃,这个大家应该之前在学这个Spark的时候就已经讲过了啊,就对于这个卡夫卡来讲,就是卡夫卡的这个数据源来讲,人家这个呃,Spark是封装了一个工具类的,就是不需要你这个一步一步去写,你只要什么调用这个工具类啊,调用一下这个工具类,然后呢,就可以什么获取到这个stream啊,那这个工具类的话,我们是这么去调用的啊同学们,它叫卡不卡UTS,我不知道大家还有没有印象啊。
08:21
好打个包啊,你看一下就这个包卡卡跳S,然后呢,它里面有个方法,我们叫做create。叫director STEM。还有印象吧,同学们create direct stream来,那我们就什么啊,写了好我提示一下啊。Director STEM OK,然后呢,这个方法里面的话,我们需要传一些参数啊,大家来跟着我分析一下,我们传什么参数呢?首先啊,现在我们用的是这个stream contacts啊,说一会我们要给他传一个这个stream contacts进来,然后呢,还要再去传,呃,我看我们用用下面这个吧,啊传上一个stream contacts,然后再传上一个这个位置的一个这个策略,然后再传上一个就是就是你这个消费者的一个策略,就是这几个东西,你要把它准备好,准备好以后呢,你再给他这个传进来就OK了。
09:11
好吧,那这个呃,Stream contacts的话呢,我们这里面肯定是没有的,所以说呢,诶将来我就什么希望你通过参数给我传过来,对吧,那我就什么先声明一个参数啊,叫这个SC吧,它是一个streaming contacts OK吧,然后这个有了以后来吧,那我们就写一写了啊,那这里面写什么东西啊,就是SSC先把它放到这好,第二个参数的话呢,它是一个location storage啊,就是一个位置的一个策略,这个的话不需要我们自己写啊,它有一个呃。Ti ts吧,呃,它有一个这个我我提示一下啊。他有一个类,然后可以直接获取到的啊呃,诶怎么没有了呢。
10:01
呃,TGIS啊,应该这个OK,就它啊,对吧,它有一个办事对象啊,然后这个身对象里面你点过来看一看,它就有这么几种策略啊,这是一个,然后这是一个啊,应该是两种吧,我记得。对吧,应该是两种啊,然后这两种的话呢,我们用哪一个呢?啊,我们一般用的是呃,一般用的是这个。对吧,你看他在解释啊,他说呃这种策略是什么意思,就是如果说呃,你的这个ex对吧,就将来你这个之前的那个什么ex啊,然后呢,它是跟你的这个卡卡的这个blocker是在相同的这个节点上面的。对吧,那你可以采用这一种策略。能听懂我的意思吧,好,那正常情况下啊,你的这个实时任务,就你这个斯巴克跑的这个程序,跟你的这个卡夫卡肯定是不在一个节点上面的,你的卡夫卡肯定是什么专门这个卡不卡的集群,你不可能在这个卡不卡集群里面再去跑你这个斯Spark任务。对吧,所以一般情况下我们都不是这种情况,那我们就使用第二种就是什么呀,这种是属于就是大多数的这个情况下,我们都可以去使用它。
11:04
明白吧,啊,所以说我们用的就是它啊来,那我们就把它调出来叫。呃,Constant对不对?好什么constant。能看到吧,就用它就可以了啊。好呃,然后呢,再来看啊第三个。这个第三个是什么,它是叫做consumer storage,这是一个消费者的一个这个策略。那这个消费者的这个策略,我们要写什么呢?同样啊,它也是提供了一个类,然后我们可以直接去使用。好来写一下啊,好也是有这么一个类,然后呢,它里面然后有一个什么叫做subre啊,就是你去订阅,那你想想啊,你从这个卡卡顿消费数据,那你不得去做这个订阅吗。对不对,那我们订阅的时候呢,诶,你看看它里面需要你传什么东西啊来我们就提示一下这个方法啊好,这个方法里面需要我们传的东西是这样子的,首先那你得告诉我你的这个主题是谁。
12:06
对吧,你说你从卡卡消费数据,那你不得告诉我你的主题是谁吗?好再一个的话,你还得告诉我什么呀,你还得告诉我这个就是卡夫卡的一个什么P,就是你这个消费者的这个参数。能不理解啊,你得告诉我这个消费者的这个参数啊好。呃,我们用就用这个吧,对吧,消费者这个参数好,那同样的话呢,我们还有一种情况啊,大家需要了解的就是你从卡夫卡消费数据呢,你涉及到消费者以后,那么你一定要去考虑一个。这个问题。那明白吧,一定要考虑这个opposite这个问题啊,那我们之前大家这个从卡夫卡消费数据呢,一般你们都是基于什么呀,默认的opposite进行消费的,就是这个oppositeet你们没有自己管过。对吧,是卡不卡自己帮你维护的,那你每次消费呢,就从卡不卡里面去读出来这个消费这个opposite,然后去做消费了啊,但其实卡不卡也支持我们自己去指定一个opposite进行消费,就比如说我给你传上一个opposite,你就拿我这个opposite进行消费,这也是可以的。
13:06
啊,所以说你看这里面,它可以让你传一个什么呀,传一个这个opposite,就是你可以什么,自己决定你要用哪一个opposite。能明白吧?同学们好,那暂时的话,这个我们还涉及不到啊,所以说我们什么直接使用第二个方法就可以了啊,就简单一点好,那我们首先需要有一个topic,呃,那这个topic的话,因为你写的是一个工具类,所以说这个topic的话,你就不能把它写死啊,如果你要把这个写死的话,那你这工具类只能从某个topic消费数据,所以我的要求是你给我传过来,能理解吧?Topic啊,这是我的要求,你给我传过来。啊,那你传过来以后呢,这里面我们就可以去写了啊,你看一下它是要求我们怎么着啊,写一个什么it就说白了啊,我们是支持你从多个topic消费数据的,那你需要把你的topic呢,封装成一个it,呃,这个it啊,这不就是一个迭代器吗?对吧,只要是可迭代的啊,那你就给他一个数组就可以了啊,那我就把它封装到一个数组里面,然后呢,把这个套杯给它放进去。
14:04
能看明白吧。好,然后接下来,然后我们这个第二个参数呢,它是一个卡夫卡的一个什么PA啊,这个他要求我们传一个这个map过来啊,那我就把这个呃,Map给它这个定义一下好吧,呃,那我们定义到外面啊,定义到这个外面。好定一下啊,这是我们的消费者的这个参数啊,消费者的这个配置啊,他要求是一个map,那我就写上一个,呃,我就写一个map吧。可以吧,我就写一个map啊,然后map里面的话呢,呃,我看看它是什么类型的哈,他说这个字符串和object对不对,是不是啊字符串object,那我就什么生命成这个字符串object啊呃,泛型来写一下,诶好字符串,然后呢,Object好后面我就可以去写了啊,我先把它接收回来,接收回来。
15:00
好,这是我们的叫做consumer cons那行吧,然后接下来这里面你就可以去写你这个消费者的这个什么配置了。OK,那到了这个地方你就要去想了哈,我们消费者都需要加什么配置。对吧,赶紧想想你们之前在讲卡不卡的时候。跟你们说的啊,这里面我们需要加加什么东西啊。好,这个有知道的同学,有想到的同学,你就在这个公屏里面打一下啊,我看看你这个能不能记得起来啊。好,这个有同学说GOID好,这是一个来。好,给我发ID来,还有什么呀。赶紧想啊,赶紧想这个其他同学也是啊,好,KV的反序列化,诶,这个很对啊。反式氯化类型对不对,反式氯化器吧,好,还有什么。
16:07
连接服务器端口OK啊,这应该就是你的一个什么卡夫卡集群位置对不对,好,我把它写到上面吧,来写到上面啊,好来这个卡不卡的什么呀,集群位置好来再想啊,同学们再想。还有什么?啊,我看看这个还能不能想到啊,这几个应该都是最基本的啊,嗯,必须得去写的,对吧,还能不能想到这个别的呀。没了吗?还有吧。啊来,我们一起来这个说一下啊,啊还有同学说这个分区分配的策略啊,对,这也是啊,分区分配策略啊,这肯定也是对吧,但现在我们对于这个消费者来讲,我们这个呃,暂时还研究不了这个分区分配,好吧,但这个你可以写啊,这没毛病,你可以写啊,行,还有什么东西呢,就是这个。
17:07
Oppositeset的什么这个提交。对吧,你要配自动提交还是手动提交啊。是不是还有一个什么呀,这个upset的一个什么重置,对吧,这不都是你这个消费的时候,你要遇遇遇到的东西吗。对吧,你消费的时候,你都会遇到这些事情。好吧啊,当然还有别的啊,就是等等吧,就是我们最核心的啊,应该是有这么些个,然后呢,当然这个别的配置有很多,你都可以什么自己去配OK吧,来这个地方我就不多说啊好,然后接下来我们就开始来写了啊,首先第一个叫卡夫卡集群的位置,呃,因为我们是一个map,所以说我可我就可以这么去写了啊,首先你的KK是什么呀。嗯嗯,这里面放的不就是一个KV吗。对吧,你放的是一个KV对吧?好,那你的K怎么写啊,K是一个字符串。那你要去写的这个配置,那你就要写什么,写配置项的名字对不对啊,大家还能记住那个配置项的名字吗。
18:02
能不能记得住?啊,同学们这个是不是不好记啊,好,那不好记怎么办呢?诶有的同学说那我可以到官网找啊,对没毛病啊,你可以到官网找,比如说我们来过来啊,我们找到这个卡不卡这个官网啊,你到了这个官网里面以后,你可以什么直接去做搜索对吧,比如说我要搜什么呀,搜你的这个什么卡不卡的什么集群的位置,那应该是什么blocker对吧,Block的什么list吧。是不是找一找啊。是不是这个东西啊,呃,不是它是吧,那叫什么来着,呃。应该是什么BOO是不是。不对不对。Bootrap叫这个server啊,应该这个吧,是不是啊,你是可以怎么自己去找的,但是呢,你这个,呃,你在找他之前的话,你肯定得对这个配置呢有一点点印象,如果说你都完全不知道它叫什么名字,你找起来不也很费劲吗?对吧,所以说啊,大家注意呃,这个写配置的时候呢,你们在讲卡夫卡的时候肯定讲过啊,消费者的这个配置啊,我们其实有一个配置类的。
19:09
对吧,消费者的配置有一个配置类,当然这个生产者也有啊,一会我们再来说这个生产者的,你看了哈,在这个consumer理念啊,应该是康菲克斯啊,哎,不对啊。卡就叫卡啊,好,我打个包啊。OK,在它里面啊,大家注意你点一下,你可以点出来各种各样的配置。对不对,你看你点出来以后你再去找,这不就很好找了吗。对吧,比如说我们那个bootop server,那你写一下这不那个什么bootop server config吗?对不对,你点过来,你点过来以后,你看看这个东西,呃,它引用的是另外一个配置,OK,你再点过来,好,你看这个配置是不是就是我们的不service啊。能看到吧,同学们啊,说这个你要知道啊来,如果说你知道这个类的情况下,OK,那就好办了,这就是我的这个配置好,那我的Y6怎么写呢。
20:00
VALUE6不就你自己指定的吗?哈多1029092103,然后这个9092,然后呢,哈多104,然后呢,9092。对吧,这不就写好了吗。能理解吧,好,那你这个会了以后下面就好写了呀,你看啊看con讲叫什么叫key的一个叫什么Dis of class。对吧,Config这不就是那个什么K的那个K的那个反序列化吗。对不对,OK,行,那这个我们配的是什么呢?注意啊,呃,这个就按照你这个将来这个所消费的数据的情况来了啊,那我们的数据的话,就是一个普通的什么日志数据,对吧,其实就是一个字符串能听到吧,其实就是字符串,所以说我们这个KV的这个反序列化的话,我们都写成字符串了就OK了啊,那这个类叫什么名字呢?那你就去找一找了啊,这个你肯定记不住我们叫字符串的D。This of leather啊,应该就是这个吧,对吧,就这个类啊,然后呢,我们把它全类名copy过来,好放到这好,那你key会写了,那你的value是不是一个道理啊,Can con.value的一个什么value conflict啊,我把它写到这,这不就OK了吗?
21:12
对不对,同学们好,来再往后啊,呃,再往后是这个group ID啊,注意这个groupd的话,我需要这个说一下啊,呃,因为我们写的是一个工具类,所以说将来我们从这个只要你是基于这个卡不卡做这个做消费的,对吧,或者做生产的,那我都可以使用我这个工具类,但是并不是我们每一次消费呢,或者生产的都用同样的消费者组,对吧,我们可能想换一换啊,就跟你这个topic是一个道理,我们每次呢,不可能都从同一个topic消费数据,我们想换一换啊,那这个怎么去换呢?那我就要求你把这个GOID呢,给我动态的传进来,我就不在这里面写死了。好,但是呢,因为我们这个只是写了一个配置哈,他是没办法给你往这去传的,但是我们在获取这个消费者,就是获取这个stream的时候,我们是不是要把这个东西给他传过去,在里面去指定的吧,那我是不是可以考虑你在这个方法中,你不仅要给我传一个topic,那我还要求你啊,把这个group ID也给我传过来,这不就OK了吗?
22:12
能不能听明白,同学们好,那你把这个group ID传过来以后呢,其实我在用之前啊,在这个位置,我在用你的这个配置对象之前,那我可以做这样的一个操作,什么操作呢?就是consumer config对吧,然后呢,呃,我看看啊。嗲。呃,这个还不行啊,因为我现在给的是个啥东西啊。我现在给的是一个map对吧,这玩意儿还不能动,你能不能看得到。是不是这玩意儿还不让我动啊,不让我动,呃,因为它是一个不可变的,所以你不可变的话,你就没有办法再去往里面去,这个这个这个加东西对不对,所以说我要把它改成一个可变的哈。对吧,改成可变的啊,叫做beautiful。
23:00
啊,来挡一下包啊行,那我改成这个可变的以后啊,然后呢,他说这个,呃,那我前面也得改一下是吧,对不对,这样就可以了啊,然后你看一下啊,那你将来传过来以后呢,那我就可以什么consumer con点什么put,诶在这啊怎么写呢,Cons con叫什么put,好put的时候,那你的keep怎么写呢?Keep就还是一个consumer con con,它里面有个什么叫做group ID的con,然后你再把这个什么group ID给它传到这。那不还明白,是不是相当于你动态传进来以后呢,我就给到你这个配置里面了,给到配置里面以后呢,将来我在这个地方再去用,不就相当于用上了吗。对吧,好说这个group ID我们就搞定了啊行,搞定以后呢,再往后啊,下面是我们这个opposite的一个提交,那这个opposite提交的话呢,呃,我们有什么有这个自动提交对吧?有这个手动提交,好那你看我想写了啊consumer讲叫做呃自动提交叫什么叫。叫凹卡密是吧,诶诶错了啊,我就就提示不出来了,大写的啊。
24:06
好叫什么叫这个,呃。Auto。我看看啊卡吧。对吧,叫这个啊,就是允许自动提交你的什么opposite啊,应该这个配置来我们写一下啊,叫做啊enable啊好点过来看一下啊,这就什么呀,Enable or to commit,就是是否什么允许你自动去提交这个opposite啊好那我们这个之前我们用的话呢,一定这个基本上都是这个都是什么呀,是不是都是这个自动提交的呀。对吧,那我就给个处啊,现在我们还是这个自动提交,如果说将来你不想自动提交了,你想改成手动的,那你就把它改成first。对吧,好行,那你知道了这个自动提交以后,那你还得知道另外一个事儿。啊,另外一个事儿就是,那你既然你是自动提交了,那你多久给我提交一次呢,对吧,你多久给我,诶这什么东西啊,你多久给我提交一次呀。
25:05
啊,我就光写个逗号啊,我说了提示啊,不对的,来写一下啊,肯砍点叫做什么呀?找Val Ms这个值。这什么意思啊,这个就是你自动提交的这个时间间隔。对吧,你不自动提交吗?那我这个怎么什么时候给你提交呢?它是一个周期性的提交啊,它会什么呀,通过这个配置来去配它多长时间提交一次,呃,我记得这个应该是五秒钟哈,默认这里面它没有默认值,你这个只能到官网里面去找啊,对吧?它默认是这个五秒钟5000毫秒的,那不就五秒钟吗。对不对啊,说这个配置的话,我觉得你也得知道一下啊,那这个我就不写了啊,就你要知道的,这个是我们的自动提交的这个时间间隔OK吧啊,这我就不写啊,行,然后下面是这个outside重置,那这个也是比较重要的一个点啊,我们来说一下,它是consumer conig讲叫做呃,Reet,就是autoet conig。
26:10
对吧,就这个东西。这叫opposite outside重置,好可以,那我估计我说到这样,可能又有同学这个想不起来了,什么叫opposite重置来着,对吧?啊,你这个看一看吧,他都解释的啊。说。当我们在做操作的时候,如果说没有一个初始的这个什么opposite就卡不卡里面没有你这个之前的这个消费记录。对吧,或者说呢,诶,你当前的这个opposite呢,在你的这个集群的服务器里面已经什么呀,不存在了。能听懂吧,那么这两种情况下,我都会怎么进行这个offset一个重置,好那我再来说一下啊,你记把它记清楚了啊,第一种情况就是你的卡发卡中的没有初始的一个offset,就说白了,你现在这个消费者主呢,它是一个新的,就之前是没有任何的消费记录的。
27:00
理解吧,好,还有一种情况就是什么呀,我不是一个新的主,但是呢,我之我之前做过消费,但是我所记录的这个,比如说1000啊,我所记录的这个oppositeet,现在说来我要从1000开始去做消费了。但这个卡夫卡说,对不起,这个opposite已经没有了。诶,那什么情况下就没有了呢。就是默认超过卡不卡的配置,超过七天以后我的数据会做删除,那有可能你是十天前消费的,对吧,那你这个十天后呢,你又来了。他可能诶你这个数据呢,正好在这个什么十天,就在这个七天内已经什么给你做做了删除了,那你再过来消费的时候,这个数据就没了,那没了以后就找不到你的这个opposite,那找不到这个opposite怎么办呢?我就给你重置一下。对吧?那我总得让你消费吧,你过来一个新人对不对?那我总得让你消费吧,你的没了,那我总得让你消费吧。对吧,这个时候它就会重置,那这个重置的话有几种情况呢?我们有这么几种,这么几种情况,第一种是ear,第二种是latest,这两种是我们用的最多的。
28:01
理解吧,当然后面还有啊,后面的话就一般不用这个是,那这个是会给你抛异常的啊,还有一个叫什么叫这个anything else,对吧,这也是会给你抛异常的,这两种我们不常用,我们主要用两种,一种是来往下翻,一种是这个earliest,一种是latest,这个是重置到头,这个是重置到尾。那什么叫头,什么叫尾呢?比如说当前我的这个topic。对吧,假设啊,我就只有一个分区可以吧,那现在我的这个数据,假如说啊,我前面的数据呢,我有过删除啊,有过删除,那现在我的数据是从这儿开始的,那比如说你的opposite是100,对吧,前面100的已经被删掉了,好,那假设这个地方是3000可以吧,如果说你让他重置到头,OK,那么。卡不卡就会让你从这个位置开始消费,那么现在已有的数据你你是能够消费的到的,如果说我让你重置到尾了,那对不起,你只能是从这个位置接着往后消费,现在已有的数据你是消费不到的。
29:01
好吧啊,我把这个再来强调一下啊,这个你们讲卡夫卡的时候肯定都讲过啊,像它的默认值是什么,它的默认值应该是我记得应该是latest啊,我们可以去看一下,呃,这个叫什么叫做呃,我把这个配置项复制过来啊。配置项复制一下。好复制一下,然后呢,拿过来放到这儿。好找一下,你看它这个默认是不是一个latest呀,对不对啊,它默认是重置到尾的啊,好,当然你你希望它重置到什么地方,那你就按照你实际情况去定啊呃,那我还是把它拿过来吧,我也写个latest得了啊。好latest OK,那么我这放一个,然后呢,再放一个就是我们常用的哈,它叫做这个A,它叫做叫做list啊在这。好,放到这儿。OK吧,这是最常用的啊,行,呃,当然哈,像这个配置,这个配置就目前来看,你不配也行,因为它默认就是这样子的,但是为什么我花时间要给你强调这个东西呢?说明它对于消费者来讲很重要。
30:08
明白了吧,就是你只要你是消费者,那你就会涉及到这两个东西啊,因此我又花了时间给大家去强调了一下,那我就希望你把它记住啊,你不管这一次你记没记住。对吧,或者之前你有没有记住,但是我这一次说完以后呢,你就不能再记不住了,以后我估计啊,真的没有人再去跟你说这个东西了。好吧啊,把它记住啊行,那除了这些之外的话呢,这个其他的配置我们就暂时就先不涉及了啊,当然你还可以再去加别的东西,好吧,来那我就把它写好了啊呃,把这个写好以后,接下来我们就回到这你看了哈,他不是要一个这个配置吗。对不对,你看是不是要这么一个配置啊,好,那我就给你传过来不就完了吗?叫consumer。给你传过来,这不就完了吗?看到了吧,同学们好,穿过来啊,行,那么这么写好以后来我们接收回来讲这个VR,呃,接收回来。
31:04
好呃,但这个接收回来以后稍微有点问题啊,大家看一下它这个提示的说我消费到这个record里面都是一个nothing。啊,这什么原因呢?这个因为我们没有给泛型啊,它是推算不出来你这里面到底是什么类型的啊,所以说你看一下我们在做这个,我看看啊是做订阅吧,来你看这个方法啊,这个方法人家是有什么有泛型的。对吧,所以你要把范型给上啊,那我们的将来我们所听好了啊,将来我们所消费的这个消息的它的KV啊,都是字符串,对吧,所以说把这个泛型指定上,指定上以后来再接收回来。诶,这个时候就不是nothing了啊,这就是我们的叫做卡不卡stream啊好,虽然说它它不叫这个卡不卡,它没有叫成这个卡夫卡stream,它叫的是一个input stream,但是呢,其实本质上你要知道,它就是从你的卡夫卡去拿数据的。能理解吧,行,那我把这个拿回来以后呢,我们再把这个就是卡不卡stream给他这个返回回去就OK了。
32:03
能看到吧,行,那么到目前为止,我们的这个方法就把它这个写好了,这就是我们的一个诶消费的一个方法啊,当然我们这个方法呢是呃,基于这个Spark stream消费获取到这个卡stream啊,然后呢,使用这个默认的off啊。对吧,就是我没有自己去自己去指定,我们使用的是默认。那其实我们在后面写的时候呢,我们会有一个场景,我们是需要手动去指定upset的,那么到时候我们再来去加吧,现在我们先不考虑那么多,好吧,先把这个方法写好啊,OK。行,先停一下啊。
我来说两句