00:00
在上节课的这个实现过程当中,大家会发现我们处理两条流是用了一一种什么操作呢?是用了connect的方法,直接把它连接在一起,呃,然后去用了Co process方式,然后直接去操作这两条流同时操作,对吧?然后用状态编程的方式,让它们互相之间有有连接,有联系,然后又用了定时器触去触发我们判断的那个输出报警信息,这里边的这个过程大家会想到在实际应用当中,除了这种方式,看起来也不容易,对吧?啊,这么简单的一个逻辑,好像代码量还挺大的,那有没有更简单的连接两条流,就是所谓的双流join的这种操作呢?对啊,既然提到双流join,大家就知道在flink里边有另外的一种方法去连接两条流,什么方法呢?就是所谓的draw啊,这里边给大家简单的照着这个官网的介绍给大家提。
01:00
一下,那么在flink的这个draw过程当中啊,它主要有两种draw影,什么draw影呢?Window draw和后面还有一种叫。叫Joy。啊,所以顾名思义这个window join就是是不是要基于窗口,就是join之后要做窗口操作啊,所以大家看这里边它本身的这种方式是什么呢。大家直接看这个代码实现啊,代码实现的过程当中,其实就是直接点join当前的这个stream去join其他的一个另外的一条流,两条流撞印在一起,然后后边干什么呢。Where什么e to什么啊,大家看这个是不是很像CQ里边一个where条件什么等于什么对吧?啊,什么的ID等于什么的ID对吧,有点像这种感觉一样,所以这里边其实就是做我们的那个的选择对吧?就是选出对应的那个K来,要按照同样的K去做draw,然后接下来怎么做呢?啊,既然是window draw嘛,接下来直接点window开一个时间窗,这就跟我们开时间窗口的那个那个用法是完全一样的,对吧?你可以time window,可以开滑动窗口,可以开滚动窗口,也可以这个开session窗口啊都可以,最后再应用一个apply。
02:25
Apply这里要实现的是一个所谓的drawing function。那具体来看,这个window drawing里边分了又分了这么几类,首先啊,这个滚动窗口是不是可以drawing啊,啊大家看一看这个滚动窗口drawing是个什么意思呢?就是如果我们指定了这个数据里边的那个那个K的话,然后它的draw应是什么?当前划分的这个窗口里面滚动窗口嘛,呃,就是头连尾,尾联头对吧?呃,就是中间没有这个,每一个数据都分配到一个窗口里面去,那么它撞印的结果,上面这条绿色的流和下面这条橙色的流,当前这个窗口里边撞晕的结果就是。
03:05
大家看就是00011011,是不是相当于做了一个笛卡尔机啊啊,所以其实就是当前按照相同的T把它摘出来之后,只要收在这个窗口里边的所有数据两两配对输出啊,其实就是这样的一个draw做了一个笛卡耳机,那同样后边如果这个呃,绿色的绿流里边只有一个三的话,那这里的输出就是2333啊,所以大家看到这相当于是谁撞影谁啊。这个应该是橙色的drawing,绿色的对吧?所以得到的结果是不是就是这个橙色的数在前面,绿色在后面对吧?啊,所以最后得到的这个数据呢,就变成了这样的一组数据,大家看一下具体这个代码实现,比方说前面定义了一个呃城一个一个data stream对吧?然后绿流又是一个data stream,然后我们可以where,我们可以这是不是可以提取它的T啊,对吧?这个大家知道是那个呃,把这个元素直接写开的这样的一个提取方式啊,你如果简写的话,如果只有一个字段的话,可以用下划线啊,这个其实是一样的,然后e two,那前面这是城流里边的K的指令,后边是绿流里边K的指令,那相同的K是不是放在同样这里边,是不是指定了一个滚动窗口啊。
04:29
啊,这这是这个滚动窗口的那个完整的那个写的表达式,大家知道,如果这里边你直接写time window是不是也一样,对吧?Time window就是这个的缩写嘛,所以这里开一个滚动窗口,然后把每个窗口里边啊K匹配上的那些元素取一个笛卡尔机两两配对,最后再做一个apply,那大家看这个apply最简单的所谓的装方式是个什么呢?其实就是拿到对应的两个元素,是不是最后你指定输出什么玩意儿就可以了,这里边最简单的一个输出是不是直接给了前面这个元素逗号,后面一个元素连接在一起输出就完事了。
05:10
啊,所以大家看,其实就是这样简单的一个操作啊呃,这是这个滚动窗口,滑动窗口是不是类似啊,大家看滑动窗口就是你在不停的往后滑嘛,同样还是每一个窗口里边收到一起的,同样K的这些元素,是不是挨个做一个笛卡耳机啊,就还是这样的类似的东西啊,啊这个,所以大家就是知道这个原理,就知道它怎么用了,所以如果我们出现什么样需求的时候,可以简单的用用这种方式呢。就是两条流要做join,后边还要去开窗口对吧?做窗口的分析啊,分析这里边的一些统计的数据,那最简单的方式是不是就应该是做这个window drawing啊啊,那那跟我们当前的这个需求可能不太一样,我们现在是做这个就是两条流只要匹配上就算并跟那个时间窗口没有任何关系,呃,那但是我们没办法用window draw,但是可以用什么呢?下面给大家举一个例子啊,当然这里还有session window draw对吧?那同样的啊,可以用下边的这一类interval join interval的意思是间隔对吧?呃,隔一段时间间隔这样的一个drawing,这个又是代表什么含义呢?大家注意啊,这样的一条绿流和黄流,呃,这个城流要做状移的时候。
06:34
它可以传进来一个一个时间间隔区域,这个间隔区域就是一个一个头一个尾,对吧,一个区间相当于是区间draw,这是一个,那这个区间代表什么含义呢?比方说这里边这个大家看这里边这个城流相当于是指定了一个什么呢?指定了一个lower bound,就是一个下限,对吧,然后又指定了一个upper bound,指定了一个上限,这中间是不是相当于是一个区间,一个间间隔啊,如果指定了这个区间的话,城流照影绿流的时候,它的行为就是我去发散这一段,这个区间发散出去,只要被我这个区间包含在里边的数据就跟我一起合在一起输出。
07:21
合在一起变成撞起来的那个状态对不对?所以大家看现在这个状态相当于什么,它的upper bond,就是大家看这是负二对不对,减二,然后upper,呃,这个lower bound啊那个那么上线呢,Upper bound就是加一对不对,所以这两条流如果对应上的话,当前二这个数据能跟它匹配的数据是不是就是0123这些数据可以啊。对吧,呃,这里大家注意啊,这个这个上下限它都是都是B的左臂右臂对不对,都是包含的,呃,这个大家跟我们那个窗口不太一样,它的这个定义是B的time stamp大于等于a time stamp加lower bound小于等于a time stamp加upper b对吧?这样的一个状语的原则。
08:11
所以得到的这个状语起来的结果是什么呢?这里边啊,这里只有绿流,里边只有零和一,那它就变成了2021对吧,所以是这样的一个draw语过程,这叫interval draw影,有些地方把它翻译成间隔,呃,Draw影有有些地方间隔连接,有些地方把它翻译成这个区间连,我觉得区间连接可能好像听着更更符合这个实际情况一点,对吧?啊,这个大家就如果要是说有人问到的话,直接说这个inter转就可以了,呃,那那大家会想一想,在这个过程当中,不管是window draw也好,还是这个interval draw也好,他们的中间的这些这些在join语的过程当中,是不是都得保存一些数据啊,大家想想是不是都得保存一些数据,那这些数据放在哪里呢?这些数据是不是肯定都得当做状态保存下来啊,所以它的原理其实都是把这些东西当做状态存起来了,呃,所以在我们这个需求里边,大家看是不是可以用这个internal状语这种方式来啊。
09:15
相当于我定义一个区间对吧,当前我的那个order pay来的这个事件呢?呃,比方说它的前后都有可能就是另外一个那个到账流是不是有可能在前,有可能在后啊,我只要定义这个区间,是不是就可以把那边留数据里边的那些东西拿出来跟我这边做照应啊,啊,这个其实是比较简单的一种实现方式,那大家看看这个具体代码里边怎么写啊,具体代码里边其实也简单,就是城流orange stream.kby啊,这个是先做,先做这个分组对不对,按照哪个K来做,然后接下来是不是啊,它不是直接draw,直接是用了一个interval join这个方法,Join了一个green stream k板,你你得指定相同的K才能join,对吧?啊,所以一定要把这个K指定好,接下来它掉了一个between。
10:12
Between这里边代表的什么含义?对,这就是我们说的传进去的那个上下线,那这里边它传的就是下限是负二,上限是一,是不是代表乘流里边当前那个时间戳,减二加一范围内的绿流里边的数据都合在一起啊,都跟它合在一起,对吧?啊,所以这其实就是收集多,可以收集多个数据跟它做匹配这样的一个状态,呃,当然我们在当前这个order order pay的这个例子里边呢,我们只有一个数据,其实呢,好像就也没有必要用这种方式,对吧?那用这种方式其实也可以,那接下来我们就用这种方式做一个处理,大家看后面是不是他还要做一个操作啊,这里边做的是一个process drawing function,对吧?那这样的一个东西,那我们看看,如果要是用这种方式写一个代码,我们怎么写呢?
11:14
重新去扭一个object,我们定义一个叫transaction match,这个应该叫BY对吧?好,这里边样域类不用再定义了,然后接下来主函数里边其实是不是都能从这里边抄啊,对吧?两条流的数据还是直接从这里边拿过来,先把这个可以读到这儿啊,订单事件流和支付到账事件流都把它引入。呃,大家看一下这个是这个对了对吧?呃,然后接下来我们做什么事情呢。
12:00
接下来直接不要做connect了,而是做什么呢?这个做这个draw处理啊,定义一个process stream,是不是可以order even stream直接去,因为已经做过批了,对吧,而且这个字段确实是一样的,我们就是要去把它们连接的那个字段,然后直接就可以调一个interval join,对吧?啊,然后接下来里边要传的是一个receipt event stream,然后接下来是不是可以指定一个between,一个时间范围啊,那这里边还是啊,时间范围就要根据具体的。具体的数据的情况来去做调整了,我们这里就还是给一个比方说对负五减五加五对吧。点sentence。五哎,直接给一个给一个这样的一个区间,然后接下来对process这里边要去扭一个transaction pay match,我们说叫BY对吧。
13:16
然后接下来最后,诶,大家会发现在这个里边。但大家想到在这个里边我能还能输出测输出流吗?啊,这其实是这种方法的一个缺陷,就是什么呢?我是不是只能把符合这种条件的事件包括在一起,把这个事件提取出来,而并不能找到只有它没有对应数据的那个那种事件啊,对吧,这个draw是不是像那种情况就丢掉了呀,所以在这里边我们是如果只要符合这种条件的这种情形的话,是不是用这种方法就很合适。啊,在我们这个需求里边,如果还想要真正的报警是不符合的内容的话,其实是还是用之前我们connect的方法会更好一点,所以这一部分只作为一个扩展给大家做一个讲解,啊,大家知道还有这种处理方式,呃,那这里边我们最后的输出当然也是process stream,直接print输出就可以了。
14:16
AB。呃,这里边我们叫做transaction pay match by join对吧?那最后我们就要实现这样的一个transaction match by transaction pay match by这样一个自定义的process draw function对吧?好,我们看一下它需要去实现的接口是process draw function这个,同样这里传的是不是也是INPUT1INPUT2AL,跟我们前面那个Co process方式其实是很像的啊。所以这里边的input,一是order event input,二是receipt event,然后输出我们还跟前面一样的话,配对输出一个二元组,对吧?Receipt event,这里边需要去实现的一个方法,大家看还是process element,因为process function嘛,只不过它是process draw function。
15:24
那这里边我我们看到能拿到什么东西去做输出呢?是不是非常简单,一个left一个right呀,然后因为是process function,还可以调上下文,我们可以定义定时器之类的东西,对吧?拿到呃,时间戳water mark之类的东西,最后还可以out去做输出,那我们这里是不是直接out.collect输出就完事了,只要一个元组,二元组,那这个二元组是不是就是left和right呀,大家想想是不是这样?所以用这种方式看起来就就更加简单了,对吧?呃,所以大家可以掌握一下这种方式,但是在这个例子里边,用这个双流drawing的这种方式internal drawing似乎不是特别的合理,因为我们想要的并不是就是正常匹配的输出,我们想要的是没有匹配上的那些检测出来报警对吧?那大家想想这种,呃,这个cover join影在什么场景下?
16:24
应用的最广泛呢?其实比如说大家会想到之前给大家讲那个传感器例子的时候。什么时候做火警报警呢?可能温度要出现异常,另外是不是还有烟雾报警器要出现异常啊,那这个时候是不是,诶温度那边一条流来了,然后烟雾报警器是不是它俩应该时间得匹配上啊,得同时在一个范围内同时出现温度又又升高的很快,然后这个烟雾这边,呃,又又有很大的这个烟雾浓度产生,那这个时候才做火警的报警,那大家想是不是就非常适合用inter状,用这种方式两条流状一起来去做处理啊,所以在这种场景下,我们检要检测匹配的这种数据的时候,可能就比较合适,而我们现在要处理的是不匹配的事件的时候,呃,可能用这种方式呢,就会有一些欠缺啊,这是给大家对这一个模块做的一点扩展。
17:23
好,那如果讲到现在的话,我们整个项目就都已经讲完了,大家可以回顾一下,在整个过程当中,我们其实主要实现的这些指标和这些拈其实分为两大类,一大类就是统计类的,那统计类的话其实实现起来非常简单,往往都要开时间窗,对不对?呃,这个套路就是你先创建,当然前前面先创建环境,读取数据,做一些基本的包装转换,Map,然后filter,然后可能要按照某个字段做分组,之后开窗做聚合,对吧?这就是标准的流程,如果遇到一些比较复杂的,呃,这个相对来讲比较麻烦一点的这个需求,如果要排序,要做这个top n的话,那可能我们就再做一个process方式,对吧?把所有的数据收集到做一个排序输出,呃,这是统计信息的这个管理和计算,那么还有一大块儿呢,其实后面这部分。
18:23
就是我们要针对业务流程里边的一些状态做检测和输出警告,呃,那这里边我们讲到的包括这个,呃,从这个广告分析里边的黑名单过滤开始,对吧?当时我们一般情况都是要,呃跟时间相关的,那就要用到process function定义这个定时器,呃,如果要是正常的处理逻辑处理流程,那么我们肯定要用到状态编程,自定义一些状态,所以整体来讲的话,就是这样的一套处理规则。那对于统计这类的,统计这类的指标,如果大家不想用这个data stream API,想用更高级的API的话,我们可以用tableable API和flink c对吧?呃,把它直接提取出来,直接做一个聚合就可以了,而后边这一部分如果是对事件逻辑啊,对这个风控进行管理的话,往往我们可以干什么,是不是可以用CP这样的复杂事件处理去做定义啊啊,这是给大家讲到的所有的内容,大家可以做一个回顾和总结。
我来说两句