00:00
呃,那么接下来呢,我们就应该去写这个弗Li格CDC的一个代码啊,弗LIC的代码,那其实在这个当中啊,它提供了两种吧,一种是这个S的方式来见表,对吧,类似这样子的,那另外一个呢,是我们的。这个啊,API的方式,API的方式直接拿买点。啊,直接这样调用就好了,然后呢,传主机端口号,哎,你要监控的库用户名密码对吧,那肯定是这些东西啊,反虚的化的一个类,注意这是官方提供的。啊,这是官方提供的,然后呢,调用build,它就得到了一个s function,可以用inv去添加一个s function,可以做一个打印,对吧,我们做这个事情好,那接下来呢,我们来写一下这个代码啊,我把这些东西呢就可以关掉,没有用了,对吧?邮件啊之然后呢,这个收起来,然后我们学的是CDC,所以呢,我们再新建一个模块啊,那我点击新建一个模块。
01:09
那么这个时候呢,注意选main就好了,不要选那个spring对吧,我们用ma,下一步好,然后这个地方呢,是JA末啊,Li啊,CDC啊,是他的一个代码保存过来,把这个呢项目名称改一下finish,接下来呢,我把依赖拿过来啊,这个依赖呢,刚才都已经做过介绍了,对吧?做过介绍了,现在让整个拿过来好,最后这个呢,是党依赖的一个。打依赖的一个插件,对吧?把依赖打进包里的一个插件,主要是在于咱们的弗Li CDC,它并不是官方提供的,包括这个什么阶层这些东西,那其实在我们fli你安装的那个程序当中,它是没有的,包括这个对吧?那两种方式可以解决这个问题,第一种就是说等会如果说我们要去启动的时候,假如说OBD Mo到这个flink想完了,对吧,那我们可以做什么事呢?在立底下放这些东西,放我们刚才那几个夹包,然后去集,还有另外一种方式就是我们在打包的时候啊。
02:19
我们在打包的时候,将我们自己所用的第三方依赖打到包里边,是不是也可以啊,两种方式,第一是你机器上环境内有,第二是你可以把这个依赖打到你包里边去正常的运行,这个没问题吧,所以呢,我们选用这种方式啊,我就不一个一个去下载这个加包了,另然你还要下载加包扔到这个底下比较麻烦,对吧,我们就打一个全量包啊,这样做测试比较简单一点啊,行,那我把这个依赖呢拿过来啊,放到我们CDC的这个po文件里边啊,Countr v粘过来。那我们选的是自动导入对吧?啊,那你如果要不放心的话,其实我们可以在这边做一个什么刷新啊,做一个刷新,让他呢去导一下这个依赖。
03:18
好,这边已经导完了,导完之后呢,我们可以看一下依赖,诶我们要的依赖呢,都已经在里边了,那这个就没有问题,我们可以接下来写代码了,这个代码呢也比较简单啊,弗CC来啊,在里边我们写一个叫com点的硅谷点,我就直接叫弗林CDC了啊,就叫这个名字,首先我们想一下要做的事情,对吧,好慢方法写好。那我们第一步应该是获取执行环境对吧?这是第一步,第二步通过记好link CDC构建source方式啊构建source方式好,然后呢,呃,我们直接这样吧,并读取数据啊,并读取数据好,那第三个打印,把数据做一个打印,第四个就是启动任务吧,这是我们正常的一些步骤啊好,那接下来第一个获取这行环境,那就是stemcution environment.get excution environment,得到一个inv对象对吧?然后呢,我们为了测试方便,我们直接把它设置并行度为一啊,设置并的为一就好了,接下来我们通过CDC构建S方程读取出去,那这边。
04:47
告诉我们了,我们应该写my sourceauce去调用build的方法,对吧,那在这个当中呢,我们就直接叫my so sourcece.build的啊,但是这个build呢,它其实是一个泛型方法,看见没。
05:02
对吧?所以我们在前面要加上这个泛型,也就是我们读取数据,你想把它封装成什么类型,我们用思辨类型就好了,接下来最后呢,会调用这个build来构建这个so function,但是在这个中间我们要添加参数。添加参数,首先我们看到是主机名,我们读取的是哈杜102,好,那接下来呢,是主机名端口号吧,PORT3306啊其3306默认端口号赔不起对吧?然后接下来呢,是user name,我是root啊,你写你自己的买soq的用户名,然后呢,他的密码我是六个零啊,你就写你自己的就好了,那这个是连接我们买搜的参数,呃,那么接下来我们要导的database意思我们要导哪个库啊,大家看一下啊,这个当中呢,它是一个可变形参,也就是说我们。
06:01
分一个CC可以同时读多个库啊,因为它是three点点点对不对,可不应该可以同时读多个库,那我们就只开了一个库的B帽,所以我们就读这一个啊,我们叫J末杠。210325个link,我们读的是这个库对吧?好,那其实这个当中呢,我们看文档当中啊,它没有提供对吧?看这个官方文档没有提供,还有什么呢?还有一个table list,也就是说这个东西注意啊,如果说我不加,如果说不指定,那大家应该能想明白,其实我们监控的是当前这个库底下所有的表的数据。啊,那然我们有时候这个库里边我不想所有的表,我只想某一张表,其他的表变不变化跟我没什么关系,那这个时候呢,你要加表明,注意这个地方表明啊,如果不指定,就是不添加该参数,我写一下不添加该参数,则消费库中指定数据库中所有表的数据,好,那这第一种,第二个还有如果指令,那当然是消费指定的表,指定方式为DB.table也就是说什么呢?必须要带上库明,比方说我们在这个当中要监控一张表,那就是点,假如说我要监控的这个就这张表吧,叫被。
07:44
Mark,这张表是因为这个字段相对说比较少一点,好做,叫base trademark对吧?那此时呢,我们要写叫base trademark,好,大家想一下,能不能想明白这个问题,为什么这样写不行对吧?我把它干掉就不行,来,有没有同学说一说。
08:05
为什么这种方式不行,他不允许。包括这个地方啊。对吧,库表啊,必须要酷点DB.table name,对啊,比方说这个地方我监控了A库和B库可以吧,那假如说A库和B库底下有一个什么呢?相同的表明都叫T,那你上面写的AB,下面写T,那下来不完了吗?我怎么知道要什么东西啊?啊,现在都可以同上了是吧?啊,很骚啊,同上这个回答就挺骚,俺也一样啊,俺也一样,这个更骚啊,张飞是吧,我那没文没文化啊,只能俺也一样,对啊,大家说的没错啊,大家说的都没错,就主要在于。
09:08
同理,加一啊好,主要是在于如果说当我们监控多个库,多个库底下有这个同一个表啊,不是同一个表啊,是表明相同的时候,就有问题了,对吧?所以在这个注释上面我们也能看到啊table is注释上面呢,也告诉我们要写table加table name这种方式,对吧?好,那这个呢,我们就改回来,哎,要加上我们的库鼻对吧,这个要加上啊好,那接下来继续。还有什么呢?我们来看一下这个地方呢,官方文档当中告诉我们应该要填一个反序列化的方式,对吧?啊,序列化的方,反序列化方式,那我们就用一个three de busy d civilization的一个scanal,从这个类其实我们也能看出来它里面是不是跟busy相关呀。对吧,哎,它名字就叫它啊,包括这个地方来看啊,它不是把所有的参数信息全部列在这了,而有的像DB的特有的信息呢,放在这儿要配置一个busy pro。
10:11
对吧,假如说我们有B100个配置信息,我也没必要把100个配置信息全写在这吧,对吧,那其他都默认的就行了,那你要想改B的配置信息,通过这个参数传进来就好了,KV形式policy,对吧?所以从这一种种的当中呢,我们都能看出来,它跟这个B有关系啊,有关系,好,那其实在官方文档当中给我们提供的就这么几个参数,对吧?好,这里边我们还有个参数要给大家去介绍一下,什么呢?这个叫start options start option,那这个里面呢,我们是一个枚枚举类型啊,它有这几种啊,那我们点进去看一下。有哪些呢?第一个叫initial初始化,那什么意思呢?来看啊,他说的是来看我们注释。
11:06
从后往前读对吧,他说在第一次启动的时候,First startup,第一次启动的时候,对于我们监控的库啊,表啊这些东西,对吧,对于我们monitor monitor对吧,监控的库和表干什么事呢?做一个初始化的快照。啊,做一个初始化的快照,能不能理解这个意思是什么?这个指的是什么意思?我们要给这个数据做一个全量的快照监控的一个库。对吧,实际上是这样啊,就是比方说我们要现在开启这个flink CDC监控任务,监控的或是base trademark这张表吗。
12:05
当前数据作为监测基准啊,是这样子的啊,就是说现在的我表里边是不是有12条数据啊,那如果我们用的是initial,用的是initial,那么他将会把这12条数据先查询出来,然后接下来从blog里边获取最新的数据,也就是说他要把历史数据读过来啊,他要把历史数据读过来啊,那我们看啊,是这个意思,那第一个做一个快照,初始化的快照,And continue to read the latest big。啊,也就是说现在呢,我们要读取全量数据,接下来切到B2的最新的位置,开始加载我们后续更新的这个数据,啊,后续更新的这个数据是这样的一种方式,对吧?但是注意啊,生长环境当中用初始化的时候一定要注意一下,现在我们用的不是2.0 2.0之前都没办法做这个事儿,什么意思呢?那你想啊,我们在做初始化,也就是说你要查这个时候,那有没有可能我在查这个数据,我把这张数据改了呢,中间发生了变化呢?
13:22
有可能吗?对吧,那就没办法做到数据的一致性了,对不对,就没有办法做到这个数据一致性了,所以在2.0之前它都是怎么做的呢?在我全量,也就是说刚才我们所看到初始化第一部分这个地方,它是不是属于全量阶段,先把表全量加载过来,然后切到冰量的最新位置,这个我们可以说它是什么增量阶段啊,也就是说初始化它分两个阶段,一个是全量,一个是增量,那这两个阶段呢,对于全量的时候,保证数据一致性在2.0之前是如何保证的呢?来。
14:05
非常简单粗暴,那你想啊,如果说让你去思考这个问题,你会用什么方式,大家自己想一想,我先不说源码当中是怎么做的。如果让你来做这个事情设计,哎,第一个是全量阶段,接下来是增量阶段,我要保证全量阶段数据一致性。对了,你看班上的大佬还是多啊,我都说了咱们班学的好啊,我就说就听说这个是对的。是不是你看大家都不仅会答问题,都会设计代码了啊,都有自己的想法就设计这个代码了啊,让我们做,我们很很显然也能想到这怎么做。加个锁吧,刚才我们不是说了,你读的过程当中可能有改,那有新增有变化,有删除对吧?啊对这个数呢,对这个数据做了修改了啊,我统一叫修改增删改我统一叫修改啊,做了写操作对吧?那很简单的方式,我读读完之前我不允许你改不就完了吗。
15:19
我能保证数据一定对的,等我读完之后你再改对吧?啊,读完之后再改啊,所以要注意一下,其实在2.0之前他都是这样玩的,但是2.0呢,它又用了另外的方式,可以不加锁来实现数据一致性啊,来实现数据一致性啊,但是这个感兴趣的话,大家可以等着下周的视频啊,那里边我又讲到,呃,还看了部分源码,通过讲解原理,然后呢,去看了部分源码去聊的这个事儿啊好,那这个你要知道一下,它分两个阶段,第一个呢先。做全量加锁啊,加锁当然可以选择不加锁,可以不指定对吧?B就是通过这配置信息,你可以不加锁,不加锁的话它会牺牲一定的什么。
16:07
准确性啊,牺牲一定的人性,但是效率会提高,因为你直接加个锁,假如说你这个数据量特别大,你可能这个锁得加几个小时,那加到后台,特别是DBA啊,你要对它不加锁,他是肯定不同意的,对吧,肯定不同意的,所以我们可以牺牲一点点准确性,哎,不加锁同时也提高效率的,对吧?啊,那DBA那边呢,也不会,也不会有什么意见啊好,这是我们第一个叫初始化,第二个叫ear list earliest是什么意思呢?就是说我们看一下啊来。首先叫never,哎,就刚才后面的这个,我们翻译过了,第一次启动的时候做初始化的,就不做初始化,它呢是不做初始化。对吧,好只仅仅什么从log开始的位置开始读。
17:00
啊,它呢是从头开始读不错初始化对吧,是这个意思,但是它呢有一个点啊,有一个点要注意一下,呃,我现在就用不了它,我现在用它呢,直接会报错。我现在用它直接会报错,那是因为什么呢?大家还记得我刚才在添加这个我们自己这个数据库b up的时候,我是怎么做的,我是不是先去建库,我先建库再建表吧,导入了建表语句,然后再开启的什么冰帽啊,然后我才开启的冰帽是这个意思。对吧,然后我才开启的这个log,那这个不行,它如果说你要想用N的话,你得先干什么呢?先修改这个地方。先修改咱们的这个地方,外木叫AC卖点康复,必须先修改,你先把库名写好,先修改,然后再去创建这个库。
18:08
然后再去商店库创建表才可以用初手用al这种方式,最早的它必须要读到你的建面语句啊,它必须要包含完整的log才可以啊,这是这个好,那接下来latest latest比较简单了啊,这个是无所谓的,那看一下第一句话也一样,跟上面一样,它不会做这个快道,不做快到直接的什么呢?读取blo end of the blo最后的位置,也就是说我只获取连接被启动之后,从这这一刻开始,你有新的数据我读过来啊,只读最新的嘛,跟我们在卡卡当中消费者一样,有earliest latest,对不对?没问题吧,那它有early,有latest,好,那就是这个点,那还可以指定offet,也就是说blo呢,它也自己的offet啊,我可以指定,假如说b block现在有啊零一直到到到到1万嘛,对吧?啊到1万条数据了,那这样来,我从我指定从9000开始读可以啊,指定奥赛就好了,可以指定一个位置读啊那然还有可以指定时间戳啊指定这个time STEM。
19:20
啊,指定时间多,那指定时间拖的话也比较简单,因为我们所有的并log,除了ET,它每一条数据有自己的时间,我们可以指定一下,我只要读取这个时间。啊,那这个时候呢,他读的从这个时间他将会注意,那有同学可能在想,我给的时间是1234,那我能保证b block当中一定有这个时间吗。不一定可能,我上一条是1230,下一条呢是1240,对吧,十年多连续的两条是这个样子,这是很有可能的呀,对吧,我中间间隔了有十毫秒产生一条数据,那没有1234我就不开始了吗?不是的,它指的是什么意思呢?叫忽略。
20:07
这个变化数据什么变化数据呢?Whose'time STEM is small that,就是说比我们指定的时间中小的这个干什么,我就不读了,就是说其实就大于等于我们这个时间拖了,从这个位置开始读吧,不是说必须等于,是大于等于啊,这个要知道一下,也就是说其实它读取数据的方式呢,还挺丰富的,但但是我们用的最多的,其实initial和这个latest用的这两个比较多一些啊好,那我们这个地方呢,有initial,我们等会看一下是不是会把我们这12条数据加载过来,对吧,那这个就搞定了啊,搞定好以后呢,CRL加V得到一个S方。呃,我们已经得到了嘛,啊,已经有了对吧?啊,这个source方式我们已经指定了啊,那下来呢,我们添加进来,就是因为点at source,把这个source方式扔进来CT,我们就得到了一个流,那家打印数据很简单了,对吧,复制它,接下来呢,点print啊,启动inv点啊一般来说呢,我们给一个这个叫名称,我就用它啊叫CDC,好,这个异常呢,我们做测试,抛一下就好了,对吧,抛给卖方法啊,那这个呢就搞定,行,这是我们做的一个编码啊,做的一个编码,那这个呢,我们直接做个测试这样。
我来说两句