00:00
接下来给大家讲这个底层API process function啊,那大家其实已经知道,就是我们前面给大家讲过很多了啊,讲了data stream很多的这些转换算子对不对啊,Transform的很多算子啊,API的一些调用,然后我们又讲到了window API,后面我们又讲到了时间语义,讲到有时间戳和水位线对不对啊,有water mark这里大家注意。我们既然已经讲到了时间相关的这些东西,大家想回忆一下我们之前讲的那些算那些API能操作跟时间相关的东西吗?除了前面有一个那个a STEM and water mark,分配那个时间戳,指定时间戳,然后确定怎么样生成water mark的时候,这个是涉及到时间了,别的地方是不是从来没涉及到时间过啊?就是type里边我们设置一个时间什么时候关对不对?你的那个窗口多大啊,这个可能涉及到一点时间,但是如果我们想更灵活的去控制时间,去控制里边的一些逻辑的话,是不是你根本无从获得他的时间信息啊?
01:09
呃,之前我们根本拿不到他的时间信息,所以这里边我们可能就会想到我们有这样的一个需求,就假如说在有些场合里边,我就是想要在代码逻辑里边,我得知道现在的时间是什么。啊,那人说那你还不简单吗?我直接system直接直接对吧,调现在系统时间不就完了吗?啊,那你如果要是processing time的话,那可能还简单对吧,我们现在是even time even time,我们现在的就是当前的时间应该是什么,大家想一想,我们想知道现在的even time进进进展到哪个程度了。呃,大家就会回忆起来,我们当时不是说watermark水位线其实是衡量even的time进展程度的一个标志嘛,大家想想是不是这样,我们当时说,当时说那个watermark在任务之间传递的时候,对吧,是不是这个水位线其实是代表了当前这个任务,它的那个事件时钟是到了哪里,对不对?哎,所以这个水位线的上涨其实是代表了事件时间的推移的事件时间那个时钟,那个clock就是用水位线来标志。
02:24
所以大家会想到,那我其实是不是在有些场合里边,我要想知道现在这个任务,我当前的事件时钟进行到哪了,那是不是得获取到water mark,哎,所以这个需求就就提出来了,我们怎么样能获取到water mark呢?哎,所以我们之前讲的比较基本的这些data stream的API就已经搞不定了,那什么API能搞定呢?那就是底层的。Process function API可以帮我们搞定这件事情,在process function里边大家看到他可以干什么事情啊?可以访问当前的时间戳,可以访问当前的water mark,另外还可以注册定时事件。
03:13
那这是干什么呢?就相当于我们可以自己设一个闹钟,设一个定时器,然后我指定说等多长时间之后,你这个闹钟响,然后我定义我要处理什么什么事情。大家想这个操作是不是就非常非常灵活了,那相当于我们甚至可以自定义延迟对不对,对吧,我自己就可以定义,比方说我定一个两分钟之后的闹钟,然后我定义两分钟之后做什么操作,哎,那大家会看到就是它自然而然就是在我们这个呃,水位线慢慢的上涨对吧,涨到两分钟那个时间的时候,自动就回来再处理我们当时定义好的那些东西了,这个就非常灵活对吧?好,那我们来看一看,就是呃,Flink里边给我们提供了哪些process function呢?呃,大家看到这个在flink里边它提供的process function其实就是一个函数类了,大家知道一看这个名字对吧?这其实就是我们之前讲的那个函数类,所以呃,那么这个函数类就是一大类底层的API啊,所以我们往往又说这是底层的API对吧?呃,同时它也是一一个函数类。flink提供了八个不同的process function最。
04:28
底层最基本的就叫process function对吧?这个大家一看就知道啊,然后接下来还有什么呢?还有key的process function,大家看名能能知道它干什么吗?啊,这个其实很简单,就是就是一看名就知道肯定是KBY之后,然后再去调这个process function是不是就。就就应该是一个K的process方式啊啊对吧,所以它能够操作什么呢?就能够操作k string就可以针对我们分区之后以key分区之后的对吧,每个那个呃一部分的,那那那个流数据就可以操作这些了,呃,另外还有什么呢?还有Co process方式,那这个就是干什么。
05:13
对,大家就会想到这是我们做了connect操作,对吧,做了这些操作之后,我们当时不是说connect之后可以在,呃,Co map Co Fla嘛,对吧,所以这其实就是处理这个connect connected strip,那另外还有什么呢?还有这个process join function,就是处理我们join之后的那种情况的,呃,这join我们没有展开给给大家讲啊,这是也是一种操作,大家感兴趣可以去查一查API啊,然后还有什么呢?还有broadcast process function,广播的这个process function,还有什么呢?Key的pro broadcast。Process方对吧?啊,这就是就是KBY之后的这种广播的process方式啊,最后还有什么,还有process window方,这个之前是不是跟大家提到过一句,大家还记得吗?我们当时讲那个window的window function的时候,定义窗口处理操作的时候,是不是有两类啊。
06:11
一类是不是叫做对增量聚合函数,另外一类就叫做对全窗口函数,当时我们说什么就是全窗口函数?是不是就是process window function,就是就是一个全窗口函数啊,啊,所以这里边它是一个process function啊,就是它当然也是一个全窗口函数,也是函数类对吧,也是一个底层的process function API,好,那当然了,有那个window function,那就有process or window function对吧?啊,这个就是这么八八类啊八个我们一般平常用的最多的啊,用的最多的可能就是前两个process function和kid process function。啊,当然了,就是有时候可能也会用到这个Co function啊,对吧,然后包括这个draw function也有可能要用到,所以就是我们这里重点给大家讲的呢。
07:03
是key process。就是KBY之后做的操作对吧?啊,之前我们所有的那些主要的API都是基于k stream来做操作的,所以我们这里面重点就讲它啊,这里给大家看一下这个kid process function,它到底是怎么个回事呢?其实很简单,就是这么一个function,定义好了之后呢。它里边就可以每个元素,每个数据,每条数据来了之后,诶就到这里来处理一下,因为大家想它是process方身嘛,就是过程函数,处理函数对不对啊,所以它就是要每个数据来了之后都要调调过来,都要来处理啊,所以这个操作就会非常非常灵活,呃,这里边大家注意啊,就是所有的process function,当然包括k process function对吧,它都继承了一个rich function接口,我们当时给大家讲过那个复函数,大家还记得吗?复函数有个什么特点?对,它是不是里边可以有各种各样的生命周期啊啊,另外可以有那个状态的一些操作对不对?哎,所以大家就会看到,既然process方式它还继承了reach方式的接口,那它同时还是负函数对不对?所以它里边的可以做的操作就非常非常多。
08:18
既可以啊,这个有生命周期,还可以操作状态,另外还可以前面我们讲的还可以获取到时间出water back对吧,定义,呃,这个闹钟定义,定时器之类的操作都可以做最底层最灵活的这个方式啊。好,那这里面给大家看一眼,就是除了这个生基本生命周期之外,另外比较重要的两个方法给大家介绍一下,一个是它必须要实现的,叫做process element。Process,大家从这个名字上来讲来看,能能知道他讲的什么东西吗?是不是就是处理每一个元素啊,啊,所以这个方法是必须要实现的,那么它的作用是什么呢?就是流里边的数据来了之后,流到了这个任务的时候,所有的数据来一个数据就调用这个方法process element处理一下,对吧?啊,所以大家想这就是是不是核心的一个处理逻辑啊,就应该放在这里,好,这里面大家注意啊,它传传的这个参数大家看一下,首先有一个value,它的类型是印,那就是代表我们的传入的数据对不对啊,当前的那个数据来了,然后后面还有一个out,这是一个collect collector啊这个呃,大家不管这个collect是什么,一看这个out就知道。
09:40
这是我们处理完了之后包装好的那个输出的数据结构,对不对,对吧,数据是放在这儿的啊,在这个out里边啊,当然它是个or我们输出的,肯定得调用collector的方法,然后中间还有一个。上下文context对吧,所以这个我们不是说它里边能够获取到这个,获取到那个吗?能干这个事情,能干那个事情吗?是不是这个名堂就都在这里了,都在上下文里边了,大家注意这个contact就可以访问当前元素的时间戳time sample元素的K。
10:16
然后还有什么呢?还有time service时间服务,这个时间服务它就可以去访问我们的watermark。另外还可以去干什么?还可以注册定时器。啊,那当然了,既然有注册定时器,那是不是定时器就还应该能够,我们我们注册定时器之后,是不是应该还得定义你那个闹钟响的时候到底干什么事儿啊,对吧?所以大家看这个方法on timer。就是定时器触发时候的操作,我们在这里定义它是个回调函数,哎,大家大家之前那个有同学做过这个前端的开发吗。呃,或者说有同学了解这个前端的相关的一些东西吗?啊,大家没做过是吧,如果有同学要是接触过一些前端开发的话,大家可能就知道在那个页面上你写一个JS,我们那个鼠标一点击对吧,或者说你那个鼠标一滑动或者怎么样,他在那个对写的时候都会有一个什么都会有一个回调函数。
11:22
他要捕捉我们的一个一个动作,对不对,捕捉一个事件,而他一开始定义的时候,他并不知道我我们什么时候做啊,所以说他必须是一个回调,那个回调,比方说我们点击操作,它就是一个叫on click事件,大家可能听说过这个对吧,On对吧,这个很有名啊,所以这里面其实差不多的啊,就是在这里边就是一个回调。所以就是相当于我定义的时候,并不是直接要让他就去,就去按这个逻辑去做,对吧,按这个逻辑直接去去过程去把它执行一遍,而是等到某个某种情况,某个事件触发它的时候,我们再去调用这里面的逻辑,所以它是个回调。
12:04
好呃,那接下来再给大家详细的说一下,就是这个time service里边的一些东西啊service我们不是说它是时间服务嘛,这个时间服务里边到底能干什么事情呢?啊,大家看啊,它有这个对象,它有下面主要有下面的这些方法。有哪几个方法呢?一个叫current processing。啊,这个就是所谓的返回当前的处理时间啊,这个我们不要在这里干,说了,我在这里给大家直接定义出来看一下吧,对吧,前面我们有data对吧,这里已经弄好了这个data stream,好,Data stream,然后我是不是可以KBY啊,KBY可以以这个ID k by,这就是一个kid stream,大家看现在我是不是直接可以点process,大家看啊,调用的时候就直接点process,里边是不是就可以传一个可以传process function,也可以传一个key的function,对吧?哎,好,这里边我就用一个my process。
13:07
嗯,大家会想到这是不是一个函数类对吧?所以在下边我们实现一下这个东西啊,My process,我要实现一个什么呢?是不是kind的process function这么一个东西啊,啊,刚才大家其实看到了,它里边有有什么样的一个类型呢?要定义什么样的类型呢?要定义key,要定义I,要定义o in是input对吧,In用output对不对?所以就是你的key是什么类型,然后input是什么类型,Output是什么类型对吧?啊所以这里边,呃,我们看这里边这个,呃,我们的K是什么类型呢?呃,这里边我们KBY的时候已经用了这个下划线的这种方式,那是不是这个K就是我们的string类型啊,ID就是string对吧?所以这里边大家看直接写进来。
14:03
然后我的输入是不是应该是那个sensor reading啊对,输出比方说比方说我就输出一个ink是不是可以啊,或者输出一个string都没问题对吧?好,输出no nonono类型啊,输出一个string,好,这就是我们的一个key process方式,当然大家可以看到k k process function,它是不是实现了一个obstract reach function啊,啊大家看是不是这样OB抽象负函数对不对啊,所以它就有各种各样的这个这个抽象负函数,它是这个,呃,实现了这个瑞士方式的这个接口的,所以它就有各种各样的生命周期函数啊好,所以这里我们就来看了,它必须要实现的方法是什么呢?是不是就是process element啊,大家看是不是就是这样,所以这里边我们已经拿到了这个value,然后在这里边ctx,这是上下文对不对,上下文里边是不是我们说有一个,大家看能直接拿到当前的时间戳对不对,然后还能拿到当前的current key,因为这是key外之后的对吧?Key process方式,所以能拿到当前的key,然后是不是还有一个时间服务啊好,时间服务我们看一眼它里边有什么东西。
15:21
大家看这里边就有,是不是可以注册一个timer啊,然后注册timer的方法有两种。一种叫注册一个even的time timer,一个叫注册一个processing time timer啊,所以大家就知道了,Processing timer,那是不是相当于就是我在这里注册了之后,是不是就是你机器运行多长时间之后,它就直接触发了呀,对吧?啊,我们的处理时间啊,机器时间,那如果要是even time timer的话,它什么时候出发,它是不是就得等到我们的water mark水位线涨到我们定义好的那个时间的时候,它才能出发啊,哎,所以大家注意啊,Even time里边它的推进就是靠water mark的,大家一定要把这个概念要树立起来啊。好,另外大家继续看还有什么方法呢?
16:13
呃,大家会看到了,能注册是不是就能删啊,对吧?呃,当然了,就是说如果注册了之后,到时候它触发了这个定时器,它闹钟响了,响完了之后是不是就相当于没了呀,这个大家都都都有经验啊,就是呃,设了一个闹钟响完了之后就没了,对不对啊,那那就不用管了,或者你如果要是觉得这个。还这个闹钟还没响呢,那是不是可以把它删掉啊,对吧,还没响的时候我直接把它取消掉,Delete掉当然是可以的,同样事件时间和处理时间都可以按照这个方法去delete。啊,当然了,它还可以直接获取当前的processing type对吧,那相当于就是当前的这个处理时间,然后另外还可以获取什么当年的对当前的even time,当前的even time应该就是什么对water,所以大家看这其实就是给了六个方法,其实就是一种情况是even time,一种情况present time对不对,相当于就是三种方法什么三种什么方法呢?一个是获取当前的时间对吧,另外一个是注册一个定时器,另外一个是删除一个定时器,这就是时间服务里边的这个大家能够调用的东西啊,啊,当然我这里边就可以直接那个一个even time timer对吧,是不是直接给2000就没事了对吧?他要的是一个毫秒数的一个一个long类型的值啊,所以呃,这就是我们这个注册定时器的这个过程。
我来说两句