00:00
Link里边提供了两种drawing,一种是window drawing,另外一种是interval drawing啊,那么interval drawing翻译一般翻译成间隔连接,它主要是用来处理什么场景呢?啊,这其实就是像我们前面提到的,比如说我们在做这个connect操作的时候,给大家提了一个。实时对账的需求,哎,那大就会想到实时对账这种状态的话,如果说我们想把对应的这一个功能啊,之前的这一个功能,想要把它换成一个join的方式,直接把它实现,能实现吗?啊,因为我们可能会觉得就是这种connect的方式,你得自己声明状态,然后两条流,一国两制,另外还得自己定义定时器,这个太麻烦了,窗口本来就是定时器啊,哎,我们能不能用窗口把它包含在这里边呢?哎,这个大家会发现显然是不行的,比如说你如果要是定义一个滚动窗口,那就不用说了,对吧,滚动窗口其实也是一样的啊。比方说你定义这样的滑动窗口的话,那么如果说我在一个窗口里边,第一条流的这个数据来了。
01:06
那么在另外的一个位置。当然在这个位置,它就都不在这个窗口范围内了,那如果说这个一它可以在这儿这个位置的话,这相当于是没有包含在第二个窗口内,对吧?诶然后二的话有可能在这儿。因为大家想到啊,如果说我们去创建一个窗口,比方说我们要求它这个五秒钟之内,你必须找到这两个事件,那自然我们就想到了我当前的这个窗口长度,我就开一个五秒钟的窗口,从头到尾这是五秒钟,只要我们这个在五秒钟范围内能够找到对应的两个数据,它俩匹配上了,前面我们不是窗口照应吗?照应上了,那么接下来我们就可以。把他们输出说正常匹配,那如果要是每一个窗口,哎,我当前就是找不到的话,那就我就直接说他不匹配不就完了吗?啊这个其实不对啊,大家看到就是如果说我当前这个是五秒的窗口啊,然后比方说。
02:06
你你如果要是滚动窗口的话,那就更不用说了,我两个数据可以跨窗口啊,跨了窗口的话,这个就没有意义了嘛,那大家自然想到我可以窗口重叠,我来一个滑动窗口,比方说在这里我滑两秒钟来一个窗口。那这里比方说是零到五秒一个窗口,那自然第二个窗口就是二到七秒了。好,现在一个问题就来了,如果说我第一个数据是1.5秒就来了。然后第二个数据是5.5秒就来了。自然我们可以看到,第一个数据不属于第二个窗口,只属于第一个窗口,第二个数据呢,只属于第二个窗口,不属于第一个窗口,它俩不会在任何一个窗口里面匹配起来。但是我们知道他俩的时间间隔只隔了四秒钟,完全符合我们的需求啊,所以这个其实就做不到嘛,哎,那你这个是间隔时间滑动的距离不够短,滑动的距离太长了,如果说我要是隔一秒钟就来这样一个窗口的话,那是不是就会更好一点呢?
03:09
大家注意啊,这个隔一秒钟,即使是你把它调小也没用,因为我们知道这个长度是五秒,那我总能找到一个位置,就是刚在第二个窗口还没开始的位置和第一个窗口刚刚结束的位置。这两个位置取两个数,然后他们之间的间隔是小于五秒的,对不对,那总能这样取到,除非除非怎么样,你隔一毫秒就开一个窗口,那大家想这个就太复杂了是吧,隔一秒钟开一个窗口,窗口太密了,这这个对于性能来讲,这是一个灾难,哎,所以正常来讲,我们一般不会用窗口连接来处理这样的事件。那大家就会想到了,那我也不想就是每来一个数据就定时器啊,这个process function,总觉得这个大招太麻烦了,还得自己定义那么多状态,那有没有更加简单一点的,是可以基于数据,然后去后边去,就像追加一个定时器一样,然后去等一段时间呢?能不能这样呢?不要把这个窗口结束开始和时间把这个窗口确定死,不要这样这样确定,我们只是要一个基于数据后面一段时间的一个触发而已。
04:15
那现在给大家提到的间隔连接,其实就是这样的一个东西。所谓的间隔连接interval嘛,大家看这张图就知道了啊啊,这张图其实就是官网上的一张图,它就是针对我们这里的每一个数据,一条流里边的每一个数据。他呢?相当于前后去打开一个区间。啊,就是这个时间戳这个位置啊,前后打开一个区间,在这个区间范围内的另外一条流上的所有数据都跟他做一个一一的配对,他们都是匹配起来的,这是连接起来的一个状态。啊,当然了,对于这个间隔连接呢,也要指定K对吧?啊,也要有这个就是K相等的,然后再连接,这才有意义,要不然就没意义了嘛,啊,所以它是在K相等的前提下,而且还以一条流上的数据为基准,开了一个时间区间,然后去做这样的一个匹配连接,所以它就叫做间隔连接。
05:14
Total join啊,那这里边的这个区间定义呢,很简单,就是一个下界,一个上界,Lower pond和upper pond,那具体来讲匹配的条件大家看就是这个不等式。比方说A要。Interval join b的话就是如果要A去join b的话,那么A的A里边元素的这个时间戳我们叫a.time stamp,然后B里边的流里边的这个数据的时间戳叫b.time Sam,那匹配成功,匹配的要求就是。b.time b上的这个元素啊,它的时间戳范围要在。A上的时间戳加上下界到加上上界之间。
06:01
啊,这就是所谓的间隔连接,就是下界到上界之间啊,从这个图上看的,大家看的就非常明显嘛,下界就是从二这个位置减一个数减过来,然后上界呢,就是从二这边加一个数加过来,这样的话就可以把这一段范围比方说啊,这是两毫秒,两毫秒前后的一段区间内的数据全。查询一遍,然后匹配起来。大家看到就是我们这里设的这个下界,因为它都是加嘛,所以你如果要是在他之前的数据也能匹配的话,那应该设置负二对不对啊,所以现在就是下界上界是负二一啊,然后接下来我们就看到了零这个数据,负二一的话,那它可以。截取的这个上面这条流里边的数据应该是负二到一的时间圈都可以,它匹配到的是零和一,所以是0001啊,这是两组匹配,那同样后面二这个数据它匹配到的是。
07:01
零啊2021,它的范围是零到三,零到正三,那同样后面这个三这条数据它的范围是一到四啊,区间长度是三,那么它匹配只匹配到一个数据一四没有匹匹配到数据,那这就没有了,五的话,诶是在它的这个上界这里啊,后边来了一个六,匹配到了五六啊,那同样七匹配到两个数据7677。这就是间隔连接的匹配模式。好,那接下来我们看一下这个间隔连接在代码里面怎么调啊,跟端口连接非常类似,间隔连接也是固定的调用方式啊,我们直接在源码里面看一下啊。还是data stream,大家要注意data stream这里边的drawing啊,只有这一种方法,就之前我们那个window draw那种方法,只要它调draw,大家知道就必须一条道走到黑,只能是window draw了,对吧,后面必须开窗,那如果要是这个internal draw,怎么draw呢?哎,它是先。
08:00
必须先基于一个ittream,然后去掉。大家可以看到。目前有一个join语方法叫做interval join啊一看这就是间隔连接的专有的方法,对吧?啊,那这里边本身是k stream调用传进来的也必须是另外一个kid stream,这种调用方式是不是很像我们之前connect?大家还记得我们当时定义这个KBY的时候,有两种KBY方式,可以你先连接起来之后再去KBY,这就有点像interval drawing的时候,先去drawing,然后再去where e two,再去确定这个K的连接。或者另外一种方式呢,就是先k by k stream,再去connect,另外一个k stream。那我们现在internal drawing的方式呢,就有点像这第二种internal drawing,它只能这么写,就是先基于k stream,然后去internal drawing,另外一条stream。得到的是一个interval draw内部类,这个inter draw呢里边它可以去。
09:02
确定它到底是因event time还是因processing time啊,当然默认肯定是time事件时间啊,就可以指定当前的这个时间语义,然后当然这个只是指定而已啊,它的真正意义上的调用,那当然就是。这里between只能接下来做这个操作,Between指定当前区间的上下界,Lower bound upper bound,得到的是一个interval drawing的,大大家注意啊,现在是interval drawing,然后它经过between之后就是interval drawing in区间啊,变成了inter drawing的,又是一个内部类,我们来看一眼。它里边又能调什么呢?这里还可以有一些设置啊,大家看lower pound exclusive,然后upper boundund exclusive,这是什么意思呢?之前我们看图的时候大家不是看到了。就是大于等于小于等于啊,就都包括这个点的,只要是在这个点上的都可以匹配,那exclusive大家知道意思就是不包括对吧?啊,所以就是下界可以把它刨除掉,就可以开区间不是闭区间了,原先那个都是闭区间啊啊这两个是可以刨除掉的,然后另外呢,能掉的就只有只剩下又只剩下一种方法了。
10:15
Process。好,这里面的process啊,哎,这样的话,我们就又看到了process function处理函数家族里边的另外的一个处理函数,就是传说中的process drawing function啊,我们看一下这个东西到底是何function社啊,它是一个obstract rich function对吧?这个跟process function的呃特点是一一样的,它是一个抽象类,然后里边的方法呢,只有一个process element,然后还有一个context啊,这这跟这个处理函数的特点是一样的啊,然后里边呢,它就是一个left,一个right啊,第一个流里边的数据,第二个流里的数据,这跟那个draw是不是很像啊。对吧,它只是比那个drawing多了一个collector,用这个al输出的话,这就是flat draw function,那它比flat draw function呢,又多了一个上下文而已。
11:01
然后在这个上下文里面,你看我们又是这些可以获取left和right的时间戳,然后可以获取当前join之后这个对的时间戳,哎,另外还有一个output,哎,大家看这里也没有type service对不对?哎,没有相关的这个再去定义定时器的这些操作了,所以他能做的事情其实也是有限的,比较像process function,但是呢,不像真正的kid process function那样可以去呃定义各种定时器什么的操作啊,所以我们说它更像一个drawing function啊,有一些process function功能的drawing function。那在具体调用的时候,大家看这里的文档当中的这个调用啊,啊也是固定套路,就是这样的,STEM1KBY之后啊,然后inter drawing stream2也要K2个k stream inter drawing,然后指定between它的区间,然后第process传一个process draw function就只能这么干啊,所以接下来我们就可以给大家看一看是不是我们之前的那个Bill check啊,我们如果想要判断。
12:08
他这个对账的话,是不是可以用这个间隔连接来实现呢?哎,那确实是间隔连接明显实现这个需求更好一点,我们当时不是一这边啊,APP event要等他五秒钟,这边又不等吗?哎,那大家其实就知道了,那不就是说第三方支付平台的那个event那个事件可以时间戳比我们APP event这边要大五秒嘛,所以如果我基于这个APP这条流的话,去interval join另外一条流,那开的时间范围其实就是零到五秒嘛,区间就是零到五秒,然后去做一个interval draw,但是。这个有问题,为什么呢?因为这里大家也看到了我们输出的结果。只有正确匹配上的数据。那所以也就是说我们这里的这个process join function里边只能拿到匹配的,对,然后你输出一个匹配成功。
13:03
如果要是不匹配的话,反而这里就什么都没有输出,这个其实不符合我们要求的,这里边这个join引它也确实是就像我们两个表做内连接一样啊,就是不匹配的那些,它直接就给你扔掉了啊,所以大家会想到,那那你这个如果要是不能把它拿出来的话,那我们关键就是想要报警啊,我们做风控,其实你真正匹配成功的反而不是那么重要,我们更关心的是不匹配的啊,所以之前的这个例子,大家看我们用connect实现还是有道理的。啊,那间隔连接它还有什么样的场景可以用到呢?啊,比如说我们这里又可以给大家举一个例子,就是说在电商网站里边啊,啊经常可能会有这样的需求,就是说一旦用户有一个用户下了订单之后啊,下了一单之后,然后我们就得看一看到底是什么原因促使他下了单,诶那我就要去找一下最近一段时间内啊。他浏览了哪些页面,做了哪些动作,把其他的数据跟他合并在一起,那这样的话就可以连接在一起之后,可能就能分析出来一些这些用户的行为习惯,就能做用户画像,就能给他做这个实时推荐啊这这些都是比较有用的一些信息,所以接下来我们就看一看这个例子啊,怎么样用这个internal draw来实现,因为大家知道最近这个一段时间范围内浏览的数据,有时候我们可能关心他之前,可能更关心他之前浏览的,有时候可能也会关心他,诶你下了订单之后,呃,还会再看什么呢?诶,所以这个可能是一段时间范围呢,之前之后的一段时间范围,所以接下来我们来测一下这个in her to test。
14:42
这整体来讲也非常的简单。我们先把这个创建出来啊。就这两个流先创建出来。然后下边是env execute。好,先把这个主体流程放在这儿,然后我们现在要定义的呢,其实稍微有点不一样,就是我们其实一条流啊,不需要这个。
15:05
就是我们只要有event就够了,就他的那个有它那个点击浏览信息嘛,Click嘛,那所以这里边我们也是直接用之前的那个click数据啊好,我们这里可以给一些click数据。好,这然后这里当然也要改成。后边把这个改一下。New,哎,把这个写在这啊,然后elephant are time Sam把这个提取出来,这个我们就改一下,不叫STREAM2了啊,我们这个叫呃,Stream。其实就是click对吧,Click stream。然后接下来上面应该还有一个order stream就下了一单,这个数据我们就没有了啊,所以这里边我们干脆也是啊,定义一个对应的这样的一个二元组吧,啊,那这里边我们给一个比方说Mary,对吧,Alice。
16:00
呃,这个Alice丝又下一单carry,好,我们有一些这样的一些数据,然后接下来就考虑一下你该怎么做这个操作了,好,接下来我们看一看,其实我们如果要是想测的更真一点,你也可以把这个数据就变成啊带几分钟一小时的那个数据,只不过就是把这个时间戳调大一点而已啊,因为这里是这个订单流,我这儿就不再加其他的辅助信息了,那你肯定就是单独把那个订单日志提出来构建的一条流嘛,啊,那这个就没必要再说它一定是一个订单了啊,这样的话我们就可以看到对应的信息。然后接下来我们就把两条流做一个连接。All the stream,然后去,哎,大家注意啊,要先KBY啊,那就KBY,当前的那就只有user喽,我们的F0是user,然后去interval join,另外一条流叫做,呃,后面那条流我们叫click。一拜。
17:02
当前的user。每一个用户接下来特之后只能between,哎,那between这个范围这就由我们自己来定了啊,比方说我们是之前十分钟的和之后五分钟的啊,那现在我们数据肯定没那么大,这个order之前的话,这都没东西了啊,我们稍微大一点吧,给这个比方说这个是从五开始啊。那我们这个从二二十个51,那我们这里边也稍微大一点啊,后边我们这个就是三,从这个爱丽丝的最后一个数据开始吧,36000对吧。好,我们把这个写在这啊,然后接下来。Between的话,比方说我们就给一个之前的。五秒钟到之后的十秒钟,哎,这样的话也是可以定义一个范围的。尽管实际好像没什么用,只是给大家做测试啊,定义这样的一个范围,那大家会看到这个意思的话,其实不对啊,这样的话,这相当于是加,那这就变成什么了,变成下订单之后,之后五秒钟到之后十秒钟这个范围了,我们是要他之前五秒钟,所以那是负。
18:13
然后再后面大家看到只有process了,然后里边我们来一个process drawing function啊,那最后得到的是一个资讯,我们做一个输出,大家可以看一下,这里边我们写这也是process function,就像这个process function一样,然后里边是一个process element啊,那我们这里就直接可以用out.collect做一个输出了,做一个stream嘛,那这个也简单啊,我直接把这个left和right做一个,做一个匹配就完了呗。啊那比方说啊,我们可以就是之前有一次点击嘛,所以应该是right啊。匹配到的这个,然后我们说他这个行为。可能会导致。After,我们这个订单。就可以得到对应的一个做一个print打印。
19:02
好,那我们来运行一下看一看啊。可以看看就是呃,我们这个随便定义了一一组这个时间范围啊,看看它能检测到什么样匹配的行为哦,大家看到这个爱ice丝啊,他这个浏览商品100的这个行为,零三秒的这个行为啊,导致他五秒钟下了一个单啊,那另外他还浏览了ID200的这一个这个product,哎,导致3.5秒的时候这个浏览有点快啊,0.5秒一个同样也是这个下五秒钟单之前做的一个事情,所以这两个浏览可能跟他的这个下单行为有关系啊,那另外就是你看这个B啊啊Bob这个他看了一下这个home这个页面啊,30秒的时候,然后呢,呃,20秒的时候是下了一个单的啊,然后30秒的时候。回到了这个home页面,可能会有一些关系,然后他23秒的时候看了这个product ID是一的这一个页面啊,所以大家看这个就能把它们之间的一些关联关系啊都能找出来,诶这就是这个interval join的一个应用。
我来说两句