00:00
我们对于处理函数process function,它的所有的特性就都已经讲解完了,这里还有一个测输输流没有专门的做讲解,其实我们也已经知道啊,就直接是里边不是给我们提供了一个ctx吗?一个上下文context嘛,它有一个方法叫output,那么我们用这个output方法就可以把数据发送到一个测输出流里面去,那当然了,如果说我们想要拿到测输出流里面的数据的话,怎么拿呢?呃,之前我们在window里边也已经接触过测试入流的用法,只不过当时是side output later data,只有迟到数据放到特殊入流那拿的时候其实是一样的,拿的时候调get set output这个方法,直接用这个方法就可以把它取出来。呃,这样的话我们传入的是输出的标签output tag,所以一般情况下我们会单独的把这个输出标签定义出来。这里我们输出数据到测出流的时候,要把它放进去,获取测出流里边的数据的时候也要传入它,那上面output还有第二个参数,那当然就是对应的数据了,到底把什么数据扔到这个测试里,要有这样一个用法,那这个一般用来干什么呢?
01:11
测输出流,可以做一些除了主流运转逻辑之外的想要输出的一些操作,可以把它扔进去,一个最常见的或者说最典型的应用就是可以用它来做分流操作。啊,这就涉及到了我们下面的第八章,第八章就是多流转换,所谓的多流转换,前面我们讲到的所有API大家发现了啊,不管是简单转换啊,还是聚合,对吧,或者是基于窗口去做计算,或者是直接process function,前面我们提到的操作都是针对一条流上的数据做的。那在实际应用的时候呢,有时候我们可能需要把一条流里面的数据拆开,拆成好几条流,也可能需要呢,把这个多条流的数据连接在一起,合并在一起去做处理,这就涉及到了一条流拆成多条流,或者是多条流合成一条流,分流合流两大类操作,这就是我们所说的。
02:10
多流转换啊,那根据不同的需求呢,调用的这个算子也是不一样的,调用的方法也是不一样的。对于这个分流操作,一般情况是测输出流来实现的。之前会有一个方法啊,叫做Li,基于一个data stream可直接调这个Li方法点Li,它就是一个标准的分流操作,但是呢,它的应用范围比较小,然后用起来呢也不够灵活,可以被测出出流全覆盖啊,所以现在基本上也就不提这个split了啊,要被弃用了,所以我们就直接是用侧输出流来做一个实现。而河流呢,分子就比较多了,常见的有union connect join啊,另外还有一个叫Co group啊,这些都是能够合并两条流的一些计算,那后面我们会讲一些典型的应用啊,首先给大家说这个分流,分流就是把一条流拆成多个对吧?啊,所以大家可能会想到就是说拆多个流,这个还用算子吗?这太简单了,比方说我们可以怎么做呢?大家看一下这个代码,这个代码我们就不实现了啊,因为太简单,他要做一个什么样的需求呢?就是比方说我们对电商网站收集到的这个用户点击的行为做一个拆分,根据类型不同分为。
03:30
什么类型呢?不同用户的浏览数据啊,所以就是其实就是按照这个user做一个划分啊,做一个分支,如果是Mary的话,那分出一条流来叫Mary的浏览数据,如果是Bob的话,分出一条流来叫Bob的浏览数据啊,那当然了,别的的话,我们比方说再放一个叫叫其他人的浏览数据,对吧,那大家看一下。最简单的方式是什么?那大家想,你既然是按照这个用户名去做筛选吧。这不就是一个filter吗?
04:01
啊,所以我们其实很简单,就是来了一个stream,然后filter filter,如果是Mary的话,得到一个marry stream。那大家知道我们这是一个dag嘛,我可以低于一个流来的一个流啊,读取数据源读进来之后我可以做不同的操作啊是吧,前我们讲的是那个分出来都是并行操作啊,现在我这个不是并行操作,我这就是直接针对这个流不同的操作可以不可以呢。大家想完全可以啊,DA不就是这个意思吗?可以做拆分的,也可以合在一起的,哎,所以我就干脆用不同的两个filter。来做一个筛。Mar的浏览信息,一个是Bob的浏览信息,这不就完了吗?那当然还可以有一个,另外有一个filter啊,那那提取那个其他人的信息对吧?那当然其他人的信息,这个就稍微麻烦一点,就是既不等于Mary又不等于Bob的时候,这个时候诶,我们把它筛选出来,进入到这个else stream里边啊,这个大家很容易想到这样的一个,呃,用用这个filter啊,做这样的一个实现,但是大家会发现这种方式呢,看起来是有点弱,它的问题就在于。
05:12
就这个底层是什么逻辑呢?底层是把我们原始的这条流复制了三份。啊,就相当于是这个流复制三份之后,然后用了三个不同的filter操作不同的任务,然后分别提取一个对应的流分出来,那其实大家想我们不应该这么麻烦的去做这样一个看起来本来很简单的一个分流提取分支的一个操作,能不能用一个算子就直接把这个分流操作搞定呢?啊,当然是可以的啊,我们知道之前有split方法,Split方法它有一个这个特点,它其实就是说先给这个数据盖戳,它其实也是啊,就是里边你给这个各种if条件。在一个算子里边直接if else if else之后干什么呢?给数据盖戳,盖了戳之后再根据对应的戳把想要的数据拣选出来,就可以得到拆分后的理由啊。所以这个split它是有两步操作的,就是先split做一个盖戳的操作,然后呢再来一个select,当然这个用法首先是它类型受限,就是能做的事情很局限,另外就是说它分了两步嘛,然后再做提取的时候呢,还是稍微有一点麻烦的,中间还加了一个特殊的这个数据结构speed stream,所以现在最新的版本01:13已经没有这个,彻底没有这个SP方法了。
06:38
然而代之的就是你直接用底层处理函数的测殊输流就好了,就完事了啊,所以大家从这个角度看的话,底层API process function你必须还是得知道的啊,要不然的话,你特殊流都不会做,不会分流,这个也不行,你不能一上来之后全是这种,呃,直接复制一个流,然后去filter这种做法啊,这种这种做法不可取啊,那所以用测输出流怎么样去做这个,呃输出呢?那其实这个思路啊,跟split那个盖戳思路是非常类似的。
07:10
它就是用到了所谓的输出标签auto t,那大家想这不就是一个戳吗?啊啊,这个其实是非常类似的方式,但是它的好处在于它的类型可以非常灵活。都可以不一样,对吧?啊,我当前的这个outut t指定的特殊流类型可以跟原始的这个数据啊,主流可以不一样啊,就是你像之前我们那个分流,大家可能认为我应该是本来一条流过来,然后这儿啊岔开分支,那现在呢,就有点像是一条主流顺着往下走,那中间。单独的差出来一样,好像是不像之前这个那么平等了,现在区分了主流和特殊流,其实大家知道你可以让他平等啊,就是这类型可以一样,也可以不一样,然后你可以把主流的这个内容当成主要要去操作的内容,也可以,他俩都是平等的地位。
08:02
拆出来之后就是分别都是一个data stream了啊,那所以他们就是完全平等的地位了。接下来我们在代码里边用测输出流的方式啊。前面我们讲过的这个啊,就按照不同的名字提取对应的这个数据啊,给大家再做一个实现,当前我们要测的是这个分流stream。PSVM,我们还是这个完整的写一遍啊,现在是新的一章,Rose section stream execution environment at一下,二,Env,现在这啊,为了测试起见,我们set paralyism,一,然后接下来还是读取数据源,我们ADD source new,一个click source。把这个读进来啊,大家知道这里ign time Sam,对吧,Auto mark strategy bounded out waterness,这个一一个operation zero。
09:02
里边来一个with time stamp a sign,这里大家知道,前面给一个范型的话,后面可以帮我们自动补全。又有一个step a sign对吧,这里,然后把这个element里边的time Sam直接提取出来,作为当前的时间,戳生成watermark就可以了。好,这是我们定义好的。然后接下来呢,哎,就是基于这个stream,我们就要做一个操作,然后去做分流了,你现在要做这个操作的话,提取这个特殊流做分流,那到底应该调用什么方法呢。自然想到了测输出流只有底层API process function才提供,那我们是不是只有process呀啊,这个是没关系的啊,定时际是只有KY之后才能用,我们现在呢,特殊流是都能用的,所以直接process没问题,然后接下来那自然就应该是要在里边实现一个process function了,我这里边直接就出来吧,那么在下面单独实现了啊,That function,然后这里边是主流的类型对吧?啊,我们现在干脆就不要变了啊呃,主流当前的主流当前类型当然可以也可以改啊,我们现在就就不给大家专门再去做对应的这个划分了啊,主流还是这个event就好了。
10:18
里边必须要要实现的是一个element的方法,然后呢,里边大家会想到我会提取测数出流,所以我先定义一个输出标签。特殊出流的类型可以跟主流不一样,所以我把这个特殊主流标签定义成不要event了啊,我干脆定义一个。我用一个不是T。把这个泛型奥泰啊引入进来。去定义成一个二元组吧。也不知道该定义什么的时候,我们就该干脆把所有信息都包在包在里边啊,所以这个可以定义对应的一个二元组出来啊,然后这里边就是。
11:04
可以string啊,这是user对吧,然后URL string。啊,那当然了,如果大家想要把最后的那个长整形的时间戳也写在里边的话,那干脆来个三元组呗,啊,一样的啊,好,另一个,然后里边定义的时候我们知道需要有一个标签啊,那比方说我们不是有marry吗?啊,对吧?哎,那我们这个定义一个标签是maryry,呃,定义的时候大家还记得必须要用这一个。这个是Mary啊,Mary t定义的时候后面跟一个花括号,我们用这个内部定类的形式把它做一个实现,因为这里边这个泛型不好提取啊,这样的话就方便一点,那与之对应的还应该有啊。啊啊,那另外还应该有一个pose对吧?啊,那这里大家会发现其实我只提取两个就可以了,为什么呢。我可以定义else就是主流,我就直接把else放在主流里边不就完事了吗?哎,这样的话其实就拆开了嘛,啊,所以这里边我定义一个Bob tag。
12:07
里的幺二是Bo。名称一定要不一样啊,要不然的话弗link会认为它是同一个标签,然后接下来我们就可以做操作了啊,在这里边的操作也非常简单,就一些基本的这个数据判断它的user。那么是否equals?如果是的话,那么我们这里边就特输出流c TX output。当前的这个标签是me tap。然后要把当前真正要写进去啊,数据要定义好,得是一个TEMP3这样的一个元组了啊,当然我可以去拗一个TEMP3,也可以temp3.up,这都是一样的,那这里面我们要传的是value.user value点。YL还有value点。
13:00
那就完事了,那当然类似的,其实是一样的啊,接下来else if,如果干脆直接copy吧,如果当前的user cos。的话。那么接下来我们这个给的就应该是fo tag,这条输出流有多条对吧,你多个标签就是多个流嘛,然后把当前的这个放进去,这就是报,那另外还有else,如果还不是的话,那干脆我放到主流,把数据放到主流叫做else得了,哎,那啊,大家会看到这样的话,我其实是用out.collect输出当前数据的,而当前数据输出的类型我定义成了event,哎,那这样的话我就只能是输出。直接把value流输出就可以了,大家看我这里的分流就非常的灵活了,他们三个可以平等也可以不平等,类型可以一样也可以不一样,那所有的都可以自定义啊,而且这个主流和测试出流啊,他们的这个只是调用的时候,一个叫out.collect一个是Ctx.output大家只要把这个定义好,做区分就非常的容易了啊,那所以这里边。
14:08
给它定义一个名称,我们把它叫做process stream吧。就是我们已经处理过之后的这个流,然后接下来。Asses stream,做一个print,知道这其实是主流,打印出来其实是else。然后如果想要去获取测输出流的话,At side output,好,Mary tag。那么这个打印出来。就是Mary。那同样的啊,如果要打印Bob的话,就获取tag,那就是以Bo tag标签作为参数传入获取测输出流,打印出来的就是哦,所有的浏览数据啊,最后不要忘记我们把这个execute执行起来。就是分流,完整的一个分流的过程,好,我们现在来运行一下看看。
15:03
得到什么样的一个效果啊。好,大家看到第一条Bob,然后是Mary Bob,诶,这是个Alice,那就是else k也是else。大家看它的数据类型不一样,这个不是原组了,是event对不对,Event to string之后的效果,哎,这就是我们完全分流之后的这种做法啊,你可以完全可以做什么事情呢?有了这个操作之后,诶,我们就完全可以,比方说诶,我们原始数据来了,然后呢,要做一个ETL,可能有一部分数据是我们想要的,然后一部分可能是其他部门同事还需要的,那这怎么办呢?我消费掉卡夫卡里边数据之后呢,别人不能用了呀,诶,那我们就把我们要的那一部分ETL提取出来,然后自己去做处理,而另一部分呢,分流分出去。用一个测试由分出去,然后再写入到卡夫卡另外一个标签,诶这样的话,其他部门消费另外一个标签就可以了,诶所以大家会看到他其实在实际应用的时候,还是有很多应用的场景的。
16:01
好,这一部分我们就讲到这儿。
我来说两句