00:00
大家要给大家讲解的,那就是弗link里边的三层API当中的最底层的API,传说中的process function啊,那大家会发现啊,前面我们讲到的这些data stream没PI,主要做计算操作,就是中间我们讲的那个transform嘛啊,我们讲过有基本转换啊,有这个K之后做这个聚合,分区聚合啊,另外呢,我们还有这个多流转换,分流合流啊,那大家讲到这个分区聚合这个操作呢,还有一种特殊的操作,其实是可以开窗对吧,开窗最后好像也是聚合啊,但是我们这个比较特殊,讲的内容也比较多啊,因为它还涉及到了这个不同的时间语义啊,所以我们可以这个KY之后,然后点入window开窗,再做增量聚合,或者是全窗口函数的一个应用啊,这个整个所有的这个功能,其实实际项目当中啊,大部分需求都能够搞定了,但是大家想想,在有一些场景下,如果说我们前面不是讲到有不同的时间。
01:00
语义吗?啊,可以生成automark吗?那假如说我在这个事件时间语义下,我现在处理的过程当中,我就想去拿到当前的时间是什么?诶大家想这个如果要是处理时间的话,这个简单就是,呃,我直接去获取这个system,直接去获取当前系统时间,是不是就近似可以认为就是当前的这个处理时间啊,对吧?这个其实是差不太多的啊,但是事件时间你直接获取系统时间是是一样的吗?这肯定不一样对吧?呃,就是这个有可能差别很大,所以当前的事件时间我能不能直接获取到呢?诶,有同学说,那这个也简单嘛,你当前不是有数据吗?数据你不是从数据里边要提取它的那个日志的时间戳吗?那你现在再提取一次不就知道事件时间了吗?但大家想想,我们现在的事件时间和这个是一个一回事吗?不是一回事对吧?我们当前数据里边的那个时间戳是当前数据发生产生的时间。
02:01
并不是当前我们系统处理操作要关哪个窗口的那个时间,对不对,我们要处理的这个,呃,时间相关窗口相关操作的时候,我们当前的时间是以谁为标准的,是water mark,哎,那所以这里边就得涉及到一个你你怎么样能来了数据之后,你知道当前的water是什么呢?这我现在就不知道了,对吧,除非我能像之前我们那个assign time Sam and automarks一样,我把之前的那个最大时间戳保存下来,然后呃,来了数据之后,我再用当前最大时间戳减掉那个呃,延迟时间,那相当于又算了一遍,对不对啊,那除非是这种,那大家想这种方式显然显然不合适啊,不应该这么去,呃,这么麻烦的去做操作,那有没有更。方便的方法,在某一个算子任务里边,直接可以获取到当前的watermark呢?当前的时间信息呢?哎,这些在一般的转换算子里边是根本获取不到的,大家想一下那个呃,Map function对吧,Filter function,甚至之前我们讲到的这个reach my function reach filter就是负函数类,它有上下文,运行上下文,但是那个运行上下文里边可以做这些操作吗?
03:18
诶,但当前好像也不行,对吧,那个运营商下文里边只是可以定义状态嘛,可以拿到一些比方说像我们这个分区子任务的编号啊什么的,可以拿到这些信息,但是也拿不到当前的时间信息,诶那所以如果我们要访问这些的话,那就必须要调用底层的process function API啊,那所以大家看这个t streamam API啊,给我们提供这个底层的process function呢,它可以做的事情主要就是可以访问当前的时间戳,可以访问watermark,另外还有一个非常特殊的功能,可以注册定时事件。什么叫定时事件呢?
04:00
就是说我当前要做一个操作,但是这个操作的就是不是我马上就要作的,我当前调了这个代码对吧?呃,我我执行这个一步一步执行这个代码,我现在呢,接下来我要我要注册一个操作,这个操作不是现在马上就要调的,而是比如说我过十秒之后再调用,我过一个小时之后再调用,大家想这是不是就相当于一个闹钟一样啊。现在不想我设一个时间,一段时间之后想之前我们所有的这个API里面有这样的功能吗。没听说过是吧?呃,就是关于这个,你像窗口窗口的话,大家想那个窗口结束的时间好像是一个定时规定时间的一个触发操作,对吧?我们用trigger好像能实现类似的功能,但是好像没有这么灵活的,你要定什么时间的闹钟就定一个什么时间闹钟啊,对吧?这个好像就是特别的灵活,特别丰富啊,这个应用场景就这只有在底层API才能调用,那另外还有一个功能,就是前面给大家提到分流操作,分流操作split select是不是要被弃用了啊,那现在我们用什么方式呢?
05:11
测输出流的方式对吧?那测输出流在什么算子里边可以直接调呢?之前我们接触过一个是window里边是不是可以测输出流,输出那个迟到的数据啊,那但大家想这只有迟到的数据,我可以差出一条流来输出,对吧?那比方说就像我们之前那个需求,我就是按照这个数据,高温温度的高低,高温流低温流就直接要分开,这个跟迟到不迟到是不是一点关系都没有?那如果我就是想做这样的一个分流,怎么做呢?也是测试枢纽,那这个测枢纽就必须在还是在process function API啊,就在这个底层API里边才能够实现,所以这个process function还是比较重要的,大家经常在有一些需要实现特定业务逻辑的时候,呃,就是前面我们讲的那些转换算子都搞不定的时候,就得用到它了,所以接下来给大家要呃详细的再来说一下啊呃,那这里边大家看到flink给我们提供的这个process function底层API其实是一个大家族,大家看这一系列的process function对吧?啊,那最基本的呢,就叫做process function。
06:21
然后另外基于它还有什么呢?Key的process function来从这个名字上就能看到它应该是什么样啊对,就是先分组之后KBY之后得到一个K的stream,然后再去调这个process function,是不是就相当于是一个k process function啊啊所以它其实就是基于不同的流,然后调process方法啊,就是大家看一下这个我们源码里边它的定义啊,我定义一下这个,看一下这个this stream。Data stream,大家还记得当前所有可调的方法,有一些map flat map,简单的转换对吧,还有这个比方说SP split s selectt这样的一些,呃,分流合流的操作,另外还有就是我们可以key by,对吧,然后分组,然后去做聚合啊,或者说开窗的话,它不能直接开窗,呃,不能就是直接点window,但是可以window or对吧?啊这这也是一类这个API啊,另外还有一些就是数据传输的API啊,Shuffle re balance global啊,那另外还有一个特殊的。
07:25
大家看特殊的API叫做process对吧?哎,那么process这个方法里边要传的参数是不是就是一个process function啊啊,所以这里边其实所谓的底层API啊,调用其实非常简单,跟我们普通的这个data stream API调用方式是一模一样的,你就是直接。基于一个data stream,直接调它的process方法就可以了,里边实现一个process方式,这就是我们的底层API。当然它里边会稍微有点特殊,大家看它里面必须要实现的方法是一个叫做。
08:01
Process element的一个方法对吧?啊,这个看起来也比较明确,就是处理一个元素对不对啊,所以当前它其实就是来一个元素,就处理一个调用当前的这个方法,那里边的具体的参数呢。哎,我们看一眼具体的参数,第一个是一个I类型的value,那I这是泛型对吧?Process方式有两个类型,一个I,一个O,大家知道这是什么类型吗?当然就是输入输出了,对吧?Input output嘛,所以接下来当然I类型的value,这就是当前输入的数据对吧?啊,来了一个输入数据怎么样去处理呢?后边还有一个context,有一个上下文啊,大家想到我们能做的那个更复杂的操作,是不是就应该是这个上下文给我们提供的呀?啊,只要是有上下文的地方,它肯定就能提供更多的东西啊,然后另外还有一个out out,大家知道这是不是就是收集器直接可以alt.CLA的输出结果啊。
09:00
所以直观上来看,如果不看这里的上下文,它就像一个什么一样。大家说这个一个输入,然后一个al collector做一个输出,这是不是就像之前我们讲那个flat map一样啊,对吧?Fla map就是有输入有输出,数据类型可以不同,然后呢,返回值类型是VO的,然后你这里边就是可以输出也可以不输出,对吧?可以输出一条,也可以输出多条,这不就是flat map吗?啊,当然这里边除了这个flat mapb之外啊,是不是还可以做更多的操作啊呃,那更多的操作就就在这个上下文里面去做了啊。这是这个process function啊,那当然大家看这个this stream里边做过KBY之后得到的k stream,这里边它当时我们说过可以开窗,可以做各种各样的聚合,另外它是不是也可以做一个process啊,大家看它的process的话,这里当然是你也可以直接传一个,诶,去哪了啊,我们看下边可以直接传一个process function,其实这个很好理解,它本身不是也继承了那个data stream嘛,对吧,所以当然他也可以直接传这样的一个process function,但是这个大家看被弃用了,对吧?
10:16
不建议大家这么用,建议大家用的是什么呢?啊,对kid process function,你还是单独去指定一个跟当前的key,当前的键相关的一个k process function就可以了,然后它的话里边大家看类型是不是就变成三个了,泛型有三个。前面是不是除了这个IO之外还多了一个K啊,那这个K是不是就是当前建的类型啊,所以KIO多了一个啊,那至于下边还是process element的方法,是不是还是value ctx上下文out呀啊,还是这样的一个过程啊,所以整体来讲就是这个process function这个家族啊,基本上都差不多,就是看你基于什么样的理由去调这个process方法。
11:02
好,那后面我们再看还有什么呢?还有Co process function。这个大家知道是基于什么流去调吗?对,之前我们不是讲过那个connect做了操作之后,连接两条流是不是得到是一个connected streams啊,然后它里边connected streams去调map flat map大家还记得对吧?调map是不是里边传的是一个Co map方式,Flat map里边传的是一个Co flat map方方式,那当时我们看到是不是还可以调一个process呀,它如果再调process的话,那就是传一个Co process方式对吧?啊,这就是这样的一个使用了啊,然后后面还有这个process drawing function,那大家知道这就是两条流,如果我们每讲一个操作啊,就是叫做draw,对吧?啊,这个比较特殊,我们可以结合后面的这个项目给大家再做一个讲解,就是如果两条流draw之后,里边我们再调那个process方法的话,要传的就是一个process drawing function啊,啊,那后面还有这个broadcast process function,大家知道这是干什么吗?
12:03
这是不是就是我们之前不是讲过可以可以有广播嘛,对吧,可以有广播流啊,可以有这样的一个广播流,那广播流里边要去调那个process方法,是不是就是一个broadcastcast process方式啊,啊,那与之对应的还可以key by之后再去广播,那是不是就变成了key process,呃,Broadcast process function,那另外基于那个window大家还记得吧?Window做操作的时候,全窗口函数是不是也可以直接调process里边传一个process window方式啊,这个是不是当时我们说的一个全窗口函数,所以它也是一个process方式家族里边的一元啊,就是它是一个window方式,也可以认为它是一个process方式,对吧?两边都都结合着啊,那当然了,还有就是你不做KY,直接基于data stream去开窗,那是不是就是window or啊,Window or之后是不是也可以做process操作啊,那要传的就是一个process or window function。
13:04
其实就是这样啊,基于不同的理由,基于不同的,呃,这个我们做的这个数据结构啊,去调process方法里边就要传入不同的process方式,它们的原理,底层的实现都是一样的啊,它们的用法也都非常类似。所以接下来呢,我们就给大家选取一个最常用的process function来给大家做一个讲解。啊,那就是process。
我来说两句