00:00
前面我们了解了在flink当中怎么样把数据输出到一些常见的大数据组件,比如说分布式的文件系统,卡夫卡,Red ES等等,那接下来呢,我们再来了解一下怎么样将flink当中的结果数据输出到关系型数据库里面啊,那关系型数据库呢,我们都非常的熟悉啊,它的特点就是数据的组织形式。是一张表,每一条数据在表里边其实就是一行啊,那当然了,里边有很多个字段啊,我们可以把它看成一种特殊的多元组的类型,那这种数据组织形式呢,它是结构化的,所以我们知道可以使用一个经典的结构化查询语言,也就是CQ进行方便的查询啊,所以在很多企业里边我们会发现啊,关型数据库可能并不太适合用作大数据场景的扩展,但是呢,我们的后台业务数据一般都是用这种关系数据库来进行存储的。那这一类关型数据库里边最经典的代表当然就是MYSQ了啊,那另外还有这种更加企业级别的,像Oracle post grasq等等,他们都是属于关型数据库的,我们在建立到数据库的连接的时候呢,其实建立的就是一个JDBC连接啊,所以这一部分我们是以MYSQL为例,其实我们介绍的就是怎么样。
01:21
输出到可以建立GDBC连接的一个外部数据库里面,传统数据库里面,好,那首先我们肯定要引入相关的依赖啊,在早些时候flink是没有官方提供GDC连接器的啊,那现在的01:13版本呢,已经直接提供了一个连接器,就叫做flink connect gd bc啊,当然了还是这种命名规则,后边要跟上当前SC的版本2.12,然后下面是当前flink的版本01:13点零。那除了引入一个GDBC的连接器之外,我们知道现在是要写入MYCQ啊,所以我们对应的必须要有具体的数据库,对应的驱动类啊,那所以我们还要引入真正MySQL Java这个连接器,所以我们引入两个依赖,一个是flink connector JD bc,另外一个是myql connect Java,这里面我用的连接器的版本是5.1.47啊,当然了大家可以针对自己。
02:17
安装的MYQ的版本来选择对应的版本就可以了。所以接下来我们把这一部分。依赖引入到泡沫文件里边来。直接做一个复制。我们看一下,检查一下VI这里啊,我们看到已经多出了flink connector jdbc和myq connector Java,接下来我们就可以在代码里边去做一个测试了,同样还是。创建一个SSC的object,现在我们应该叫做think to my test。没方法先写出来啊,那整体的流程我们其实会发现啊,跟之前还是非常类似的,所以前面啊,创建流式执行环境和读取数据源,我们这里还是直接借用ES当时已经实现过的这一部分代码,前面还是把对应的这一部分直接改成下划线,引入相应的影视转换。
03:12
然后接下来呢,我们就把stream里边的数据直接写入到MYSQL里面就可以了,那要想写入到MYSQL里边,按之前我们的这个想法,肯定就是直接ADD think,然后去实现一个think function了,只不过现在官方有连接器,那我们这个think function就应该是官方给我们提供的了,这个名字叫什么呢?呃,跟之前也非常类似,当前是JDBC的连接器,这个名字就叫做JD bc think我们看到就是它。可以点进去看一眼,我们可以看到这个class啊。它本身其实并没有去实现think方式接口哦,那这个东西到底应该怎么用呢?那我们就应该去找里边到底哪个方法返回了think方式,我们看就是这个啊,就是调一个think方法,所以我们最终其实是调用了gd bc think这个类里边的静态方法think,然后最终返回了一个我们想要的think方式,我们看一下这个东西,这就是我们真正意义上想实现的这个接口,里边有一个最关键的in work方法,这是我们。
04:18
具体去指定的操作流程啊,所以现在的话,我们这个调用就不应该是去有一个对象了,而是直接在这里调think静态方法里边我们看到。前面这里有两种传参方式。我们直接看上面最简单的这种就可以,里边有三个参数,第一个是一个string类型的CQ啊,很显然这就是我们所要执行的,比如说写入数据到数据库里边的具体的那一句CQ啊,那我们当前如果是写入的话,当然就是直接insert into啊,我们定义一张MYSQL里面的表,然后into某张表,然后把什么样的字段写进去,比如说现在我们写的还是用户网页上的一些浏览行为,点击行为。
05:04
哎,那么我们还是把这张表可以就直接叫做clicks,然后在里边我们保存什么数据呢?呃,可以类似于我们之前在里边的保存方式啊,我们还是一个K一个value这样去保存就可以了,就一个user。然后一个URL,这里我们定义也可以比较灵活啊,就是如果我们把这个user当成key的话,Primary key啊,那么接下来相当于我们就是一个覆盖的效果,如果不把它当做P的话,那就类似于之前我们往ES里边写的效果了啊,那就相当于每条数据都会保存进去啊,这是我们基本的一个想法。然后后边第二个参数,我们看到是一个gd bc statement builder啊,这个主要是用来干什么呢?那就是构建我们当前的一个预编译的语句啊,因为我们知道啊,前面假如说这个CQ里边我们想要写入数据到MYSQL里的话,我们这里边预定义的这个insert语句是不可能直接包含我们当前要写入的所有数据的。因为数据是真正执行开始之后,来一个数据去执行一次,所以我们这里边一开始是还没有数据,应该只有一个占位符,呃,我们知道在CQ里边可以用这个问号去表示占位符,哎,那对应这个数据怎么样去填写呢?哎,那很显然就需要用一个预编译语句的builder,把我们当前的这一个数据相应的字段填写到上面我们定义好的CQ里面,好,前面可以直接预编译起来,然后在这里呢,Builder里边我们就把这个语句完整的填好,然后进行执行。
06:33
啊,那最后还有一个参数是gd bc connection options,这很显然就是我们建立GDBC连接的一些配置选项,我们在这里边可能需要指定GDBC连接的URL,可能要指定当前driver的名称啊,可能要指定我们当前连接数据库时候的用户名,密码等等等等啊,所以这就是我们常规的这个连接的配置现,所以整体来看这个还是比较简单的,就是我们能够想到的这些东西都要在对应的参数位置去做一个实现。
07:03
所以这里边我们把三个参数就分别做一个写入吧。第一个。这里面我们知道是insert into clicks,然后接下来我们还得去指定当前的想要写入的字段名称,比如我们就是user。和URL。后面呢,我们还可以继续指定当前的values。现在我们并没有对应的值,所以直接用一个问号占位符来表示就可以了,这就是我们写入的语句。我们这里啊定义。写入。MYQ的语句。然后接下来后边我们所要实现的是一个。Jdbc statement builder哎,那这个其实非常简单,我们直接去new一个JDBC。Statement builder就可以了,这里边有泛型啊,那当然就是当前的数据类型了,Event里边必须要去实现的一个方法叫做accept啊,那accept顾名思义嘛,那就是说我们当前把数据这个数据叫做U,当前的数据来了之后,怎么样把它填入到对应的预编译好的语句里边去呢?啊,就是这个t prepared statement里边去。
08:21
这就是我们所要定义的这一个逻辑了,所以接下来我们做的操作也非常的简单,那就是这里的T需要去进行set。我们想要set的两个值,一个是user,一个是URL,那都是string类型,所以我们可以直接set string啊,那里边我们所要填写的参数啊,我们看到第一个参数是当前的索引位置,也就是说我们当前不是有两个占位符吗?那所要填写进去的当前是第几个呢?哎,那这个int就是位置第一个。然后后边就是我们所要填入的值,值的话,当然就从U里边去提取了u.u。所以非常简单啊,后边第二个也是一样,t.set three,接下来是第二个位置填入的是u.URL那这样的话,我们就把这个预编译语句按照我们当前来的数据全部都填写完成,可以去执行了,那最后还有一个参数,我们所要去创建一个当前的GDBC连接的配置项啊,那所以我们就直接用一个。
09:21
JDBC现在很显然是connection options,然后点进去我们看一下需要哪些参数呢?哦,这里边这个connection options啊,我们看到它的构造方法又是protected,所以很显然我们现在又用到了一个,哎,又有builder,又是一个建造者模式,所以我们真正所要创建的其实是一个它的builder对象,然后再调用builder对象里边我们可以做各种各样的配置,With URL with drivername with username啊,把这些全部配置好了之后,最后调它的一个build方法,就可以返回gd bc options的一个对象,所以接下来的我们的做的这个操作啊,那就是直接在这里。
10:05
去创建一个builder,那这里builder是没有泛型的啊,所以后边我们就可以直接调用它的构造方法,我们看一眼哦,这里边构造方法参数是空的啊呃,没有任何的参数,所以接下来我们就直接在后边调用它的位。去进行各种各样的配置就可以了,首先是URL,哎,那我们知道这里的URL是JDBC,然后MYCQ。接下来我们去定义,比方说是local host 3306默认端口号啊,我们这里因为Windows电脑里边本身就安装了icq嘛,我们直接用local host就可了,如果说我们是远程机器的话,配成102也都可以啊,所以接下来啊,那后面我们还可以指定当前连接到的某一个数据库啊,比方说我们连接到一个测试的数据库吧。Database名字就是test,我们先把这个指定好,然后接下来后边我们还需要有一些基本配置,比如说。
11:01
当前我们得指定MYCQ的这个BC连接的那个driver啊,所以我们当前就是com.mycq.JDBC.driver这是我们的驱动类啊,那啊当然了,如果是MYSQ比较高的版本的话,我我现在是CQ5嘛,如果是更高版本的话,可能这里还要加上这个点CG.gbc.driver啊,大家根据自己所对应的MYQ版本来进行调整就可以了。好,然后接下来我们还可以指定,因为我当前有这个username,有password啊,所以这里边我们可以直接指定username,比方说我这里是root。然后。Password。是。123456,诶,我这里是直接写死了,大家知道在生产环境里边,这种在代码里边写死用户名密码的方式是不可取的啊,这个安全性不太好,所以我们这里只是举个例子,做一个测试,最后调用build的方法把它创建出来就可以了啊,这就是我们完整的写入的这个过程,三个参数按照要求一一填入就完成了。
12:03
啊,那当然,最后不要忘记烟。还要执行起来execute,这就是我们整个代码的实现。接下来呢,我们就可以运行它来做一个测试了,当然现在如果我们要运行的话,还必须启动MYCQ,并且我们得创建出对应的数据库和表才可以啊,这跟其他的大数据组件是不一样啊,别的可能我们不需要特别的去创建啊,所以现在我的本地机器上已经MYSQL啊,Server是已经提起来了,那接下来呢啊,我们连接上之后啊,进去看一眼,首先看一下当前的。Database。数据库啊呃,当前我们已经有这个test了,那我们use test。然后接下来我们show一下当前的tables,当前什么都没有,所以我们应该首先要去做一个表的创建。Click。那里边主要有两个字段,一个是user当前的用户名,它是string类型,所以是V,我们给一个20吧。
13:07
Not now。然后另外还有一个URL,同样也是弯叉,我们给一个100。脑通道。好,创建了这张表之后,我们再收tables,可以看到有了这张表,接下来我们就可以运行代码,像MYSQL里边去写入数据了,所以接下来我们直接运行跑起来。这里我们看到控制台也没有任何的输出,因为我们这里边没有做控制台打印print嘛,只是直接I think啊,直接写入到外部的MYCQ里面去了。接下来我们看一看,到底写入了没有呢?哎,那当然我们就去select。直接select芯儿。From clicks。这里我们看到所有的数据全部依次写进来了啊,那我们所有的一条一条数据按照顺据依次放到了clicks这张表里,那这里呢,我们看到每一个user啊,它的所有点击数据并没有按照user去进行覆盖,那是因为我们这里边并没有定义K嘛啊,所以这里边的实现的效果其实是跟之前ES里边是类似的,那如果说我们把user定义成primary key的话,那后边就可以实现一个覆盖的效果了,呃,那如果说我们想要实现类似于之前里边的覆盖效果,仅仅在这里把user。
14:28
定义成primary key还不够啊啊,因为我们知道如果说想要去做一个按照key的更新操作的话,那当前的这个语句就不应该是insert into了啊,只有在当前K没有出现过的时候,我们才应该去insert,那如果说已经出现过的话,那我们应该是用update啊,那我们应该要构建另外的一个语句,然后来进行不同的逻辑定义,所以这种方式可能会更加的复杂一点。这就是关于在flink当中将数据写入到MYSQL的完整过程。
我来说两句