00:00
呃,那么再往下呢,咱们来看一看关于我们现在的这个代码的实现,对吧,那么这两块呢,这个代码呢,不要了,我刚改一改啊这个代码。这个呢,是我们这个order in对吧,那咱们现在呢,把这个我要注释一下,对,注释一下,那我这说明对以下写法是错误的,错误的话我是不是应该把它干掉了,就不在这写了行吧,对吧,这个就不要了,这就不要了对吧?那么咱们现在呢,直接留我们这个正确的写法对吧?直接留正确的写法,那么如果留正确写法的话,那首先大家想一想,咱们现在是不是得先去开窗啊,对吧,先去得去我们这个开窗,那么怎么去开窗呢?那这里它有它的stream对吧,那这里stream,那接下来咱现在在这里对吧,这个先找谁,先找我们这个订单吧,对吧,这是order in inform stream,然后接下来点开窗的话,Window是吧,啊点window,对,然后接下来,那么咱们现在呢,在咱们这里边,那么应该传两个参数。过去第一。
01:17
个窗口大小对吧?窗口大小这个东西呢,是不是也是我三,比如说我这个东西,我这我这个20差不多了,应该用不了20对吧,然后接下来那么咱现在呢,来一个滑动不长,比如说五对吧,然后点2V2,那么这个时候拿到的什么呢?拿到的是我当前的它的一个stream对吧,这个呢是我们这个order in four,然后呢,Window stream啊然后接下来那么除了它之外,然后在这里啊来设置我们这个窗口大小啊,或者开窗嘛,啊开窗那么除了我们现在这订单之外呢,说订单明细也得开窗了,对吧,那咱们现在来在这里它呢,也是我们这个window来指定我们这个窗口它的一个大小啊,比如说呢这个上十,然后呢来指定对吧,我们当前的这个窗口,它的一个滑动不长,对滑动不长。
02:17
二这个呢,是我们这个。好了,那么咱们现在把窗开完之后呢,那这个时候咱们是不是不能再用我们原来这个去做正文了呀,你是不是应该用咱们那个开窗之后的数据来进行join什么意思,意思是我当前呢,这里边对我照的范围变大了,原来就一个采集周期,现在什么现在两个窗口之间的数据来进行照的啊,现在是两个窗口之间的数据来进行照的,对吧?就相当于我现在这个数据这个范围变大了啊,数据范围变大了,原来是一个采集周期,现在呢,是四个采集周期,对吧?那你要保证啊,你要保证你的电脑在这四个采集周期范围之内,他们两个肯定都执行完了啊,这样才行啊,这样才行对吧?哎,那我现在呢,那如果要做join文的话,那就来呗,同学们,首先在这里我们是不是得去干什么,得join用,但直接照应照用不了,是不是咱们得转换呀,对吧,所以说呢,咱们是不是得把它给带上我这K对吧,那如果转换的话,这里。
03:26
大家再去map一下,然后这个map呢,那么首先你拿到的是什么?拿到的是record,拿到record对吧?然后接下来那么咱们通过这个record可以拿到什么呢?是不是可以拿到我们现在它的一个value是吧?在这里啊,Record点我们这个value,然后接下来这个呢,其实是我们这个order in for PR吧。对吧,当前我现在可以拿到我们这个订单它的一个信息啊,拿订单信息对吧,只不过呢,这是接S字串的形式对吧?然后接下来那我可以干什么呢?我可以将Jason字串转换为我们这个对象吧,那这个呢,就是Jason对吧,然后点我们现在呢,要把杰字串转对象,那就是pass object,把咱们切词字串给我拿过来啊,然后接下来转换成什么呢?转换对应的对象,Class off,这个对象呢,就是order in inform对吧?啊导一下包,然后进来转完之后呢,那这里咱别忘了对吧,那我现在要把它整KV的形式,所以说呢,我定个变量对吧,来接收一下,然后呢,返回一个元组,这个元组呢,这个是order INF for,点这个就ID吧,然后这个呢是order in inform本身,然后接下来点V2,拿到的是我们这个order in for,这个时候with the key the stream。
04:45
和他上午名字一样,但是不一样的地方在哪?它不是一个采集周期了,而是一个窗口啊,而这个窗口数据,然后接下来,那么咱们现在呢,除了它之外是不是还有这个对吧?还有谁呢?还有我们这个order detail对吧?那么它一样的操作在这里呢,也是map一下,做一个结构的转换,那么你从我当前这里面拿的数据的这个也是原生的卡夫卡,所以说呢,我定义变量record来接受一下啊,Record再接受一下,然后接下来我通过咱record可以拿到什么,可以拿到它的value啊record.value啊点value,然后呢,这个value其实是我们这个order detail,然后呢,这个字符串形式,然后接下来咱们可以通过我的Jason啊这个类,然后呢,去调它的一个pass object pass object方法把谁呢,把我的order detail s tr,然后转换成我们现在的。
05:46
他的一个。转换成我们的一个order detail这样的一个样一个对象,然后点VR,这个呢,是我们这个order detail对吧,这order detail,然后接下来也分成一个元组返回去对吧?那么这块呢,那应该是order detail.order ID,然后接下来order detail对吧,然后呢,我定一个变量来接受一下对吧?我定义变量来接受一下对吧?那么这个呢,是我们这个order detail with对吧,Key。
06:19
Stream,好,那这样的话,咱们当前呢,这两个流有了对吧?开窗,然后接下来转换结构对吧,这个是我们这个转换为我们的KV啊结构啊,转到KV结构,然后转到K结之后再往下走,那么这个时候是不是就开始双流召唤了呀,对吧?双流照稳啊,双流照啊,那么在双流照开始之前,同学们,到目前为止,咱们现在做的这些操作能不能跟上。就目前做的这些操作能不能跟上啊,其实这个代码是不是咱们对这个东西对吧?这个比较简单对吧,那咱们现在记下来说这个中文,那么如果说这重文的话呢,那一个是我们这个订单的窗口这个流对吧,然后接下来那一个呢,是订单明细的对吧,那咱们直接去调它的重对吧,然后把它呢也拿过来,把它拿过来,那么join完事之后呢,来VR一下对吧,这个呢,是一个join的第streamam对吧,已经呢,合并到一块的这样一个流对吧,但是我们说这个流里边有数据重复吧,对吧,就你光转完之后这不行,还有什么呢,还得去虫呢对吧,接下来得去虫,那去虫的话怎么去除没思是吧。
07:38
怎么去除同学们?整不过你。
08:00
想想这个驱除怎么来做,根据ID去除,怎么根据ID去除呢?这个方式不唯一啊,大家可能想到的是算子本身对吧,那这会儿呢,拿到它了,然后接下来,然后呢,你看一看这里面是不是有这个重复的是吗。本身我现在这里边就想这个肯定,比如说这个是我订单的一个ID,然后进来看来这里是不是有两个订单对象对吧,然后呢,大家想到可能是诶这个驱重了,那咱们现在根据谁呢?对吧,根据根据这个订单对吧,这个ID对这不行吗?同学们。你根据订单ID去重,咱们现在一个订单里边可能有很很多订单项啊,你肯定你得在这里边来进行驱重吧,对吧,你光根据订单ID的话说,哎,我现在只要订单ID存在了,对吧,这个东西对吧,它就可就是再再往里放就不能重复了,这肯定不行,什么意思,比如说我现在这里边订单有一个一的,然后它这里边有两个订单项1001和10002,然后现在呢,我已经把101放进去了,那它会出现情况什么呢?幺,然后这个呢1001,然后这个呢,啊这这这个呢是我订单这是幺,然后这个呢是我1001对吧,然后假如说我再来一个这个订单ID是不是也是一,然后这个呢是一,这个是1002,那你说我这个东西不能往不能放了吗。
09:31
肯定也可以吧,对吧,所以说这个驱虫的方式呢,你光靠他这够呛的话,这个够呛能实现。两个是吧,啊用这两个是吧,用这个还他一个order有两个detail。哪哪个哪哪个,为啥一个order in,为啥对两对应两个这个all detail是吧,那因为这个呀,同学们一个订单里面不会多个订单明细吗。
10:16
他们不是一对多的关系吗?是这个意思吗?一个order in,大家想想,一个order in是不是代表咱们当前这里面一个订单记录啊,然后一个order detail是是代表咱们当前当前这个order detail里面一条记录啊,对吧?那你看一看咱们假如说我当前就就就能就拿他为例吧,同学们你看一看咱们现在我选中这三条记录,他的订单是谁,他的订单是不是都是3721对吧?那你说我现在这一个order是不是order,以后是不是对应三个order detail。驱虫啊驱虫怎么去呢?想想咱们以前咱们好像做过驱虫这个事儿吧。
11:11
啊,在咱们在前面日活呀,或者说咱们首单这块做个驱虫吗。咱是不是咱们做过类似的功能,比如说我现在在咱们这里判断的是不是存在呀或什么的,对吧,那其实呢,就和驱虫差不多嘛,对吧,所以说咱们的去虫的时候,大家注意啊,也可以借助谁呢?也可以借助一下我们现在对吧,这个red对吧,因为red它本身不有一个这个set集合吗?对吧,这个S的集合本身他们就帮咱们就可以过滤重复嘛,对吧,那如果说我现在这个S集合的话,那个K咱们怎么来设计呢?对吧,来一起来看一下,比如说啊,我现在在这里,我要想去完成咱们驱重的话,那这个时候呢,我可以借助我们现在呢,它的一个RA对吧,进入red,那么如果进入red的话,那么这块它的一个我的K对吧,大概的一个设计对吧,那应该这样,这会咱们学过了,对这学过了,然后接下来,那咱现在呢,这个type类型,那肯定是that对吧,然后接下来对吧,那么咱们呢,也是对吧,当拿到订单和订单明细之后,那么这个时候呢,我往咱们这个set集合里面去添加一个数据来看一看。
12:19
啊啊,看一看能不能够放进去,那如果放进去的话,同学们大家想一想,如果说能放进去,那说明什么?说明咱们现在这个数据从来没有出现过呗,对吧,但如果说放不进去的话,说明咱们这个东西是不是曾经出这个出现过了,对吧?那接下来咱们现在呢,在这里这个K长什么样呢?那完全可以这样,比如说我现在这个这个标记,然后这块呢,是我们这个order ID,对,就是说我现在啊,这里边我当前这个订单ID是否和咱们某一个订单明细,他们两个进行过关联。因为咱们现在最终我在我我再去判断的时候,其实是不是就是我现在订单啊,他的一个什么对吧,和咱订单明细他们之间是否进行一个连接做关联呀,对吧,那我先判断一下呗,如果说他俩做关联了,那我就往red它那个set集合里边来保存这样一条信息啊,来保存这样条信息,对吧?那么我现在这个东西我之所以敢放在raking里面去,大家想一想,我这个东西需要长期保存吧,对吧,这个不需要吧,对吧?所以说呢,咱们可以设立失效时间,这个失效时间根据你来设置,比如说正常情况下五分钟我觉得足够用,因为如果说咱现在采用这五秒的话,窗口大小是20的话,五分钟足够了,对吧,其实不用五分钟对吧,只要让它定一分钟是不是就够了,对吧,那咱们接下来,那我这里同学们,那么咱们如果要想这么做的话,那咱们现在就得把这个东西给写一下呀,既然你要连接red,同学们,既然你要连接red,那么这个时候咱们是不是得考虑。
13:58
把这个RA对吧,这个连接的时候,对吧,你不能说我每次我都去创建一个连接,对吧,那这不行,那怎么办呢?你是不是得去对吧,这个以分区为单位对吧,然后呢,一个分区来去编辑red呀,对,这是一个,那么另外一个咱们现在呢,要想去我们这个合并,那么合并意味着什么呢?意味着我要把订单和订单明细的东西是不是合成一个更大的宽表啊,那如果合并成一个更大的宽表的话,咱们这里最好是不是找一个东西来接收一下更好一点。
14:28
呃,比如说定样类来进入一下订单和订单明细,他俩合并的一个对吧,这样的一个更大的宽表,都更大宽表,那所以说呢,咱们现在要想的是去重,那么首先呢,第一件事咱们在这里先干什么呢?先建立一个样例类对吧,在这里这个呢是order外对吧?这个样例类它主要用于封装订单和订单明细的信息,这个呢,我直接过要过来了,CTRC。然后在我们B里边呢,直接去创建一个我们这个类对吧?这个呢,是我们这个order外的,它是订单和订单明细合并之后的一个我们的宽表啊,订单和订单明细合并之后的表,咱们来看一看这里边啊,它这个代码是怎么写的,首先呢,这里定一些属性啊,这里定一些属性对吧?那有订单的,有订单明细的啊,这几个呢,是金额,然后接下来那么你要想去把这些东西拿到的话,大家想想是这个分摊金额是我最终要求的吧,对,就每一个啊,每一个订单明细里边分摊多少钱,这个咱们要求的对吧?是否首单,这个咱们是不是已经计算好了。
15:34
是否首单这个标记,咱们在做订单的时候是不是已经把它个计算好了呀,对吧?然后接下来这个维度关联,咱们这个维度对他的一个省份和用户的年龄段这个关联是不是咱们也做好了,这个在我们的订单里面是不已经做好了,然后现在这些从表的关联这些东西,咱们在我这订单明细里边是不是也已经把维度关联好了呀,对吧?那么我其实在这里主要想达到谁呢?其实我主要算的就是这个。
16:02
就是当前我现在的这个每一个我的订单项里边,它的一个实物分摊金额是多少啊,我其实主要想求的就是这个对吧,那么如果想求这个的话,那你得把这些东西都给我关联在一起啊,那么怎么关联一起呢?你定这么多属性,同学们你定这么多属性,那么你到时候说得给的属性赋值啊,怎么赋值呢?对吧?它定义了一个这样的一个方法,这个方法叫什么方法?一看到我现在选中的这个东西对吧?DFZ这叫什么,是不是构造器呀,对吧?那么咱们SC里边它的构造器呢,那应该有两种,一个是主构造器,那么一个呢,是辅构辅构造器,那吧,咱们现在在这里这块我定义的是不是咱们这个主构造器,然后接下来这里边通过Z定义的是不是辅助构造器,对吧?那么在辅助构造器里边它要干什么?第一行是不是通过咱们这个调用其他的构造器啊,对吧?然后接下来你看他干啥了,同学们在咱们这里边对吧?然后呢,它接触两个参数,一个订单,一个订单明信,然后接下来默order定义for,这是干什么呢?我这些属性本身的值是不是都是空的,对吧?那我是不是得根据这个订单,然后把这属性值给我拿过来呀,所以说呢,这个墨order in inform和墨order detail,它其实呢,是把订单的属性值和订单名的属性值来复制给当前我们这个order we是订单这个宽表是吧,给它赋值的。
17:26
啊,其实在这里啊,就是把这个订单拿过来,然后如果说不等于空的话,那么这个时候呢,我去给他把这个属性做一个赋值,这个呢也是做一个赋值操作好了,那么这个完事之后呢,那接下来咱们现在呢,就开始来写我的去除代码。来选去除代码对吧?首先第一件事咱们呢,Join的stream,然后接下来我现在去重的时候,我是不是得依赖于我的red呀,那么red呢,咱们曾经对吧分析过对吧,在这里给我分析过,那么如果ready的话,那么你得给我分析你要用哪个类型,它的type是什么,咱们是不是已经分析好了,是S吧,然后接下来那么它的K是什么,对这K呢,我和它保持一致啊同学们,咱们当前在这里我这K什么join的对吧,我找一找。
18:13
在咱们这里,我把这个呢给拿过来这个吧,这个呢是我们现在他的一个K啊。把这个呢,K拿过来是吧,这K拿过来我的K呢,想这个是这种形式的,是这个呢是前缀,然这块呢,是哪一个订单ID,然后value的话,对吧,这个value是不是直接就是order detail它的个ID就可以了呀,对吧?然后接下来咱们还有一个pair失效时间,这失效时间呢,比如说对吧,咱们就让他这十分钟啊,比如说我现在有效期啊,叫他十分钟对吧?那么如果想做这个事的话,那咱们接下来在这里是不是得以map的以分区为单位进行处理啊,Map partitions,因为我现在要连接谁呢?我现在要连接我们这个red呀,对,我现要连接red对吧?那么如果以分区为单位进行处理的话,那么这个时候你拿到的是谁?注意看同学们拿到的是这个东西,迭代器,迭代器的类型是什么呀?是不是元组,元组里边K是什么,是咱订单ID,然后value是不是订单这个明细和订单这样的。
19:29
一个这个对象了,对吧,所以说呢,我直接这样就好了,Top我拿到的呢,是一个我们这个元组啊,我拿到的是一个元组对吧,而且呢,这个元组还是一个啊这个多个对吧,所以说拿这个元组叠在一起对吧,然后接下来那么咱们最终呢,你要给它返回什么呢?对吧?来比如说我现在呢,在这里我的习惯是是这样的是吧,咱们应该干什么?转例子集合呗,对吧,那这里咱们现在呢,应该是一个type list,然后最后把我这type list呀,在这里去给它转换一下,我就先不换别的啊,我先把这个东西按照咱们原来正常的思路对我说,如果我是叠的器的话,是操作起来不方便,先转成集合啊,先转集合,然后接下来,那么你现在在这里边干什么呢?要我是不是得获取连接对吧,来获取我们现在的这个jeice的客户端对吧?那么我现在获取客户端的话,咱们应该有工具内的my red YouTube点,然后接下来get je。
20:29
看第二位啊,对吧,那么这个呢,这些例子拿过来了,这些例拿过来之后呢,那咱们现在这里边是不是要它关闭一下呀,那接下来咱们现在开始进行操作,怎么操作呢?那我现在再去处理的时候,我是不是要JD点什么SAD啊s sad往你放一下,然后呢,才能判断一下,诶当前这个东西是否存在嘛,对吧?那么我现在要判断往里放的话,我需要谁,我是不是需要个K呀,这个K它和谁有关系,它是不是和咱们这个order这个东西有关系,那么如果和order有关系的话,同学们,那这个时候呢,那你是不是得把这个集合给我便利一下了,对吧,因为我现在我要拿我现在的订单,以及我现在的订单明细了,所以说呢,那你应该把它集合呀,给我便利一下了,怎么便利呢?那在这里点2FOR循环呗,啊缝数循环,那么每次遍历它得到的应该是一个元组吧,原组的话我可不可以这样,同学们这个呢是我们的o.ad这个呢是。
21:29
Order e这个呢,是我们这个order detail啊这块呢,再加一括号。对吧,这块呢,是一个五力括号对吧?那这样的话是不是咱们现在把每次得到的东西对吧?哎,每次遍体得到的东西,我是不是可以复制给咱们现在这个原组对象了,对吧?那么接下来拿到这个数据之后呢,那咱们现在呢,我就可以执行sad了,对吧,可以执行我们这sad了对吧,那么sad在执行的时候对吧,那它需要传参数,第一个参数谁K呗,咱们现在呢,VR去拼接一个K,这个K呢,是这个形式的对吧,这个K对吧,是这个形式的,然后再加上谁呢?加上order ID订单ID嘛,对吧,然后接下来,然后现在呢,这个对吧,我们定义变量,比如说叫咱们这个order key对吧,Order key,然后接下来把order key对吧作为我们这个参数给传过来啊CRLC啊CCRV,对,那接下来除了K之外还有一个吧。
22:35
是咱们这个这个值啊,这个值是谁?Order detail order detail,然后进来点ad对吧,那么这里呢,需要转换一下这个to string对吧?这个转换成怎么字串,然后接着那么这个方法推换成返回什么这个结果了呢?零或者一吧,对吧?Sad DD它返回零或者一,那么咱们第个判量来接入一下,那么这个如果返回一的话是说明什么。
23:03
说明以前没有是吧,所以说呢,我叫is not exist。对吧,是不是不存在,如果返回去的话,是不是不存在呀,对吧,那么同时我在这里我要干什么呢?Je.is pair,对吧,Fair,咱们是不是可以设置一下,它这个是这个需要时间的这个呢,就是我们这个order k然接下来对吧。600秒啊,需要时先设一下,然后接下来咱们再做一个判断,如果说呃,如果说我现在呢,这个is not exist,那么要是等于等于一的话,那等于等于一说明以前的不存在,那么如果以前要是不存在的话,那说明咱们现在这是第一次来加的倍,那是不是应该把这个数据给保留一下呀,这门数据保留一下,那么如果把它给保留的话,同学们,那咱是不是得有一个东西对吧?哎,这这这这这个东西怎么怎怎么对吧,我要把这个东西给怎么加一加,对怎么保留对吧,怎么把它过滤出来,其实大家想一想,我们一开始在讲我们这个是否对吧,这个对吧,这个第一次登录的话,我们其实呢,就用这种方式,最早的时候我们是怎么过滤,我们用filter对吧,但我说filter不好对吧,咱们是一个个处理的。
24:19
然后第二种方式呢,我们先实使用的它对吧?第二种方式其实我使用它对吧?那么咱们这里我是不是可以单独定一个集合呀,来在这里我定一个集合,因为你要往这集合里边不停的加东西嘛,所以说呢,我定一个例子,8UFF分什么类型的呢?大家想一想,我现在我是不是要把订单和订单明细给它合成一个更大的宽表啊,对吧?如果说我以前我没有关联过的话,它等于一意味着什么?意味着不重复,不重复意味着什么?意味着订单和订单名他俩从来没有关联过,对不对?那么如果没有关联过的话,这个时候我是不是可以把它俩关联在一起,形成一个更大的宽表,咱们已经生明类型了,叫什么?Order外吧对吧,叫order white对吧,叫order white对吧,来在这里咱们呢,去拗一个对吧,又一个。
25:07
点VR对吧,这个呢,是我们这个order white list order list对吧?那么接下来那么如果说啊,咱们以前呢,这个东西对吧,他把没有关联过,就订单跟明细没有关联过,对吧?那么这个时候咱们呢,去做一个我们拼接往我们的order外的例子里边啊,给它放一个我们的一个数据啊给他放一数据,放谁呢?是不是把咱们订单和订单明细给它合并为各大宽表,怎么合并,这个时候就得看到样例类了,同学们,刚才咱定义样类的时候,这里我提供了一个辅助过的方法,这个辅助过程方法是不是你可以给我两个参数了,这两个参数是不是你把订单和订单明细给我,我会给咱们当前这个order外的对象它的属性来赋值吧,对吧,所以说呢,在咱们这里我应该干什么?又一个我们这个order we,传讲参数过去一个是我们这个order in inform,那么另外一个是我们这个order detail啊,另外。
26:08
一个方程底。能理解吗?同学们看一看这块代码啊,同学们,其实这块呢,代码的业务和我们这个日活那块判断的是不是首次登录这个业务一模一样,来你看一看同学们,如果你没印象,我给你找证据啊来咱现在呢,来到我们的APP,然后接下来在咱们的日活这里边对吧?呃,日活呢,曾经咱们过滤重复的对吧,其实判断是否登录对吧,咱们应该呢,有两种类型对吧,第一种对吧,这个第一种方式对吧,Filter对吧,这个第一种方式filter,然后第二种方式呢对吧,然后呢,在这里以分区为单位进行处理,然后怎么做呢?对吧,在这里你看地集合,这思路类似八分对吧,然后接下来获取连接这个思路对吧,然后接下来咱们对着做处理,是不是往这个集合里边来放数据啊,对吧,在放的时候怎么放的呢?你看是不是根据我直点CDD来判断的呀,对吧,其实思路是一样的啊同学们对吧,思路是一样的对吧?好的,那咱们现在呢,回过头来啊,回过头来。
27:10
等它循环完之后,同学们,咱们这里面是不是有数据了呀,那你说我最后我再返回一个top list不太合适了吧,那就没有过滤的意义了,你过滤你现在是想干什么,你是不是想拿到一个咱们这个合并之后的宽表了,所以说应该把谁返过去order we list,然后给它转换成我们现在对吧它对吧,给它反过去。啊,把这个东西给他返回去是吧。好了,那么咱们现在反过之后呢,那接下来我呢,去抽取一个,这个时候它应该是order white stream啊order stream,然后接下来咱们呢,来把这个order stream的点去打印输出一下看看效果,那么这个效果是不是应该就是我把订单和订单明细给他合并之后,它的一个宽敞效果呀,对吧,好了,稍微休息一会儿啊,稍微休息一会儿啊。
我来说两句