00:00
呃,那么这样的话呢,我们的这个source插件这一块呢,就是讲完了,接下来呢,我们看这个transform插件。那么transform插件顾名思义呢,就是对我们里面的数据做一个转换操作,嗯。Transform块里面呢,也可以声明多个这个转换插件,这个地方呢,我们需要注意一个问题,就是社区呢,刚才看文档我们也发现了。就是社区呢,列了一个很长的表,它对这个转换插件呢,做了很多规划,但是截至这个2.1.0的版本呢,我们现在可以用的插件呢,只有两个,一个是split,另外一个呢就是circle。啊,那么transform块里面呢,可以说明多个转换插件,也所有的转换插件呢,它都可以使用一些公共的选项,比如说是source name和name。那么这个地方呢,你会发现比前面的这个内呢多一个。啊,比前面的name呢,多一个name啊,什么意思呢?我们设想一个这样的场景啊。就是我编写了一个配置文件,刚才我们说呢,Source块里面它其实可以声明多个S,那么我们这里呢,还声明了一个source,然后我们后面呢,还声明了一个source,这样的话呢,我整个这个配置文件里面是对应的两个S的。
01:14
那么我后面有一个声一个什么呢?我声明一个transform的一个插件,那么请问我这个transform插件是使用前面两个表,前面两个这个source里面的哪一个里面的数据呢?啊,实际上我们刚才说了,就是前面的这个。Source插件呢,它可以指定我输出的一个啊表明啊,也就是result name,那么假如说这个插件的这个result name吗?是T1,然后另外一个的这个插件呢,它的result name呢是T2。那么我们就可以在这个转换插件这一块呢,也就是这个source插件的下游,哎,去设定一个s t name,也就是我从哪个表里面去拿数据啊,如果说我设T1的话,那么我就可以去搭建一个这样的理由,如果呢,我设T2。
02:02
是吧,我就可以反过来。使用七二这个表里面的数据达成一个这样的理由,那么这呢就是一个内的作用。然后接下来的话呢,因为这个呃,刚好这个transform这个插件呢,只有两个,一个是插件和一个S插件。那么这样的话,我们就可以去源码上,就是深的,就是深入的把这两个呃场景好好看一下。另外一个就是在这个C塔诺中呢,我们一个转换插件呢,待会儿呢,会着重讲述这四个方法,这四个方法呢,对于一个转换插件来说是最重要的。啊,那么我们现在直接去源码里面看一下这个。啊的插件的实现。嗯,在这个transform里面,我们可以看到有一个transformlink split这个地方点开,然后在s RC Java底下,你可以看到有一个split。
03:01
呃,我们可以看到呢,首先就是这个split呢,它继承了一个什么,它继承了一个flink STEM的这个。啊,不是,它不是继承的,是实现实现了两个接口,一个是流处理的,一个是批处理的。嗯,然后后面接下来呢,是它的一堆属性。也就是这个地方。我们可以看到哈,首先是这个一个插件里面,它会首先是会有一个这个配置的这个对象。然后另外呢,就是。它会有一个。对自己的一个这个选项,它做了一个常量的处理啊,完了之后它还有一个默认的分隔符。然后最后呢,是有一个这个叫nu,就是数字的意思,也就是说最后要分成几个字段,然后另外一个呢,File里面就是我们说的这个啊,你的字段名啊,它一分来之后呢,它放在一个集合里面,还有一个就是row type in for。啊,也就是这个肉这个里面,它各个字段的类型是什么的,是什么样的,比如说是啊第一个第一个字段是int型的,然后第二个字段是字符串是吧,是这样的。
04:10
那么我们可以看到哈,往下拉。这里呢,有一个prepare的方法,这个方法呢,实际上对于一个插件来说呢,啊,是用来解析它的日志的,我们可以看到就是我们刚才看那些引用。在这个地方呢,它都有一个赋值的操作,实际上这个方法是在这个插件被初始化的时候就会调用的一个东西。可以看到这个files呢,它被这个从config里面拿出来,也就得到了我们刚才说的在那个插件里面设置的啊字段名称完了之后,Name呢,就是这个five.size啊,它会去这个,因为这个是地方是得到了一个什么呢,得到了一个集合啊,这个地方呢,它就直接把它的这个大小调出来。包括各种的操作,最后呢,全部挂到这个属性上。然后接下来呢,我们关键是往上看一步。
05:04
这里有两个方法,一个是process batch,呃,对它进行批处理的,另外一个就是process stream,要对它进行流处理的,那么这个插件呢,很有意思。我看有一次在哪。就是它是一个转换插件。但是呢,你会发现我们的这个数据。他是怎么样进来,然后就立刻return了,怎么样出去。啊,他没有对这个我们的数据进行一个操作,也就他名调这个分割的一个插件,但他没有对这个数据进行分割。那他干了个什么事情呢?你要注意啊,在这个位置。下面这个function这个地方。它像这个表环境,呃,它像执行环境哈,就是你比如这个是get stream environment啊,还有get BAT environment,它对这个表环境里面呢。注册了一个名为split的。UDF用户自定义函数,我们可以看到它这里拗了一个啊sc split这个对象完了之后呢,直接把我们的spli就是分隔符,然后包括这个肉的一个类型信息,还有一个这个。
06:15
字段的数量。直接传了进去。呃,也就是说其实split这个插件,它其实不会对数据进行操作。它起到的作用就是向这个表环境里面啊,去注册一个udf,而且呢,这个函数的名称呢,是写死的啊,这个udf的名称是写死的,也就是说你在整个这个就是在我们目前这个2.1.0的版本里面呢。啊,你整个这个配置文件里面只能使用一次这个split插件,而且经过我的实验呢,是发现。啊,那很自然的哈,这个就是就最后一个,就是最越靠后的一个啊的插件,它才是有效的,前面的这个插件呢,都是无效的啊,这是它目前的一个实验方式,我相信这个可能在。
07:03
下个版本或者后面几个版本,这个地方肯定会做一个更改,至少这地方呢,啊要做成一个。呃,动态传参的一个功能是吧。那么接下来呢?实际上我们还要讨论一个问题。啊,就是我们发现这个地方,实际上它是起到了一个注册udf的作用啊,可是呢,我们发现玻璃的插件呢,它其实能不能处理数据呢,就是能不能对数据进行一个立刻的操作呢,它也是可以的。啊,因为它继承的它实现的那两个接口呢,里面有一个process和process stream的这个呃,方法要实现。那么如此一来呢,实际上如果说你自己去实现一个插件的话,那么一个transform插件,它既可以,它同时既可以作用于数据,又可以去注册一个函数,那么这个地方实际上呢,呃,它没有起到职责单一的作用,它违反了有点违反这个。
08:00
这个职责单一的原则。那么这样的话,就是需要你自己去,你比如说你要做二次开发的时候,你需要自己去对自己做一个约束,也就是说啊,我把这些插件严格的分开啊,比如说有这个注册这个函数的这个,呃,转换插件呢,它就只注册函数完了之后呢,呃,去处理数据的这个插件呢,就专门的专职的去处理数据,那么当是在目前这个版本呢,实际上啊是function,就是注册这个函数。和这个处理数据呢,是在同一个插件里面去做的事情,那这个地方大家需要注意一下。那么最后就是对spli的插件做一个总结,就是我们如果说是要用到这个SPSP的插件的话,那么很简单了,按照它这个源码的实现思,我们就能知道它实际上应该是啊,先去注册一个名为SP split的啊这个函数,因为我们在那里面的传参的时候是传了一个spli,然后一个啊,传了一个spirit,传了一个这个字段的名称。
09:03
啊,完了之后他去注册完这个啊SP函数之后呢,我们什么时候才能用到它呢?实际上就是在我们的后面的SQ插件里面啊,去写一个SQL语句的时候,我们可以通过这个SP的函数啊,对我们的里面的数据进行一个分割,同时呢,啊,因为这个SP是写死的,那么全局只能有一个,所以说你你只能有一个具体的的实现啊,当然是最靠近你这个SQL插件的最后一个。那么我们现在呢,就顺便去看一眼SQ插件的实现。啊,这边呢,就是它的这个circleq插件的实现,我们可以看到现在呢,我们呃有一个process stream方法,这个呢就不是直接return了啊,它呢很简单,是先从这个疑对象里面拿到这个stream table,这个even啊完了之后呢,在这个里面呢,直接去执行一个SQL的查询,那也就是我们那个在配置文件里面写明的circlel,然后之后呢,通过这个工具类再把这个table转成啊data stream row返回给外面的下一个插件啊所以说呢,呃,还是我们之前说到的就是这个肉呢,它是整个小唐豆工作流的核心数据类型。
10:12
啊,但是我们再往后看一下。你发现这里面呢,它就没有那个注册函数的方法了,为什么呢?因为那个注册函数的方法它是重写的啊,它不是那个接口里面要求必须实现的一个功能。嗯,所以说呢,在这一块呢,实际上啊,他还是值得值得单一上,嗯,不是那么的完美。啊,所以说呢,这个插件呢,它可写可不写,但是如果说呢,你真的想注册函数的话啊,就是同时注册函数和同时处理数据啊,它实际上是保留了这个能力的,这是他目前的一个。啊,地方一个点,一个要注意的点。
我来说两句