00:00
来,我们再来给大家讲解另外一个flink当中比较重要的内容,就是所谓的process方式API,之前我们在介绍的时候,Flink里边提供了三三种层级的啊分层API,大家还记得最底层的就是process function,那这个所谓的process function到底是个什么东西呢?其实之前我们在那个window操作啊,包括这个呃,Kid stream,它可以基于他们去调用的一些方法里边也看到了,里边都有一个方法叫做什么呢?呃,大家还记得就是我们做这个KY之后得到的k stream里边会有一个方法叫做process对吧?哎,那之前我们那个基于它开窗之后得到的这个window stream里边也有一个方法叫做process,当时我们还说了process里边传的这个process window function,这是一个全窗口函数啊,那所以这里边他们的这些process到底是个什么操作呢?
01:00
他之前我们都没讲过,这个process就是一个没有严格定义到底里边要做什么操作的,最底层的操作,它就是一个任务,一步转换计算计计算操作,但是具体做什么呢?全靠你自己来定义,所以对于这样的一步操作点,Process方法里边要传一个参数,传什么呢?就是我们所谓的process方式啊,那这里边我们所所谓的这个process方式API啊,其实就是调在不同的地方,不同的这个数据结构流流的这个结构里边去调它的process方法,然后里边我们要实现的是一个process方式,那这个process方式它我们说它更加底层,更加一般化,那难道就是说你在里边可以包装包装,再实现一个map包装包装再实现一个filter这样的一个功能吗?呃,当然,如果你想去做这件事是可以的,Process可以用来当成一个map,可以用来当成一个filter啊,甚至可以用来当成一个这个reduce这类的这样的一个算子都是没有问题的,但是function显然不仅仅只能做这些事情,你要只能做这些事情,这就完全被取代了嘛。啊,大家需要注意的是,如果说我们想要去做更多事情的时候,比如说。
02:20
之前我们不是讲过了这个时间语义的表达,然后讲了这个watermark吗?假如说现在我想要在事件时间语义下面获取到当前数据的时间戳,以及当前的系统里边的这个watermark,也就是说我们当前的事件时间这个东西怎么样去获取呢?你像如果要是这个处理时间语义的话,Processing干嘛这个简单,我就算不能获取到flink当前的这个处理时间,我也可以,这个就是system,对吧,直接去调它这个system方法去获取当前时间嘛,看起来好像这个比较简单,但是事件时间的话,那个water mark你这里边直接去调好像就调不到了,对吧?我们说它就相当于一种特殊的数据结构,放在数据流里边挨个去做处理,你这里边每一个转换算子只能针对数据做计算呀,我们当时说的这个map Fla map具体调用到的这个方法里边,每来一个数据就调这个方法,这里边这个数据必须。
03:20
是我们当前定义好的,真正的这种数据没有water mark来了之后调用的,对吧?哎,所以怎么样去获取当前water mark呢?这是一个问题。另外还有一个问题就是假如说我们还想去定义一些更加复杂的计算方式,比方说我定义的这个方式是什么呢?就是当前来了一个数据,我不想让他现在马上就做这些操作,我希望怎么样呢?呃,比方说我过十秒钟之后,或者说过五分钟之后再做一些操作,就相当于我要注册一个未来时刻的一个定时器,去做一个定时指定的操作,能不能实现这样的功能呢?
04:03
之前所有的算子大家回忆一下啊,不管是有状态的还是无状态的,不管是你是否实现一个瑞士方式,瑞士方式里边有这个生命周期,有运行上下文,但是它也没有办法去,呃,向这个注册定时器去,去对未来要发生的事情去做一个调整,对吧?啊,那另外也不可能去获取当前的这个watermark事件时间,假如说我现在就真的想要控制这些,那怎么办呢?用最底层的process方式啊,所以说整体来讲的话,Process function大家可以认为是最底层的API,也是我们做flink编程时候的一个。最底层的大招啊,就是说理论上来讲,它是一切需求都能搞得定,因为在这个层级上我们可以访问到所有的东西啊,就比方说你的数据当然能访问到,对吧,那数据上的时间戳可以访问到当前的watermark,当前的事件时间可以访问到,另外我还可以控制时间,就是说我可以注册一段时间之后去做什么操作,注册定时事件啊,这些就就非常非常的丰富了。另外关于这个process function呢,还有另外一个用途,就是前面提到过的。
05:15
分流操作之前我们说那个speed select不是已经被弃用了吗?那推荐大家使用什么呢?使用侧输输流,侧输出流到哪里去输出呢?哎,之前我们在那个window那里边,大家可能看到一个,呃,Get set output,呃,就是set output data对吧,它是只能把。迟到的数据,诶,如果说这个窗口没有收进来的数据,我放到一个测试数据流里面去,它并没有办法直接实现我们之前想要的那个分流操作啊,那如果说我要分流怎么办呢?哎,这就需要用到process function,它也可以直接针对这个做一个分流操作,所以process function真的是什么事都能干啊,啊,它就是特别的灵活,实现功能特别的丰富,当然了,呃,带来的问题就是它可能相对来讲更难一点啊,那接下来我们就给大家说一说这个process function到底来怎么来用,那首先给大家还是介绍一下process function这个家族,大家注意啊,它并不是只有一个函数类叫做process function,你实现它就完了,而是什么样呢?针对我们基于不同的啊流的这个结构去调它的process方法的话,你要传的这个参数类型还不一样啊,就是比方说有最普通的底层的process方式。
06:35
这个是哪里去传的这个参数呢?这就是最一般化的,我们这里边的data stream,大家看data streamam API里面啊,直接调这个方法里边要的就是一个最一般化的方式。哦,那如果说刚才我们看到了啊,你做过KY之后得到的k stream里边如果调process方法的话,那得传一个什么呢?诶这里大家看可以传一个,也传一个person方式,因为我们说kid stream底层它继承了data stream嘛,啊但是这种方式呢,呃,也要被弃用了,最好你不要这么用,那更应该用的是哪种方式呢?那就是这里边的传一个key process function。
07:20
啊,所以说这就是相当于一个process方式的一个具体化了啊,一个一个具体的细化了,那另外我们还知道这个,如果说做了这个开窗操作得到一个window的stream之后,他做那个window处理的时候也有process方法呀,这里边传的就变成了一个process window function,那除了这些之外呢,还有其他的一些操作,比如说前面我们给大家提到过的。就是做这个connect操作啊,Data stream里边大家还记得我们可以去做一个connect操作,连接两条流,然后得到一个connected streams,这个connected streams里边的,之前给大家说可以做map对吧,这里边传一个Co map方式,可以做这个flat map,这里边传的是一个Co flat map方式,那大家看到中间还有一个做process操作,对吧?哎,那这里面传的就变成了一个什么呢?Co process。
08:21
啊,所以这其实就是类似的一些操作啊,针对不同的流,你就可以这个传递,呃,不同的这个process function式,那这个process function式,整个这个家族里边啊,整体来看的话,总共大概就是这么几类吧,啊就是大家看就是最一般的process function,然后k process function k之后去调的process,然后呢,Connected streams去调的那个process,这是Co process function啊那另外还有一个呢,是process draw function,这是在哪里去调的呢?哎,这是就大家会想到两条流连接,还有另外一种方式叫join啊,这flink里边也有直接的join用方法啊,但是这个呢,相对来讲它的用途比较局限一点,这个我们放到后边讲项目的时候,给大家做一个再做一个回顾和统一的讲解啊,那他就是说你做完draw之后得到的那个流,然后再去process的时候,要传的就是一个process draw function了啊,那后面还有这个broadcast process function,那这就是广播流。如果要是说你。
09:22
去做一个这个process操作的话,那就变成这个function了,对吧,Process function了,那同样广播流还可以kid对吧,与之对应的有这个key的broadcast process function啊,那后面我们还有开窗之后呢,有process window function。那最后当然了,还有一个就是process all window function式啊,那大家知道这个是什么?这是基于那个data stream,我们不是说也可以开窗吗?直接window or得到的是一个all window stream,然后all Windows stream去调process方呃,方法得到的就是一个process or window方式,那就是传递的就得是这样一个process or window function,这是关于这个process function整体的一个介绍,然后它里边再去做使用的时候啊呃,我们一般情况接下来给大家举例呢,我们就以这个kid process function用的最常见嘛,对吧,我们一般做的操作不都是KBY之后做的操作吗?呃,所以k process function是用的最多的,然后在里边呢,我们知道它应该就还可以定义状态,对吧?啊另外除了这些之外,它还有一个特点是什么呢?Process function身里边啊,大家要注意一下,就是它里边。
10:32
有这个,呃,Open close这些生命周期,因为process function它是继承了rich function接口的啊,所以本身这个process方式你可以认为它就是一个瑞士方式,瑞士方式能做的事,这他所有的啊,当然这个Co方式就特殊一点了啊,它跟这个不太一样,对吧,它这个实线底层不太一样,那我们前面这里边。做的这个process操作啊,Process方它本身是继承自这个抽象类的,对吧?Abstract啊,所以这本身就实现了这个reach function这个接口的,那我们所说的负函数里边能做的呃,生命周期呃方法还有一些这个获取运行上下文,定义状态,做状态编程,这些是不是就全能做啊啊所以就是大家可以认为这个process function是一个加强版的reach方式啊,就是除了这些之外还能干干一些别的,那还能干什么呢?它独特的方法就是还有另外一个叫做on timer方法,On开ER方法说的是什么呢?就是你可以定义定时器,就是注册定时器,然后去哎on camera,就是定时器发生的时候到底要做什么事情。
11:46
然后他还有一个自己特有的方法,叫做process element啊,这个其实跟每一个算子任务是差不多的啊,你就像一个map map function里边不是有map方法吗?Flat map function不是有一个flat map方法吗?这里边的process里边必须实现的方法是一个process element,它同样也是每来一条数据都会调用到这里的这个方法。
12:10
然后这个方法有一个特点,就是大家看多传了一个上下文,有一个ctx context传进来,那么对于这个上下文而言,我们在使用的时候,它能干什么呢?这个上下文就可以访问当前的时间戳,哎,访问当前我们这个KY之后的这个K,以及还有一个timer service时间服务,或者说定时服务,那么这个服务它里边有一些特定的方法,就可以获取当前的处理时间,获取当前的watermark watermark不就相当于事件时间吗?对吧,就相当于获取当前的时间,时间都能拿到了,然后还可以干什么呢?注册定时器啊,注册处理时间定时器处理,呃,注册事件时间定时器,另外呢,能注册就能删除,对吧,还可以这个删除定时器,这就是这个timer service的这个特点啊,好,我们可以在代码里面给大家简单的把这个稍微的写一写,大家看一看到底是怎么。
13:10
不用的。啊,我们新建一个,在这个API test下边新建一个object,当前这个就叫做process。Function test。好方法,然后接下来同样前面的这些操作都差不多,对吧,我就直接把这个copy过来了。还是先把它读进来,然后转换成样DV,我们不要忘记上面还有一个这个下划线啊,把这个影视转换引入,然后从这个文件里边读取数据,我们就把它直接删掉吧,直接还是用这个流失数据的输入,最后不要忘记有一个execute,把它执行起来,当前是process function test,然后接下来呢,呃,我们如果要是想要基于当前的这个data stream直接去做这个process操作的话,哎,当然这里边你就直接调这个process对吧,里边要去传的就是自己自定义实现的一个process方式,或者还可以怎么样呢?我可以基于它。
14:15
就是我可以基于它先做一个K对吧?呃,比方说我现在还是基于这个ID先去做一个分组,然后里边呢,再去实现一个自定义的kid的process方式,My kid的process方式,哎,首接去这么去实现啊,那这里边我们看一下这个这个东西到底该怎么写extend key的process方式啊,然后里边肯定要有类型定义对吧?你诶这里写错了啊,Kid的process方式对吧?不要把那个我们自己定义的那个类名误以为是这里边我们想要实现的这个接口啊,呃呃,抽象类啊,这是一个继承了一个抽象类K的process function,然后大家看它本身也是一个,呃,就是继承了abstract function对吧?所以这里边有生命周期,呃,有我们能够调用的,能够获取到这个运行上下文,能够定义状态,而且能够定义k state对吧?啊,那那另外就是说还有process function。
15:16
能够做的那些事情,那大家接下来看一下,就是这里边我们要传的参数稍微的麻烦了一点,这里边要传什么参数呢?这里要传的参数是KIO3个参数这个类型对吧?呃,当前的这个类型,那K的话,因为是K的process function吧,K当然就是当前建的那个类型,然后IO大家就知道了,Input put嘛,我们相当于呃,你你就相当于一个特殊的map嘛,呃,就是你既然可以做这个类型转换,那我输入输出的数据类型可以不同,对吧?啊,所以这里边就是所有的这些都包含在里边了,比方说当前我以这个ID做这个,呃,K的话,它的类型当然是string,然后后边呃,我的这个输入的数据是sensor reading输出的数据,比方说我输出一个string,对吧,这完全没问题,类型一写,写对了之后,上面就不再报错了,对吧?啊,这个其实就是这样的一个调用的简单的一个过程啊,然后在里边必须要实现的,看到有一个。
16:16
Process element的方法啊,这个方法里边你看它本身也没有返回值,为什么呢?你想要输出数据的时候,跟我们之前那个Fla map一样,用这个collect输出对吧?可以不输出,所以如果可以不输出,大家想想是不是就可以起到filter的作用了啊,所以你你用这个process去去实现一个filter也非常简单对吧?你实现一个map Fla map也非常简单,它做什么操作都可以啊,这里边主要我们想说的是这个特点啊,第一个value,这是我们输入的数据,每一个数据来了之后都会调到他嘛,然后还有一个是做输出的这个collector,那中间还有一个特别的叫做ctx当前的上下文,对吧?啊,那这个上下文能够调用的就比较多了,大家看到这里边能够做什么事情呢?能够get current k对吧?啊,能够把当前的这个K拿到啊,当然有同学说你这个K太简单了,我直接从value直接去去提提取不也是一样的吗?啊那当然。
17:16
当然了,你再次提取也是可以的,或者你直接get current key啊也是可以的,对吧?那另外还有什么呢?直接获取当前的时间戳,哎,当前数据的这个打上的那个时间戳啊,这就省得你再去提了,对吧?就是你之前提过的那个时间戳就已经放在这儿了啊有同学可能觉得这没什么用啊,我本身我再提一遍,呃,也还能提出来嘛,呃,就是一方面就是说没有必要对吧,数据里边其实已经有这个信息了嘛,另外一方面就是他真正的大招不在这儿,真正的大招在哪呢?啊,前面你看到这里边有一个output对吧?啊,这个outp是干什么的?我们不是想到你这里边这个out不是要做这个collector去做输出的吗?out.collect吗?那这里边怎么又有一个上下文又有一个output呢?哎,这个是后面我们要说的测输出流,就是用这个来做,大家看这里边不是有个output tag吗?测输出流就会定义这个标签对吧?啊,这就是这个测输出流,我们现在主要给大家说的是这个timer service啊,这个timer。
18:16
Service就非常的厉害,它里边有这么几种方法,你看可以获取当前的时间,处理时间和water rack water RA就相当于这个系统当前的这个事件时间了,对吧?或者说还可以去注册一个定时器,那这里边你注册定时器的话,你看它必须传一个长整形,这长整形是什么呢?就是定时器要发生的那个。时间戳对吧?啊,所以我这里边可以怎么样呢?比方说我就用当前的当前的时间戳,诶当前这个数据带着这个时间戳,但是呢,我不去注册当前这个时间的定时器,我怎么样呢?我注册一个比方说一分钟之后,那就是60秒对吧,那就是6万毫秒,定义一个这个时间戳的一个定时器,那就相当于按照它的时间,然后一分钟之后到点儿的时候再去触发,那触发的时候到底做什么操作呢?做的操作在这呢,大家看有一个我可以去重写的一个方法,叫做on timer,这是我们真正要就是注册的这个定时器,触发的时候要做的操作都在这里定义就可以了。
19:32
啊,这就是这个,呃,当前我们这个process function啊,能做的事情,他能做的就极大的扩展了我们这个能够处理的场景啊,那当然了,就除了这些之外,还可以怎么样呢?还可以有这个生命周期嘛,对吧,可以有这个open close方法,然后在这个里边我可以去啊,我们在在外边可以去定义出某个状态来,对吧,比方说my state,定义一个value state。
20:02
呃,比方说当前这个我就随便给一个。这写错了啊。State随便给一个int类型,呃,然后我们这里边外边先给一个空值,然后在open生命周期里边可以去给它赋值,对吧?呃,为什么可以去给它赋值呢?因为当前它本身是负函数嘛,对吧,本身就是负函数,所以我可以获取当前的这个运行上下文,然后去,哎,给它获取当前的这个状态距离我去拗一个value。State,大家还记得script对吧?把这个定义出来啊,这里面是一个my state。然后类型是int,呃,这个所有可以做的这个状态编程的操作,我们在这里边都可以做,另外还可以获取这些东西,对吧,获取当前的时间watermark,然后注册定时器表示之后可以做什么操作啊,另外还可以做测速输流的操作啊,所以能做的事情真的非常的多。
21:06
哦,另外大家可能有一个问题,就是说当前我们这个注册定时器的时候,上面是这里只注册了一个对吧?啊,那假如说我这里边要注册多个可以吗?当然是可以的,你就不停的去注册就完了,Register对吧?继继续去注册,那这里面怎么去区分不同的定时器呢?就是按照当前的时间戳去区分,如果你注册同样时间出的定时器的话,他认为就是一个啊,那有同学可能就会说,那你这里面注册了不同的定时器,那执行的时候到底怎么执行呢?它都会调到当前的timer方法里边来,大家注意啊,就是定时器它的触发的这个,呃,操作就是在on timer里边。定义的执行,这里边并不做任何的区分,所有的定时器出发的时候都调到这里来,那所以这这里边就得怎么办,你如果要想区分不同的这个定时器的话,哎,大家看这里边有一个参数是叫time stamp嘛,这个是什么呢?这就是当前定时器触发时候的那个时间戳,也就是说你定义什么时候出发,现在的这个time就是什么,所以呢,你可以把这个时间出拿出来判断一下,现在这是哪个定时器出发了,对吧?几点钟的定时器出发了,它就相当于一个闹钟一样啊,我去判断一下,诶,现在是八点的闹钟响了,还是九点的闹钟响了,然后你在逻辑里边if else,你去做处理就完了,对吧?或者说你分不同的这个case去做处理就完了,这是呃,就是平常在使用的时候啊,定时器是这样去注册和使用的,那另外还有一个就是可以去删除,如果要是删除一个定时器的话,同样还是调用timer service delete对吧?Delete,注意啊,就是你当时定义的是什么类型even,看这里边删除也得是什么。
22:48
形,然后里边传什么呢?传的还是一个长整型啊,那所以这里边就是这个长整型,就是因为我们说可以注册多个嘛,所以说你不能直接说哎删除就全删了,那必须得指定你删除哪个,那这里边我们定位这个定时器的时候,就是按照那个时间戳嘛,就是按照闹钟响的时间来定的,所以这里边你就要把当时定义的你要删的那个时间戳传进来。
23:14
啊啊,所以这个大家就是可以先做一个整体的了解,后面我们给大家做事例的讲解。
我来说两句