00:00
现在我们已经了解了在flink当中函数类的用法,其实我们发现啊,对于flink的操作流程来讲,每一步具体的转换算子就是通过这样的一个函数类来定义它内部的转换操作的,而在这个函数类里边呢,最关键的其实就是我们所实现这个接口里边的那个唯一的抽象方法啊,那所以它可以用一个拉姆达表达式去直接进行表示。那这里边呢,自然会想到一个问题,就是这里的这个方法,比如说我们filter function里边的filter方法到底是什么时候被调用呢?哎,我们在整个处理流程里边看不到调用filter方法的这个过程啊,我们在外边的filter难道调的就是这里吗?注意不是啊,在外边我们定义的其实只是一个一个的算子,我们知道对于程序而言,本身的这个数据流图将会在客户端上打包进行转换,生成作业图,然后提交到集群上,由job manager生成执行图,所以这里边我们定义的就是每一步的计算。而这里边真正意义上调用filter方法,那是要等到数据来了之后,我们看它的参数是当前的数据嘛,所以其实是我们的数据流每来一个数据。
01:23
到达当前的filter算子的时候。那么就会调用到我们当前这个算子里边,定义好的这个filter方法里来,那当前的数据呢,就是我们这里的参数。执行当前的流程,哎,那得到的最终结果根据它到底是true还是false,决定当前算子的输出到底是什么?哎,这就是整个我们调用和处理的过程,那在这个过程当中,可能我们又会联想到另外一个问题啊,我们只定义了每一个算子里边每个数据来了之后调用的这个方法。那假如说我现在要针对这个算子统一执行一系列的操作,我不是针对每一个数据来了之后才执行的,而是在所有数据来之前,我当前这个算子,这个工位已经创建好的时候,我就想做一些初始化的操作,这个怎么办呢?
02:19
那我们看到udf在一般的函数类里边是做不到这些事情的。那在flink里边有没有这样的功能呢?能不能做到这件事呢?当然是可以的啊,这个时候呢,我们就要用上更加复杂的一种用户自定义的函数,哎,那对应的我们把它叫做负函数类。其实简单来讲啊,负函数类跟函数类一样,或者说它就是一个函数类的扩展版本,它是rich版本的函数类。为什么叫rich呢?Rich是富有的意思吧,它到底附在哪了呢?呃,简单来讲就是说它比一般的函数类会提供更多的功能,最大的不同就在于负函数类是可以获取运行环境的上下文,而且拥有一些生命周期方法啊。
03:09
那这个生命周期方法呢,主要就是两个,一个叫open,另外一个叫close,那open呢,我们知道这是这就是开始嘛,所以它是当前一个复函数类,也就是我们整个这个算子任务的初始化方法,它会开启一个算子的生命周期。比如说我们定义了一个map function,那map function呢,它是没有生命周期这个概念的啊,就是里边有一个抽象的map方法,每来一个数据就会调用这个方法,如果说我们想要定义一个负函数的话,那对应的就会有一个reach map方式。哎,那当然了,如果要是filter function的话,那对应的就会有一个reach filter function,几乎每一个函数类对应的都有它的这个rich版本,哎,那所以这个时候呢。就是在所有数据到来之前,在当前map任务这个算子创建的时候,就会调用它的open方法。
04:07
啊,所以我们就会发现这个open方法里边呢,就可以做一些初始化的工作,像文件IO流的创建啊,数据库的连接啊,配置文件的读取啊,这些一次性的初始化工作都适合在open方法里面去完成。啊,那与之对应的还有一个就是close方法,Close方法呢,是整个生命周期当中最后一个调用的方法,呃,类似于我们的解构了啊,之前我们这个算子任务在创建的时候调用open,那在当前这个算子任务结束要销毁的时候呢,当然就可以调用close,它可以用来做一些清理工作。那另外我们说这里还可以获取到运行环境的上下文,在运行时的上下文里边呢,其实可以获得到更多的信息,比如说当前正在执行的子任务的编号啊,啊,比如说当前运行时环境里边的一些状态啊,这一部分都是可以在负函数里边获取到的,所以这一部分呢,其实对于flink编程而言是非常重要的一个功能扩展,我们在后边会讲到所谓的状态编程,那这一部分在负函数类里边就可以得到非常丰富的应用。
05:18
接下来我们可以在代码里边也来测一测负函数的功能。同样,我们在当前的包下边去new一个SC的object,当前我们测试的是还是转换transform下边的复函数类,我们叫reach function test。没方法先写出来啊,那整体的测试流程呢,跟前面还是一样的,所以我们干脆还是把前面的获取、执行环境设置、并行度以及数据源的读取全部都copy进来上面同样我们还是引入一个下划线。接下来有了数据之后,我们就可以去定义一些转换计算了啊,那基于当前的STEM,我们可以去定义一个map,之前我们的map定义的都是简单的转换啊,或者说自定义一个map方式,那现在呢,我们希望。
06:10
定义。自定义一个rich map。方式。啊,那么我们想要去测试负函数类。的功能。啊,那所以接下来我们在这里可以去又一个my rich map,接下来就要去具体来实现它了,Class。My reach map注意,现在我们要去实现的是一个rich map方式。同样后边需要有input和OUTPUT2种数据类型的泛型,我们点进去会看到啊,Reach function函数类其实跟函数类稍微有点不一样,函数类我们看到它都是单一抽象方法的,接口是interface,而rich function呢,负函数类呢,它是一个抽象类。
07:04
主要就是因为什么呢?我们看到它是继承自abstract rich function,继承了抽象的复函数类这样一个抽象类啊,那所以当然它本身也是一个抽象类了,在这个抽象类里边,我们就会看到它有当前的运行时上下文环境可以去获取,另外呢,还有open和close对应的生命周期方法啊,那对于rich map function而言,它继承了抽象复函数类,除此之外还要去实现一个map方式。我们看到这里它其实都是Java代码,所以是function。那里边当然还是必须要实现一个map方法了,所以接下来我们还是先把当前的泛型先写进来。Event是输入,那输出的话,我们就简单一点吧,假如说直接输出一个长整型的数字吧。里边必须要实现一个map方法。
08:00
这里我们想要得到一个长整形的数据也非常简单,因为当前的事件里边有一个数据时间戳,就是长整型的,那我们干脆就把它的时间戳做一个输出就完了嘛,啊,这就是我们一个简单的测试啊,转换成一个时间戳,长整型的时间戳进行输出,这就是我们这个慢function的基本的转换逻辑,哎,那我们知道啊,如果只是定义这个的话,我们只要用map function就可以了,那现在rich map function里边显然我们就可以获取到更多的信息,比如说。我们可以来实现一个重写一个当前的open生命周期方法,这个方法是在当前的map算子创建的时候就会去直接调用的一个初始化方法,所以呢,它会在所有数据到来之前直接调用,而且只调用一次,诶,那这里我们可以在里边添加进去自己想要的一些信息啊,这里我们也没有实际的这个应用需求啊,所以我们就简单的做一个测试吧,比如说我们直接做一个打印吧,我们可以利用运行是上下文,前面我们看到啊,负函数类里边不是可以获取当前的运行上下文吗?我们可以利用运行是上下文获取到当前正在执行这一个任务的并行子任务的索引号啊,就到底是哪一个并行子任务正在执行呢?我们可以把这个信息打印输出。
09:26
所以这里面我们可以写一下索引号为。多少呢?呃,这里边我们需要从当前的runtime context里边去获取了啊,这里我们可以看到可以获取job ID啊,获取当前的作业ID,获取当前的task name,获取当前的任务名称,获取当前的状态啊,各种各样的方式,那现在我们要获取的这个信息呢,是get。Index of this sub tasks,当前子任务的索引号,然后接下来后面我们继续写当前编号为什么的任务现在是open,所以我们可以写一个开始。
10:06
这就是一句话,测试了这么多内容,有open就有close,我们同样还可以去重写一下close方法,Close方法呢跟open是非常类似的,它是只有在当前的算子任务结束的时候做一次性的调用啊,那对应的啊,我们也把这个做一个获取,然后打印输出。现在就是。当前任务结束。好,有了这个之后,接下来我们可以直接运行来看一看效果到底是什么样子的,上面我们还要加上一个print打印以及env要执行起来。好,接下来我们运行。首先这里我们有很多数据啊,那这里我们可以看到啊,所有数据到来之前,因为我们当前的并行度是一嘛,所以是按照顺序来执行的,所有数据到来之前,首先就会有一条说索引号为零的任务开始。
11:04
然后接下来每来一条数据,就会执行一次我们当前的map方法啊,那对应得到的数据呢,最后我们要做一个打印输出,所以每一个数据来了之后,它的时间戳依次做了打印,当所有数据都读取完毕之后,我们当前任务要结束的时候呢,调用了close方法,打印出了一句索引号为零的任务结束。哎,这就是我们整个这个过程,那当前只有一个索引号为零的任务,因为我们并行度是一嘛,我们还想继续测试的话,还可以把它换成并行度是二,再做一个测试。调一下并行度,看一看效果会有什么不一样。接下来在测试我们看到输出的话,因为我们知道啊,当前的这个外部任务,整个这个操作是会分发到不同的slot上,不同的并行子任务去执行的,那当然有一个并行子任务就会调用一个open方法啊,当前我们就会执行一下,我们可以看到索引号为零的任务开始,索引号为一的任务开始。
12:06
而后边的数据呢,每一个数据可能会分发到一和二不同的slot上面去执行,去打印输出,每一个数据都会来了之后调用一次map方法进行转换,最后啊,那全部处理完成之后呢,索引号为零的任务结束,索引号为一的任务结束,只调用一次close方法,注意是每个并行子任务都只调用一次open和close,而每个数据来了之后呢,都会调用一次map方法。这就是我们所说的复函数类的用法,所以在实际操作的时候,像这个复函数类应该怎么去使用呢?啊,那一个常见的应用场景就是我们要连接到一个外部数据库,想要去读取数据或者说写入数据的时候,那这个时候对于数据库的连接操作,如果我们放在这个map这个方法里边显然是不合理的,因为我们知道啊,你在这个map里边,它是每来一个数据就会调用一次,对于数据库的连接就会反复的打开关闭,反复的打开关闭这个代价就会比较高,很显然我们应该在初始化的时候一次性的建立连接,后面每来一个数据之后进行读写操作就可以了。
13:20
那当然了,最后在整个任务结束的时候,再调用close方法里边我们再去关闭到数据库的连接,释放资源啊,这样就完成了整个的流程。所以这是在实际应用当中比较常见的一种负函数类的用法。关于负函数类呢,另外一个常见的功能啊,常见的用处就是因为它可以获取运行上下文,前面我们也可以看到了,获取上下文之后,接下来可以get state,可以获取当前的状态,所以呢,我们就可以在里边非常灵活的自定义状态,然后用状态去处理各种各样灵活的功能啊,后面我们讲到状态编程那一章的时候,还会再反过来用到负函数类啊,这就是关于负函数类的用法。
我来说两句