00:00
好,同学们,我们接下来呢,给大家讲一下咱们卡夫卡作为数据源,咱们该如何采集数据啊,在咱们工作当中,咱们的SPA这命呢,一般是跟卡夫卡做对接,由卡夫卡那边作为一个消息的数据源,咱们给他采集过来啊,是这样的,那这个呢,所以啊是面试和开发的重点,嗯,但是呢,我们的卡夫卡呀,他在做我们数据源的采集过程当中,它分两种不同的版本,那么一个叫receiver API,一个叫direct API啊那么这个receiver是什么意思呢?顾名思义,它需要一个专门的ex来当我们的接收器,来采集咱们的数据。那么这样的情况下,它的那个采集和计算的那个速率啊,可能不会相同,那不相同的情况下,就可能会导致呢,数据的积压,内存的溢出,所以啊这种方式现在已经不怎么使用了,在早期版本中有这样的一种方式,但现在的版本中是不实用的,你去增加我们的美美依赖,根本找不到对应的一些找不到,那么我们的API是什么意思呢?它是由我们的计算的节点来动,消费卡卡的数据,说的简单一点,就是我们的计算节点呢,它同时又计算它有什么消费,那这样的话呢,你的怎么说呢,就是你采集数据的数据啊,就跟你消费的数据呢,可以做一个均衡啊,速度由自身来控制,这个我们称之为叫direct API啊就是这样,那么我们下面呢,就区分了一下啊,比方说早期版本零杠8RECEIVER模式啊,就是这样,这个当前版本是不适用的,所以啊,咱们课件当中有这个内容,但是我们这里是不讲的啊,然后呢,接着往下这个。
01:38
也是一样叫direct模式啊,接着往下,下面呢,还有一个咱们往下走啊,往下走来来来这边呢,是有一个零杠10DIRECT啊,这个呢是我们现在需要给大家讲的啊,那么首先这里面我们需要增加依赖关系,来吧,把这个该关的关掉啊,打开咱们的依赖,然后呢,在我们这个位置给它增加上咱们拷贝。
02:02
拷位以后放到咱们的下面啊,咱们拿过来好,当我们的这个依赖关系增加好之后,那接下来咱们要做的事情就是要想办法去读取卡夫卡的数据,但是你前提条件你得把卡夫卡给他启动好,对不对?诶咱们一会儿来说这个事情啊,回来以后我们开始要写代码了,写代码以后啊,这里面会有一些我们需要注意的地方,由于在工作当中,咱们这个SPA stream主要就是跟卡夫卡做对接,所以咱们这个相关的类当中啊,其实它提供了一个工具类叫卡夫卡U,这个呢,我们一点一点给大家写上啊。来把这个去掉,然后去掉以后呢,我们拷贝咱们写上叫零四,然后这个呢,我们写上叫卡夫卡,然后呢,点击OK,那好了,那这个里面的它咱们就不要了,把这个去掉。然后呢,这边呢,我们也都不要了啊,咱们都不要了,不要了之后,接下来咱们说过了,卡夫卡它有个工具类,所以这个工具类呢,叫卡夫卡U,然后呢,我们导一下,导一下以后点它里面啊,咱们卡夫卡us啊,然后点一下有一个叫create,咱们direct stream。
03:14
那这个里面会传很多的参数,这个参数呢,我们就直接给大家拷贝过来了,在咱们的这个位置啊,有一个它,而且呢,它这里面要加上泛型啊,那这个泛型呢,咱们写上咱们叫做string,咱们叫做string什么意思呢?就是说我们要采集的数据啊,那个卡夫卡传过来的是什么样的key,什么样的V,那咱们这里呢,其实就是字符串的key,字符串的V啊,这里咱们说一下,然后呢,这个SC就是我们的上下文的环境对象,这个没问题,这个呢是一种我们的位置的策略,这句话是什么意思呢?它所描述的概念就是说我们的那个采集的节点跟计算的节点该如何做匹配。那么原则上来讲就是我们会有一个什么呢?比方说哪的数据我们怎么采集会更优一些,对不对?在咱们SPA个块当中,不会有个首选位置的概念吗?咱们这里呢,可能不叫首选位置了,但是其实恰恰也是一种我们的位置的关系,但这个关系让你来决定不是很方便,所以啊,咱们这里呢,干嘛呢?就是让它自己来选择,自己匹配,由框架来匹配,你不要去管它,就是这个意思啊,所以这个呢,我们就不说了啊,它其实取值啊,大家可以看到除了这个以外还有什么呢?我们叫brokers对吧?那其实就是说我们自己来选择咱们的服务器节点,这个呢,我们一般咱们不干这个事情啊,咱们让它自动选择,咱们别做这个事情啊,这个叫混合的嘛,对吧,就是这个东西。
04:38
好了,这个呢咱们就不说了,下面这个呢,叫做consumer,那就是我们消费者策略,消费者策略的话它来干嘛呢?做订阅嘛,那么订阅的话,这边会有一个我们的艾特硅谷这什么东西啊,咱们来提示一下看看什么东西。它里面呢,会有个叫topics啊,就是我们的主题啊,咱们主题呢是艾特硅谷,这个倒也没什么问题,然后后面呢,是一个卡夫卡的,这个呢,我们称之为什么呢?就是一个卡夫卡的配置,对吧。
05:07
这个配置呢,我们课件当中是有的,所以我们直接参考了,咱们直接拿过来拷贝,拷贝以后在我们的这个位置,诶拿过来大家可以发现这就是我们卡夫卡的per,然后呢,里面会有一些内容,咱们叫consumer config。好了,那么这个地方大家可以发现LINUX1还有二三是我本机的地址,9092是卡夫卡的服务器集群地址啊,所以这么写没问题,这个呢,我们称之为叫做group啊,就是一个消费者主对吧?当然我们这里呢,其实主要呢,都是干这个事情的啊好了,那这个呢,我们去掉啊,咱们去掉,那这么写完以后大家会发现呢,我们现在其实就已经连接到咱们的卡夫卡了,对不对,那所以呢,我们这边写上来一个咱们叫做卡夫卡,或者咱们这样吧,咱们给个VR提示一下,咱们点叫VAR回车。回车以后,大家会发现你拿到的是一个叫consumer record啊,就是消费者的那个记录啊,啊那个数据啊,那这个呢,我们就写上卡夫卡啊,咱们的date。
06:10
他们叫DS,但是其实你这么拿到以后啊,其实好像意义并不大,为什么呢?因为我们想拿的其实是它的value什么意思,就是看看你传过的是什么样的值,你叫什么key什么的,其实不关心,所以大家可以看到来,咱们在这里点,点了以后给他个map,这个map写的下划线,点它里面有个叫这个value,就是传过来的那个值,你从卡夫卡当中把数据给它传过来,对吧,那这个时候呢,我们直接点叫print就可以了,诶咱们就这么写啊,那你这么写的话,那么一旦数据过来了,我们在控制台上是可以看得到的,行了,那我们的卡夫卡的这个程序啊,咱们基本上参考了一下咱们课件中的内容,咱们给它写上了,但是有前提条件,你要把topic准备好对不对,然后呢,包括这些服务器都启动好,这些东西没有那是不行的,所以我们这里来看一看啊,咱们现在给他来啊,咱们现在呢,已经有我们的卡夫卡和主per已经启动好了。
07:11
现在我们来看一看。来咱们写上。呃,咱们写上咱们就叫做卡夫卡,诶,放过来,然后呢,我们现在想干嘛呢?首先我们先看一看咱们的topic都有哪些内容,所以呢,我们写上叫卡夫卡,嗯,咱们叫做topics,然后呢,在里面呢,我们写上一个叫做杠杠,哎,咱们叫bootstrap,嗯,好,咱们叫server,那么这里呢,给它来一个,咱们叫LINUX1记住啊,这是我自己的机器,我们叫9092。然后呢,有个叫杠杠list,表述的是我通过这个我们的集群地址来访问咱们所有的topic,就是这个概念了啊,来回车回车以后咱们来观察一下啊嗯。大家可以看到我们当前呢,其实就有这样的一些topic啊,就是这样,但这个topic好像已经有艾特硅谷了,对吧?那所以我觉得还是换一个新的会不会好一些呢?咱们这个叫做new行不行啊,咱们叫at硅谷,New就是诶或者说咱们叫做new at硅谷呢?算了,就这么随便写吧,咱们就意思意思得了,那我现在就想创建一个新的topic,那我要创建一个新的topic,我们该如何创建呢?对吧?那这里呢,我们来把这个放过去,咱们把这个历史的去掉,咱们叫create,叫做创建,那么这个创建以后呢,有一个问题,你想创建什么样的topic呢?诶,我们就写上叫艾硅谷。
08:33
艾特硅谷咱们叫the new对吧?哎,咱们就这么写,写完了以后,它这里面会有分区的概念,所以我们写上杠杠,那咱们叫partition partitions加个S表示多的意思啊,然后写上一个我们的写个三吧,然后呢,杠杠啊,咱们再写有一个叫副本对吧,咱们叫replication,咱们叫factor啊,副本因子给个二,那么这样的话它会有六个对不对?哎,就是这样的啊,好回车。
09:01
回车以后,如果创建成功的情况下,我们这里会多出来一个我们的topic啊,所以来我们给他一个杠杠list,嗯,好了,咱们这里就多出来一个我们的new了,那行,那现在呢,我们有了这个new以后,我们现在准备啊给它来消费数据,所以我现在点击运行啊去运行,但是由于你这里根本就还没有发送任何数据,对不对,所以呢,你这个是控制台上肯定全都是空的,那怎么办?诶,我们现在还要去通过控制台来生产数据,这边好消费,如果生产没问题,消费也没问题,那么这个卡夫卡就是联通的啊。那这里呢,我们来看一看吧,在我们的这个地方这边正在,诶我先看一看,他把那个控制台的时间戳先打印出来啊,咱们先看一看,没问题了,咱们再去生产呢。好,已经出来了,出来了以后,现在呢,我写上一个B啊,咱们叫卡夫卡,然后呢,我们写上咱们叫做cancel啊,咱们叫cancel,然后呢有一个producer,那么这里呢,我们写上什么呢?来写上一个杠杠,嗯,杠杠我们叫做broker list啊来broker list,嗯,咱们写上一个LINUX1,然后9092,咱们的集训地址,然后杠杠咱们的topic,这个topic呢,我就写上了,咱们叫做艾特硅谷new,诶好,回车,回车以后咱们稍等一下,同学看,我们这边呢,给它来啊清空,清空以后呢,我写上叫嗨,回车以后再写个skyla,再写个Spark回车,回车以后大家看一下,Hello出来了,然后Spark盖LA不也出来了吗?这就说明你的卡夫卡可以呢发送数据,我这儿是不是也可以消费数据啊,那你能消费数据,你就接着往下执行你的需求不就够了吗?OK,这个就算是完成了啊。
我来说两句