00:00
前面呢,我们使用filter这样一个基本转换算子,非常容易的就实现了分流操作啊,那这个过程确实是非常简单,但是如果我们仔细思考的话,就会发现这里的filter其实本质上来讲不能叫做一个分流操作啊,我们这里其实是针对同一个data stream进行了多次转换处理,它的本质上呢,其实是把这个data进行了复制,复制成了多份,然后针对每一份做了一个筛选操作。所以它的处理效率显然是不够高的啊,那我们自然就想了,能不能直接基于那STEM调一个方法,然后里边就可以定义好我们的筛选逻辑,就可以实现分流操作了呢?当然是可以的。在早些版本的flink当中,Data stream有一个方法就叫做sweet啊,顾名思义它就是切分嘛,就是把当前的一条流拆分成多个,这就是一个标准意义上的分流操作,那它的原理是什么呢?呃,本质上就是在里边啊,我们就要实现一个if else的判断,对当前的数据进行筛选分割,诶,那这里边筛选出来之后要干一件什么事呢?就是要对当前的数据。
01:17
给它盖一个戳,打一个标签啊,这个标签当然就是一个字符串类型的一个值了。对应的标签如果盖上去之后,那就相当于诶,这些数据可以拣选出来,提取出一个相应的硫啊,那所以我们就会发现呢,就是盖一个戳,对应的就提取一个流,这样的话就可以得到实现一个分流操作,这个逻辑本身还是比较简单的啊,不过呢,哎,它相对来讲不太灵活,而且跟我们平常所使用的flink其他的API风格不太一致,所以说现在这个方法已经要被弃用了啊,那现在我们真正意义上的分流操作应该怎么去做呢?那就是我们之前讲到的process function处理函数,哎,我们知道啊,Process function它是flink当中最底层的API,我们说它是大招,什么事情都能做,本身process function它其实就继承自abstract reach function啊,它本身就是负函数类嘛,所以负函数类能做的事情他都能做。
02:20
那其他它还有哪些是别的函数类啊,负函数类做不到的事情呢?一个就是前面我们提到的定时器啊,这个是它的独门绝技,只有K的process方式里边可以使用定时器,可以获取我们当前的时间相关的信息和操作,那另外还有一个就是我们说处理函数里边可以去定义测输出流,哎,这就是我们说处理函数的上下文当中。还有一个可以调用的方法叫做output,这个方法呢,就是直接输出数据到测试出流里面,那它输出的方式也很简单,之前我们已经接触过测输出流了,那要给他传递一个输出的标签,表示这条流到底是什么样子的啊,当然了,后面还得跟上对应的数据信息。
03:12
所以接下来呢,我们同样可以使用process function的特殊梳理去实现一个分流操作,这其实是现在link当中推荐的啊,标准化的分流方法。好,那同样还是这个例子,我们要按照用户去对于当前的数据流进行一个划分,那既然是有Mary Bob这样两个用户要单独的提取出来,我们干脆就定义这样的两个输出标签吧。定义输出标签。哎,那首先我们定义一个na ta。那这里就需要去new一个output type了,哎,那其实output type我们可以直接用它的伴生对象去获取它的一个实例,哎,那这样的话里边所要传入的啊,首先这里得有泛型,我们当前想要去获取的。
04:03
本身就是原封不动的数据类型啊,那当然了,直接给一个event就可以了,那假如说我们不想直接把event拿出来输出,我们想把它做一个转换,其实也是可以的啊,比如说我们event里面不就三个字段吗?我这里边直接把它包装成一个三元组也是完全可以的啊,那最后一个是长整型的time。里边需要给他一个标签,比方说这个就叫Mary ta。那同样我们还可以定义一个Bob tag。同样的定义方式output,哎,里边这里边类型可以跟Mary一样,也可以不一样单独定义嘛,诶那这里边我们就把它叫做还是三元组string string了。同样给一个ID。叫Bob泰。好,已经定义输出标签之后,接下来呢。我们这里是使用测书出流进行分流操作。
05:07
那上边这些我们就直接注掉了。接下来我们要做的操作是基于stream,哎,那既然是想要使用特殊出流,那只有在process方式里边才能够使用这个接口,所以我们就直接process,直接做一个处理函数的定义。在这里我们直接实现一个匿名类吧,哎,那所以这里就是process function,那我们知道process function里边需要给两个分型输入和输出,当前的输入是event。那输出呢,输出其实可以跟输入一样,也可以不一样,因为process function并没有这个限制,另外呢,可以跟分流之后啊,其他的流一样,也可以不一样,这个是完全没有关系的,这是我们当前相当于主流里边操作的输出。那这里边我们就可以定义成还是没有问题。
06:03
接下来必须要实现一个process element的方法,在这里面呢,当然就是各种判断就可以了,比如说当前的value.user如果要等于。Mary的话。那就把它输出到Mary对应的这个测殊出流里面来,哎,那我们看现在怎么样输出这个数据呢?使用的是Ctx.output这个方法,哎,这就是处理函数当中进行测殊出流发送数据的方法,那里边呢,首先传入一个me tag,然后后边就是我们包装好的三元组啊,那得是。value.user然后value.url以及value.time step包装好输出就可以了,那当然了,接下来我们还可以继续l if。如果value.user等于。Bob的话。
07:01
那同样道理,这里边也是把数据拆分成一个三元组直接输出,只不过这里给的标签就是Bob tap,这就是另外的一条,第二条流。那最后,哎,其实我们看现在就不需要像上面这样啊,单独的再把这个逻辑重复一遍了,我们直接else不就完了吗?哎,那剩下的情况都输出到第三条流就可以了,第三条流我们没有定义测试流标签,那怎么办呢?那其实我们还可以当前的主流,这也是一条流嘛,哎,我们完全可以就是定义一个主流的操作线,然后再定义测输出流岔开,其实他们都是完全平等的啊,所以这个分流操作,这感觉就是真正意义上在主流上岔开了分支。所以我们现在的主流干脆就是else吧,那主流的输出out.collect我们需要的是event类型,所以直接把value做一个输出就可以了。得到的结果,我们可以把这一条流就叫做l stream啊,那接下来我们就可以做一个打印输出了。
08:06
Re,直接做一个print。得到的就是除了Mary和Bob之外的所有用户的点击事件,那对应的Mary和Bob他们的数据对应的那条流又应该怎么样获取呢?哎,我们知道现在他们都是在测输出流里面测输输流的获取呢,调用的就是当前data streamam这个主流,它要调一个get set output方法啊,这个其实跟我们之前在讲到window API的时候啊,其他API里边我们可以把十道数据直接放到一个测殊出流啊,那这个时候如果要想去获取到当前的数据的话,那使用的其实就是基于。窗口计算得到的data stream去调一个get set out put方法,哎,所以这里边其实是完全一样的,只要获取测殊出流都是调用这个方法里边当然就是传入对应的output tab了,啊,那我们获取的是Mary的数据,那就传入Mary ta这里边打印Mary。
09:08
另外,我们还可以get set output Bob tag。打印就是Bob对应的数据,哎,这就是完整的流程,我们相当于就是通过一个process function里边的测输出流的方法,把一条流分成了多份。接下来我们可以测试一下,看一看效果跟之前是否还是一样。啊,那当然了,现在我们的数据呢,如果是主流里面的数据啊,Else的话,那不变还是event,如果是Mary或者是Bob的话,我们就会看到啊,现在是已经把它转换成变成了一个三元组进行输出了,那依然还是分成了三条不同的零。这就是现在比较通用的啊,推荐大家使用的分流的操作。
我来说两句