00:00
我们这里专门要去另外介绍的是一个新的内容,就是所谓的函数类rich function。诶,这里所谓的函数类是什么含义呢?如果我们还记得的话,应该还能想起来,之前我们在测试卡夫卡的时候会发现。卡夫卡的连接器给我们传进来的这个fli卡夫卡consumer,它的本质上其实是一个parallel方式。关于parallel,我们已经知道了,这是一个并行的,可以并行产生数据的一个S方式,一个原任务的实现。那这里面的又代表什么呢?我们发现reach表示的是继承了abstract rich方式,这里的reach方式就是我们所说的负函数类。那到底什么是函数类呢?啊,那本质上来讲,它其实也是data threepi给我们提供的一类函数类啊,那可以认为也是一种接口,但是事实上啊,负函数都是以抽象类的形式出现的。
01:08
所有的函数类,比如说function functionman function reduce function等等啊,他们都有对应的函数类的版本。哎,那所以这里面的关键就在于这种负函数类,它是rich,它比较富有,那它跟普通的函数类版本到底有什么区别呢?它到底附到哪里去了呢?当然就是比常规的函数类可以提供更丰富的功能了。它最大的区别就在于可以获取到一个运行时环境的上下文,在这个上下文里边当然就能处理更多的信息,能够干更多的事情,另外呢,他还拥有一些生命周期方法诶,所以他能做的事情就会多更多啊。那这里边涉及到了一个生命周期的概念啊,这个我们其实并不陌生,在Java里边最典型的当然就是说每一个对象它其实都是有生命周期的,因为我们知道GM是有GC机制的呢,啊,在C语言里边本身是没有GC的啊,那就是相当于所有的内存分配和回收,哎,我们做这个lock free都是需要去手动管理的。
02:22
那这是有很大的一个问题,如果说分配了之后一直不回收的话,那很容易造成内存泄露啊。所以Java g vm主要是对于这个问题进行了一个优化,我们程序员就可以把更多的精力放在业务逻辑的处理上了,而不要去处理这个对象的回收,那对象在GVM里边就会有自己的一个生命周期,有创建和消亡的过程。而对于flink而言呢,在rich方式里边,它同样有一个生命周期的概念,那最典型的生命生命周期方法当然就是open和close了。
03:01
所谓的open打开。当然就是在当前每一个算子任务创建的时候会去调用的一个初始化的方法,也就是说对于我们当前这个算子任务而言,如果用一个reach方式,用一个复函数类去进行了声明的话,我们就可以在open方法里边,在当前这个算算子任务实例被创建的时候。去做一些特别的事情。这一个方法会在我们内部必须要实现的那个接口方法被调用之前调用,因为它是初始化方法嘛,后边我们像这个map,比方说filter,或者说我们在这个reduce function里边必须要实现的reduce方法,它在什么时候调用呢?其实是每来一条数据的时候才会去调用,因为我们说这里边传入的就有当前已有的状态,违约状态和每一条数据嘛,诶,那或者前面我们提到的map map方式里边这个更加明显,传入的参数就是当前的每一条数。
04:09
所以这个map方法是每来一条数据才会调用一次,一开始创建出任务的时候,我们在等待数据到来的时候,它是根本不会去执行的。而open生命周期呢?只要当前的任务创建出来,它就会被调用,所以它是一个初始画风。一般在这里边干什么事儿呢?那就是像文件IO啊,或者说像数据库连接的创建,或者配置文件读取等等,非常适合在这个初始化的open生命周期里边进行完成。另外还有就是close方法,Close方法我们知道对应的它是关闭嘛,这是生命周期当中最后一个调用的方法,有点类似于解构啊,那一般就是做一些关闭和回收清理的工作了,那如果说在open生命周期里边创建的数据库的连接,那我们现在就应该关闭数据库的连接,做一些清理,有一些不必要的内存空间可以就直接释放掉了。
05:07
这里需要注意的就是生命周期方法open close对于每一个并行的子任务来讲,只调用一次,那当然如果是并行的任务的话。每一个子任务的实力都会对应的调用自己的一个open和close,它是针对当前的并行任务实例对象而言的。那里边的具体的那个抽象实现出来的抽象方法,Map filter reduce,它是对应着我们处理数据的工作方法,他们是针对每一个数据到来之后都会去出发一次。接下来我们可以在代码当中去用一个例子来看一看。Reach function的用法。这里我们可以把这个命名成transform,同样还是部分transform。
06:05
整体的流程其实还是一样。我们把main方法写出来,Throws exception里边的过程的话,我们还是借鉴之前的。定义的这个过程,创建执行环境以及输入当前的数据。接下来我们就可以去做一些针对stream做一些转换操作了,哎,我们可以直接就用richman方式,我们做一个例子吧。这里边直接定义一个点map操作,然后里边那就实现一个my reach map,当然我们这里边也可以用匿名类的方式去实现,当然了不能使用拉姆达表达式,因为很显然在瑞士方式里边就不仅仅只有一个单一的抽象方法了啊,那还有生命周期方法,如果用到了的话,他显然就不能使用拉表达式去实现了。那这个问题呢,其实针对之前的s function同样是适用的,在s function里面我们知道它需要实现wrong和CAN2个抽象方法,那很显然我们直接使用拉表达式就不能去单独把它进行描述了啊,那所以我们只有在单一抽象方法接口里边实现它的时候才能够使用拉表达式啊。
07:21
那这里面我们关键就在于实现一个自定义的。自定义的函数类。那这里我们就是public static。Plus。My rich。注意,这里不是implement,而是。一个map方式。我们点到源码里面可以看到,这是因为rich map function它是一个抽象类,而并不是一个接口。为什么是抽象类呢?因为它继承自abstract rich function抽象的函数类啊,那这本身也是一个抽象类了,它实现了rich function接口。
08:04
而rich map function实现这样一个函数类接口,继承字抽象函数类的同时也实现了map function接口,所以当然了,当前我们里边也需要有一个抽象的map方法,需要我们单独去做实现啊。那接下来我们就可以看到,呃,当前还是应该有两个对应的泛型参数啊,In和out,这个跟那方式还是一致的,所以我们这里边可以基于当前的event,然后把它转换,我们举事例的话就简单一点,直接把它转换成一个inte,一个整数吧。里边必须要实现的方法有一个map,那除此之外呢。我们会发现还可以去实现当前继承自abstract rich function里边的一些生命周期方法,比如说open和close,当然了,还有一些是跟运行时环境有关的方法,那基于这些,比方说我们get runtime contact,就可以获取到当前的运行环境,然后去得到一些其他的信息,做一些操作,那也可以去set当前的运行环境。
09:12
那我们这里边先把open生命周期和生命周期先定义出来。然后接下来我们就知道了open生命周期里边,那就是当前任务实力再去创建的时候,首先要调用的一个方法,这里边首先有一个super.open啊,调用它副类里边的open方法啊,这里边我们可以有也可以没有啊,其实并没有特别的操作,我们这里边可以做一个直接做一个打印输出。打印一条信息啊,因为我们可以知道。当前是open。生命周期。生命周期被调用。啊,那我们这里边还可以加上一些其他的信息,比如说我们可以在get runtime context里边拿到一些比较特别的东西,比如说我们可以定义状态,诶,这也是负函数类的一个非常重要的用法,就是在获取到当前运行上下文之后,可以自定义状态。
10:19
进行状态编程,所以这个功能其实非常强大,有了这个功能,理论上一个map function,诶我们这里边本质上是要实现一个map function,但是如果说我们是一个rich map function的话。他也可以去自定义状态。之前我们说一个慢方式,本质上它是一对一的转换,不涉及到之前所有数据的某些信息,我们没有状态。只有在做聚合的时候,Reduce的时候才会有状态,而现在呢,自己可以自定义状态了,那也就代表甚至一个map能够实现聚合的功能。这也就是为什么flink被称为有状态的流处理啊,它的使用对于状态的使用是非常灵活,可以实现各种各样功能的。
11:06
啊,那当然了,除了这个state之外,我们还可以获取当前的任务名称啊,另外还可以获取任务名称with当前子任务啊,Sub tasks,另外呢,还可以还可以获取一些比较特殊的状态类型啊,比方说aggregating聚合状态,关于具体的状态我们会放到后面章节做介绍,这里边还有一个啊,这个可以看到,可以获取到当前子任务的索引号index,诶,这个比较有趣,我们就直接把这个拿到获取当前的索引号,诶,那所以当前我们可以知道这是第几号任务。我们直接在这里做一个信息的输出。它启动了啊,那同样我们可以把这一句。输出到close这里,因为我们这里没有特别的操作啊呃,那我们这里是直接用一个打印输出来表示这个方法被调用,现在是close方法。
12:07
生命周期被调用,那现在是?结束。那中间的转换呢,我们需要把每一个value转换成一个啊,这个很简单,我们可以直接用一个数字做一个返回,也可以比如说拿当前value,它的URL里边我们提取一下它的长度。一下它的长度进行反馈啊,那这样的话就可以得到一个整数类型了。接下来我们可以把得到的结果直接做一个打印输出。DV execute执行起来。接下来可以运行一下,看看当前的效果怎么样。我们可以看到非常明显,Open生命周期只被调用一次close,同样close也只被调用一次,只是在开始和结束的时候有这么一次调用,而中间的map方法呢?啊,那很明显了,每来一个数据都会调用一次,现在我们测试的有三条数据,它输出的是63,也就是当前URL的字符串长度。
13:17
这里为什么只有一个零号任务呢?因为我们当前并行度是一嘛,全局并行度是一,当然当前执行这个map任务的时候。所当前的子任务就只有一个了,那如果说我们想要看的更加的清楚一点的话,我们可以对它做一个并行度的设置,比方说设成二的话,接下来我们再来重新运行一下。当前我们就可以看的非常明确。并行度设置成二之后。Open生命周期和close生命周期就会调用两次了,因为每一个并行的子任务,零号和一号都会有一个。
14:04
当前对象实例任务的对象实例被创建和被销毁的过程,所以open和close都会被调用一次,而对应的数据呢,这里边处理的过程它只被最终因为print它的并行度是二,所以这里边打印输出的时候还是以一个线程一个并行子任务去做打印的,这里边并没有看出他们的不同。在具体map处理的时候,可能是分配到了不同的病行子人物去进行处理。这就是关于负函数的用法。
我来说两句