00:00
好,那接下来我们进入第二章内容的学习,也就是说关于弗link CDC它的编程内容,呃,刚才也说到了,在这个当中呢,我们通过data STEM以及link s这两种方式给大家去做介绍,呃,那么第一块呢,是我们的data stream的方式,那首先呢,我们导入依赖,那我们把依赖拿过来之后呢,给大家去看一下这个当中我们所做的一些事情。我们主要引入了哪些依赖,那这边呢,我已经创建好了一个项目啊,那我就直接把我们的依赖拿进来。好的,让他自己导一下我们的依赖,这个呢需要去下载一下,因为这是最新版本的2.0。呃,那接下来呢,我们要去编写代码,那接下来我们就不不用看这个内容了,我们可以看一下这块,其实在使用弗Li CDC的时候是很简单的,那他提供了一个叫MYS。直接调用build的方法,最后调用build,在中间添加上我们一系列的参数就好了啊,比方说这个参数有连的。
01:09
我们的主机my circle端口号,以及我们要访问哪个库啊?如果说只写了库名,那么这个表示监控当前这个库底下所有的表。当然在这个当中还有一个属性叫table list,可以监控单表或者多多张表,自己有选择的,那么同时提供username password以及提供一个fine序列化器,但这个呢,是官方所提供的,呃,那么它不利于我们后续的。加工啊,所以呢,最后呢,我们需要将这个数据呢。自己进行反学的话啊。好,那之后呢,就应用到我们的环境,这些环境当中做一个打印就好了。啊,那其实比较简单,我们可以来看一下这个内容。
02:00
行,我们检查一下我们的依赖,我们的依赖呢,已经导入完了,对吧?呃,那接下来呢,我们去创建一个类,那我们从文档当中,我把这个类名拿一下,就叫Li CDC。好,那我们创建一个类,按照他刚才的要求,我们来写一下,首先呢一个慢方法。好,第一步我们应该是获取flink的。执行环境是不是,哎这个呢,我们把注释加一下。呃,那这边呢,我们就直接stream execution environment get。我们的环境R到位,得到一个烟位,那这此时呢,我们做一个测试,所以呢,我们直接设置变异度为一啊,不需要那么高的一个变异度。第二步呢,就是通过。
03:00
Flink CDC构建source。构建我们的source方式。啊好,刚才在文档当中我们也看到了这个地方呢,我们就直接调用叫my circle。S。啊叫my source啊来看它是属于CDC包下的一个连接器my source.build啊,但是这个build呢,它是一个泛行方法,我们需要给一下它最终返回值类型,那官方提供的string类型的反循发器,所以我们写一个string好了,那接下来呢,最后调用build,那这样其实可以帮助我们构建一个。读取mylo的一个S方式,但是这个中间呢,明显缺很多的参数,那我们逐个添加一下,首先我们要有主机名啊,我的还我的买S呢是装在哈度102这台虚拟机上的端口号,因为我们用的是。
04:00
MY3306,然后接下来呢,我们应该加上username,我们的用户名以及密码啊好,那在我们当中看到这个地,这个地方可以实现它的一个。Data,也就是说我们要监听的数据库。比方说这次呢,我们等会创建一个库,叫CDC test。啊,等会呢,我们再去创建一个库叫CC test,好,那接下来还有一个叫table list,在这个当中,如果这个地方我们也可以写来看一下,首先第一个位置这两个属性呢,明显的是监控哪个库哪个表的,呃,那在这个当中,CTRLB我们可以看到它是一个可平行参,也就是说我们可以同时监控多个数据库。啊,我们可以同时监控多个数据库,是这个意思,好,那接下来呢,在这个当中一样的。
05:03
它呢也是一个可变形参,也是一个可变形参,所以呢,也就是说在一个库里啊,如果说我们不添加该参数,那么跟官方档当中提供的一样,说的是监控当前这个库底下所有的表,假如说这些表当中我们自己有所选择,我们也可以在table list当中给它添加上啊呃,但是添加的时候注意了,一定要加上,比方说CDC这个库体下test。有一个表叫A表,Table。A啊,Table a这张表,那么一定要注意的是什么呢?在写法上一定要像我这样去写。就是一定要带上库名,这个也比较好理解,因为刚才我们看到像database当中可以填写多个库,比方说我有A和B2个库。那么这两个库底下呢,同时都有一个表叫test表,那这个是完全允许的,假如说我们上面监控了AB库,下面直接写test,那么他就不知道你到底要监控哪个表了,所以在写表的时候一定要加上。
06:16
我们的库名一定要加上库名,OK,这个一定要注意一下,当然如果说不加这个内容,那么将是读取当前库当中所有的数据啊。呃,那接下来呢。还有我们刚才看到那是一个反序列化器,官方提供了一个我们可以用这个string。第一。一个disorization的一个STEM。但是我们等会会看到它里边数据呢,很不友好啊,封装的,呃,那这当中呢,我们还有一个参数需要给大家去介绍什么呢?叫start up options。Setup options,那这个当中呢,它要我们去传输一个setup options,那我们来看一下,其实这个类啊,它是一个正常的一个类型,我们在这个当中呢,主要的内容呢。
07:09
我们可以看到它有这么四个,我们下载一下源码给大家看一下,呃,当中呢有初始化。有earliest latest。Specific ofset还有。啊,这么五种。呃,那我们可以简单给首先给大家去介绍一下啊,等待源码的一个下载,第一在我们初始化当中,初始化所代表的意思啊,表示它会先读取我们。MY搜当中这张表已经存在的这些数据,利用查询的方式把我们现有的数据全部读过来,接下来切到blog最新的位置,开始读取我们后来新增及变化的数据这样的一个方式。
08:01
那如果说是。那也就表示它将会从lo最开始的位置从这个表开始创建。读取。读取这张表的一个数据啊,也就是从blo最最早位置,但是这种方式要注意一下,大家在做测试的时候,需要我们在。我们在。往某一个库、某一张表当中插入数据。之前。也就是说这个库在被创建之前就先开启该库的log,也就是说比方说我们要对A库要做early list的一个方式的一个监控。那它有前提条件,必须我们A这个库在创建之前,我们就对A库已经开启了lo。啊,只有这种方式才可以,要不然它会失败,好,那下面一个呢,叫latest,这个是比较好理解的,它会直接切到我们lo最新的位置开始读取出去。
09:07
呃,那之后呢,下面两个是指定offet或者指定时间戳来消费我们的帽啊。好,这边呢,已经开启了,我们呃,就下载这个注释已经下载完了,我们可以看一下第一个。他说的是做一个初始化的快照,对于我们所监控的这个数据库表,在第一次启动的时候,哎,做一次快照,也就是我刚才所聊的,先将我们现有的数据全部查询出来啊,那之后呢,And continue,继续读取最新的lo。啊,读取我们现在新加的一些数据啊,或者修改的数据,好,那看。Ear ear list呢?他说不会去做,刚才在第一次启动的时候,并不会去对我们监控的库表做一个快照。
10:00
啊,而是直接从我们的log开始的位置开始读取。啊,但是后面有个注意点什么呢,在我们。有个注意点对吧,在我们使用这种方式的时候,那必须要求我们数据库。他这个建库建表操作,诶,必须所有的内容都在我们的。频道当中保存才可以啊,呃,那么第三个是latest latest这种方式呢,大家好理解一点,那我们看一下啊,首先它也不会去做快照,那仅仅是从我们。End of the blog,也就是说从blog最新的位置开始读取,叫latest,也就是说我只读取开启这个程序以后所发生变化或者新增的数据。呃,那下面两个比较好理解,一个是通过指定offset,指定位置指定。Blo的位置开始读取,那这个是指定时间戳,因为在我们的blog当中,它会有自己的位置信息以及时间戳的一个信息,是这样的方式,OK,呃,那也就是说这个内容我们做的一个初始化,它先会读取一次全表,好,这样的方式我们就得到了我们的source风式。
11:22
呃,OK,那这个呢,我们取名叫so方式。这个我们就不需要了,对吧。啊,那接下来呢,我们就直接将这个s function做一个应用啊,因为点ADDS啊s function呢传入,那就得到了我们一个流。Data STEM source啊,那之后呢,我们对这个流呢,做一个简单的打印就好了,对吧?当然在生产软件当中,此时可以对数据做任何的加工处理,也可以写到其他的数据库啊,或者说消息中间件当中,好,那这块呢,我们直接做一个数据打印。
12:00
因为如果我们能把数据打印出来,那么做加工肯定就可以了啊,最后一定不要忘了启动我们的程序。那我们就直接叫启动任务,因为点。呃,那一般来说呢,我们可以在这个当中加一个我们的。名字叫这个CCQ这个异常呢,我们可以暂且抛一下,因为我们是做一个简单的测试,对吧?好,那我们到这块呢,Data stream方式,它这个编码就完成了。
我来说两句