00:00
好,那接下来呢,我们要做这个驱虫,由由level join产生的驱虫,而我们要用到U表的数据,所以我们要取最后一条,我们怎么说的?状态面诊。对吧,啊,取最后一条好。那这个地方我们还要有定时器。对吧,所以用哪个API呀。因为防止说你后面没有数据来呢,对吧,你不能一直等着呀,你有可能数据不来了,没有了,对吧,它只有左边一条数据,右边没有数据,它就一条对吧,它没有这个重复重复数据,那我们这个地方调用的算子应该用什么。我们要用状态变成也要用这个定时器。我们要用什么?用哪一个算子啊,告诉我。
01:12
哎,强总说了process对不对啊,其实这个地方咱们只能用这个process,因为我们用到了。定时器对吧?啊,应用状态平整,起码得是一个负函数,但是负函数里边它也写不了什么定时器对吧?所以咱们只能用这个process啊好,那kid。爹。Process,你有一个key啊,Key的process function输入数据,输出数据,那输出数据呢,还是。杰森。Object的,其实我们可以直接在这个里边写成什么,转化为招聘对象,其实可以的,对吧,我直接写,但是呢,我已经注意写好了,我就原封不动的输出,好吧,把这个杰森对象输出,然后呢,再转成招聘对象,实际上这五六两步可以合并了,对吧,我在输出的时候直接写一个招聘输出出去吧就好了。
02:07
啊,但是呢,我终身已经写好了,这样更好理解一点,我们分开写好吧,呃,那这里边呢,我们就有一个状态了,对吧,那还记得我们之前说这个状态里边存什么东西。存什么?可还记得。我们提到了我们这个状态里边要存什么内容啊,当时分析的时候还记得吗。对了,存这个数据本身吧,啊,存时间啊。存时间有什么用啊?你好,你你要存时间,你能比较大小,那有用吗?比如说你。
03:04
第一条。更大呢,你最后输出的时候,你输出啥呀。肯定是存数据本身吧,因为这个东西是要到定时器里边去输出的呀。第17里边输出,输出的状态里面的数据啊,能明白吗。杨总。不能存时间啊,以前去重存时间,这个地方不存时间啊,因为我们新来的数据要跟老的数据进行时间比较,把时间大的放进去,对吧。能明白吧,所以呢,存这个数据本身也就存一个接对象对吧,好private。Value state。那就是杰森object对吧,好,那这个呢,就是我们就直接叫state对吧。在里边写它的一个初始化。诶,我这个没写。
04:02
点get state,你有一个value state对吧,这里面呢有一个value。State。杰森object点。OK吧,啊,咱们得到是一个杰森奥比亚class。呃,那么接下来我们到这边来怎么写,首先获取状态里面数据对吧,在这。它有可能是第一条它状态为,那对吧,有可能吧,好所以呢,我们就直接获取线啊value state点。Value啊,那这个是last value对吧,上一条数据我们认为上一条数据呃,If last value等等于,那那如果状态为那。你那个网不确定不稳定是吧。啊,那没关系没关系,呃,那这边如果他上一条数据也就状态里边数据为,那他之前没来过数据,那我就把当前这个数据怎么样存到。
05:12
状态里边。我就把当前这个数据存到状态里边。然后注册一个定时器。对吧,啊,注册一个定时器好,那就value.update把这个value呢扔进去,然后同时在这注册一个定时器。对吧,我们去注册一个定时器啊好,那这个定时器的话。我们就用处理时间,好吧,我们用处理时间就够了啊,因为这个定时器其实就是说你在做这个left join过程当中,它可能会有这个数据稍微来晚一点,所以右表数据来晚了,本来其实你想想看下单的时候我们的订单。订单明细,还有这个订单明细活动订单明细购物券,本来他都应该是同时到的,几乎都是同时到的,对不对。
06:07
没毛病吧,但是呢。他有可能怎么样?来晚了导致有这个数据,那你来晚了,那无非就几秒钟吧,那我按照这个处理时间是不是就可以了,对吧,这块反而不需要用这个事件时间,我们用处理时间就好了。OK吧,用处理时间,呃,那我们要处理时间的话,那就先从这个ctx点。获取time service对吧。呃,先获得当前的这个。时间current ts对吧,然后接下来呢,注册定时器type service.processing time对吧,把这个TS加。比如说我们规定五秒可以吧,啊,我们认为在五秒,那我们给一个五秒钟的。时间。啊,咱们给一个五秒钟的时间就可以了,对吧。
07:02
好,那这个就搞定了,Else。否则呢?如果它不等于钠。那我们要做什么事啊?如果它不等于,那我们要做什么事啊?对吧,如果不能呢,对,它不是第一条数据了,对吧,当前这个value不是第一条数据状态里面已经有数据了。那我们应该干嘛?我们应该做什么事?想一想。对吧。最重要的,那大家都知道,肯定不是第一条数据了,对吧,那我们要做什么事呢。
08:07
对,获取时间比较大小,对吧?啊好,那时间字段在这个里边叫什么呢?叫这个。叫肉opera ts这个东西啊,就是它对吧。它在弗Li里边,诶,那这样啊,我们给大家看一下。找一个,诶不带YouTube啊,这有个test对吧,这里边儿呢,我们写的有很多的这个。这是个draw对吧。呃。给大家看一下啊。这边有。第二或者说呢,我们在自己来写一个啊。茯苓零三,好吧。
09:07
就叫type step吧。哎,不对,这个应该是零四。好,那PSM啊,嗯,这边呢,我先这样把这个环境拿过来。好,这个就不要了啊,跟他没什么关系,对吧,Table in叫query select。诶,这个啊。塞他。能不能直接来的。应该可以啊。我试一下试一下试一下啊,还没有这样写过呢,是不是必须要要要一张表啊,应该不要吧。
10:01
哦,不行,还必须得搞一张表,这个就比较麻烦了啊。嗯。呃,我想想看啊,或者说我直接给大家看文档吧,这边有啊,这边有我是记录的,有他还得搞一张表,我就不想写了,这个东西比较麻烦,对吧,他搞一张表啊。直接搜,嗯,对在这儿。哎,完了这。在这个地方对吧?好,呃,那其实在我们flink当中啊,它提供了有很多的这个方式可以获取这个时间啊,叫local time step对吧,还有这个car time step,那啊与这个相同,还有这个road time step都可以啊,就是时区可能不太一样,对吧?但是我们只是为了做比较大小,只要大家统一就可以了,无所谓了啊,然后呢,这地方就可以。查询,哎,我直接查询为什么不行,我记得可以。
11:02
哦,我刚才是没加括号吗?不对吧。他们三,哦,不对,我神经病了,这个是类型。我说呢?这个东西是类型,哎哟,这个东西类型,那我们取的那个函数在哪啊,对我要说为什么,我记得好像可以的,对吧?呃,我们取的函数在这个地方。对对对,是类型啊。这个运行一下对吧,我们看一下啊。好,那这里边呢,就是这个到毫秒。到毫秒。对吧,但实际上它经过这个卡夫卡之后呢,它后面会多一个Z,但是无所谓嘛,所有人多多一个对吧,那既然你是这样的一个格式,那我直接按照字符串的比较各位啊。
12:00
完全可以对吧,因为你这个数据,你你写数据的时候需要去考虑一下,诶我会不会从这个。2022年比较到什么1万年?只有在这种情况下来说,他才可能会出现错误吧。对吧,啊,我现在是2022年啊,只要你是什么99994位年份还是四位数,它按照字符串的比较方式。是不是就可以了。对不对。有没有问题,按照字符串的比较方式是不是就可以了?对吧,啊,所以呢,我就直接按字符串啊,就不写那个工具类了,那工具还比较麻烦,其实文档当中有啊,但是实际上这个地方呢,可以简化,直接按照我们刚才说的。比较就行了,Else if,对吧,那我们这样规定啊,如果后来的数据就是当然谁时间大,那就存谁,这个没有问题对吧?那如果后来的跟刚开始的一样呢?假如说时间就是这么凑巧,那我就用后来的对吧,但时间上到毫秒的话,它不不会出现这种情况,其实不会出现这种情况,但是呢,你总得把等于这种情况写一下。
13:14
对吧,要不然他没法做了啊好,那怎么写呢,就是如果当前的value对吧点。Get stream,呃,谁呢?刚才我们的字段字段名啊叫肉operator ts对吧?好获去这个,如果说它大于等于字符串直接比较对吧?大于等于谁呢?呃,Last value.get stream还是它。哦,这个不能这样比。应该是什么?呃,不是大于等于啊,封了对吧?呃,应该是compare to,对,应该是compare to啊。
14:00
诶,少了括号了。等会儿我重新写一下啊,我把这个拿出来。这边括号捋不清了,还。Come to这边呢有个括号对吧。跟他进行比较,对好,那这个东西呢,跟他进行比较,然后呢,要。呃,大家想一下这个比较之后,它是零一跟负一对吧?嗯。大于等于零对不对。就是你还想一下这个compare to,如果是这个大。一还是零啊?Yearly。正数还是负数啊。正数还是负数啊,它它这个不是一零啊,是正数负数对吧,正数还是负数。
15:03
它是正数吧,前面大啊,或者说这种东西呢,很简单对吧,如果这种东西啊记不清,我估计啊有同学记不清,那没关系啊。写一个TEST02是吧,简单的test啊,这个跟flink没关系。好,那我就二。点compile to对吧,因为如果两个相等,那肯定是零,那没关系对吧,好,那我们就看这个。点so,呃,我们做一下这个拍掉不就好了吗。那这个返回这几啊是一。对吧,是一啊,所以呢,我你看我这怎么写的,呃,我用的是当前数据的时间,对吧,那大于等于零说明情况,说明前面的大于等于后面的有没有毛病。没问题吧,是前面的大于等于后面的,也就是当前这条数据比之前数据来的要大,那怎么样要更新。
16:04
状态吧,啊,这里边要更新状态啊,好last。Value state啊。点update把Y6更新进去。对吧,那如果反过来这个大呢。本来就是状态里面这个数据较大呢,那是不是就不管呢,是不是就不管你大的先来对吧,那你后来的比我这小,那无所谓啊,我就不管我状态不用动了。没毛病吧,是这样子的。OK吧啊,所以我直接用字符串的比较刚才冯了,我直接写大于等于了,对吧,在S个里边可以写这个大于等于啊,咱们在这儿不能得写comp to啊compare to好,那这块呢,咱们就搞定。对吧,好,那之后呢,我们要考虑输出啊,那什么时候输出。在这边外面啊,Process这个。函数对吧,外面有一个on。Time方法在这儿是不是输出数据啊?
17:02
对吧,要先把这个数据取出来做一个输出,那就是value state点。Value看加V得到一个Y对吧,好,那。把它清空掉。现在。啊,当然你可以先输出也行啊out.connect把这个value的走一个输出,最后呢,注意这个东西呢,你要清一下。OK吧,你这个状态呢,你最好最后把它清一下,因为没有用了嘛。对吧,这个状态没有用了啊,要清理掉,这样呢,咱们状态就不至于特别的大,因为订单这种数据的状态没必要永久保存了,是不是OK吧?好,这一块的逻辑大家有没有问题来看一看啊,如果有问题可以提出来。有什么不能理解的地方吗?如果OK了,给我扣个一好吧。这就是由left join产生了重复问重复数据问题的一个去种。
18:08
哎。呃,那都提到了这一步呢,咱们去开个窗,然后用。全窗口函数筛选窗口中的一个最大值啊,有同学提出了这种方案,对吧?呃,那也就是说你要开一个五秒的。窗口,比如说对吧,还是说你要开什么窗口,你告诉我。淘淘,你说一下你要开什么窗,因为我们有滚动、滑动和绘画。你要开哪一种窗口,好吧,既然你提出来了,我就来说一说,这也没关系,对吧,就是让大家提出问题,我给你解答嘛,对吧。啊,不错啊,还能想到窗口很好。滚动好,滚动不行,Pass,为什么滚动不行,我告诉你啊。
19:03
你看既然你是level join,你的数据是不是有可能一前一后,那有没有可能第一个窗口这样对吧,因为你滚动窗口是不是这样,第一条数据在这儿,第二条数据在这儿。不是好无情,我跟你说不行,我会给你解释。能懂吗?那如果你是滚动窗口怎么样?不好。去中不亮。啊,继续继续。准备用什么窗口?既然考虑到这种问题,滑动窗口行吗?也不行,对吧,哎,对了,其实绘画可以。啊,你要聊的话只能是会画滑动,是不是也有这种问题啊,或者说它会有大量的重复窗口,窗口本身重复了,那你白做了对吧,所以我们就不聊这个会滑动了,那绘画。
20:05
对吧,好,我搞一个什么五秒的间隔,因为我敢在这写五秒,证明五秒钟数据全部都到了,对吧?好呃。绘画也还有超时的问题,绘画没有啊,跟这个一样啊,你这个地方敢写五秒,绘画也写五秒啊,对吧,间隔时间五秒,那为什么不用绘画呢。规划延迟会高,对对对对,姚总可以啊,姚总很nice对吧。杨总呢,想到这个点,绘画呢,它比我们当前这个状态编程的延迟要高。啊,比我们状态变点的延迟要高,为什么呢?我跟你说啊,因为当前呢,我们是不是用了两次let join。我们用了次left drawing,对不对?没毛病吧?好,那你再想啊,既然咱们用了两次left drawing,对吧?好,那比如说A。B。
21:01
C它延迟会不统一对吧,比如说啊ABC对吧,A1B1C1啊还有一个呢是A2。B2。还有一个A3,这是三种不同的数据对吧?好,那你看啊,A3你给个五秒,它是不是到五秒就输出了,而这个A2这条数据呢,诶到五秒,假如说到三秒的时候来了一个B2,他是不是会话窗口要怎么样,要在这个没有数据来五秒才可以输出啊,他是在什么时候输出,他是在八秒时候输出对吧?而这个是三秒,这个是六秒,比如说啊,他在加五秒,在11秒才输出,也就是说这个A1到他输出要11秒第一条数据对吧?而这个呢要八秒,这个呢三秒。啊,五秒五秒就够了,对吧,到五秒输出了,他延迟可能会更高。呃,能不能听明白。就是因为有的数据有右表数据对吧,那这个两个右表都有,这个只有一个右表数据,这个没有右表数据,其实你看啊,我们用状态编程。
22:07
对吧,我们如果用状态编程,是不是所有的数据都延迟了五秒触发,不管你是一条两条三条对吧,都是五秒钟触发好,但是你要用会话窗口,其实可行啊,会话窗口能解决咱们这个事情,但是它延迟会高。这样感觉绘画好像挺难用的,绘画就是在什么时候用的,就是呃,当你想要根据绘画去计算什么指标,但是没有绘画ID的时候,我们会考虑绘画窗口听懂。啊,我们会去考虑用会话窗口,只有在这样的情况下,我们才会去用这个会话窗口,OK啊,这意思好吧。呃,那这块呢,咱们就。
23:01
搞定对吧,这个去轴,那现在这个。有没有理解啊,扣个一啊,还有没有什么不理解的地方,或者说有什么新的方案,都可以提出来去聊一聊,其实其实淘淘不问这个方案我也会说。啊,这个窗关于窗口的方案我也会说,但是呢,我没有放在咱们班上个班,其实我是直接在这儿。我是直接在这儿去分析这个,呃,怎么做,我是直接带着分析的,上个班的时候对吧?啊在麦呢,我就准备在后面再去讲这个无所谓嘛,不重要啊,本来淘淘不问我也会说,所以我就说问的挺好的,对吧?啊,刚好问到这上面,我也会跟你说有这种方案,但是呢,他会它比这个状态面诊呢稍微要。差一点对吧,差点意思啊。现在有没有问题,没有问题,我就把这个截掉好吧。淘到功劳都没了啊,我不应该把这个话说出来,对吧,应该是淘到的功劳,对对对对。
24:04
我的我的我的啊。嗯,这个是。驱虫叫left join。或者叫filter。好,那我们就直接叫这个felt level join DS OK吧,啊,叫这个名字好吧,它就是去重这个由产生的重复数据问题嘛,对吧。
我来说两句