00:00
所以我们接下来我们准备在程序当中把我的数据呢,给它来采集过来,然后呢,给它消费一下,保存到当中,那既然是要在程序当中的话,那我这里的创建一个新的项目了,这个项目呢,就叫做消费者,所以点击new,然后创建我们的module。然后呢,点击下一步,那这里我就写上叫CT,我们叫consumer,所以啊把这个呢拿过来拷贝,点击下一步,然后在这里斜杠我们的它点击完成,点击完成之后,那我现在啊,我就准备要去什么呢?来完成我们的操作,那首先你得跟卡夫卡有关系啊,所以我们找到咱们之前卡夫卡的这个项,这个项目当中会有依赖关系,这个咱们就不用去找了,记住如果以后你要是没有找到老师呀,这个去哪找啊,没系你就去网上找找卡夫卡的ma的依赖就完事了,那我们这里事先是有的,那我所以呢,就直接把它拿过来了啊嗯,这个呢,我们就直接拷贝,其实我们这两个是不需要的啊,为什么呢?因为我们这里是做流失处理和那个什么看给大家看源码和用的,那也就意味着我真正用这两个就够了吧,啊,他俩就够了啊,所以我们又不做流失处理,也不用看源码了,所以我们这里呢,就直接在这里啊来写上。
01:15
把这个呢给它加上啊来,那并且你的这个consumer它应该也可能会跟有关系吧,所以在这种情况下,我们是不是还应该把那个common给它准备好啊,所以在这里呢,再来一个啊,我们的依赖关系,这里我们写个CT叫common,诶把common也给它加进去,加进去以后你不要忘记这边书的刷一下啊,之前咱们就没有刷就没出来,所以我这边刷一下,刷一下以后如果没问题的话,会多一个我们的consumer出来,诶就可以了啊好了,那现在呢,我们当前的这个里面就可以去写我的程序了,所以src,那么然后呢,我们这里的Java和这个resource啊,那Java里面我们点击new,然后创建package,我们叫com.at硅谷,然后点啊,嗯,我们叫CT啊,这是我们的项目的名称,后面呢,就是我们的consumer了,嗯,好,那么我们这里既然是消费,那肯定也得有一个main方法来执行它呀,所以我们new啊,咱们new这个我创建一个类,那我们之前那个。
02:15
用的是那个叫P,我们这也这么用,所以我们叫啊点击OK,嗯。然后把这个呢,我们就给它增加,诶来我们写上叫它增加焖方法,在上面加注释,嗯,咱们叫启动我们的消费者啊,那么启动消费者的目的是什么呢?很简单啊,我们写上叫做使用啊,放前面使用卡夫卡的消费者啊来获取什么呢?我们的嗯,Flu啊采集的数据,采集的数据,那你获取以后有什么用呢?诶,将我们的数据啊,将我们的数据直接存储到我们的h base中去,诶这就是我们的目的,很简单啊,那么我们这个呢,我们就需要考虑考虑该怎么做了啊,那么首先你的这个类有了,那我得有相应的对象啊,咱们面向对象编程,那所以我这里呢,再来加上new,我创建一个包,这个包就叫bin啊,一个我们的对象,那这个B里面我们应该增加一个叫consumer,那既然是consumer的话。
03:22
我记得咱们在common当中好像增加了一个接口叫producer,那这个producer呢,表示生产者的意思,那我的consumer呢,就是消费者的意思了,所以我在这里呢,也创建个接口叫做new,然后呢,我们这里给他来啊,我们创建一个它,咱们叫consumer啊,然后呢,点击它选择我们的interface啊,咱们的接口,那么然后呢,我们的producer大家还记得吗?我们这里怎么了?是不是有一个可关闭啊,那咱们消费者你消费完了也得关闭它呀,所以点击OK啊,然后接下来我们这里呢,生产者他因为涉及到一个数据的来源的问题,所以我们这里有个in,有个out,但咱们这个consumer呢,就没这个事情了,那你消费其实就是获取嘛,那这就不管了,那我这里呢,就直接把它拷贝过来了,嗯,拷贝过来以后,这里我们写上咱们叫做消费数据啊来咱们叫消费数据,那这个呢,就改个名叫consume,嗯。
04:16
好了,然后呢,在这里呢,我们就写上啊,咱们叫消费者接口啊,都是面向接口编程了啊,那好,那我这么写完以后,那我得在我当前的程序当中创建它的一个实现呢,诶,所以我们来我们写上new啊,我们写上呃,我们的加class,我的目的就是为了去消费那个通话的日志,我就叫诶我们的的嗯,OK,然后呢,点击OK,点完之后呢,去实现我们的这个啊,咱们叫做consumer啊。好,就我们自己的就可以了,那写完之后,这里呢,把它里面的方法呢,给它重写一下,然后呢,这里写上咱们叫消费数据啊,来消费消费数据,然后呢,这个呢叫释放资源,如果你有的情况下啊,来咱们写上叫释放或者关闭吧,嗯,咱们叫关闭资源啊来关闭资源OK啊那这个呢写上,嗯咱们叫做通话,嗯,咱们叫通话啊。
05:23
通话日志的消费者啊,消费者对象,诶OK行了,那你这个有了之后,那么我们的consumer呢,就可以在这里面呢,给他准备一下啊来我们写上,咱们在这上面写吧,嗯,咱们要创建消费者。创建啊消费者OK,那这两个我们我想想还是放上面去吧。把这两个放上面去啊,把这两句话,嗯,然后呢,把这个放上面去,这个呢放下面来啊,好,然后呢,我们的consumer这边呢,就等于new啊,我们就要polo consumer啊,把它准备好,那你创建消费者之后,那么不用说了,你要消费我们的数据了啊,就是这样,然后呢,接下来这边啊,我们接着来,那我这里呢,干嘛呢,我们写上叫关闭啊来咱们叫关闭资源诶就可以了啊,所以我们的consumer呢,把它拿过来啊,咱们点嗯。
06:19
我们的consume啊,然后呢,这里呢,就是我们的关闭资源了,那么OK,我们把这个呢就拿过来了啊,然后呢,我们的第二我们的close,诶好了,那这里呢,会有异常发生,这个异常呢,我们给它来po咱们的exception,嗯,好,那这是呢,我们一个大概的一个架子啊,把架值搭建好了以后,那具体的消费数据呢,就是由这个方法它来完成的,所以我们的方法呢,就在咱们当前的对象当中就它了,那在这里呢,我们消费的话,那么我们的逻辑就要清楚了,首先干嘛呢?我们要去采集数据,我们要去得到我的数据,所以我们写上叫获取flu啊,采集的数据你得把这个拿过来,那这个时候我们该怎么办呀?同学们,我们怎么获取啊,那首先我们是不是应该构建自己的那个卡夫卡的consumer啊,对不对,所以我们写上叫卡夫卡的consumer啊,就是他了,然后呢,里面就是我们的string啊和我们的string了。
07:19
然后呢,这里呢,我们写上叫consumer,他等于new啊,咱们的卡夫卡consumer,诶好了,我们这么写,但这么写是不是报错了,为什么报错了,因为它没有无参的构造方法吧,是不是啊,那所以在这种情况下它报错啊,报错以后那我们是不是应该传参数,那传参数我记得给大家看过,这个参数当中会有很多不同的类型吧,其中有一个对于我们来讲是比较容易的,叫property,对不对,那所以呢,我们这里可以在上面咱们给它构建啊,写上咱们叫创建我们的配置对象,OK,那你创建配置对象的话,那不就是我们的property吗?嗯。所以把这个呢,给它准备好,咱们PP等于new我们的property啊,就是这样好了,那么写完之后,然后呢,我们把这个pop呢,给它放过来,诶就可以了啊,那然后咱们该怎么办来着。
08:14
就是我们这个地方,我们暂时可以先不考虑对吧,我们说了先不管它,就是有一个对象保证它不出错嘛,那我后面该怎么办了。后面是不是应该干嘛呀,是不是应该关注那个主题呀,对不对,你消费者你要想获取数据,你是不是得去那个主题上获取啊,那你是不得先关注他呀,哎,先给他关注啊,所以关注我们的主题,那么关注我们的主题,咱们应该是consumer干嘛呀,Consumer点是不是有个叫呀,这个东西,然后呢,我们写个叫点,我们叫at least,把多个数据呢,变成个集合啊,就是这样,然后呢,这个集合里面它的那个topic的名字是不是应该叫CT啊,但是呀,我们最好呢,不要写成固定的这个值啊,我们写成这种值呀,以后再扩展的话就比较麻烦,为什么呢?可能多个地方都有CT,那你光改一个别的地方可能就会有变化,所以在这种情况下呢,我们最好呢,我们用常量来表示啊,因为你这有个固定的常量嘛,所以咱们之前有个common common当中,我们当时给大家准备了一个常量的东西,叫names,就是名称,那这个就是名称的情况下。
09:21
那我这个就叫topic了,那就叫topic啊,我这写的还不对啊,应该放在上面啊,我想想啊,写个逗号吧,嗯,放到这边啊,然后我也写个什么东西啊,CT,记住啊,上面这个叫做什么东西啊,是我们的命名空间吧,是跟我们的什么表各方面什么有关系的,那我这个呢,是跟我的卡夫卡的主题有关系的,所以在这种情况下,那我们就来取了,来拷贝,拷贝以后在我刚才的这个地方,我说了不要固定写时,所以点点了以后呢,我们有一个叫做什么topic,对吧,然后呢,我们再来点有一个叫做什么呢?叫做get value。
10:00
你这样的话可以把它拿过来啊,嗯,但是我看看啊,咱们这里面,我们这边好像。嗯,它这里是集合,我这个是不对是吧?点一下get value哦,这个还不对,不太对是吧,我们这边我想想啊,Name set,那我这边把它返回吧,把这个我们的name返回一下,嗯,你返回以后,那这个时候呢,你传的是CT,那么这个name就是CT,那么这个值就直接返回了,所以我这边呢,应该就是对了啊,所以我关注这个主题,那你关注以后,然后怎么办,然后该怎么办,然后是不是应该获取我们的数据了,对不对,或者叫做采集或者叫消费版啊,咱们写上叫消费数据了啊,哎,咱们写上叫消费我们的数据就可以了,那你消费数据大家还记得该怎么做了吗?这里面是不是应该不断消费啊,因为你的生产者在不断生产,对吧,那你就应该不断的去消费,那么我们写作V处,你v to以后,那这个时候呢,我们的consumer应该有一个方法,这个方法呢,点我们叫做什么啊,给个啊这个这个东西写完以后。
11:07
这个铺方法呢,是有返回值的,这个返回值叫consumer record啊,就是返回的一个结果,其实就是采集的一个结果,所以在这里呢,给它拿过来,我们叫做consumer,嗯,Record,诶就是这样啊,好,把这个呢,我的泛型给它加上,这里呢是我们的spring啊,我们的spring这个其实都是咱们写过的啊,就没有什么好说的了,那行,那这个咱们写完了之后,同学们想一想吧,那我现在是不是应该对我的一个结果应该循环便利啊,所以我们的consumer record DR,因为我们讲过它肯定是实现了那个叫可迭代吧,所以我们这里是可以直接用for循环的,那么直接用否循环的话,我们的record就有了,那你有了record之后,那你的记录不就有了吗?所以我们这里就直接来打印我们的record,点它里面会有value,有有topic,这些东西是不是都有啊,那我就把这个value给他拿过来,那就看看我的这个值我取没取到,那你。
12:07
那如果渠道的话,那你后面就可以完成我们的什么具体的操作了,那不就完事了吗?这样我们通过这种方式把那个卡夫卡的那个消费者呀,给大家写出来了,咱们已经写过了,只不过呢,我们这里再写了一遍啊那么你光这么写的话可能不行,为什么呢?因为你还没有配置,那不行,那咱们就要想一想,咱们当时都有哪些配置,还有印象吗?啊,可能印象不深了,那所以呢,我们也不用管它,咱们这里呢,直接找我们的卡夫卡啊,找我们的这里面,我们当时在写代码的时候,同学们看我在程序当中这个consumer啊,咱们点我们这里有两个不同的消费者,那么大家想想我当前写的这个是高级的还是低级的,高级的对吧,因为低级的话,它力度可以更细一些,对吗?哎,我这里用的是高级的啊,那么高级的情况下,咱们打开这个,打开这个以后,你会发现它是不是有这样的一些配置呀,这些配置我相信同学们一定能够什么明白吧,对不对,没有任何的问题啊好了,那我现在。
13:07
那我如果这么写的话,是不是等同于还是固定写死了,那我能不能想办法给它放到一个配置文件当中呢?也就意味着我现在举个例子,比方说我想放在哪里呢?Consumer,我在咱们当前的这个地方来,我们的resource里面,我加一个叫做new,然后呢,我这边来创建,创建一个文件啊,呃,创建一个我们的文件,我想想,嗯,我创建一个叫consumer,点我们的,嗯,来practice文件啊,那我创建这个文件之后,那么也就意味着我的这些配置信息是不是应该都应该有啊,诶所以拷贝拷贝以后呢,我放到这儿,然后呢,把这个呢,我们也拷贝过来啊,反过来,然后呢,放到里面去,还有我们的这个,嗯。给它拿过来啊,放过来,嗯,好了,然后接下来我们再把这个group ID啊,然后呢,一样啊,就是我们以前配置的东西呢,我们自己都给它拿过来用一下就行了啊,然后呢,放过来,诶把这个呢给它放过来,嗯,好,然后呢,这边呢给他来啊,这个呢是我们的1000啊,这个呢给他个true啊group ID我们就叫艾特硅谷了,嗯,然后这边等号啊,这边等号上面呢是等号,那这个服务器啊,就是我们当前集群的地址,这个可以多写几个也可以,什么呢?哎说就写一个也可以啊,那我这里多写几个吧,诶,好,然后再来一个,嗯。
14:29
好,那写个三啊就行了,那接下来是我们的那个反序列化啊,我们这里因为是消费者嘛,你就得反序列化了,如果是生产者的话,你就得序列化,嗯,所以呢,这里呢,给它拿过来两个应该是一样的啊好了,那我这边呢已经可以了,那可以了之后跟我之前的代码呢,就有所变化了,那我的程序呢,就稍微改善一下,就意味着在我的程序当中,我得读取这个配置文件,我该怎么做。这个我该怎么做,同学们想想,那么我们的prop点它应该有一个叫做什么东西啊,叫做漏吧,对不对,那么这个漏的话会有个什么方法,就有需什么东西啊,需要个留吧,OK,那这个我该怎么做,你一个有一个fair是吧,好的,然后么,同学们。
15:22
然后怎么写,那你你直接就写上了,是你直接就写那个叫做是吗?是不是这样的。Consumer点啊,咱们的点啊,咱们点啊,啊加拉帕是吗。啊,是这么写吗?嗯,首先记住啊,你们学乱了是吧?啊,学乱了就是你们什么都会是吧,但什么都不懂是吧?啊就这意思是吧?就这个你们学那个流流头,你们是这么学过吗?你们这个是不是在学spring的时候应该有这个东西啊对不对?学spring配置文件有这个玩的吧?那我们你们学那个I的时候哪有这玩意?
16:21
IO流里面哪有这个东西,这个是不是文件file啥意思啊,这文件系统啊,它叫文件系统,什么C盘呀,D盘呀,E盘啊,什么斜杠这样的东西,哪有这玩意儿啊,没有,这是我们特殊的框架当中,他做的特殊处理,他会对这个路径啊做一个特殊的操作,那我们这儿可没这个事情,也就意味着大家能够想到这一点,说明你们意识到了我们现在的这个文件放置的路径,你应该不可能直接读到吧,对不对,因为你如果大家看想想,如果你直接这么写的话,可能是读不到的,对吗?为什么?因为它是文件系统,可是你文件系统它默认指向的是那个Java的什么,那个B的那个目录,所以并不是我当前的什么类环境,那所以这个可能就读不到,那我该怎么办?
17:08
那怎么办?这个时候啊,这个时候咱们之前就讲过一个叫类加载器,类加载器可以在指定的位置来加载我们的类,对吗?但是告诉你它不光能够加载类,还可以加载什么东西啊,还可以加载我们的什么东西啊,加载我们的来配置文件啊,所以呢,我们可以这样啊,咱们叫thread点啊,咱们叫card thread,然后点有个叫什么东西啊,咱们叫做get啊contest。然后呢,大家看有个这个东西叫做get resource这啥意思啊。你这句话能不能明白什么意思是读取一个文件,把它作为流来使用,艾不是作为什么吗?对吧,那么叫获取资源,把它作为流来使用,对不对,那所以呢,我们这里呢,来添加什么呢?我们的consumer的那个叫啊。
18:06
所以啊,也就意味着我们这种方式应该是可以的啊,然后呢,我们这里呢会有异常发生,给他来啊,我们这里呢给他catch一下,嗯,然后呢,我们这边就写上一个exception啊,然后呢,给它一一点啊好了,然后呢,我就把这个呢给它挪进去啊来把这个代码呢给它挪进去啊,所以把这个呢给它放进来啊好了,那首先我们就来想一想,同学们看看这个代码你们能看能不能看明白啥意思,能不能看明白什么意思,叫当前运行的线程获取它上下文的类载器对吗?然后呢,获取之后从上文的类加载器里面去加载我的文件,把它作为流来使用吗?那现在重点就在于这个类加载器是什么类加载器了,这个什么,这个类加是什,你当前运行的线程,是不是当前的这个消费者在运行。
19:07
啊,那这个消费者在运行,其实是靠的是不是主线程在运行,那所以其实就是主线程吧,主线程,当前主线程是谁来执行的,是不他来执行的,那所以说这个类是被谁加载的,被应用类加载器对不对?首先它不是这DK的,他也不是扩展类的,他是我们自己写的,那我们自己写的话,一定是我们的应用类加载器,对不对?那么应用类加载器去加载的话,那不就从我当前的类环境当中去取吗?那这个在不在我的class下面,在不在,那当然在了,对吗?否则怎么可能运行这个类呢?它一定在我们的class part下面,所以就能够找到它,所以我们讲类加载器,不见得只是讲一个类从哪里去读,还有它的配置文件的那个路径啊,还有这样的一些事情,所以这个如果你能明白的话,那我换一种写法能不能行呢?比方说我这。叫做点什么的class点什么这么写呢,是不是一样的,因为你这个类肯定也是被当前的应用类加起加载对不对,那所以也是可以的啊,就是这样,所以我们怎么写都可以,但是我们最大比较常见的写法呢,应该是用这个啊,用这个来去操作一下啊,好了,那我现在我们这个写完了之后,只要没问题的话,我们这边呢就可以呢,把数据给它展现出来了,好,那我现在呢,对不对我就不知道啊,那所以我这边呢,给他来重新我们操作一下,那我这里呢,咱们就别用控制台了,因为我们是用Java程序了,就不用控制台了,那我这个是不是还应该去采集一下,但你光采集的话,如果数据不生产这个也不合适,哎,所以我们这里呢,再去生产一下啊,来这以样,我这边我看看啊,我记得诶我这边有一个生产呢,那我有生产的话,我就回车,回车以后看看啊哦,他咱们这样是不是不太对呀,应该是我们的model,然后呢,我们的date是吧?诶。
20:59
好,然后呢,再来。
21:01
我们这边是不是在不断生产呢?你不断生产的话,那么我们的辅助就可以不断的采集了吧,诶就是这样,所以回车啊,咱们这边那你采集的情况下,那我在这边的话,我们是不是可以直接运行了,所以呢,我们去运行一下打开,打开以后呢,来我们运行,运行以后,反正生产在生产,采集再采集,那我们这边就看能不能消费到不就行了吗?啊看一看。大家看是不是可以啊,他得等一会儿吧,你看是不是这样的,所以他是周期性的,记住他不可能一次性把数据全拿到他的周期性的去取出去,那也就意味着只要我们生产在不断的生产,那我这边都开着的情况下,也开着,你的消费者也开着,那数据就源源不断的就过来了吧,哎,就是这样啊,所以我们通过这种方式呢,告诉大家我们的消费者如何通过Java程序来进行消费啊好了。
我来说两句