00:00
接下来是获取数据啊,真正的去那获取数据是不是此时我们要连到那个leader去获取数据啊,对吧?哎,同样的也是用的那个simple consumer啊simple consumer啊这个类来去具体的获取数据,而且调用的是那个什么fetch方法吧,对吧?OK,那既然说这个地方我们是不是要先要来。在获取数据的时候,是不是先要找一下他这个领导啊,然后连上他这个领导啊,那找领导他里边要这些参数,这些参数是不是你必须要在这个方法外面给他传进来呀,哎,所以说像找领导的这个。CTRC,我们是不是每一条数据每一个都要啊,啊,一个都漏不掉,OK,除了除了这些之外,要注意我们还要添加一个什么。我们要获取数据是获取指定topic,指定分区,而且要指定。Offetle,对吧,Offet,所以说你要获取数据的时候,你要把off传进来吧,浪漫型的一个offetle。
01:03
啊,这个值要传进来,这个值要传进来,OK,然后find leader布斯port,然后这个什么掏一个,还有这个partition,真的返回值是这个leader,这个leader。这个leader可能为空吧?假如说他全挂了,可能会过,那我们如果说他什么leader。等等云档的时候,我们直接就返回来,就不不去获取什么数据了,可以吧,做了一个小小的一个安全因和校验是这样,这个是获取内。分区leader,分区leader OK,那有了这个leader之后。我们是不是同样的区域用一个什么simple consumer,然后里边同样的要传什么,它传一个的host。但是这个leader是什么?
02:03
是broke and point类型不对吧,你不能直接把leader传进来吧,所以说呢,我们要把这个leader要进行一个转一下,转一下它有一个host点。Leader点我看一下啊,这个host它的一个返回值是对吧。呃,除了这个host。Host啊,拿到这个host返回值是一个string,这个应该叫read leader host,然后我们第一个参数是什么。就是你像此时我们是不是就找到零号分区那个leader,然后把它传到传到这个simple里边,那你接下来连的是不是零号分区那个领导啊啊零零号分区那个领导,OK,然后接下来第二个参数port。第三个什么一个延时对吧,1000,然后还有一个1024乘以四,然后还有一个这个客户端的一个ID啊,一个唯一ID,那这个地方我们就叫get。
03:08
Data get data OK?那这个一个消费者就是获取数据的消费者对象啊,获取数据的一个消费者对象啊,接下来我们是调用那个之前所说的调用什么方法呀。饭吃吧。调用这个fetch这个方法啊,调用fe这个方法OK,那这个fetch里面你看一下它也要一个什么。也要一个什么请求,那他同样的也是要发送一个请求过去,然后呢,获取啊类斯的阿类似的OK,那这一块呢。还有一个请求,你来,你直接扭一下这个请求看看。对吧,加假如说加篇。里面东西一个都看不懂对吧,就是之前一个都没接触过吧。
04:06
一个都没接触过。而且它也是。SKY拉啊,这一块呢,它提供了另外一个方法。它返回值,它通过这个什么fetch request build来构建的。Request build build点。爱fetch啊,爱fetch就添加你要抓取的内容,你看一下这个这个参数能看懂了吗。就比刚才好多了吧,哎,还有另外一个方法,Topic,然后是partition。然后那传进来吧,最后还有一个什么。Fetch size,也就是说你一次抓取的大小对不对啊,在这个地方定义的啊fe size,那它是一个int类型的,那大家猜一下,它应该是指的是。
05:00
它是一个int类型的,是不是大家一一方面想到的,可能是抓取了多少条啊。对吧,抓取了多少条这块不是的,它其实还是那个缓存大小。缓存大小,所以说这块不要不要看它的一个参数的一个什么类型误解了,那这样我们可以试一下,首先我们second零号分区,等会我们可以再往零号分区多插入一点数据吧,我这个地方写什么呢?写100。我写100,等会我们可以测一下,看能不能把所有数据抓出来吧。就目前来说,我们什么。Second的那个零,Second的那个topic零号分区里边有100条数据吗?应该没有吧,啊,没有,等会我们可以从零开始消费看一下啊,总共有多少条数据啊,我们这个地方先写100啊,先写100,因为它是int,因为int这个东西给我们的感觉就是。它就是一个什么,就是条数啊,就是条数,但实际它不是条数,还是字节数啊,这个地方实际还是字节数,就是类型一般呢,像这种它字节数大小一般都是用long类型定义的,对吧,我们看类型就可以,但是这块啊,就比较一个特殊的一个地方啊,需要注意的要注意的OK,然后这个地方是用这个build来构建的。
06:15
你看它返回的是一个什么。Request request OK,把这个request给他。啊,它返回的就是一个什么response response,那这个就是创建。获取数据的对象,创建获取数据的一个对象,OK,然后这个地方。就是获取什么。数据返回值。啊,返回值OK,然后接下来我们就解析这个返回值了吧,对吧,解析。这拿着这个饭吃。Response,调用一下,看有没有什么方法。
07:02
它里边有什么。什么e code等等不太像对吧,第一个是什么。就是message的一个set集合,对吧。啊,Message和set集合,OK,这个就是真正来解析这个数据的啊,解析数据的,那这个里边还要传什么。就很奇怪,是不是就明明前面我创建那个什么获取对象的时候,我传了topic。传了帕,你在解析这个值的时候还要传这个东西。对吧,这个地方就很奇怪。其实不奇怪,是这样的,来看一下啊,这个地方还可以点爱。泛起。还可以干什么?点at的fe,就是说这个东西啊,这个request请求当中是不是可以包含多个partition,多个topic呀,那你真正在获取数据的时候,你要指定你要解析的是哪一个,你传的哪一个吧,那这个地方你传一个topic,这个地方传一个TOPIC1 topic2可不可以啊,就是一次性呢,这是不是挖取了很多很多的类型的数据啊,哎,不来自于不同的topic,来自于不同的part,所以呢,你在真正解析这个返回值的时候,它还需要你做一个什么?
08:24
啊,Topic partition,你具体要要的是哪一个主题,哪一个分区的数据啊,是这样的啊,所以说单独如果说只看一个at的时候,是不是觉得它很奇怪啊,啊其实是他同时这个请求当中呢,可以放很多的一个主题,很多的一个partition啊,很多的partition OK,那这个地方呢,我们就把什么topic。Part啊,给它传进去,得到它的一个返回值message and offsets,那我们就知道了,那这个message里面呢,有value有offset吧,对吧,因为它的名字就叫message and offet,那解析的话,我们到时候打印打印OK。
09:05
好,然后这个我们看一下message and点。It特对吧,那就正常了,因为我们一次就获取的时候,不可能获取一条数据吧,有多条,所以呢,它也是一个集合啊,它也是一个集合,那我们调用它这个迭代器,或者说直接用负循环看能不能行,用那个增强或循环,对吧?啊,这个也是可以的,它这个也是可以的,那这个东西才是真正的一条一条的数据是不是。对吧,哎,到到这个地方我们才真正拿到了一条一条的数据,OK,我们看这里边有什么东西啊,Message and outside点。Of message是它的一个信息吧,Offset呢?是它的一个平移量啊,是它的一个平移量,OK,那平移量放在这。好的一。因为我们之前自己传了一个什么O塞进来,对吧,所以啊O31,然后我们看这个它这个Y6怎么的点应该是在这个message里边吧,对吧,应该是在这个message,因为它的名字就叫message点。
10:13
那这个message或者说直接看一下吧,它什么类型类型。很奇怪的类型,对吧,你之前没有处理过这个类型,你直接打印。应该不行吧,对吧,啊点。嗯,然后看一下它里面还有什么呢。转化成了一个什么?一行一行的数据,还有b check sum做校验,对吧,哎,等等。还有。Is,哎,可不可以用啊,可不可以用等等啊,这地方怎么解析呢,是用这个东西,它要通过。Bed buff bed buff啊,这个东西来做解析的啊,来做解析的,然后这个地方你没办法直接调用它的一个buff对象,然后给它转成4G,你只能用这个,然后干什么呢?又一个。
11:11
Bad的数组,这个bad的数组的长度呢,就是这个。点。Limit啊limit它这个长度获取一个自间数组,然后拿着这个字点干好啊,传一个空的字间数组进去,它就把这个数据啊放到了你这个字节数组,然后这个东西。这个BY数组转成string会转吗?当前这个摆的速度会转成4SP吗?可以转吧,然后so,然后我们打印一下什么呢?这个off set。一然后加上。嗯。然后加上又一个什么。STRNGC,然后把这个best数组给它扔进去的数组给它扔进去啊,把它这个BA的数组转化成了,转换成了那个是因为这样的,因为它底层是一个序列化,正常的,我们写高级API的时候,是不是传了一个什么序列化的类进去的呀。
12:16
对吧,哎,它里边是怎么做的呢?也类似的一个,哎,类似的啊,源码里边,因为我们这个地方没有自己,我们是不是要类似于自己做它的一个反序列化的一个过程啊,啊,我们用这个be数组来做到将它的值取出来,哎,放到这个BA数组里边,最终呢,我们将这个BA数组形成了这个three啊,形成了个string,这块你要注意一下,你要注意一下。啊,这块整个的一个操作操作。要注意一下啊,OK,这个地方才是,嗯。没事是吗?有什么事咱们下课聊。啊,这块之后就是对于每项数据进行一个打印吧,啊进行一个打印这样的,那如果说现在啊,我们这个获取完了啊,获取完了你要是要想类似于之前的我们高级API一样,不停的就获取怎么办?
13:12
外销处,但是我们这个地方外销处没有什么意义,为什么呢?因为我们每次都是从那个什么,我们传进来这个opposite来获取的吧,哎,传进来ET,那你真正要想做成跟那个什么。高级API一样的一个效果的时候,那你在这个地方把这个拿到以后干什么事。给它保存下来吧,给它保存下来,那下一次在创建这个请求之前,是不是先把那个off set获取到,是这得添加一个获取off,然后在这儿得添加一个什么保存off,能理解吗?啊,保存off,至于这个offet,你保存到z kable,保存在本地文件,保存到什么其他的一些HDFS都无所谓啊,保存在啊都可以,只要能存数据的地方都可以帮你保存这个off。
14:06
对吧,那你这样就比较灵活了,不不一定就是说这个consumer不完全的依赖于什么cable了吧,你可以选择随便你要用其他任何一个保存数据的一个框架都可以啊,都可以OK,那我们再把这个。保存数据。就是获取数据这个东西啊,梳理一遍,这个应该是便利。并打印啊变成打印OK,然后首先呢,我们在获取数据之前,先拿了一下我们这个分区的一个leader是谁吧,然后针对这个leader干什么。创建了一个获取数据的一个symbol symbol哎,它的一个消费者对象,这个消费者对象是连到具体某一台机器的,某一台机器的,是一个很底层的一个类,之前也提过,那此时我们连的是Li这台机器是不是啊?然后我们去获取数据,获取数据之前分析过。
15:02
要用那个范这个方法。Fe,这个方法OK,这个Fla呢,它里面又要一个什么。Request,那我们是通过这个在request builder来构建的,它整个的,而且我们提了一下它这个里边是不是可以添加多个。Fetch对象,哎,At fet你进去啊,添加多个这个at fet at fetch配值,那所以导致我们在这个地方去解析这个值的时候。解析这个值的时候干什么,他还是要你干什么。传一个topic给一个partition进去,就是因为这个地方我们是可能写多个的啊,写多个的,所以说这个地方它才要求你传一个固定的值,然后你对这个值进行一个解析啊,进行一个解析,那最后的一个解析呢,是我们这个是一个bad b啊,这个类型我们不好处理,我们把它转成了那个带字节数组来给它进行处理的啊,转成字节数组来进行处理的啊,就是因为我们没有那个序列化的一个类啊,所以说这个序列化的一个类似的过程,我们只能模仿它那个反序列化的过程吧。
16:07
哎,只能模仿它反序列化的一个过程,是这样的,OK。这是获取数据的这个方法。
我来说两句