00:00
那么除了用CDC呢,咱们还有一种需求啊,那么就是我要大量的历史数据,我要离线的批量导入,那如果你不用CDC的那个历史数据同步也可以,那你用ber insert就行,核心就是用咱们的b insert。对吧,那这个东西呢,呃。它有什么好处呢?第一个它可以省去我们阿芙罗的序列化啊,也就是说点log文件是不是阿芙罗序列化,另外呢,我们是不是有log之后,是不是还要去灭掘。对不对,不管你是Co还是MR,都得有一个合并或者压缩的过程,对吧,那么那离线批量导入,那之所以叫离线嘛,对吧?啊就是说一步到位,我直接就是初始化号啊,那后续它也不会再有一个去重的操作,这是要注意了,那数据的我运行需要自己来保证的啊你。那另外呢,一些注意事项,咱们这个BK insert需要在P的模式下执行效率更高,你用流的方式也可以执行,但是流可能会比较吃资源嘛,或者说性能不好啊,Bench模式它因为它会按照我们的分区路径啊,它会做一个排序啊,排序的话那效率就更高嘛,相当于就一个批处理了啊,避免呢我们文件这边频繁切换啊,那P的执行方式就是我们只需要设定一个参数就行,只呃运行模式改为batch,默认是streaming,也就表示流啊,那么如果要用batch呢,它就要求咱们check po要关闭啊,因为它不支持check po。
01:38
那另外呢,就是关于它的并发限制,其实就是right task right task就是它的并发啊。嗯。另外就是说,比如说你并发指定为次,最终一定只会生成四个PA回的文件吗?啊,或者说生成四个文件嘛,也不一定,不要忘了,咱们附底会自动管理文件的大小,默认呢,是不是超过120兆的pack文件会会生会让你往新的PA写啊啊所以你并发为四的话,可能最终生成的啊文件不只是四个对吧?因为它还有一个滚动的大小限制啊,这就是我们说的一些事儿,呃,那看一下具体的一些参数啊啊首先咱们怎么用呢?
02:22
我们写的operation啊,写的方式默认的是upset,我们从来没指定过,基本上对不对啊,那现在呢,我们可以把它指定为报insert啊,这是第一个,那第二一个呢,这是它的并发啊,Right task大家很熟了啊,这个大家知道了,另外呢,就是一个shuffle啊。Shuffle input,它就是说先按照分区字段做一个shuffle,这样的话同一个分区的数据就先到一起去了,这样可以减少我们小文件的数量啊,但是带来的就是你不同分区之间数据量不均匀,不一致,那这个时候反而会可能有数据倾斜的一个问题啊,所以这个你就自己看着来吧,这个要不要做啊,那第二个是排序啊,它可以先做一个排序,它是按照分区字段排序再写入啊。
03:11
也是为了减少那个小文件的问题。这个是一一版本才有的啊,另外呢,就是排序的时候可用的一个内存,那内存它用的是flink内存模型中的托管内存啊,也就是说咱们弗Li内存模型啊,不是有堆内跟堆外吗?啊堆外里面不是分为task堆外啊,还有框架,框架的堆外还有什么呢?网络缓存是吧?还有一个托管的内存呢,托管内存正常来讲是给RODB用的,那如果是P的模式下,咱们这些salt呀,哈希呀都是也也是会用到这块manage的。内存啊,也就托管内存,那这个默认,呃,这个时候他默认护底给我们写的是128啊,那我们也可以去改好,那这些是一些参数的说明,那接下来就是准备数据了啊,那我们快速的建一张表吧,我在test库下面建一个ST4啊,这我已经拷贝过了啊,现在是test啊创建一下。
04:18
好,刷新STO4有了,那么接下来呢,这边我之前是用data faker去做的,那现在大家也不一定装了,那我就手动先插几条数据吧啊。好吧。嗯。我插个两三条吧,嗯。30,好吧,随便咱们快速的演示一下吧。
05:03
那就两条吧,好吧,那现在假设这个是历史很多的全量数据啊,那这个时候咱们要去做一个历史导入啊,那我们进入一个circleq客户端,创建这个MYSQL的源,大家注意我这边不用用CDC了,我用的是CDBC啊CDBC。嗯,我看一下啊,这个应该没问题,也指定组件了对吧,字段都是对应的,好看一下so tables啊,这还是之前几张对吧,那就见一下STO4。啊,你注意改一下你的账号密码啊,还有数据库的路径啊地址,那ST4创建好之后,咱们再建一张互地表啊护地表我还是放到忽迪link吧。啊,这个是ST4要写入的Mo,诶,大家注意这个关键在哪?Right option为Bo insert啊,那预合并字段为score啊,这无所谓,行,那我们创建互地表。
06:06
最关键就是这行我标个红啊。这一行就是最关键的,接下来就直接执行一个插入就行了。来insert,那那现在我插入的话,那肯定是一个流的模式,接下来我给大家前面不讲了吗?我们说用P的是不是更好啊啊我先指定为batch。啊,就这么用就行了,BAT完了之后,我们执行这个insert语句啊,你看会不会报错啊,他报什么错呢。他是哦哦,这这个不是啊,这个是因为我少了一个什么JBBC的连接依赖,哎呀。那我。那我就放呗。还得重启是不是,哎呀,那没事,我在资料里面啊,这个flink连接器架包放了两个东西啊,又多放两个,一个是JDBC连接器啊,2.12GALA1.13.6,还有呢,就是MYQ连接依赖我的my circleq是5.7的啊,那对应的连接器,这是MySQL Java连接器,把这两个放进去就行了啊。
07:22
嗯。CD,呃,OPT Mo啊,不对,Model flink-1.1 3.6,对吧?CD live,然后呢,将这俩包拿过来,还得再重洗一下啊,这是最烦的事啊。好了,那就重启呗,而且我们也没做持久化啊,诶行,那没事,嗯,没有执行中的作业啊。Here掉。
08:03
好了,K完之后还是一样的,先启动一个雅安session,重启嘛,那个包放进去我不懒得再看了啊。然后呢,连接搜索客户端啊,接下来就是重新创建这两张表呗,啊没事无所谓啊。On。STU4创建一下这是JDBC的表,那接下来这个是蝴蝶表啊。好了,那接下来我们刚才在干嘛呢?是不是一个batch对不对,呃,我们指定为batch。好,再来一个东西,我指定为table流啊。显示模式指定为table。好了,那接下来就差最后一步了,插入啊,我们看一下会不会报错啊,不会就直接跑。
09:06
那这个就是我们一个全量。诶你看呃,他说check个po是不支持的啊,所以呢,还是得得设啊。设成零,看试试行不行,因为我们默认配置文件已经开启了。那这个就比较麻烦了,由于我们在配置文件写死了啊,就是说默认就是开启这个火现在关不掉了啊,所以我们其实可以怎么办呢啊,就平时用的时候啊,就这个point不要就这个特不要指定啊。就往下走,就是这个,呃,我家在哪呢。啊,就这个地方,这个参数就不要在这写了,也就是说你需要开checkpoint的时候再在这里去set就好了啊,这样反而就是默认是关闭的啊,需要的时候再开行吧,那现在呢,我也不去用Bach了啊,其实效果啊就是一个效率问题,那我还是改成streaming之后呢,咱们去insert就好了啊,不要搞那么复杂。
10:19
我也我也懒得再重启了啊。那现在就是一个流式的插入了啊。但我们说了,如果可以的话,尽量还是用be啊,尽量用be啊刷一下。这个应该很快就执行完了。我们看一下它的节点有什么变化啊。还是塞,呃。还是right?现在哦,早就结束了,你看finish finish。
11:04
好了,那既然如此,我们快速的查一下吧,啊,Select from。From哪张表啊?这个表明啊。我们是TABLE6的风格嘛啊,无所谓了,缩小啊,那那么你看数据是过来了,那么如果你数据量特别大的时候,就适合用这样的一个离线,但至于你是走P还是走流都是可以的,离线归离线,但你是P还是流都行啊,那如果是P的话,我们说了会更推荐一点啊。那这种呢,是一个历史存量数据的一个导入方案。
我来说两句