00:00
刚才我们已经实现了一段简单的代码,看到了这个table API和fli CQ这个程序的结构到底是怎么去写的,那现在呢,我们再给大家来总结一下,刚才那种写法其实不是特别的一般化啊,不是特别的常见,为什么?因为前面我们是先把这个数据读进来,是作为流读进来的,然后读进来之后呢,Map成样粒类,然后又做了一个这个table的一个转换,呃,最后我们输出又转换成流,所以相当于我们这个过程当中是流处理API datat streamam API和table API的一个混搭,呃,当然就是大家看flink就是这么灵活啊,你要想把它这个混搭起来,前前前后后做转换没问题,喜欢这种方式,觉得这种方式最简单的话,其实是可以的啊,呃,这么写也是可以的,那flink里边有没有更一般化的方式,就就是对于我们这个table操作而言,我能不能一开始不依赖流,直接就读进来。
01:00
就是一个table呢。啊是可以的啊,大家看一下一般更加一般化的这个写法程序的代码的结构啊,是前面先创建表的执行环境啊,当然这个创建表的执行环境肯定是得基于那个有处理执行环境的,对吧,这个不能少啊啊因为这里面参数得有那个env嘛,啊这个先创建出来,然后接下来呢,接下来就可以直接创建表了。啊,创建表,而且大家会发现啊,这里边呢,我是可以创建一个用于读取数据的表,这就类似于我们的那个S任务了,对吧?这就相当于S嘛,所以大家看它的写法是table even env,调一个点connect方法,这个connect这是什么呢?连接的意思嘛,所以这里边要写的就是哎,相关的这个,呃,我们那个连接器给我们提供的table连接的方式,对吧?那比方说卡夫卡可能就有一个连接器,我们可以直接这个这么1CONNECTCT连接到卡夫卡,然后怎么样呢?把对应的那个数据可以定义好啊,在这个连接器里边,我们把这个东西都定义好,呃,这个到底那个数据结构STEM长什么样对吧?呃,格式化的那个类型到底是什么样的,格式化工具是什么样的,都定义好之后,诶,读取出来字段都定义好,然后创建一张表。
02:19
啊,所以接下来大家看这个创建表的过程呢,用的是create temporary table这个方法里边传一个字段,这当然就是当前表的名称啊,所以我这里边就直接这个读取外部系统里边的数据,连接外部系统读取数据,创建一张表。这里大家要注意啊,这里边并没有返回值,所以说这里其实并不是构建了一个Java的那个table类,对吧,并没有实现这个table接口实现这个类啊,呃,这里边是在我们的这个执行环境里边注册了一张表。啊,就相当于是我们那个,呃,把那个当时在那个环境里边调用create temporary一样,对吧,这里边是调用了这个create temporary table,然后把这个表注册进来。
03:09
哎,这是已经就相当于这是SS嘛,读了数据,有了这张表,另外我们还可以怎么样呢?还可以直接注册一张输出的表,哎,大家看这个方式跟我们这个,呃,读取数据啊,S这一端,因为大家知道输出相当于是SK嘛,哎这里面其实好像跟那个S是一模一样的定义。大家看这个,因为是定义表嘛,这里面又是一个connect连接外部系统,然后呢,后面create temporary table output table。所以在这个过程大家看到啊,我可以相当于在这里边定义表表的时候啊,不区分输入还是输出,只是相当于连接一个外部系统,然后指定哎,对应的这个数据结构,数据类型scheme对吧,架构是什么样的,字段是什么样的,然后定义表名,在环境里边注册表就可以了。
04:03
那我们具体的这个程序流程跟流处理一样的话,那应该还是S,呃,这个map转换对吧?呃,然后再做这个think输出,呃,就是map转换,一般就是transform嘛,对吧,Source transform think,那这个过程我只是注册了两张表,具体的转换到底是在哪里做的呢?诶大家注意啊,是在这里就是。就是我如果要把一个创建好的这个读取数据的这个输入表里边的数,想要调用这个table API,想要把它这个转换成table类型,怎么转换呢?这边有一个方法叫en.from。啊,所以大家看前面我们那个,呃,用的时候,在这个代码里边给大家写的时候啊。在这个代码里边,我们这种状态是本来已经有了table对吧,因为这里边这个从流直接转换过来,就是一个Java的这个,呃,这个实现table接口的一个一个实例嘛,对吧,这样的一个类类的实例,那么接下来我们在调用的时候就直接可以调这个table API了。
05:09
然后你如果要写CQ的时候呢,是要去注册一张表的,而如果我们这里边用一般化的这个直接读取数据啊,这个是直接在我们环境里边注册了表,所以可以怎么样呢?可以就直接写CQ了,这是这个CQ调用的方式。那如果说你想要去调用table API的话,那得用一个table,因为Env.from用这个方法啊,就是这就相当于是什么把我们环境里边那张表from拿出来,然后得到一个table类型,对吧?啊,这就相当于是这个这这就是那个本来Java里边的这个table类型和这个表环境里边我们注册好的那个表,它们互相之间是有对应关系的,但是你如果要做这个不同的API调用转换的时候呢,你需要多做一步操作对吧?就是你看到底是有哪个,呃,没有的话,你得做一步转换,这里边我们是用了一个from,把它这个数数据拿出来,然后本来这个不就是我们的那个输入表吗?当前的这个connect连接到的那个外部系统应该是有数据的嘛,所以这里边直接from之后得到的这个表里边,也就相当于我可以直接获取到它的这个数据的输入。
06:22
那后边slept对吧,Filter,或者说你做各种各样的转换操作,就可以得到一个结果表。诶,那这是我们相当于这个SS任务啊,读数去,然后去这个做transfer,那最后写入怎么办呢。哎,大家注意啊,最后的写入操作是一个result,得到这张表,调用这个table API里边有一个方法叫做insert into,对吧?啊,有这样的一个方法,这个方法就相当于可以把我们这个表的内容直接写入到当前注册好的这个output table里边。
07:00
啊,这就是整个这个相当于s transform SK的一个完整的流程。啊,这是大概的一个程序的基本结构啊,大家先看一看这个,呃,整体的过程是什么样的啊,然后我们看一下这个。就是具体的过程,具体过程呢,在一开始的时候,当然还是需要先去创建这个表环境了啊,就我们说的啊,除了这个source transform thinkk之外,第一步或者说第零步,你前提是先得把环境创建出来,而这里边我们这个创建表执行环境的时候,是需要基于flink的流处理执行环境的啊,就是呃,就是对于之前那个老版本的话,这就涉及到这个老版本还是新版本对吧,就是后面我们给大家可以分开来讲啊,到底怎么样去创建它啊,那这里边就是对于这个新版本来讲,大家知道这个特别是blink版本里边是批流统一嘛,所有的这个table API或者flink c创建的这个程序,最后都是要转换成为data stream。
08:05
的程序对吧,所以我们这里边都是基于这个流处理执行环境去创建的。那老版本呢,有些时候就还是基于那个批处理执行环境,对吧,看你要创建这个这个table,这操作到底是流处理还是批处理了,后面我们具体问题具体分析,这里边的这个这个table environment啊,我们这里边创建的这个叫stream table environment对吧?它的底层呢,就是table environment,那么table environment是flink里边它要去集成table API和fli CQ的一个核心概念啊,所以后边我们定义的所有对于表的操作啊,啊就是这些大家看不不管是呃你基于那个table API要对这个table去做一些操作,还是说我是呃不去转换成table类啊,呃,我直接要写CQ,直接注册表去select from这个表对吧,做各种各样的转换操作,这个所有的这些操作都基于的是当前表的环境,表的执行环境就是这个table environment。
09:08
那他主要可以干什么事儿呢?这里边我们说一下他负责的这个事情啊,有这个注册catalog对吧?然后呢,在catalog里边注册一张表,这些都可以去做,然后执行CQL查询对吧?我们写的那个内容直接可以在这里边去查询,另外还可以注册udf对吧?后面我们会讲到就是你可以自己去定义各种各样的udf,去做功能的扩展啊,那另外还可以做就是像我们前面那个data stream,如果要是转换成表的话啊,那也是他们之间的互相转换啊,也是由这个table environment来负责的啊,这就是这个table environment的一个功能,那我们接下来就看看在代码里边到底怎么样来实现这样的一个环境的一个创建。啊,那我们另外创建一个。Object。
10:01
这个我们就类似于之前的那个data stream API的测试啊,我们这个就相当于是table API的一个。Table API,呃。Test。好,然后接下来我们在这个里边本身,呃,默认的这个前面的这个就是流处理的执行环境,当然是应该先要创建出来的,对吧?啊,所以这里边这个还是把这个前面两句先抄进来啊,我们把这个抄进来。然后同样前面这个流逝的执行环境下划线引入对吧,方便后面做这个影视影视处理,影视转换的这个调用啊处理,然后接下来大家注意啊,正常情况下啊,呃,我们首先这是第零步啊,创建执行环第一步吧。创建创建表、处理环表、执行环境。然后诶,这里边大家知道最简单的方式,前面我们也已经知道了啊,这不就是stream execution environment。呃,不是啊。
11:05
这个我们是基于stream table environment对吧?然后直接调它的create方法,然后把前面的env传传进来,这不就完了吗?呃,这里面大家注意一下,这个就不要加这个了啊呃,这个这个我们这里定义的是table environment啊。Table environment,那前面这个并行度你想给也可以,不给也行,对吧,我们这里边给这个并行度的话,不影响这个最后结果正确性,保持顺序顺序这个输出。呃,然后这里边我们要想给大家分析的是,现在不是说明有这个啊,不同的这个版本嘛,对吧?啊就是你有这个老版本,还有这个blink版本,另外呢,现在号称是批流统一,那就你这里边既然是表执行环境,那应该还有这个批处理的环境和流处理的环境,对吧?那他们这个到底做这个不同代码应用的时候,怎么样去配置这个环境呢?
12:01
啊,这里边就有不同的讲究了,那首先这里边给大家配置一个。在下边来写啊。呃。这个1.1吧,呃,我们这里边首先给大家写一个这个创建,呃,我们这个就直接写吧,这个是老版本的。老版本planner的流逝。处理环境。啊,或者说我们就是要做的是这个流式查询对吧,我们就直接写流式查询好了啊,那其实这里边我们默认创建的这种方式,当前用的就是老版本的流失查询对吧?啊,就是流失的这个处理方式,那如果具体展开这个定义的话,应该怎么定义呢?啊,大家其实前面看到create这个方法,我可以传多个参数对吧,后面还可以传一个settings。所以如果要是有一个settings的话,这里边就可以指定到底用什么样的版本了,比方说我先把这个定义出来settings啊,它本身就应该是一个,呃,大家看这个里边要传的这个东西是一个叫做这个environment settings这样一个东西,对吧?Flink API,呃,Flink table API下边的这个environment settings,所以可以把这个东西先引入。
13:19
好,我们定义一下当前environment settings怎么样创建呢?哎,注意这里边本身这个settings啊,它的这个构造方法我们看一眼。Environment setting settings是一个private方法,哎,这这种调用方式就又来了,对吧,所以你不能直接啊,你有一个这个啊,这个environment settings,然后然后那个直接传参就完事了,那这里边得怎么样呢?它的调用方式是用这里的这个。啊,大家看这里边也有builder对吧?呃,这里边儿有builder,但是呢,这里边builder的这个,呃,就是方式,它应该用什么方式去获取当前的builder呢?你你不能直接去调这个builder,而是要用一个叫做。
14:05
New instance new instance这样去返回一个当前的builder,对吧,去创建这样的一个builder啊,那所以这里边我们的这个常见的调用形式啊,是new instance,然后就有了一个builder,大家知道肯定最后就是这个builder.build嘛,把它创建出来,那在在这中间对于这个builder就可以配置各种各样的东西了。哎,可以配置什么呢?可以大家看我可以用老版本的planner,另外还可以定义是批处理模式还是流处理模式,哎,那当前就是流处理模式对吧?啊,那当前大家看这个用老版本。Planner用老版本,然后现在是流处理模式,诶这就是我们整个这个过程啊,Build build。对齐。然后那自然有同学就想到这个,那对于这个呃,老版本而言啊,如果要做这个批处理这个定义的话,那是不是就直接我我把这儿改成个批处理模式就行了呢?哎,这里大家注意一下,不是因为老版本并不是底层批流统一对吧,它不能基于这个就是我们的这个呃流处理执行环境来来做定义啊,所以说这里边啊,老版本的批处理给大家写写出来啊,老版本。
15:29
呃,这个批处理环境那得怎么定义呢?我得先获取一个之前的那个batch env,因为我定义的这个env本身是一个流处理的env,对吧?哎,那这个我得基于这个Bach env来做,它本身是什么?大家还记得那个execution environment吗?就是不叫不加。这个stream的这个这个类型,大家还记得这个对吧?那所以我们把这个引入啊,Scale execution environment,那我们调用的时候execution environment.get这个环境,然后接下来我们定义的这个batch table environment。
16:13
那同样它应该是什么呢?它的类型就叫做batch table,下面这个啊,Environment。那我们定义的时候,同样跟前面那个流逝的差不多,也是用batch table environment调它的这个create方法传进来的就是一个batch env,用这种方式来做这个老版本的批处理。好,那我们继续定义啊,既然有老版本就应该有新版本对吧,那我们这里边blink版本。Blink版本的。流式查询。我们来看一看blink版本怎么定义呢?那同样这里边首先那就同样批流统一嘛,都用当前这个流式处理,基于流式处理的这个环境创建出来的这个table v了,对吧?啊,我们都用这个了,那接下来呃,我们这里边用的这个,诶大家注意一下啊,刚才我们只是定义了一个settings,还没创建那个,呃,当时的那个老版本的那个查询环境呢,对吧,这个少了一步啊,所以这里边我们如果要去定义这个呃,Table env的话,或者说我们这里边加一个这个前缀啊,因为要跟前面这个table env区别啊,我们这边可以叫这个老版本的话,我们就叫O的。
17:30
呃,流处理对吧,Stream。Table env。啊,那那这个大家知道肯定是就是得到的是一个这个stream table environment了啊,所以这里边我们的调用方式是什么呢?Stream table environment.create传的是env,以及这里的settings对吧,直接这么定义就可以。然后后边如果说我们又要定义这个blink版本的这个定义的话,那我们再定义一个settings对吧?啊,比方说我们这里叫blink的streaming流式处理就叫BS吧,Bs settings。
18:04
那这个我就不写类型了啊,它同样是一个environment settings对吧?呃,那new instance后边我们要去定义的,那就是。现在是use blink planner,那后边是in streaming mode,对吧,然后接下来点build。那后边我们定义在这个环境的时候,Bs table environment。它就应该怎么定义呢?同样的方法,Stream table environment create,把这个env和bs settings传进来就完事了。啊,那与之对应还有一个啊1.4啊这个blink。版本。版本的这个批示查询。P是。查询批示处批处理对吧,同样我们可以定一个批处理啊,Blink batch BB吧,BB settings来这里边我们定义方式大家看到就是完全一样啊。
19:05
Environment setting new,一个instance,然后后边是这个use blink planner,然后后边就会in batch mode,对吧,最后再做一个这个build,这里大家需要注意的一下是,就是最后我们定义的这个BB table environment啊,不能用之前那个stream table environment去创建啊,因为现在你是一个批处理的一个表环境嘛,对吧,所以这里边是直接调用底层的这个table environment,前面我们说过in y。对吧,呃,之前我们说过,就是对于这个呃流式的表环境而言,它的底层啊,大家看这个create的时候啊,就是这个stream table environment,它这个treat啊,其实是实现了一个这个table environment的,对吧,它的底层其实就是这个Java接口啊,这只是我们上层又包了一层流处理而已。
20:03
啊,那所以这里边我们批处理的话,直接用这个底层的table environment,然后点create把这个写进去啊,那这里边同样写的方式也是env,还有当前的这个BB settings对吧,把这个写进去就完事了啊那这里边大家看到这这里边就是还有一个报错对吧?呃,这里边我们看诶我们看这个。哦,这里边我们传的注意啊,这个cable environment的这个环境就不能传我们那个流式处理的,呃,那个那个执行环境了,对吧,这里边就只传一个settings就可以了,所以要把前面这个参数删掉,这样的话就不报不报错了。啊,这就是我们这个对于不同处理环境的一个定义啊,啊,那我们后边给大家做调用的话,那这一部分其实就呃就就不用这么展开去说了,对吧,大家知道怎么去执行,怎么样去做就可以了,那这里边需要给大家说的是,如果我要用到这个blink的这个版本的话,啊,那你必须得在po文件里边得做一个添加,对吧,之前我们已经有了一个这个。
21:10
A flink table。Planner这个版本,那如果我需要有对应的这个blink版本的话,大家知道就在后边planner后边加一个杠blink就可以了,把这个包也引入后边执行就不会报错了啊,因为这里边你就是本身我们这个API都是都是支持的嘛,你不引包的话,本身这里边编译不报错,但如果执行的话,那肯定就有问题了。这是给大家把这个环境先说一下。
我来说两句