00:00
现在Doris呢,也支持flink一个连接依赖啊,这是官方提供的,那通过它呢,我们就可以使用flink去读和写Doris里面的数据表。啊,那这个东西呢,呃,现在已经提交到那个美美中央仓库了,所以我们直接去引入一代就可以啊。那么除了读取跟插入修改删除也都是可以,都是OK的啊,那么它底层原理大家要知道,它是通过这个string road的方式去写入,那这个我们知道它是通过什么HTTP的方式是吧?啊,所以回头我们写代码肯定还是写的8030端口。那我们接下来了解一下它的一个版本金融关系,呃,那官方提供的是这么几个系列啊,首先最基本的要求Doris的版本要高于0.13啊,Java呢,最基础是JAVA8啊,是目前都是支持的2.12啊2.12。那么弗林格尔每一个系列从一点一一开始,1.11 1.12 1.13 1.14都有对应版本的依赖,那这个呢,呃,我也都在代码里面给大家写好了啊,那首先第一步我们跟Spark一样啊,先创建一个main工程啊,那这边我就不从头挑了,直接带大家看一遍啊,很简单,然后呢,导入flink的啊不,那个配置一下我们这个po文件。
01:28
好,我们来看一下。我重新点开啊。那前面是一些基础的配置对吧,什么指定为JDK8啊,Link版本,目前我演示的用的是一三系列啊,其他一一也好,一二也好,一四也好,一样的啊,你改掉就行了。呃,SC我用的是2.12啊,其他没啥,那前面几个依赖是flink基础依赖对吧?Flink Java flink是Jim啊,Flink client啊,这依次导入就行了,还有它的circle模块啊,它的plan啊也要导入啊,是blink版本的。
02:07
那后面其他依赖是无关紧要的一些像日志依赖对吧,啊还有一个。是my circle依赖啊。啊,其他的都呃,可有可无,对于Doris来讲,那么核心的就是。我这边写的最后一个依赖啊,那你group ID也好啊,呃,就gav GV按照我这个写就行,那么其他版本的我也都写好了,注释掉了,你选择你对应版本就OK了。那这个呢,呃,依在依赖下载好了之后就可以用啊,那么大家注意,如果要跑我这个案例的话,因为我这边scope作用域都写了一个provide provided啊,所以我们在运行的时候,如果你报错找不到类,那么你可以选择将这一行注释掉啊。注释掉provide不会把对应依赖打进包里,对吧?运行时也找不到依赖啊,这是第一种选择。
03:06
那我这边恢复啊。因为我们在正常开发flink的时候,一些。一群现有的依赖,我们肯定也不用去打包对吧?啊,那这个时候怎么办呢?点。ADD configuration。然后这里是不是对应的每个应用你都可以去选择一个什么啊,根据你idea的版本去找到这个配置啊,意思就是什么呢?在我运行的时候也包含指定为的依赖,这样呢,就不会报错找不到类啊。那你也可以直接在default里面有一个application啊,把它默认的就设置为勾上就可以了,好吧,啊,这个我就不多讲,那接下来我们看一下每一种呃方式的写法,第一种呢,是思考方式的写法啊,直接看代码。里面有一个circle DEMO啊,其实特别特别简单,那前面是flink使用flink circle使用的一个基本环境,这个应该大家都会写,对吧,我就简单的创建了一下,接下来是核心怎么写,来来我们看这里。
04:15
Create table这个语法都一样啊,那这边有什么字段的类型,字段类型字段类型啊,那Doris的字类型与flink类型的对应关系啊,那在文档里面我也列出来了啊,在最后一个小节大家可以去里面找,那目前的类型跟我do瑞S的表是一致的啊。好,接下来就核心的VS模块啊,这边主要是一个配置项了,对吧?首先第一个connect指定为Doris,还有fe的节点,我们说了它底层是通过stream load啊,所以我们写的一定是什么。HTTP端口8030啊还有呢,就是库名表用呃,Doris的用户名,Doris的密码对吧,这是最基础的配置项啊好,那这样的话我们就flink跟Doris就打通了啊,然后呢,我们可以尝试着来查询一下啊。
05:12
我们执行一个查询语句啊,看能不能将数据查出来,我们先回忆一下啊,现在我进入test DB这个库了,那我的表是TABLE1,这张表那我查一下先啊,Select新from table1。那我们看到是不是有这些数据对吧,我们看看flink程序能读出来吗。运行。那我们看到出结果了对吧?啊,那这在这里呢,我们就能看到Doris的表已经被成功被flink读取了啊,那这些数据对吧,我们再来试试用circle的方式去写入数据啊。
06:02
来,我打开这行代码,那我执行的是flink的insert into,那insert into我插入一条数据,呃,二二吴彦祖三三对吧,我只对三个字段插入啊,因为这个small in我们还得单独去转一下啊,你也可以去试啊,我这边为了省事就没啊。没有去插入这个C的这个字段啊。只插入这个这个这个啊,那回头这边没写的,那就是个闹啊,那我们试一下执行。现在还没有这一条对吧,ID为22的这里还没有啊。啊,ID为22。
07:01
好,呃,程序执行完毕啊。那么接下来我们来Doris里面去看一看莱特星。这个时候我们对比发现,是不是这把这条数据也插入进来了,对吧,ID22,然后这个字段没指定呢,是个闹啊吴彦祖啊,PV3没问题吧。这个就通过circle的方式来读写啊,这个还比较简单啊。那接下来我们看另外一个方式啊,我们用是来写。塔是呢,这边我。造了几条数据,我们来读写啊,来我们看一下,首先是这个Jason,我们先看读吧。啊。哨子就特别简单了啊。我创建一个配置对象,这个是Z,呃,Java u包下面的那个properties,那里面呢,放还是这几个核心配置项,呃,像fe的。
08:01
地址跟端口啊,Doris的账号跟密码,还有库名点表明啊,还是这基础配置项之后呢,我ADD source里面传的是那个连接器提供的一个类啊,叫Doris source function。那里面需要传两个东西啊,第一个呢,叫Doris的配置项里面呢,就把刚才的这个properties传进来就可以了,这是第一个参数,第二一个参数呢,就是指定一个序列化的STEM啊,那我们用simple的就可以了。呃,那这边我做了一个打印啊,那我们来看一下效果。还是查的TABLE1对吧。可以看到已经读出来了数据。对吧,它是以流的方式一条一条读出来的啊,你看这是一条,这是一条啊,这是一条。因为我们这边呃,指定的是一个什么list啊,所以一条数据就转成了一条list,看你是怎么进行反序列化的,对吧。
09:08
包括我们前面S口插入的这条数据都能读到,对吧。这是一个读,那我们再来看一个写。写的话,我这边写了两个类啊,一个是Jason格式的数据,我往里面写入,另外一个呢,可能是大家会用,也会用到叫肉德塔的方式去写啊,肉data有一些特定的写法啊,所以我们都啊做一个演示跟讲解,首先看一个杰森。T。这个呢?来,我们看一下。首先呢,我是from from element啊,造了一条这么一条数据啊。那把这条数据改成我们对应的就可以了。现在改成跟我们表对应的几个字段对吧,赛塔ID什么U端内。
10:03
啊PV啊这样的数据,那之后呢,我直接ADDS,那看一下这里怎么写啊,这边呢,还是用一个连接器提供的类叫dori sink,调用一个sink方法,那里面的传参它的构造器有多种方式,那呃一种是简单的写法啊。一种呢是传多个参数的,那首先第一个它是一个参数啊,这是读取选项,一般我们直接B保存默认就可以。这是第二一个参数。这是一些执行选项啊,主要有什么呢?我写入的时候,如果一条来输出一条,就往Doris插一条,那这样的话,如果。我们每秒钟数据量多一点啊,数据量多一点,频繁一点,那Doris这边连接可能就扛不住,那一般我们写入的优化是不是要长P,那所以他这边执行选项指的就是我去长P啊batch还有一个就是同事。
11:01
同事那怎批啊,比如说我三条一批,那批次时间我先设个零对吧,那同事呢,我设个三啊,这是基本的配置,还可以配置一个。我们通过STEM load的方式去写入的话,还有它的一个基本选项啊。啊,这边我也是另用了一个property对象,把它存进来。一个是指定格式啊为Jason,那这样的话,它会自动解析这个Jason数据。那还有一个参数,把它置为啊。这是第二个啊选项。也可以不呃,前两个参数呢,都可以不去写也可以,那第三一个是一定要的。啊,这是。Doris的基本配置像什么,它的前端节点对吧,那这边我对我们来讲就是啊哈杜一啊8030啊前端啊HTTP端口。
12:00
那这个呢,就是库名点表明,库明点表明对我们来讲就是test db.table1。那用户名呢,我这边是test,密码也是test啊,就这样子就可以了。那我们来执行一下。看这条ID为66的啊,能不能成功插入。现在是执行完毕的,我们到Doris里面去查啊,那我这边已经查了一次了啊,那我们来看一下我指定的那条数据,是不是刚好就这条ID为66CD的为六啊,然后用户名彭于晏对吧,PV6,那已经成功的进行了一个插入。哎,那这就OK了,那最后就啰嗦一句,就是什么,我们这个写法,这个sink方法里边啊,一共三个参数。啊,最后一个是必配的。要指定如何连接Doris啊,要处理哪张表,那前面两个呢,看你的需要,你不指定也行,像我下面的写法,我这个think里面是不是只指定了最后这么一个参数也可以啊,啊,这是它提供了不同的一个构造方式,行。
13:14
是Jason。那我们再来看一个road德。如果我以肉的方式去写入。那应该怎么写啊,那有一个东西是一定要的。就是指定这个roll data的一个,呃,逻辑类型,还有它的字段列表啊,这两个要记得去写,这个其实是属于roll data的一些规范啊,那前面呢,我就造了一条roll data格式的数据。那比如说我的IID33。那city code是一个small int,那我就用salt去写入对吧,我这边转了一下啊。呃,三三的,我改成八八来,这样比较明显对吧,之前没有八这条数据啊八啊。
14:01
嗯。那这些只是呃,生成一条road类型数据的写法而已啊,那其次呢,这个地方要去指定啊,我这个road每一个字段分别是什么类型顺序不能乱,对吧,第一个字段是一个int类型,第二个small int,第三个ver。第四个big in对吧,还有字段名称,那这两个定义好之后,你同样传入哪里呢?这个think方法里啊。那这边你看我船舱比之前就多了,除了这三个参数之外。对吧,前面我们里面只定义了这三个参数,那它还支持再传两个参数,一个是字段,一个是类型,啊传进来这样就OK了。好,那我们来执行,还是对table进行操作。听完毕了,七星完毕,我们进来看一下select星啊,那这个时候我们能发现ID为八八,是我们我们现改的一条数据对吧?呃,成功插入进来。
15:02
这种就是对肉类型的数据啊,它呃是怎么个写法。那这边呢,是一些案例演示。那最后呢,我们讲讲除了那四个基本配置,那还有其他的一些通用配置,像前面我们指定一个F1的节点地址对吧?呃,库屏表明Doris用户名,Doris密码,那还有一些什么东西。呃,像城市。还有一些超时时间,超时时间呃,分别是读的超时,还有查询超时,这边提供了多个丰富的选项啊,还有一个大小跟Spark一样啊,这个参数都差不多一样啊,这就不再重复去讲了,还有一些批次啊。内存限制。那这边呢,大家就去查阅就可以了。那最终的一个列的类型映射关系啊,那基本上都是一一对应的啊,都是一一对应的,那我们看看有没有特别要注意的地方,还是这么几个地方啊。
16:07
一个是date类型,一个是date time类型来。你看。他们呢,呃,在flink里面我们要转成string,这样格式才能统一啊。Flink你要用stream来对应,那还有呢,就像我们这个,呃,差large in very差这几个呢,它也通通用string来处理,这是flink啊。DECVR它用的是普通Dis啊,Time类型用的是double,呃,Log log还不支持,这link不支持嘛,对吧?啊,这是一个类型的映射关系。
我来说两句