00:00
嗯,好的,那么我们现在呢,这个even块讲完了,块讲完了,Transform块讲完了,S块也讲完了。那么对于我们的整个的框架来说呢,我们已经了解到了它的所有的零件啊,所有的零件,但是你要注意一个问题,就是我们的工作流,它到最后是怎么样串起来的呢?这个实际的上呢,就涉及到这个can的一个核心代码,也就是说它最后是要把这些东西全部串成一个完整的工作流的,那么我们现在呢,来看一下can这一块是怎么实现的。那么首先呢,我们还是去看下源码。在这里面呢,有一个module叫,也就是它的核心代码。我们还是以flink为例,然后呢,点开这个flink。看到一个Java,然后这里面呢,有一个带绿箭头的,也就是它里面包含慢方法啊,实际上我们那个调用的时候呢,就是通过这个脚本直接把这个我们的配置文件里面的参数。
01:02
传递给了他啊,传递给了我们这个卖方法完了之后,这个里面逻辑写的很简单,也就是我们的这个can。他直接点乱,然后把这个参数参数。传进来,然后完了选择flink引擎啊,那么整个程序开始运行了,我们现在呢,就来看一下这个run里面的这个逻辑。把它点进去。那么可以看到呢,这里面实际上它有一些检查啊,到最后呢,还有一个就是它是不是一个测试的。但是我们呢,一般来说不是那么大多数时候呢,走的都是这个else啊,因为检查你看这个检查这个con是否合法,完了之后他说log in for啊con OK,那么结果呢,它我们大多数时候呢,是走的这个else这个分支,那么这个地方呢,就踹了一下。嗯,关键的我们的这个一个零件组合成工作流的逻辑,就封装在这里面。我们点进去看一眼。
02:03
啊,我们可以看到啊,就是在这里面呢,有三个集合。来装的刚好是我们的这个S,然后transform,还有think。然后呢,我们可以看到这里有一个prepare,一个静态方法,然后它调用了什么呢?它创建了环境对象,然后把source transform和S这三个集合也传了进去,啊,我们先来看一眼这个prepare是干什么的?点进去看一下。我们可以看到哈,这里面实际上相当于什么呢。这里面相当于两层的这个循环嵌套啊,一个外面是一个号,一循环后面就直接掉到这个啊集合里面的一个for each啊,那么我们可以看到这个地方呢,是一个可变长的一个参数,那么就plugs plugins,这个plugins呢,就包含了所有的这个source插件,然后转换插件和think插件,它会轮番调用所有的这个插件里面的方法。
03:07
啊,刚才我们说到了这个prepare方法是干什么呢?这个prepare方法主要就是去检查啊,我的这个配置合不合法,完了之后完成一些属性的初始化的操作啊,就是这个prepare的一个准备。呃,那么这个呢,一看就是它现在已经为我们初始化好了所有的插件,但是呢,这里面还没有正式开始啊,拼装我们的工作流,我们继续回去看一眼。那么我们现在看到的呢,还是这个entry point这个方法啊,刚才prepare这个地方我们已经看完了,那么剩下的这个逻辑呢,就很简单了啊,我们又看到一个execution execution这个对象。那么excu这个对象呢,就直接就start完了之后,把这个source啊,Source,呃,Source source,然后完了之后transforms,还有S这些东西传了进来,也说这个三个集合呢,哎,就进了这个大的方法了。那我们看一下干什么了?我们还是点进来。
04:03
点进来之后呢,你会发现它是一个接口,然后下面呢,我们看它有几个实现。啊,我们有Bach的一个实现,然后flink Bach,然后flink STEM的一个实现,那还有一个结构化流和Spark的这个批处理的实现,啊关键的我们这里只是以这个flink STEM为例。我看一下他这个start方法里面怎么写的。啊,那么我们现在呢,实际上可以看到啊,这里面关键最核心的是有三个for循环,一个是对SS的for循环,也就是对所有的SS插件进行一个for循环。然后另外一个呢。就是对我们的transform这个插件的一个for循环,还有一个呢,就是对我们S插件的for循环。那么这三个for循环呢,实际上就是我们整个工作流进行一个串联的逻辑。啊,我们可以看一下这个里面具体是怎样去操作的。呃,首先呢,我们在这个第一个for循环里面呢,对所有的ST插件进行了一个便利。啊,然后每次呢,这个sa插件呢,它会get data,调用它里面的这个,这实际上是它的接口啊,里面要求必须实现的一个获取数据源的啊一个这个方法,然后呢,他拿到这个数据流对象之后呢,把它放到一个集合里面,然后呢,接着会去调用一个就是execution对象提供的方法叫result。
05:21
啊,他就把这个数据流注册到我们的表环境里面,作为一个结果表,也就是像这个表环境里面注册一张表。啊,那么接下来呢,这个for循环执行完了,我们举这个例子,比如说我这里呢,哎,SS插件呢。已经注册了啊,搞了这个实例化了三个这个。然后后面呢,这个有一个这个操作。注意哈,有这个操作。叫data stream,这个input等于一个集合里面的第一个。也就是说这个时候呢,我就让这个input是什么呢?是这个。一盒里面的。
06:00
第一个数据流。于是呢,我后面开始第二个for循环,这第二个for循环呢,处理的就是我的转换插件。转换插件呢,我们可以看到。呃,这里呢,首先。有一步操作是我们的excution去做的,也就是它有一个from soft table,只要这个from table呢,它会从这个啊转换插件呢,根据这里面的配置,也就是你设定的那个name。根据你设定的name去获取,在表环境里面去获取数据流,那么后面呢,关键是这一步它有一个判空操作,如果说发现呢啊,你给的这个表名不对,或者说你这个表名压根就没有设啊,总而言之是拿不到这个表名所相对的数据流啊,那么STEM呢,它就会用什么代替呢?用input代替,也就是说我后面。假如说哈是第一个,第一个我的transform插件。哎,你刚好呢,去没有去指定一个表明,那他去哪里拿数据呢?你说是这这上面三个这个source plug啊三上面三个source这个插件,他去哪个哪数据呢,他就会去这个。
07:10
我们说的第一个。哎,就是这个集合里面的啊,Input data data塔GET0第一个这个source插件的一个输出,哎,作为我这个插件的输入。对吧,那么后面呢,假如说我第二次循环呢,诶又有一个这个source plug的插件,那么我这个地方呢,我指定表明了。那么我去哪里拿数据呢?那我当然是去这个。呃,有这个表明的是不是因为我这个时候就不是空了,我就确实是有数据,那我就去指定了这个table的这个name对应的这个。地方拿数据。那么后面呢,我们继续看啊,我们这个input这个进行一个替换之后呢,啊,接着就调用了transform插件的process stream的方法,然后把这个process stream的这个方法。
08:00
返回的数据流又赋值给了我们的input变量,那么这个时候啊,我们想想哈,我们现在有第三个插件。第三个插件呢,同样没有指定表明,同样没有指定表明,那么。我现在去拿数据的话,是以谁的输出作为输入呢?这谁的输入作为输入呢,实际上呢,哎,是以我。之前上一个。数据的插件的这个输出作为输入,因为input上你会发现,就是我在遍历第二个,遍历到第二个transform插件的时候,实际上呢,我的input已经被这个transform插件那返回来的流所替换掉了啊,所以说实际上是这样一个过程,也就是说啊,我们的每一个插件,每一个插件它都会默认以上一个插件的输出作为自己的输入啊,当然这是在配置文件上的顺序,我们说就是在配置文件上的上一个插件的输出作为自己的输入。
09:04
啊,那么这就是他的一个基本处理逻辑,那么后面呢,我们我们会发现就是啊,Execution对象呢,它其实还做了两个操作啊,一个就是就是把这个transform的这个呃,输出啊,一样是注册到这个。表环境里面去注册一张表,另外一个呢,就是调用它这里面的一个啊,注册函数的方法去注册一个udf。那么到最后呢,我们还有一个thinkin的一个插件啊,那么think插件呢,我们假定呢,这里又搞了两个think插件。啊,这是我们的两个四插件,然后我们现在呢,这个如果说呃,我第一个插件,我们可以看它的处理逻辑一样呢,他先去找这个呃,Name。去这个from table,去这个呃,Table里面拿数据,如果说这个数据没有拿呢,怎么办呢,他一样还是以。
10:00
最近一次的这个,呃,插件的输出作为自己的输入,那么我们想它如果说没有进行一个啊,没有指定这个啊,S的名称我们刚才说了,哎,这个上面是第一个是吧,这是第二个,这第三个我们以谁作为输入呢?啊,其实是以最后一个,你们看,因为这个1INPUT它每次都会被替换掉,那么实际上是以第三个啊。这个插件的输出作为自己的输入,那么我假如说又有一个这个呃,Think的一个插件,那么它这次呢,指定表明了,那么我呢,要指定的表明呢,就是这个第一个啊,插件的输出啊,作为自己的一个啊输入指定的就是比如说它是T方T1吧,然后我刚好这个说T方name也是T1,那么这个时候呢,哎,我就直接是拉过来的。就可以通过这个指令,指令表明的方式,这样达成我们的工作流啊,也就是说实际上在这个工作流的环境过程中呢,组建这个工作流过程中呢,实际上s name和name。呃,是控制整个啊流就谁和谁对接的一个非常重要的参数啊,这个地方一定要注意。
11:03
啊,那么这个就是我们这个一个工作流拼接的过程啊,那么接下来呢,我们可以看到就是。工作流一旦拼接完了啊,这个loger他就会去这个,呃,记录一下这个flink的执行计划完了之后啊,直接就拿到这个flink的这个环境对象,调用这个Q的方法来触发整个任务的执行啊,这样的话呢,就是我们的这个任务就直接提交到这个啊flink集群上了,在整个过程中呢,你会发现啊,我们使用。这个水开发的一个好处就是我们在整个过程中只编辑了配置文件,没有去。编写什么代码,然后去打大包啊,所以说这就属于是低代码开发的一个环境,而且呢,像这样的话是非常好维护的。最后呢,我们有个非常重要的点,就是啊,要对circle这个插件做一个特殊的说明。啊,我们可以看到这个,回去看一眼这个事后插件。
12:00
Circle的插件是在它的这个transform里面。我们可以看一下这个link里面的circle。我们可以看到这个flink的这个SCO插件呢,实际上它是啊,从这个表环境里面。直接去执行的这个思后查询语句。那么我们想到这个SQL查询里句里面是不是可以写from呀?那么如果有from的话,实际上呃,我们依托这个表环境来执行这个数据流的问题,嗯,我虽然说指定了一个,可能是指定了一个STEM,但是呢,我可以在这个SQL语句里面不从这个data stream,就是这个source name对应的那个呃,Data stream里面来来取数据,我可以通过SQL语句里面的from啊,来指定我从哪个表环境里面去取数据。呃,所以说对于SQL插件来说呢,它还有点特殊,就是它的name呢,不是那么重要。
13:01
不是那么重要,因为如果说我们基于circleq的话,就可以突破一些啊,这个就是超越一些这个框架的限制啊,所以说呢啊,实际上我如果我们是circleq插件的话,你说这是一个SQL插件,然后我上面呢,有好几个SS插件。啊,实际上我通过from join这些语句,我可以同时呢去多个表里面,哎,去拿数据啊,不一定是非得是一个s name啊,那么这是S插件它一个特殊的地方,希望大家注意一下。
我来说两句