00:02
那么我们使用flink来跟护理集成的话,也可以借助咱们flink的CDC。来完成对户底的实时变更写入啊,那么一般方案就两种,这是数据库对吧,比如说是my circle吧,好理解啊,那我们可以直接用flink CDC。之后呢,读取进来啊,就创建一张CDC的表啊,然后呢,Insert到蝴底表这种呢,就链路特别短,特别简单,而且我们知道flink CC啊呃,它也支持读取同步历史数据对吧?呃,历史数据然后呢在实时的变更也能后续的捕捕捉到,那他呢,如果是二系列的CDC啊,它也是实现了一个无锁算法啊,即使你同步的是历史数据,它也不会去锁我们的业务表业务库了啊啊所以不用担心啊,它也是无所算法,并且呢,建议用2.2.1现在比较新的版本啊,修复了一些bug啊好,那这种方案好吗?啊,看情况了啊,如果说你们企业当中啊,对这个库比较敏感啊,或者说不想对业务库造成压力。
01:17
啊,那么建议呢,你还是可以过一遍卡夫卡啊。因为你这样,比如说你这张表有假设你有1000张吧,啊,你有1000张表,那你用CDC去同步的话,呃,看你用什么方案吧,你可以写APIAPI可以整库同步,也就是说你你整个库啊的表全部同步过来啊,一个昼夜搞定,呃也可以多个库同时监听啊,同时同步啊,这个时候你只需要一个job啊,那对MYSQL的业务库的压力也是也还好。那么如果是第二种,呃。你可以先过卡夫卡,再到。CD。也不是CDC了,这个时候你就是用那个,比如说用up那个连接器去消费这些变更数据,呃,然后这些变更数据再插入到湖底,也就是过了一道卡夫卡,那么这个的这种方案呢,相对来讲还好,因为你不会直接对我们的数据库造成一个太大的压力啊,太大的压力。
02:22
这边就是用一个同步作业过来就行了呗,啊,然后这边在消费再写过去,而且呢,如果你有异构的数据源,比如说你不仅仅你们企业当中,肯定不是说呃,规模稍微还可以的,不可能说业务数据库很单一,你可能你们用到了myle,可能用到的Oracle,甚至还有DB two的,甚至还有mango的,也就是说你们的业务库特别多,有很多的异购数据库,那这个时候呢。你还不如说呃,每个库呢,看用什么工具诶,都统一采集到卡卡当中之后呢,再统一用flink从卡布卡当中去消费,再写入护地啊,那这样的话会更合适一点,对吧?那下面我也简单写两句啊啊第一种方式呢,用CC connect直接对DB bno导入啊,就是它不依赖于消息队列卡不卡,缺点呢就是会对业库造成压力,特别是这里,比如说有张表A,你这边重复创建了两次互底表,比如说护底里面有表有有有有啊T1T2吧,这两张表的数据都依赖于表A,如果你用的是CDC的circle语法建立1CREATE table这种方式啊,那你是不是就得两份同时监听表A的blog啊,那这样子肯定就造成压力的嘛啊所以你看情况啊,呃,如果上游的数据无法保证顺序,需要指定什么预合并字段啊,这个不要忘了。
03:51
啊,他会根据record key帮我们去做一个啊。好,行了,废话少说了,那咱们快速来演示一下,我这边就演示一个。
04:01
这个啊。啊不啊,上面这个电路简单啊,我就不去做了,我我比如说我从这同步到卡普卡啊,然后呢,我flink该怎么来消费这种数据怎么写啊,咱们用纯circle来实现,那首先咱们准备一下数据呗,呃,那咱这边需my circle需要开启bno,我这边是有my circle的啊。这个你们都会配,我就不讲了啊,浪费时间呃,在ETC慢一点,CNF在这边是不是要开启一个blog啊,指定一个serve ID啊,然后呢,Lo b等于MYSQLB啊,那格式呢,要指我指定为肉啊那。过滤出居帽这个库啊,这个配置下你看情况吧啊,最好开启GTD,避免主避免主从切换的时候那个丢数的问题,行了,这个就你们自己去开,然后重启一下买circleq就行了,接下来我直接准备数据啊,那来来这么一张表可以啊,我这边用直接用这个navvi cat。
05:07
来做就行了。啊,稍等一下好了,那是哈杜ER1对吧,啊,有一个我是指定的机贸,那就在机贸里面吧。新建一个查询好,然后呢,咱们呃,要test吗?啊不要了吧,我就集墨了啊,你们单独建库就单独建库吧,那比如说我就用ST3啊。进了。好了。刷新。啊,我这表有点多。
06:02
是我没看到吗?还是怎么了?啊,这我们刷新啊,是ST3有了吧,啊有这么多个字段,好,那接下来我们要在这边去创建一张读取blog的表,用flink CDC来实现,但是我们要做这个的话,需要有flink CDC的依赖啊,那么在这个地方。呃。再给到大家的资料啊,连接器架包一些常用的我都给你下好了啊,这是适配咱们版本的,包括了卡夫卡的,包括了have了,包括了买SQLCC的,那现在咱们是不是需要卡夫卡也需要买SQLCDC啊,那就上传呗。O BT module flink1.13.6啊,CD live我直接丢到live了啊,将这两个包。然后呢,不要忘了,我们说这种集群模式,它静态加载依赖了,我们要先重启啊,完了这个之前好没事啊。
07:10
那我把这个session先掉。依赖变更了嘛,要不然他读取不到。好了。Kill掉之后重新启动我们的雅session。搂一眼吧,你看呃,这个K耐卡夫卡K耐马克CDC啊,用2.2.1啊修复了一些bug。OK,之后呢,还是一样的连接一下咱们的SQL客户端,那这个时候就可以来用了啊,那首先你看我们建议一张blink的表,它是MYSQLCDC啊,好写上你的MYSQL主机名,MYSQL端口账号跟密码啊,还有库名,还有表名,那这边我得稍作修改啊。
08:05
因为我的库不是这个啊。都删了吧,我的酷是居帽啊。那账号密码你们自己看着办啊,你不要跟我一样好回车,So tables应该只有这张了啊,别闹了,接下来我们要建一张卡夫卡的表,那卡夫卡的表你的字段跟类型要能对应的上,我这边都是对应上的啊,而且有个地方要注意,咱们必须指定组件啊。因为咱们要用什么呢?@卡夫卡,@卡夫卡就必须有组件,我们知道卡夫卡是不是KV存储啊,它怎么来存储这个变更数据呢?其实也简单,它就像组件,比如说有一条数据叫EA。对吧,后面它变更成了1B,它是怎么处理的呢?呃。
09:00
它类似这种回撤数据啊,啊,我们可以这样,它先是一个insert加I,然后呢,加I的话,它会将这个组件一当成key value呢,将数据本身写进来啊就这样子。那我们去查卡夫卡这张表,就能查到EA这个数据了,那么后面你这个ID为一,组件为一的这条数据变更了,由A改成了B了,他会做什么呢?呃,如果是一个回撤的操作的话,比如说有个减U它会干嘛呢?是不是先先把EA回撤掉啊,它就会来一条啊。Now。对吧,然后再插入一条新的EB,那就是一,然后1B,那么这个时候你看他们的key是不是都是同一个,那这个时候我们再去查的时候,我们就可以取最新,那就是最新变更后的结果啊,它就就回撤,撤回的话就是复值为no啊,插入的话就是写进去value啊,用它的key是主键,所以必须有主件啊,这咱们简单回忆一下阿四卡不卡的原理啊行。
10:09
之后呢,Topic呢,咱们就按照这个来吧,让他自己写,没有应该会自动创建啊,接下来是足keepper的配置啊是它诶然后卡不卡的地址跟端口,呃,我是这个,那格式呢,我就都用Jason好吧,那这个。就直接拿过来,So tables,那回头就是查询这张表,插入这张卡卡表啊,那就完成这个事。嗯。那直接拷贝吧。因为我也不想敲这个名字了啊呃。哦哦哦哦,我这里忘了写组建了啊,ID也忘了写组件了啊,那我直接写在后面就好了,看我怎么写,我保持一致吧,那就写在后面吧。
11:05
就是你买circle CDC的表也是必须有主件的啊,它这个无锁算法也是要依赖主件啊哦,稍作修改也是ID吧,好,那我就drop table s some log。重新建一下就好了。好好。那接下来执行一个插入操作啊,查询CDC表插入卡不卡。那。我们的数据那这样吧,我就手动演示了啊,ID1那张三是过啊清华。
12:03
啊,它的简称昵称,哈哈,年龄18。Class number123电话456EMAIL这样子啊,IP啊,随便写吧,啊给一条数据。啊,我还没还没提交啊,我现在查selectt新啊,这个表明也有点长啊。卡夫卡的这张表啊,给大家玩一下啊。啊,不喜欢这个啊,那我们还是用table的这种格式来啊。好,再查一下设置为table,然后我再查,现在呢,肯定是空的对不对,为啥呢?因为没没有变更嘛,啊没有变更数据来我勾上好了,观察这里。要不要再缩小一点?
13:02
我们就看有没有数据就行了呗。啊,这这个是测一下flink CDC啊,写入卡不卡这么一个事儿啊,大家注意咱们用的是阿卡不卡连接器啊。哦,完了,为什么说完了呢?因为我,哎呀,不,不对啊。啊,我我犯了一个错误,我突然想起来了,我应该是改成居帽这个库才对啊,啊,那个组件我也得拷过来。我重新建表直接粘的文档了吗?啊,这就保持不一致,就麻烦麻烦在这啊,应该这样子才对啊。那那个插入作业咱们也快赶赶紧给他停掉吧,那就没有意义了,就。
14:02
你看他一直在重启报什么错,你看吧,他肯定是说找不到啊,啊,你看不能找到匹配的表,对吧,这个作业是什么。好,停掉了啊,重新执行一个insert,造孽了。浪费时间。好,现在软了,那我就可以来查了。好,查这张卡不卡表,看看能不能实时获取到变更数据。哎,你看这数据不就出来了吗。对吧,好,那现在比如说我做了一个修改,哎,年龄我改成20岁,好提交,那么再看一下能不能实时捕捉到。诶是不是可以啊,而且大家看看我们先有一个什么减U,减U的意思就是要更新啊,要撤回这条数据,然后加U,就是将更新后的结果写进来,你看它是先将18这条减U,然后再将新的20加U啊。
15:17
那如果你用table的那种风格来看的话,就是一开始就一条18,后面诶反而动一下就变成了一条20对吧,结果直接更新掉的那种效果。好了,这个就测到这啊。那么接下来我们要写入户表了。对吧,写入互利表,那这个时候我们要重新建立一张原表啊,然后呢,映射还是之前的这一些卡卡配置信息,那指定从earliest从头消费啊。格式为Jason,好,之后呢?为什么要重新建呢?因为之前用的是UPS。啊,如果是普通的卡夫卡连接器,它是没法接收这种changelo的。
16:02
就CDC的这种格式,那你你用普通卡夫卡连接器是写不进去的啊,它会直接报错啊,啊,所以我们先用@卡夫卡接收啊,让他数据进到这个topic当中,按照刚才的格式,接下来我们建立一个普通的卡夫卡连接器,为什么是普通卡夫卡连接器啊,因为我希望把你所有的变更捕捉到,写入护底,你这个加加I减U加U,我都希望拿到,我希望把这些变更的东西写入到户底去,而不是说只把最新的结果写过去啊,所以我这要理解为什么这么用啊,这么用呃,那在之后呢,忽底的目标表咱们就正常去建就行了啊,那个tmp啊迪link吧。啊,这么长的名字,呃,那写入方式为insert或者upsett都可以啊,那我的预合并字段是用的学校字段啊,无所谓啊,Part by score啊,可以。好,那接下来我们快速的进行一个操作啊,建立一张新的卡不卡原表,映射同一个topic,同一个topic,但不是upset了。
17:14
好,接下来建立一张忽底表啊,就参数很简单,你打通这个电路是没什么挑战的,说实话你用的circleq的话,就是写写circleq,复制粘贴就可以了,甚至你可以写一个circlel脚本,在外面直接杠F执行这个circleq啊好了,接下来就插对吧?啊,我们怎么插呢?我们从这一张新的卡夫卡连接器的表插入到呼地表。那这个电路就都通了,对吧,你看应该有两个insert任务啊。看一下这。回退。哎,这张表有了吧,啊,就等待数据写入了。
18:00
啊,你看现在现在有一个分区啊,我是用学校做分区啊,你看有个点。那个你你可以继续怎么玩呢,你比如说我再插一条数据啊二李四。啊,然后他是北大的。14啊年龄22,班级222号码3331麦AAAIP333好确定。那我们看一下。啊,也不用看,我们就看分区有没有生成嘛,啊,我们刚才没有开启流毒对不对啊,我们要你看现在数据肯定进来,为什么?因为北大这个学校分区也生成了,对吧?如果要看的话,你可以开启流读去看,也就是说我们一开始建表的时候,呃,直接把这个在这里指定一个流毒的参数啊,那这样看起来就更舒服了呗。
19:01
好吧。那后面这个是介绍一个data fer data faker呢,是用来给我们生成测试数据的,这个测试数据我们主要是要呃插入到MYSQL,但是我刚才的方式就是什么就手动添加嘛啊都可以啊,那这个我们后面这个批量导入的时候会用啊啊一会儿再讲啊。那你看最后你要实时查看数据的入库情况啊,那你就开启一个什么流毒流毒。
我来说两句