00:00
掌握了窗口这部分内容之后,其实我们已经可以应对工作当中的绝大多数需求了,因为我们想啊,要做流处理的话,诶,那无非就是这几种计算吧,要么就是简单的基于每一个数据来一个就做一个转换,那这种转换呢,可能就分成了两类,一类就是我们所说的啊基本转换算子,Map filter flat map来一个转换,一个不涉及到中间状态的聚合,那另外一种呢,诶,那就我们就说的就是reduce了,那就是来了数据之后,我们可能中间要保存一个状态。不停的叠加,不停的规约,哎,那最后得到的就是唯一的结果,诶,那这都是每来一个数据就处理就输出,那如果说我们想要分成时间段一部分一部分的进行处理的话,那就去开窗口,诶,那所以前面的两部分内容啊,我们介绍了data streamam API里面的基本算子和窗口算子之后,那接下来我们其实已经掌握了弗Li编程的核心啊,但是我们知道啊,对于弗link而言,它其实给我们提供了分层级的API,那我们所说的这个data stream API呢,它是中间的这个核心层,也就是工作当中可能用的最多的一层。
01:14
那另外如果说我们工作当中遇到了一些非常特殊的需求,之前我们讲过的基本的转换,基本的聚合,还有这个开窗口之后的各种操作,如果都搞不定的话,这个时候怎么办呢?有些同学可能会想,这还有搞不定的需求吗?啊,比如说我们想啊,我现在。就是想要对时间进行精细化的控制,比如说我在我当前的一个算子里边啊,需要去指定。根据当前的时间,然后指定在当前时间之后的十秒钟。去做一个某个操作。哎,那这个操作怎么样去指定呢?怎么样去定义呢?啊,在我们之前的API里边好像没有提供对应的功能。
02:00
之前我们好像跟时间相关的就只有窗口操作了,但是我们的窗口操作那都是指定的呀,要不就是指定长度,要不那个session的话,那是要指定这个间隔时间嘛,指定这个绘画的间隔嘛,所以我们现在就得考虑能不能获取到更加底层更加丰富的信息呢?比如说掌握到这个时间信息,比如说可以去指定过一段时间之后发生什么事情,诶这就是我们接下来要介绍的底层API,我们要直接针对流数据去进行有状态的流处理。这就是我们接下来要讲的第七章处理函数,处理函数其实是一个翻译啊,处理函数本身是叫做process方式。它在调用的过程当中呢,其实跟我们前面讲过的那些转换算子都类似,也是基于一个data stream去调一个点process方法,然后里边传入一个process方式就可以了。那这里的process方法我们想到它就是处理的意思,诶,所以我们看到它就更加的一般化一点,它不是指定,比方说像map我们一看就知道是一对一的转换吧,像filter一看就知道是要做一个过滤嘛,Reduce一看就是要做一个规律聚合,那process呢,只是处理,并没有说怎么处理,所以呢,他就给我们提供了通用的接口,提供了最灵活的处理方式,那具体的逻辑你自己去定义吧,在这里边什么东西你都可以获取到啊,那比如说这里边就有当前的数据事件啊,当前的事件那当然是得拿到了啊,数据肯定有,然后还有状态,诶我们说现在是有状态的流处理吗?底层的状态在处理函数里边都可以获取到,另外还有时间相关的信息。
03:45
啊,这就是前面我们说的,能不能捕获到当前的watermark,然后基于它再做一个定时的操作呢?啊,在处理函数里边这些都能拿得到,所以我们就有了完全的控制权,这一层级呢,我们就会发现它太过于底层,所以说是比较抽象的啊,在实际应用的过程当中,如果可以不用它,可能我们就不会去用啊,因为确实是编程会相对麻烦一点,但是有一些需求可能是非它不可,我们就要控制的那么精细,就要这么多功能,所以说这就是我们做弗link流处理的大招啊,不轻易用,一旦要是派他上场的话,那基本上就是所有需求都能够搞定。
04:26
好,那接下来我们就在代码当中去看一看process function到底是怎么用的,接下来我们可以新建一个啊,现在是第七章,所以我们新建一个package。叫做CHAPTER07。然后接下来新建一个测试的object,当前我们测试的是process function。Test。没方法先写出来,哎,那首先这个流程还是一样的啊,Stream execution environment啊,那这里我们把这个下划线做一个引入,调用它的get execution environment方法得到这个叫做env,同样还是全局的并行度,我们先设置成一,方便后边测试输出,接下来呢,就是读取数据了,我们可以直接基于EV。
05:13
去调一个ADDS方法里边直接有一个我们自定义的click s。诶,那当然了,我们知道PI本身是声序排列的数据嘛,所以我们可以直接传一个igning time stamps里边指定当前的时间戳就可以了,这是我们基本的数据的准备啊,那么得到的这一个,我们把它叫做stream。基于当前的data stream,我们就可以直接调用一个叫process方法,我们看data stream本身就有这个方法,叫做process,里边需要传入的就是一个process方式。我们可以看一下这个process function又是个什么东西呢?哎,注意它不再是我们之前一般意义上函数类都是一个接口吗?它现在是一个抽象类。
06:01
为什么呢?因为它本身就继承自abstract rich function,它本身就是一个负函数类啊,所以我们现在就知道为什么说process function强大了啊,那它本身其实不是一般的我们所说的这一个转换算子对应的那个函数类,它本身就是一个负函数类,有生命周期,有运行上下文啊,可以获取到状态,可以获取到各种各样的运行式信息,所以复函数类有的功能它都有。另外,它还有一些独有的功能。接下来我们来做一个详细的说明啊,那首先我们看到process function它有两个泛型参数,一个是I,一个是O,这个很明显了,它做一个处理转换嘛,所以当然可以更改当前的数据的输出类型啊,那输入的数据类型是I,输出的数据类型是O,所以我们看到了map能做的这个转换,当然process是能搞定的。然后我们看它里边的内容,哎,这里边最关键的有一个抽象方法叫做process element,哎,那就是处理元素,很显然这跟我们之前所有函数类里边定义的那个抽象方法类似,都是每来一个数据就会调用这个方法,那他说的也很明显,就是处理当前的数据元素嘛,那后边的这个I。
07:18
Value,很显然这就是当前我们输入的数据,然后接下来呢,有一个ctx,它是一个上下文,最后还有一个collector类型的out啊,那这个就很明显了,我们输出数据的时候就还是调用out.collect这个跟Fla map是完全一样的,哎,那所以很明显,当前process function也可以很容易的实现Fla的功能,就是你想输出几次输出几次,只要调al点就可以了。那另外呢,我们这里因为还继承自负函数类,所以负函数里边我们不是可以获取运行上下文吗?啊,还有生命周期,那运行上下文里边本身又可以捕获当前底层的状态信息,哎,那所以process function也可以做状态编程啊,那我们前面提到啊,所有的聚合操作不都是保存了一个当前聚合的状态,然后依次叠加的吗?哎,它所以有当前聚合的状态定义出来之后,其实process function也可以实现类似于reduce这样的聚合功能。
08:23
之前我们所有做的操作在process function这里都可以实现,那这里还多了一个东西啊,我们看到还多了一个context上下文,这是真正意义上跟复函数类又不一样的一个地方,这里它还有一个单独的上下文,这个又是什么东西呢?我们看到。在他自己这里就做了一个单独的定义,这个context里边。可以获取当前的时间戳。而且还给我们提供了一个时间服务,定时服务timer service这样的一个方法。这个方法又是用来干什么的呢?诶,我们可以点进去看一下啊,这是一个接口,在这个接口里边,它可以告诉我们current processing处理时间,Current water当前的水位线,也就是事件时间啊,这两个方法其实我们非常的熟悉啊,因为之前我们讲过process window function啊,在里边就可以实现类似的功能啊,就可以调用到这个东西,那另外呢,下面我们看它还可以。
09:26
注册,注册什么东西呢?Processing time timer,注册even time timer,所以它还可以去注册一个定时器开门,我们说就是相当于设定了一个定点的时间啊,然后到时候去执行对应的操作,这就相当于一个闹钟一样,我们设置一个时间,然后把对应的处理逻辑到那个时间才去出发。哎,那当前的时间呢,可以以处理时间为准,也可以以事件时间为准,诶,这就是time service给我们提供了非常强大的用途啊,我们真正想对时间做精细控制,在这里都能实现了。
10:04
啊,那能设置就能删,后面还有delete processing time timer和delete time timer,所以我们看到啊,有了这样一个定时服务time service,有了它之后,那process function呢,就可以设置定时操作,那不就可以做窗口对应的处理了吗?你告诉我现在诶有一个窗口到十秒钟的时候要去触发计算,那我就设置一个十秒的定时器嘛,诶所以我们看process function就是最底层的东西,之前我们讲过的所有API用process方式理论上都可以实现。啊,那另外还这里还有一个output,诶,我们看到这outut又是干什么的呢?里边要传一个output type,所以我们看到process function还可以做测输出流的输出啊,那这也就包含了之前我们在处理指道数据的时候,窗口里边测输出流那种功能啊,而且用这种方式。它还可以实现非常一般化的分流操作,哎,就是我们现在有一条数据流,我就有一个想法,想要把它一分为二,分成两条流,能不能做到呢?诶之前我们讲的这个过程,那可能就从代码上单独去处理了,但这个效率就比较低,我们这里边更好的方式就是还在一条流里边直接。
11:20
调用一个process function的auto的方法,然后直接把它输出到一个特殊出流里面就可以了,就实现了分流操作,所以process function真的是功能非常非常强大。啊,这里还要多说一句,就是我们再往上看,看到还有一个抽象方法,叫做on time。这又是一个什么东西呢?啊,哎,看这个名字我们就看到了啊,就是on time,肯定是跟定时器有关嘛,On什么什么,这其实就是事件触发之后,接下来要执行的操作啊,就是大家如果要是对前端编程比较熟悉的话,可能知道啊,就在一个页面上啊,一个设置一个按钮,一个button,那它本身就会有一个on click事件,对吧?呃,我们可以调一个on click方法,然后我们。
12:07
就可以定义点击这一个按钮之后可以做什么样的操作了,那同样这里的on timer。指的就是当定时器触发的时候。现在我们到底要做什么操作?诶,之前我们不是可以注册定时器吗?Camera service里边这里可以注册定时器,注意这里只是注册,那注册了之后触发的时候干什么事儿呢?这个是要在process function的on timer方法里面去实现的。哎,那所以这个所有的功能就完整了,我们现在想要的东西就都实现了。啊,那在这里我们简单的上手来测一测,那这里我们就直接去new一个process function就可以了,哎,这里我们使用了匿名类的写法,那当然把这个process function要引入,这里边它有IO2个泛型,我们现在的输入数据当然是了。输出的数据类型,诶,我们可以简单的想定义一下,只是测试嘛,我们简单起见啊,干脆就直接提取当前的用户名,直接打印输出就完了,就一个string。
13:10
所以这个逻辑是非常简单的,然后接下来那就是具体的实现了,必须要实现一个process element方法,在这里面呢,每来一个数据都会调用这个方法,哎,那我们就可以考虑怎么样去处理呢?呃,比方说我们可以判断如果。当前的这个value。它的user,如果要是我们说它等于啊,我们可以调一个equals方法,如果它是Mary的数据的话,那么我们就直接输出out.collect当前的user。直接把Mary输出一下,这是一种情况,然后我们当然还可以继续判断了,L if,如果当前的user是另外一个人,如果是Bob的话,诶,那么我们做的操作稍微的多一点,我们可以输出他的user,而且既然是string类型嘛,我们还可以把他的URL也做一个输出啊,就类似于我们之前做的Fla map里面的操作啊,value.user和。
14:11
Value点。URL都可以输出啊,那time如果我们想打印转成string的话,也可以输出啊,啊,那所以这里边我们就可以看到,可以非常简单的实现之前的map filter flat map的效果啊,那对于其他的用户来讲啊,那不输出就相当于直接过滤掉了嘛。这就是关于一些基本操作啊,那当然我们说这个process element里边,我们其实还可以去get runtime context,因为它本身就是函数类嘛,哎,在这里边我们想拿什么信息也是可以拿得到的,比方说之前我们说的get in deth of this sub task啊,这个完全是可以做到的。那我们可以把它做一个打印完全可以啊,那另外呢,我们还可以直接打印一下当前的水位线。这个在GTX上下文里边可以获取到,我们可以获取当前的时间戳,另外还可以使用timer service这个服务获取current water。
15:07
啊,那这样的话我们就可以看到更多的信息了,那接下来如果要做测试的话,我们还应该最终有一个think操作,把它打印输出en v exit执行起来。接下来我们做一个运行。好,接下来我们这里的数据已经输出了,那这里输出的数据我们可以看到啊,那当Bob的数据来了之后,这里其实就有两条数据输出,后边它跟着当前的URL,接下来呢,我们要打印一下当前的sub和ID啊,那当然了,当前我们全局并行度是一嘛,所以肯定就是零了,然后注意这里还有一个watermark。Watermark,这里是一个很大的负数,这是什么东西呢?哎,这其实就是我们所说的长整型的最小值,为什么是这个样子呢?这就是因为我们当前是事件时间,事件时间的推进是要靠着数据的时间戳来推动的,那一开始还没有数据的时候,当前的事件时间到底是多少呢?其实之前我们在。
16:14
处理乱序数据的时候,这个调用的过程当中。我们可以看到啊,里边它到底是做了一个什么操作,哎,这里我们就看到当前max time step初始值给的是什么。这就是给了一个非常大的负数,然后加上了当前设置的延迟时间,再加上一毫秒,哎,那这里为什么要加一毫秒呢?因为当前设置的是最大的时间戳,初始的最大时间戳,那后边我们设置watermark的时候,这个是还要减掉的啊,所以我们看到最后生成的water mark其实就是我们当前。一开始设置的这个初始值啊,所以我们看跟它是完全一样啊,那一开始没有数据的时候是这样,那为什么报破数据已经来了之后还是这样呢?诶这就是我们说的当前我们是周期性的生成,如果是隔默认200毫秒生成一次的话,那很显然Bob第一条数据来的时候,我们在处理Bob这条数据这个流程的过程当中,Watermark是没有更新的。
17:16
只有在它处理完成了之后,下一个时间间隔200毫秒生成watermark的时候,才会把它做一个更新哦,所以我们看下一次啊,Mary第二条数据来的时候,哎,Mary做一个这个user输出,那这个时候的water就更新了哦,那当然了,这个更新的water mark其实并不是Mary本身的时间戳去更新的,因为他的时间戳还没来得及更新的啊,那应该是之前已经更新好了的当前的水位线。那后边同样都是来一个数据对应的啊,我们就有相应的输出,有水位线的打印,这就是我们对于process function的一个基本的了解。
我来说两句