00:00
呃,那么咱们呢,现在已经是吧,可以从我们这个red里边来获取到我们这个偏移量,对吧?那么接下来咱们呢,再从卡夫卡里面消费的时候呀,那这个时候是不是就是得从咱们这个ready来获取了,对吧?所以说呢,咱们现在来到我们这个daup,就是咱们日活的程序里边,然后接下来呢,找到我们现在这行代码啊,这行代码原来干什么呢?原来是不是从啊我们这个啊吧,直接是吧,就是从我们这个最新的位置啊开始读取数据了,现在不行对吧,现在干什么呢?现在啊,在消费卡法数据的时候,那么首先它要获取一件事,对吧,是不是得先从我们red里边来获取便移量了,是吧?所以说呢,咱们现在第一件事,从RA中来获取我们现在的这个卡卡,它的一个偏移量啊,或者分区偏移量啊,偏移量,那么怎么来获取呢?咱们有一个这个工具类啊,咱有个工具类叫outside。
01:00
啊,Measure u,它里边有一个方法,这个方法呢叫get outside,传两个参数进去,一个呢是topic,另外一个是我们的格白D啊,另外一个是我的格鲁白D,那么把它两传去之后呢,那么接下来它返回的啊,是一个我们这个脉部集合,短反mapb集合,这个呢,就是我们现在它的一个opposite map。啊,这个其实我们天量的一个mapb集合,然后接下来,那么如果说啊,同学们,咱们现在第一次的去从卡卡读数据啊,以前我从来没读过,那你说从re里边会有到的偏移量吗。没有吧,就你以前从来没读过,没有消费咱们这个主题,对吧?那咱们现在专利肯定不会记录的,对吧?所以说呢,那这个时候咱们拿到的这个P量的map集合是不是其实就是空的呀?所以说呢,那咱们需要对它做一个判断,如果什么呢?如果说我们现在这个outside map,它不等于空,对,那么并且呢,咱们现在这个oppositeset map,点我们的一个size,它的大于零说明啊,咱们现在这个从red里边已经拿到便宜量这个信息了,对吧?那么这个时候咱们呢,再去读数据的时候,是不是可以这样把MY卡卡YouTube,它其中有一个方法叫什么呢?叫get cover stream,然后传这么一个参数进去,一个呢是主题,你要从哪一个主题里面消费数据,那么另外一个呢,是我们当前SPA stream的这个stream content对象,然后接下来那么第三个是不是咱们可以把我们这个outside map作为参数给传过去啊,对吧,把outside map作为参数。
02:43
传过去,那么第四个格式排D对吧,把这个东西传过去,然后接下来,那么它传上去之后呢,那么咱们在这里定一个变量,比如说这个叫record record dream。
03:04
好了,那么咱们现在呢,这是关于我们现在啊,这个如果他ready是不是有啊,那么咱们通过这种方式呢,来获取我们数据,但是大家想一想,那如果说咱们现在软没有,你第一次去从卡发消息数据,那怎么办?你是不是还得从咱们这个最新的位置开始读啊,所以说咱们现在这行代码是不是也不能给它丢掉了,所以说呢,在这里加上一个else对吧?所以如果说咱们red里边有偏移量,那么这个时候呢,从指定的偏移量位置开始读取,注意啊,如果咱们RA中存在当前消费啊,消费者组对该主题他的一个我们的偏移量信息,那么从我们这个指定的偏移量位置开始消费啊,开始消费。
04:04
对吧,那么否则的话,对吧?那么如果说啊,在咱red中没有当前消费者组对于我们现在的这个主题的偏量信息,对吧,没有是吧?那么这个时候呢,还是从我们这个最新的位置开始消费是吧,还是是吧,按照我们的配置,从我们这个最新啊位置开始我们这个消费是吧?那么如果从最新位置开始开始消费的话,是吧?那么咱们现在呢,把这段代码都拿过来,大家想一想啊,这个名字呢,我改一改,把这个名字我改一改,你不管是我们从指定的偏量位置开始消费,还是从我最新位置开始消费,那么其实你的目的是不是都是从卡卡拿数据啊,那也就说我现在打的其实都是一个我现在这个地嘛,对吧,那你是不是应该下边,如果要用的话,你把它放在这,放在咱们这个局部变量是不太合适,所以说呢,应该把它给提取出来啊,在这里我们应该去把。
05:04
把我们当前的地子tri给提取出来是吧,一开始呢,是我们的空的,然后接下来,哎,如果说我现在呢,在我们这里是不是有,那么咱们给它来做一个赋值,对吧,因为我现在要改变它的值,所以说你VR修饰不太合适啊,把它改成VR对吧?然后下面这块呢,那也是一样的,对吧,下面也一样的。好了,那咱们现在呢,从卡卡里读取数据,这个操作对吧?和原来不太一样了,对吧?那么咱们再判断一下啊,Ready有个保没保存偏移量,如果保存偏移量的话,那我要从指定的偏量位置开始消费,如果没有保存偏移量的话,那么这个时候呢,我才从最新的位置开始消费,那这段代码大家能不能理解对吧?那么如果这块可以理解的话,同学们,那么接下来咱们继续啊,再往下走,那么再往下走呢,这块的测试呢,那我不做了,对吧?这个测试呢,不要干掉了,然后接下来,那么咱们现在呢,要开始对它做一些结构的转换,那么在结结构转换之前,我问大家啊,这个东西,你这个数据是不是相当于从卡不卡读到了,那么读到数据之后,其实你在干什么,是不是相当于这数据已经消费了呀,对吧,那么如果消费了,假如说同学们我原来卡不卡里边呢,有这么几个内容对吧,有这么几个数据的这么几个数据,这个数据的偏量分别是的0123,然后接下来你当前采集周期里边一下子把这几个数。
06:26
句,然后读到哪了,读到我们SPA stream里面来了,对吧?那我问大家对吧,那你说我当前读到的东西,我能不能知道我是从卡不卡哪一个,我们现在它的批量开始读的,那么读到了哪结束的,应该可以吧,就你现在你看咱们要从卡卡拿数据对吧?那么接下来我现在想干什么的,我想知道当前咱们现在采集周期中对吧?那么你读到的卡法数据对吧?那么从什么位置开始,以及呢,到什么位置结束。应该可以获取这个内容,那么你获取这个内容的目的是什么?大家想一想,从什么位置开始,还好到什么位置结束,是不是其实就是我们现在新的篇音量,你不光要从专里读,那么等你下面这些逻辑处理完毕之后,那么你是不是还得把咱偏移量给更新一下呀,对吧?所以说呢,那咱们现在在这里我要做一件事情,注意啊,这一件事你必须在读到卡法数据之后,然后呢,当它里边类型是它的时候来做,为什么我一会儿给大家说原因,注意啊,接下来我要干什么呢?我要获取啊,获取当前我们这个采集周期啊,从我们这个卡夫卡中消费的数据,它的一个起始对偏移量,呃,起始偏移量以及呢,咱们这个结束偏移量啊,它的一个值啊,偏量值,那么怎么来获取呢?要注意咱们其实呢,完全可以通过我们这个RDD来获取。
07:56
啊,通过RD来获取对吧,那大家想一想,我们现在从卡不卡对吧,这里边来拿数据,那本身呢,拿的数据的封装成以什么呢?说封装成一个叫d stream这样的一个离散化流啊,那么离散化流它的内部是不是对RD做了一个封装,对吧?你们都接触过哪些RDD。
08:17
你们肯定学过Spark对吧,然后你们自己创建RD,那么常见的RD你们都借助哪些呢?Map part对吧,就咱们现在比如说我要执行什么map呀或什么的,那应该底层创什么mapd,如果有沙的话,肯定创建什么沙,对吧,然后还有什么我现在GDBT操作什么GDBCRDD,如果说我现在呢,是我们现在是我们的一个Spark的话,对吧?如果说我现在Spark读谁呢?读卡夫卡的话,那么这个时候它里边什么呀?它里边是我们这个卡夫卡B对吧?这个不需要吗?同学们在这里RDD对吧?那接下来它这里是不是一个抽象内啊来CTRLH对吧,有这么多呢。
09:01
对吧,哎,有这么多RDD的对吧?这原来你们在操作的时候,我现在想通过咱们RD直接操作my circle口,那就可以JDBC,对吧,然后接下来那么咱们现在呢,在咱们这里边对吧?诶我在创建的时候对吧,那有时候卖partition,就现在做做业些转换的时候,其实这个然后还有什么呢?还有一个是我们现在这个沙法D对吧?那如果说我现在呢,要读取我们的卡法数据的话,他本来数据呢,其实底层啊,它封装的是一个卡塔D。啊,是个卡法RDD对吧?哎,那么咱们现在呢,注意我要想去获取这个内容的话,那怎么获取,那我要通过咱们现在的这个RDD来获取,而且这个RD只能是卡法D才有,别的没有,你想想我现在如果自己随便创建一个迈法的RDD,我想去拿什么偏移量,拿什么起始位置,拿什么起始位置,有这些消息吗?没有,比如说我GDBC,对,哪有什么偏移量这个这一说法,对吧?比如说我就是一个普通的咱们现在这个粒子集和转化RD哪有种偏移量这个说法没有,是不是卡法才有咱们消息的偏移量啊,对吧?所以说呢,那么你要想获取这些东西的话,那么你必须得在这里,就是还没有做转换的,如果说你先把它转换成这种结构了,主要是金层了,那么它底层封装RD你是获取不到的,对吧,只有咱们刚才读数据的时候,它底层是我们那卡卡RD,那么这个时候咱们才可以获取数据对吧?那么你获取数据当家想想对你后面的业务应该是没有影响的吧,你是不是只是把它当前从卡卡里读到的数据,对吧。
10:31
那么咱们现在它的一个起始和偏量对吧,这个达到,但是我该怎么走流程是吧,它怎怎么走流程啊,也就是我不会去改变的RDD的一些什么数据的这个值什么的,对吧?所以说这个时候呢,大家想一想,我要想不改变成它的值,然后呢,还得把这个数据获取到,那这个时候咱们应该去调用算子谁呢?就是我相当于我只是把这个RD的值取出来,对吧,但是呢,我并没有对做一些什么改变什么的,对吧?那你说我现在用哪个比较合适呢?卖吗?诶注意啊,你想首先第一件事你是要把RD拿到呀,如果拿到RD的话,你肯定选transform,或者是我们这个for r DD等,那么这两个咱们现在还不到提交的时候呢,那首先我是不是选的谁,是不是咱们这个transform呀,对吧?那么如果transform的话,那么这时候它需要给你传到一个参数,这个参数谁是不是就咱们转换RPD呀,那么你给他返回谁呢?我给他返RD,其实我不对你做什么操作啊,其实我就拿东西对吧,这个RDD该是什么结构,你还是什么结构,我还给你转回,还给你返回,只不过在咱们这个返回之前。
11:33
啊,在反回之前,那我想从咱们这里边来获取一下咱们起手的结束偏段的位置对吧?哎,那么怎么获取呢?那RDD点大家看我现在这里边啊,想获取起始和结数偏移量对吧?你找钙的如下的方法呗,对吧,发现这里边没有,这就是RDD的操作,这个东西都没有对吧?那怎么办?注意啊同学们,我说这块呢,一定要使用我的卡法RD是不是才可以啊,那么这卡法RD长什么样呢?那么咱们现在来看一下卡卡RDD它长那样,那我发现它本身在咱们这里边是什么修饰的,Private private修饰的咱们内说明别人地方他听一下访问还访问不了对不对,什么私有的只能在咱们内内部就访问对吧?然后接下来注意看同学们咱们现在卡不卡RDD啊,它里边继承了,它是不是继承了RDD,那其实相当于混入我的RD特质吧,混入log的特质,同时混了一个这个特。
12:34
是这个特质叫什么呢?看看同学们叫什么叫做has of outside range对不对,对吧,咱们现在一看,哎有什么有偏移量的范围,我现在想拿什么,我是不是就想拿起车起色位置是不是就是偏移量的范围啊对吧?所以说呢,那这个时候,那你那得对想办法做处理了,注意啊,咱们现在呢,在这里我之所以可以处理,为什么?因为啊因为啊我们当前呢,Record d stream啊stream它底层封装的是我们这个卡卡RDD对吧,那么咱们当前卡RDD的它混入了咱们这个特质啊这个特质那么可以的,那么提供了什么方法呢?这个特质中的,这个特质中提供了可以获取我们现在的这个什么可以获取偏移量的范围的方法。
13:39
啊,可以获取偏量范的方法,那么怎么来获取呢?那你看一下特质里边是什么呗,对吧,来它或者这个特质点进去,这里面有个叫什么,有一叫opposite range对吧,这样的方法对吧?那么咱们现在是想,诶,你现在你提供的方法了,咱们这里原来方向错了啊这个东西不是get,而是什么呢?而是我们现在off outside range,但是发现也没有。
14:01
因为RDD,你现在卡卡的D啊,你访问不了,对,那怎么办?同学们对吧,在这里它访问不了,没关系,咱们呢,可以这样,As instead of,对,因为你看我现在在咱们这里边卡法他去找什么呢?你在这里去找咱们现在刚才我这方法ET range对吧,但其实咱们现在这个方法呢,它本身是咱们一个属性对吧,你的属性的exce有的你访问不了,那你现在访问的方法怎么访问啊,对吧,没关系啊,咱们现在转换一下,谁有呢?它没有,咱们现在复里是不是有啊,对吧,Has outside range。对,就相当于把这对象呢,给它转换了对吧,转换了这个复类对象了,注意啊,同学们,假如说我现在呢,在咱们这里边对吧?哎,比如说我现在对吧,这个复类定的方法对吧,但是呢,我现在在拿子类中对吧,在拿子里边你直接访问其实是可以的,正常。等于我现在是不相重复里就记录下来了,但这个东西呢,那我发现咱们当前没有访问它这个组对象的权利啊,因为什么,因为它是私有的嘛,那怎么办呢?哎,我现在访访问他呗,对,你就告诉咱们复类一呗,等到具体他怎么做对吧?那么它在实现的时候呢?哎,那么咱可以看到我现在其实访问的谁,访问的是我们has outside range,对,然后接下来它里边有一个什么,有个叫outside range这样的方法。
15:26
啊,相当于咱们的到时候在访问的时候,对吧,访问的是我们这个has often range,它里边呢,有一个这个方法,其实真正实现啊,其实真正实现在他实现的时候,他还是谁,还是在读局卡不卡的时候,对吧,他来去帮他实现。啊,还是他来帮咱们去做的实现。啊,这个主卧实现,咱们现在把这内部给封装了,对吧?因为你想一想咱们现在这个实现的过程,同学们他并没有说,诶我在这里边我记住它了,然后呢,这里边马上我这里有一个方法对吧?它没有对吧?它是什么呢?它是在咱们当前我的卡夫卡对象在创建的时候,它其中有一个属性啊,其中有个属性,这个属性呢,叫off outside range的outside range,然后呢,通过咱们这个属性可以拿到我们现在它的一个偏移量的范围,所以说这个呢,是我们在创对象的时候呢,它内部实现的啊,内部是线,你没有办法直接去调啊,你把法直接调你怎么办?你只能去,诶我现在呢,把它转换成负类,然后呢,调咱们这个负类,它里边的一个方法对吧?那么你再去真正执行的时候,对吧,它底层其实还是卡卡做的,这个稍微注意一下啊,这个时候注意一下,然后接下来那么当你执行这个操作之后呢,那么它的返回值,大家看它的返回值啊,那这里应该是一个看类型啊,在这里我们呢,把这设置一下没有。
16:48
是是吧,把log加上是吧,但是我注意啊,我不知你们版本是不是这样对吧,就我现在这里面有两个这个STEM可以选,一个是default,一个是我们project,对吧,我的default给设置呢,这个其实不起作用,我一般我都把这个东西给它切换project里面,我不知你们的版本是不是这样对吧,就是我现在想显示类型的话,是不是应该是把这给我勾上对吧?把这个勾上对吧,然后接下来OK一下对吧,OK一下。
17:20
来,那么咱现在呢,在这里对吧?啊outside rain啊outside rain好,那这个outside rain里面有什么呢?对大家看啊,它本身这是不是数组啊,数组里放的什么,放的什么奥赛润呀,那么这里面到底是什么东西,我也不知道,点进去看一看,同学们在这里点进去,点进去之后发现这里面放的什么主题。然后分区从什么地方开始,然后到什么地方结束,也就是说啊,咱们现在我是不是从卡卡读数据呀,你读的是哪个主题,哪个分区,您从这个分区什么位置开始读的,到哪结束,在他当前那里边是不是都记录一下呀,都记录下,其实我比较关心的什么,我比较关心就这个对吧,我比较关心的是我们现在到哪结束,因为到哪结束,是不是我下一次我便宜量一个起始啊,我最终我是要把这个东西我要保存ready里去啊对吧?哎,所以说呢,到这啊同学们,咱们现在呢,可以获取到它了,但是获取到它之后呢,同学们,那么你最后要保存的时候,你肯定要用到它,所以说你把它定义成局部变量呢,不太合适,对吧,那我同样把这个东西呢,也往外提一下对吧,把这东西也往外提,那么它等于什么呢?咱们习惯性的对吧,这是空对吧,然后接下来在咱们这里对吧,把它往这一放,改成我们这个VR。
18:37
对吧,这个呢,是比较low的一种写法啊,这是比较low的写法,说第一个这个空的这个定义空啊,就相当于生为一个变量,对吧,然后接下来咱们现在呢,在这里。然后呢,给它赋值对吧?还有一种写法什么呢?还有种写法,咱们这里是不是可以用我们这个area对吧?然后接下来这里给它指定一个对吧,就是我相当于我现在创建了一个数组对象,但是数组里边呢,放的什么对,现在什么也没放对吧?这数组现在什么也没放对,那最终放的什么元素,什么类型元素呢?就这个类型的呀,对吧,这个呢是比这个low呃稍微高一点点的写法对吧?然后如果要是稍微呃再装一点写法呢,那应该是这样的对吧,Every点什么,咱们这empty对吧,然后接下来什么类型的呢?是吧,咱们这个outside range对吧,S表示的是我当前呢定义了一个空的我这个数组对吧,具体怎么写的,点进去看一看,对吧,就是咱刚才那个写法啊,就是说咱们现在在写的时候呀,注意定一个什么控数组,一般咱们可以直接什么呀,就接就帮你就封装好方法了,对吧,封装好方法了对吧,然后接下来。
19:47
这里注意啊,有一个坑啊,就是我现在这么讲,你们晚上呢,也有可能再去做练习的时候,也会犯这个错误,有也会犯这个错误,那么咱们现在是不是已经把这东西拿到了呀,拿到之后咱们是不是可以继续往下走,开始转化了呀,注意同学们,那么如果你要这么直接转化的话,你这个东西拿不到,它不起作用。
20:08
对吧,因为什么?因为咱们现在transform它是什么,是不是转换算子呀,那么转换算子什么叫转换算子,你给一个stream,他拿到的是什么,是不是还是一个我们这个stream,对吧?那你说我现在这个东西你真正在执行的时候,你现在你往下走,你是不是走的还是record stream,这个执行了吗?没参与到预算中来呀,没参的预算中来,所以说呢,咱们现在得定个变量点,我们这个V2去接收一下,比如说这个叫我们这个off set,对吧,然后呢,Stream outside stream,然后接下来把这个outside stream往这一放,然后才继续就往下走,注意啊,晚上在做的时候别忘了啊,别忘了对吧,好了,那这样的话,咱们现在呢,对把我们这个。根据指定偏量啊来读取数据啊后,并且呢,来获取咱们这个偏量值,对吧?那么这个操作拿到了把代码呢,稍微咱们再看一下,首先呢,咱们再去我们这个从卡卡读数据的时候,那么这个不是上来直接读了,而是咱们先从我们red来获取我们偏移量,如果说有偏移量,那们这个时候呢,咱们根据指定的偏移量位置来读数据,如果没有的话,咱们从最新来开始读,对吧?这是咱第一个处理,那么第二个处理的话,那我从卡卡里读数据,那我当前读数据是从哪个分区读的,从哪个主题读的,当前从分区的什么位置开始读,到哪结束了,我是不是可以获取,获取到了,怎么获取到呢?那么咱们现在得通过它RD来获取,那么因为我们现在呢,这个第卡这个刚拿到的卡组刀数据对吧,那么它内部呢,它封装的什么,它封装的是我们这个卡夫卡地对吧,那么在咱们卡法地面呢,它混入了我们现在这个特啊,它混入我们现在这个特值对吧。
21:56
然后接下来那么这个特值呢,它里面提供了我们现在相关的我们这些方法啊,提供了我们现在相关的方法,对吧,那么这块呢,大家要注意一下对吧,但是呢,这块其实也不太好理解,导说老师你看我正常情况,原来我再去我们这个Java或者SKY语言里边,如果我当前某一个类对吧,他会了我们的特值的话,那这个时候这个特指里边的方法是否应该有啊。
22:22
我是不是应该是可以调用的呀,对吧?哎,那么这块大家注意,那相当于什么,相当于这块它本身你可以这么来理解对吧,它本身呢,这是一个我们的RDD对吧,但这RDD呢,它封光的或者声明呢,它并不是卡不卡RD对象,比如说我现在这里边我定义什么呢?我定义了一个我们这个RDD对吧,比如说我现在VR,然后呢,接下来RDD啊什么类型的呢?对吧,RDD,然后这什么类型的呢,在这里对吧,就是RDD类型的,对,就是RD类型的。对吧,然后接下来我你用了一个卡RD啊,比如说卡法RD可以访问啊,假如不是私有的同学们不是可以不是私有的,对吧?然后接下来那么咱们实际这个R的对象是不是卡卡的地啊,但是你说这个里边它能调用咱们现在它的一个我们的奥赛方法吗?
23:12
肯定定文不了啊,因为你看咱们在编的时候,他认为咱们当前RD什么类型的是不RDB类型的呀,RDB里边是不是根本就没有这限制方法呀,对吧,所以说那你应该怎么办,那你是不是应该对吧,把它先转换一下呀,转化成什么呢?对吧?那这个时候咱们其实就是把当前RB对吧,转换成我们现在的看在rich对吧,你想强转什么时候可以强转,是不是它随着竞争关系才可以强转对吧?我现在这个是子,这是负,把咱们RDD是转化成我们现在这种类型,然后接下来去调它里边的装outside range这个方法。啊调offet这个方法,那么这块呢,还有一个问题就是什么呢?说老师你看我在咱们这里边啊,我看到卡卡中,然后呢,去找我们的opposite outside range对吧,去找它opposite outside range,那么其实呢,咱们现在opposite outside range,注意它本身它并不是一个方法,而是什么呢?而其实这样一个属性对吧,那也就是说咱们现在这里边其实对吧,他是在创对象的时候,是不是其实已经帮咱们把这个东西给实现好了呀,对吧?哎,所以说你直接拿过来用就行啊,直接拿来用就行好了,然后最终呢,咱们这里直接把什么呀,直接把我们的RDD对给返回,本身我并没有对RD做什么操作,你读的值是什么样的对吧,还是什么样的,我只不过呢,从你这里获取点内容,然获取点内容,千万别忘了,咱们现在呢,它是一个转换算子,转换之后呢,这边接收,然后让咱们当前地继续参与运算,否则的话,对经常啊。
24:38
会有这种情况对吧,获取完之后直接就完事了,对吧,直接就完事了,然后呢,咱们现在发现这个偏量是不没变化对吧,那这块呢,可能就是你没有要让我们现在这个参到预算中去好了,这块呢我们稍微停一下啊。
我来说两句