00:00
接下来我们了解一下另外一个导入方式,Routine load,或者我们也称之为例行导入。那么最主要呢,我们就比如说从数据源,像卡夫卡源源不断的将数据进行导入,那么它是不会停止的。也就类似于我们一个实时的程序,它会不间断的一直在运行,除非呢我们手动将它停止,那这种就适合去从卡夫卡去。实时消费数据。这个是一个root node,那么当前景致是什么?从卡夫卡导入啊,而且呢是。支持无认证的或者SSL方式认证的卡不卡机群,如果你是cover那些可能要不一定。那格式呢,支持CSV,还有Jason。还有一个版本要求卡夫卡呢,是要求大于等于0.10,那么如果是0.80.9,那目前呢,还不直接跑的话是不支持的,那我们可以修改一个兼容性的配置是be的配置是这个version。
01:08
指定它才能兼容,也就是说兼容是可以兼容,我们要多设置一下啊。那它的基本原理特别简单啊,我们看一下这张图,还是客户端提交一个命令给谁啊,给fe,那fe呢,在切分任务。那分分配任务给谁执行,给be执行这边呢,是不需要broke参与啊。嗯。那它怎么实现不间断呢?其实什么不断的产生新的task,完成数据不间断的导入它的底层还是什么是dream no,只不过他帮我们实现了不间断的定期的去生成新的task,去拉数据啊,消费数据,仅此而已啊。那接下来就是对语法的一个介绍了,同学们,那这个语法看起来很简单,但是我们需要给大家讲一下啊。
02:02
那首先看第一个。叫create routine load,这是固定的,然后呢,给它起一个名字,类似于前面用的一个标签对吧,然后指定是导入到Doris的哪张表,所以这个是什么Doris的表明。接下来是一个合并类型,合并类型主要有三种,就是append delete merge append就是数据是直接追加到这张dori表。Delete呢?就类似于按呃取反导入嘛,前面也讲过一个取反导入对不对,就是说我原先导入这张表有123T是123,现在我想把一删除,我们说有一种方式,就是我导入一条数据是一,然后呢,类型我标记为delete,那这条数据导入进来之后,这个一就被抵消掉了。当然数据还在,只不过我们去查询的时候,这个一这个数据就查不到了,对不对,那合并呢,是更复杂的,我可以定义其他的啊。
03:07
对吧,那我们默认不指定,那肯定都是什么。按照一个append来处理啊。那接下来是这个load properties加载属性加载配置,那这边就很多了,第一个。参数有很多,我们看几个重要,第一个是分隔符,列的分隔符,也就是说卡夫卡里的数据你指不定是什么样,对不对,那分隔符我们就可以手动去指定,如果不指定,注意是杠T,也就是一个制表位,这个是大家要注意,特别是你自己去测试的时候,如果你都不指定这些,你结果说支持CSV,你用逗号去,卡夫卡的数据用逗号分隔,但会发现报错,或者说查不到数据,那是因为默认它是按照杠T来的啊,这个是要大家要注意的。第二一个是列的映射关系。这个呢,我们如果不指定啊。
04:00
呃,它是会自动去匹配,但建议指定为什么卡夫卡的每条消息。比如说我用逗号分隔了每条消息是不是可以不一样啊对吧,那很如果你不指定映射列,很容易就被脏数据给给怎么样。就就就就搞乱了,数据就乱了,所以这个列的映射关系还是建议指定,那列呢,我们分为什么呢?映射列还有衍生列,映射列呢。就是说原数据中每个列对应我们目标表的哪些列,也就卡夫卡跟Doris的列关系怎么来对应,怎么来映射啊?那还有一些取巧的用法,比如说Doris的表有三列是K1 K2 V1,卡夫卡里面按照分隔符切分,呃,四列,我只想取其中的三列怎么办?这个时候因为卡夫卡有列明吗?没有。他只能说按照分隔符切分成四列,如果比如说有四列,有1234列。你想把第三列不取,那是很难做到的,那这边我们有办法是什么?给要跳过的列取一个不存在的列名。
05:09
那这样的话,相当于说就实现一个跳过的效果了啊。那比如说我们就可以这么写cars映射嘛啊,然后就是K2K1,然后叉叉叉随便写一个一个不存在的列,那这样相当于说就跳过了呗。然后V1啊这样子。好,衍生链呢。衍生列是指啊,我们在上面用了一个表达式,比如说你用了一个加法,你看这不就表达式啊。那同样这个表达式,你甚至可以写什么case when,甚至还可以cast转换类型,对吧?这个这个表达式就很灵活了啊。那还有一个where。Where,就是一个过滤条件啊。过滤链呢,可以是衍映射力或者是衍生链,这种就是经过映射转换之后的过滤啊,还有可以指定分区。
06:00
啊,指定分区这个没什么好讲的。呃,还有一个删除条件,这个就是合并类型为删除时。呃,合并类型为默句时。的一个用法啊。好,其他的就大家一起看一看就行了啊。那还有一个什么source sequence用于唯一模型啊?嗯。那还有一个原数据的过滤跟VR区别在哪?VR是经过转换映射之后的列,那这个是未经映射未经转换的数据,就就是最最原始的卡普卡数据怎么过滤啊,威尔是经过映射之后怎么过滤啊,不一样啊。好,这是加载属性,那一般的加载属性我们就指定一个列分隔符,还有字段映射关系就够了,其他根据需要去设定,接下来是作业属性,作业属性是一些通用参数啊,那有这么几个,一个是并发。
07:01
对吧,我当前作业希望起几个并发去执行,默认呢是三。但这个并不是说你指定多少他就多少,他还是受很多东西的影响,它是经过这么计算的。一个是啥?取最小值是什么分区数,比如说我分区卡不卡就一个分区,你并发指定为三。它其实真正执行是不是也只有一个并发,因为就一个分区嘛,你用三个并发去跑没意义啊,那还有呢,这个就是指定的这个参数啊,默认是三,还有呢,还得考虑什么be的节点数量,因为be是真正干活的,假设我只有一个be,虽然卡不卡分区是三个参数,也指定三个并发,但是只有一个人干活,你还指望它能并发达到三吗?不可能,是不是,这个也好理解,那还有呢,是一个叫系统的限制,这个是fe的参数,默认呢是无。就默认限制最大的,并发最大,这是天花板啊天花板。那这几个参数去取一个最小值。
08:00
那么大家就知道一下啊。那还有几个呢?是关于控制一个执行时间,还有处理量的,或者说就是跟吞吐相关的啊。第一个呢是什么。最大批次间隔默认是十单位是秒对吧,默认一批次十秒。那么。还有一个行数,还有一个字节数批次的行数默认呢是多少呢?20万。默认20万,那么还有一个通过大小的大小默认字节呢,是100兆到一个G,然后它默认是100兆,那这个你可以根据你的需要去调整啊。就这三个参数。还有一个允许最大的错误的行数。默认是零,就一条都不能错啊,不能出错,那这个你可以根据需要去调。还有严格模式,还有时区,还有格式,像CSV啊。默认是CSV。
09:03
还有一个涨停啊。Jason,数组数组数组对象将进行,相当于说炸开嘛,展平嘛,默认是false啊,大家知道,有这个功能就行。那可以指定Jason的根节点对吧,比如说你嵌套的杰森啊,外层是你不需要的,你需要的是里层的一个,那你可以指定根节点是中间这里就可以了。好,那还有一个就是什么数据源的配置,这个主要是用来指定卡夫卡的参数,像卡夫卡地址端口,卡夫卡的topic,卡夫卡的分区等等,那就是在这里去指定的,好。那这个呢,就是我们的一个routine node参数说明啊,先了解用法你才能去用。
我来说两句