00:00
那么读取咱配置文件工具类完成之后呢,那么再往下走呢,咱们再进写工具类,这个工具类呢,是读取我们这个卡普卡的消息啊,读取卡卡的消息对吧?呃,那么咱们现在啊来看一看,那么怎么样去从卡法里读取数据,对吧?然后接下来你想一想,你读取卡卡消息的目的是不是要通过咱们Spark training来就已经行处理了呀,对吧,那怎么样去读取我们现在卡卡消息呢?对吧,咱们这个阶段第一天对吧,给你们讲其实对吧,我带他回顾一天嘛,对吧,主要是读去卡不卡嘛,其实这个对吧,那咱现在来看一看呗,在这里啊,我再去创建一个工具类,那这个呢,比如叫麦卡夫卡U艇,那这个呢,是读取我们这个卡夫卡它的一个工具类,对,然后接下来在咱们这里写一个主方法啊吧,先把这个东西呢,咱们先想一想怎么来写,然后一会呢,我再去封装咱们。
01:00
这个方法到现在这个主办法里边来写一下啊,那么如果说啊,要是从卡萨里读数据的话,那么这个时候其实是不是就用到了我们SPA streaming,然后卡卡集成的一个我们零杠幺零这个包里的类的呀,对吧,那是不是有一个叫什么工具类嘛,叫什么叫卡夫卡YouTube是吧?有一个叫卡夫卡YouTube对吧,U这样一个工具类对吧?那咱现在呢,UUU是吧,那这样一个工具类,然后接下来,那么咱们这里面呢,那应该有一个我们这个方法叫create direct stream对吧?那么咱们现在呢,在选择的时候,肯定是S开始的啊,不应该是我们这个GSS啊,这个SSSC,这个GSSSC表示Java啊,Java对象对吧?那我现在呢,那如果要需要它的话,那么这个时候首先第一件事是吧,你得给我查参数,那么这参数都有哪些呢?一个是SSC,这个是不是就我们SPA stream context啊,Stream contact对象啊,对吧,那这个呢,我也。
02:00
创建一下对吧,New一个stream contest对吧,Stream conest,然后接下来那么咱们在new的时候,这里是在指定一些东西啊,一个是我们的康吧,对,那么另外一个是不是有个周期啊,对吧?那么咱们这个呢,是我们这个second点对second对来给他传一个短的时间,对,然后接下来到这里呢,没有我现在先用一个Spark com对这样的一个配置,对吧,然后接下来在这里点VR对吧,这个呢是配置文件,然后咱们需要指定什么呢?两个东西,一个是APP内,那么另外一个呢,是我们这个master是吧,另外一个master,那么咱master呢,Local给了几个线程呢?四个就行是吧,行是吧,也行是吧,四个就行是吧,因为我现在我目前我的卡卡,它的一个分居数是不是四个呀,对吧,那我现在呢,开四个线能处理了对吧,四个线处理了对吧,然后接下来那么咱们现在呢,在咱们这里这个名字。
03:00
啊,这随意对吧,叉叉叉,对,我现在这个写代码我都是不运行,我是封装方法的目前对吧,然后接下来那么咱们在这呢,我就创建一个我这个啊这边刚才对象,然后接下来咱们现在把它传过来了啊S有了,那么第二个参数叫什么来着。叫做一个我们的位置策略啊,叫位置策略对吧,那么咱们的位置策略,当时我我给大家讲的时候,对吧,咱们这会怎么指定的。是不是通过它的静态常量调调它的一些我们静态常量去指定的,这里面应该有一个叫location,然后呢,然后咱们的策略对吧,然后这个G后面是不是得把Y变I加ES呀,对吧?然后接下来这里边那有一个咱们用的是哪个了呢?就是第一个是吧,那么咱们现在再带大家去回顾一下,对吧,就是我当前呢,在指定咱策略的时候,对吧?那分别表示什么含义,那么如果说我现在呢,这个是prefer broker的话,那么这个呢,一般是不是咱们现在在同一个节点上呀,对吧?然后接下来就是什么时候用咱们现在这种策略,就你能够保证啊,当前卡不卡和你现在isor他俩能够在同一节上,在这个东西咱一般保证不了,除非你就一台机器,对吧?这个咱可以保证对吧?但如果说我现在集群的话,这东西咱不好保证吧,对吧?那么我现在呢,除了它之外,那么这块呢,咱们这prefer consist它呢是大部分情况在使用这种方式,那么它的策略是什么样的,是不是按照咱们这个距离呀,对吧,按照咱们的距离对吧,来对吧,那看一看是不是同一个进程对吧,然后呢,是不是同一个我们的节点对吧,是不是同一个机架对吧,是这样对吧,然后再往下呢,这个是不是相当于固定的呀。
04:50
对吧,哎,哪一个啊,我们现在对吧,这个主题的哪一个分区交给哪一个机器上给个进行处理,对吧,直接给指定好对吧,这个呢,一般咱们也不怎么用对吧,那咱们现在呢,直接就使用往下这种方式,然后接下来除了我位置策略之外,那么再往下这里是不是有个消费策略,对吧?这个消费策略是什么来着?来咱们现在呢,在咱们这里啊,然后接下来这里咱们再选呗,哪个。
05:26
就咱们这RA是不是可以啊,对吧,要不然你现在呢,我指定对吧,那个我来消费谁,要不然呢,咱们就订阅一些对吧,来看一看我消费谁对吧,就咱们S呀,就是表示什么,表示我当前的我可以消费咱们某一些咱们这个主题的数据,注意啊,不是一个对吧,你看连接的参数对吧,你不管是aable还是咱们这connection是不是其实都是一些呀,对吧,所以说我现在呢,我完全可以把咱们现在这个的数据放在哪呢?放在一个数组里边,对吧,来放数组里边,比如说这里边有什么呢?对吧,比如说我当前有个主题,这个主题呢叫叉叉,是我现在去消费它,然后另外一个,那么咱们的消费数据的时候,你的卡卡数据它的类型是什么,K是什么类型的,得Y什么类型的,这个得指定一下,然后除了这个之外呢,那么在这里啊,它还要传参数,这个参数什么,是不是咱们卡夫卡呀,对吧?哎,咱们需要呢,去传一个我卡卡派,但是我这卡卡派目前没有吧,到这里放的什么是不是就放的时候卡夫卡配置信息呀,好。
06:27
好吧,那这样的话,咱们现在啊,在这里关于我们现在呢,它的一个我们这个从卡卡里读取数据对吧,其实是不是就咱们这几行代码呀,对,就是通过SPA streaming来从卡卡里读取数据,对吧?其实就这几行代码对吧?那么如果说把这几行代码当没问题的话呢,那么接下来我们就开始封装工具类,这个工具类的封装呢,这个我就不一个人写了,对吧,咱们直接呢,到这里把我这个代码拿过来,首先呢,第一个对吧,那我现在呢,在咱们这里边对吧,我把这个东西我封装了一个我们现在这个方法,这个方法作用干什么呢?或者把这功能什么呢,叫get卡stream对吧,你给我传一个我们topic,这个topic放在哪了,是不是放在咱们刚才这里边呢,我要去的消费哪一个主题,然后呢,另外你给我传SSC,对这个SC就是我们SPA streaming contact stream contact对象也需要你给我传过来,对吧,然后接下来,那么咱们现在这里边需要什么呢?我是不是需要卡它的一个参数啊,那目前没有,没有的话呢。
07:27
啊,咱们在这里把我们的卡夫卡它的一个消费者的一些参数给它拿过来做一个我们这个对吧?这个配置我在这里定义个什么呢?我定义了一个我们现在它的一个mapb集合,大家注意当前mapb集合的类型,这是什么,是Java的还是SC的SC的,而且我定义的什么,我是不是定义的是可变集合呀,对吧?可变其意味着什么?我是不是意味着我可以通过咱们这个map集合,然后进下来key的形式对吧?来对它呢?对我这个这个这这个集合里的Y就直接修改呀,对吧?那么另外一个呢,同学们,我这种方式啊,其实写的不太好啊,就咱们现在在这里,你先把这东西导进来卡复卡对吧?那目前呢,我这种方式其实不太好,对,就咱们我在讲咱们SPA training的时候,我呢写这种方式是好的,对吧,那么如果说你现在要写的话,那就可以这样什么呢?咱们这个consumer对吧,Consumer有一个conig对吧,它里边。
08:27
有一些配置,比如说这里面有什么呢?有我们这个boot stripp server con,那这块呢,那其实你应该放的是它啊,这块呢应该放在它,然后咱现在呢,这个k server,那你应该从咱们这配文件中把这个东西拿过来,对我就不一个个改了,对你从咱们这里边你可以拿到咱们现在比如说你看key对吧,说这这这个是我序列化方式啊对吧,然后呢,还有比如说group吧,这个呢也有对吧,它呢到时候对吧,你看它指定的东西就是格点ID嘛,对吧,你看咱咱咱咱们现在这里面对吧,这个东西是不格ID啊对吧,像这会其实我这个写法不好,同学们。
09:07
对吧,如果要什么的话,你应该通过常量的方式对来写的,对吧?从常量方式来写它啊吧,我就把这个给大家说一下,对大家说一下,然后接下来那么像这些啊,大家看咱们这里都设置哪些我们现在这个属性的呢?哪些参数的呢?第一个这个是我们连接卡不卡它的一个的地址,对吧,地址,那地址咱们现在这里在哪可以获取,是不是在配置文件里面可以获取啊,那么如果在配置文件里获取的话,咱们是不是应该从配置文件里面读出来,所以说呢,那么在咱们这里边,我呢要去调用我刚才写的这个工具类对吧,这个呢,我直接拿过来了,同学们对吧?首先咱们现在呢,这个方法其实咱们刚才我已经调过了,那你想刚才在测试的时候,我说我这个大码已经写过来了,对吧,所以说呢,那么在这里我通过我这工具类,然后呢,去加载我配置文件,得到一个property对象,然后通过property对象它里边的get pro方法来拿到卡broke list它的一个配置,那么拿到它之后呢,那么咱们把。
10:07
它作为值来复制给我们当前的这个的这个属性。对吧,然后第二个呢,就是K和value,它的一个我们序列化方式对吧,其实我都是字符串对吧,然后这个呢,是我们的个ID对吧,默认呢,咱给他一个值,对,就是你消费的时候对吧,你的一个消费者组是谁对吧,比如说叫寂贸0523,这个是我的默认的消费者组,对吧?然后接下来那么这块呢,它有一个什么呢。叫做all to outside re receive什么意思?它表示的是我在消费的时候呀,那么咱们从什么位置开始消费对吧?那么它的默认就这会你可以不用配同学们,它的默认值什么默认值是不是就咱们这个latest到默认值是不是就咱从最新的位置开始消费啊?如果说你要想把咱们当前主题的所有数据给拿过来,这里取值应该是earnliest对吧?应该是earnliest对吧?然后接下来,那么这还有一个什么呢?还有一个是我们这个enable all to commit,这是什么?这个是我们当前这个消费完之后,它是否会自动提交偏移量对吧?默认呢,是处自动提交,但是我们这个提交呢,注意并不是说你设置处它这个消园就马上提交了,它是什么呢?它是过一段时间等于如几秒钟之后,然后呢,它就有消的偏移量啊来进行提交,我们现在把它设置force,注意啊,我暂时先不回顾的偏移量,因为我们现在重点在我的项目里边,要给大家讲咱们的偏量都是怎么来管理。
11:35
因为我们这里是不是有一个精准一次消费的问题啊,对吧,你要想实现精准一次消费,那么咱们现在这个偏移量,你就不应该让咱们这个系统去维护对吧?应该是你去维护这便宜量对吧,你现在你做完你这个处理了,到时候你把便宜量给改一改,对吧,应该是你去做的对吧,应该是你做的,所以这块呢,这两个首先你暂时呢,设不设无所谓,对吧,我也把它放在那默认值的就是它,然后咱现在呢,这个偏移量把我设force对吧,到时候咱们手动去维基维护我这个偏移量好了,那这样的话,咱们现在呢,第一个啊,这个方法OK了,那么我这块呢,我没有带他整体把这代码一行写过来,对吧,然后呢,这块大家能不能接受,对就整皱现在啊,到目前为止,这些这个方法的封装能接受吗?
12:19
有没有什么问题,同学们?行不行,现在对这个这这个没问题吧,没问题的话,大家注意啊,那为了去扩展我们现在的当前的工具的功能,对吧?有的时候啊,那我现在在消费的时候,对吧,我不想用你现在这个消费者啊,我不想用你当前的消费者组,对吧?那么这个时候呢,咱们可以对我当前的功能呢,来做一个扩展,所以说呢,提供了第二个方法CTRLC,然后呢,接下来CTRLV,注意啊,咱们现在大家看一看这个方法和咱们刚才上面这个方法比较多了一个什么,对吧?来看一看对吧,和咱们这个上面这个方法比较对它的差别在什么地方。
13:05
多了一个什么东西,是不是就是多了一个消费者组啊,对吧?如果说我现在只把主题和SSC传给你,那么这个时候呢,你在消费的时候,你用的是谁?你用的是我们的默认的消费者组吧,对吧?但有的时候我现在呢,我消费不同的一个主题,我希望呢,他有不同的消费者组,是吧?那这个时候呢,你也可以把消费者组传给我,那么如果把消费者组传给我,大家想一想,咱们什么时候会用到消费者组,是不是在我们这个卡卡的拍参数里边呀,对吧?那么这个卡卡拍是什么呢?我特意是把它定义成我们现在的一个可变集合,对吧?那我是不是可以把当前这个mapb集合里边它的一个消费者组给它改一改,所以说如果你给我传咱们消费者组的话,那么首先第一件事我干什么了,我就是把咱们当前这个消费这个这个参数中,他这个消费者组对吧,给他改一下,改成谁,改成你给我传过来一个值。改成啊,你给我传回这个值对吧?然后接下来那么咱们现在呢,调工具类,然后调它的方法,那么最终给你返回了一个对吧,这个离散化流对吧?那么咱们如果消费卡卡数据,它返回离散化流,注意啊,它里面放的是什么?是不是咱们封装的一个consumer car对象对吧?那么consumer卡对象它其实对我们的卡卡数据的一个封装,大家想一想,其中这里边有建设,对,有一个K,有一个value,我们真正的值在哪的呀?是不是在value这里边,K是用来干啥的呀?就是帮咱们用来做分区的吧,对吧?哎,其实没啥用对吧,咱们到时候这个操作的时候,我现在从卡里取的,其实我这值在哪了,就在value里边,对吧?这是我第二个方法,比第一个就多了一个格YD,那么接下来那么再往下走,同学们对吧?这个方法呢,对吧,大家呢,也先去看一下,到也看一下这个呢,到咱后面会详细再说对吧?这个方法其实呢,也容易理解,大家看它和我们现在呢这个第一个方法比对吧,那么它又。
14:59
又多了些什么呢?大家看这个是不是有格白D,好像多了个这个东西对吧,我们这个呢,去对吧,啊就导进来了,大家看一下这多了一个啥呀。
15:13
你说这整体这这这一个这个类型的数据啊,同学们你看一看整体这个类型的数据,我们说啊,我要想保证咱们精准一次性消费,那么这个时候呢,我得去自己维护我们这个便宜量,对吧?那么如果自己维护便量的话,那么你下次再读取的时候,你是不是应该从你自己维护的偏量的位置来开始读取啊,那我在留偏量的时候,那这个偏量读包含哪些内容呢?是不是应该我现在每一个主题,它的每一个分区,现在的偏量到哪了呀?所以说大家看一看,咱们当前我现在传一个参数是mab集合,Mab集合里边呢两个内容,一个是key,一个是value key是什么?Key你看这名字啊,是人家给我封装的一个类,这个类叫什么呢?叫top partition,那你想一想,它这里面肯定包含两个内容,最少包两个内容,一个是主题,那么另外一个说分区啊,然后这个浪类型是什么呢?就是这个主题的这个分区它的偏移。
16:14
到哪了?如果说我当前topic a里边有一个零号分区,现在偏量已经到30了,那我下次再去从卡卡读取数去的时候,我是不是应该从那30开始读取,对吧?所以说你现在可以把谁给我传过来了,把偏量给我传过来,那么如果给我传偏量的话,同学们,那么咱们现在在这里我呢,再去读取的时候,除了我要传我们现在这个主题,除了传递参数之外,那么还要传递什么呢?传偏移量。表示的是我要从指定的偏量位置对吧,来读取数据啊,表示的是我要从指定的偏量位置来读取数据啊,在咱们这里它表示的这个啊干什么呢?要从我们这个指定的偏移量啊位置来读取我的数据,这个偏移量到底怎么维护呢?这个应该是咱们自己维护啊,在讲咱们这个精准仪的消费的时候,我还给大家说对,到时候我会把这个卡卡呢也给大家回顾,对吧,这个呢,对吧,是在咱们这个对我们这个卡卡啊,这个在对卡卡数据来进行我们这个消费的时候,那要干什么呢?要指定啊,指定咱们这个消费者组。
17:33
指定消费者组对吧,那上面这个呢,就是最简单的是不是使用的默认的咱们这个消费者组啊,对吧,它其实使用的就是我们这默认消费者组对吧?使用默认的消费者啊,使用默认消费者组好了,那这样的话,关于我们现在啊,这个卡卡的U完事了啊,关于我们现在卡法工具的一个封装这个就完事了。来,停一下。
我来说两句