00:00
接下来我们要介绍的呢,是在大数据处理领域用的并不多,但是在常规的业务后台使用非常频繁的一类数据库,那就是关系型数据库。其中的典型代表当然就是MYQ了,当然还有post Oracle等等。我们知道关系型数据库它的特点是结构化的表,这样的数据结构设计,那里边呢,可以进行非常方便的CQ查询。绝大多数企业里边业务数据存储的方式都是使用这种数据库来做保存的,那所以尽管大数据处理直接跟MYSQL交互的场景不多,但是我们也需要知道怎么样能够把数据写入到MYSQL里面。在以前版本的flink里边是不提供直接向MYSQL里写入数据的连接器的。而目前01:13新版本的flink已经提供了直接连接到MYSQL的连接器,当然了,呃,不仅仅是连接到MYSQ,只要是建立GC连接的关系数据库都可以使用这个连接器进行连接。所以接下来我们想要引入的就是官方为我们提供的flink connector gd bc。
01:21
后面跟着的是scla的版本,下边对应的是flink的版本,我们发现官方提供的连接器都是这样的一种形式。而下边因为我们发现这里边只是一个普通的GDBC连接器嘛,那下边如果是具体要写入MYQ的话,还应该要引入一个MySQL Java的连接器,也就是得提供它的客户端,我们得建立到MYSQL的连接,向MYSQL去发送命令,写入数据。接下来我们先把对应的依赖引入到文件当中。引入依赖之后,接下来我们就可以创建一个新的测试。
02:02
Java class就叫做。Think to my。同样,整体的测试流程还是类似的exception,接下来先创建当前的流执行环境。Get。得到的叫做ena。同样把当前全局的并行度设为,接下来我们是要读取数据,我们就用之前E条数进来。接下来就应该是stream要at think里边当然是要传入一个think function。那这里。连接器为我们提供的think function又是什么样子呢?其实我们已经能够想到了,之前写入到ES的时候,官方连接器给我们提供的think function就叫做e search function,那写入到的时候把here给我提供的think也叫做。
03:06
Think function啊,那当前既然是JDBC连接嘛,那对应的连接器提供的think function就应该我们自然想到了是JDBC。我们先看一下这个类到底是什么,诶,这个类好像稍微的有一点问题,这个类它并不是一个think function,它没有继承rich think function式或者对应的实现什么接口,而是在里边定义了一个静态的think方法,然后它的返回结果是一个think方式。啊,当然了,这样也是一样的,所以我们现在呢,就是直接g bc think,你有它的一个对象,而是要调用它的think方法。里边需要传入的参数呢,简单来看的话,主要就是这样的三个参数,首先Q。我们想要往JDBC连接的数据库里边去插入数据,那当前你的插入数据的这条CQ到底是什么?我们必须要定义出来。
04:10
然后接下来呢,需要有一个gdbc builder,这个builder的含义指的就是。插入数据的CQ当中,这条语句当中有可能会有一些占位符,有可能会有一些空位,哎,那当前我们显然是在处理数据的过程当中,每来一条数据需要提取相应的字段填入到CQ当中,构建出完整的cqu语句,那所以这里边呢,我们需要把这个statement的语句做一个构造,做一个构建,那在这里面关键其实就是往CQ里边填充对应字段了。接下来呢,还有一个gd bc options,很显然这就是连接相关的配置了,最关键的我们需要把GDBC连接的URL要定义出来,另外可能还需要当前driver的名称啊,当前我们连接MYSQL的时候,我们用的用户名、密码这些都有可能需要在这里做一个对应的配置,所以接下来我们就在代码里边具体实现一下。
05:16
那这里的实现其实也很简单,首先我们想到到底要做一个什么样的数据的插入呢?呃,那我们就还是把当前的user URL单独的提取出来,做一个插入就好了啊。那对于当前的定义呢,我们可以把当前的user作为唯一的key,那也可以就像在ES里插入一样,把每一条数都加在后边写入就可以了。那这个定义呢,关键在于MYQ里边的表我们到底是怎么定义的?啊,那跟我们这里flink代码的逻辑其实就没什么关系了,所以我们这里边要执行的其实就是一句。Insert,所以这里面我们要做的就是insert into。
06:04
首先这里的表名,我们表名还是叫做CS吧。然后接下来啊,那就是插入这张表,这张表里是哪些字段呢?我们当前要的是。User和URL。那另外它的值是什么呢?Values。现在我们还不知道,因为当前构造这个CQ的时候,其实只是相当于只是一个预编译好的语句,预写好的语句,那具体这里面的值应该在具体流处理的过程当中,捕捉到每一个数据的时候,我们才能填这里的值,所以这里应该是占位符。两个问号。这是第一个参数,一条CQ。第二个参数呢,前面看到了是一个。GDB,那这个DC本身又是个什么东西呢?我们发现它是个interface。
07:04
本身是个接口,那这个接口里边我们看到并没有任何的抽象方法呀,啊,那它是继承自。By consumer with exception。这样一个。对应的接口,那这里边的关键就在于要实现一个所谓的accept方法。这里的方法其实。意就是说我们到底要根据当前的数据,传进来的数据要做什么样的操作。我们要做的当然就是填入到。之前的这条insert CQ里面了。所以接下来我们要做的,既然它是单一抽象方法的,只有这样一个抽象方法,哎,它是单一抽象方法。下面这一个unchecked,这本身已经有具体的实现了,这是一个静态方法,那所以针对这样的单一抽象方法接口,我们可以直接用一个拉姆达表达式来实现它具体的处理流程啊,那所以当前的拉姆达表达式其实实现的就是这里的这个accept的方法,那这里的accept方法呢,有两个参数,我们这里面拿到的参数其实第一个。
08:19
就是当前的。就是当前的这一个。CQ,而对应的后边第二个参数呢,就是每一条数据传过来之后的数据的值,所以对于当前这一个定义,我们可以把它定义成。和event。把它定义成这两个,接下来就是处理的流程了,其实也非常简单,我们可以直接拿statement去做一个set string,因为user和UURL都是string嘛,直接set string,然后里边的参数当然就是对应的位置和相应的值,我们首先填第一个位置,第一个位置应该是user,所以我们选取的user做一个填入。
09:05
啊,那后边还应该有statement.set street第二个位置我们提取的是点URL填入。这就是当前的第二个参数。那最后呢,还需要创建一个JDBC连接的配置项,那就直接new一个JD bc connection options。那这里我们会发现gdc options又有一个问题,它的抽象方法不能直接调用啊,所以问题又来了,里边呢,我们是需要用它的build,又是builder设计模式,调用builder.build里边传入各种参数,所以接下来我们的做法是。你一个他的builder,然后下边去。定义一些,比如说我们with URL,这个是最重要的,我们这里面是JDBC连接。后边啊,连接到MYSQL。
10:02
然后接下来啊,我们可以直接比方说就在当前机器上去启动一个啊MYSQL去进行测试,那这里面其实就是local host默认端口3306。我们可以连接一个数据库,Test。连接到test数据库去进行数据的写入访问,那后面呢,我们还可以定义with,可以加上with driver内啊,比如说我们当前的这个driver,那选择com.mycq.JDBC。变driver。也针对不同的MYSQL版本,我们选择的driver可能也不同,我这里边是MYSQL55点几的版本,所以直接就是用MYSQL.gbc了,Username和password的话。我们可以在这里做一个直接声明,直接连接到我们相对应的数据库里面去啊,比方说这里我们的username和password都是。
11:11
Root。啊,这种直接写死在代码里面的方式当然不够安全,我们这里只是做一个测试了,最后再做一个build,就把我们的第三个参数这样一个g bc options定义出来。所有的都定义好了之后,当前的数据就可以直接写入了。最后。Env.execute执行起来。这就是完整的代码逻辑啊,那接下来呢,我们就可以运行测试,当然如果要测试的话,我们首先应该要先启动一个MYSQL的服务啊,那当前本机上已经启动了MYSQL的服务。我们可以直接在这个Windows电脑里边。去访问当前的MYSQL。输入用户名和密码,进入到当前的MYSQL数据库里边来,那当然了,当前的数据库我们要看一下,确实有一个test数据库,那么我们就直接连上它,先看一下里边有没有内容,目前没有任何的内容啊,那对于ES或者red而言,这个不重要,我们直接去创建一个新的索引或者一个新的key就可以了,而对于MYSQL的话。
12:30
这个不行,我们必须首先把对应的表要先创建出来。我们想要的表本身就叫做clicks,里面最关键的字段呢列呢,也就是user和URL2个字段,它们的类型都是string啊,那当然在MYSQL里面就是char了,所以这里边我们直接可以创建这张表,Create table。CS。那接下来我们首先有一个user字段。当前。挖。
13:00
比如说我们只要20个。字符就足够了,加一个脑存档。那另外还有一个字段,URL同样也是这个,可能会长一点,我们给一个100NOT,就是我们的两个字段。那接下来。当然就会有一个clicks这样的一张表了,接下来我们就可以运行代码向它写入数据了。我们运行一下,看一看结果怎么样。当前的数据已经全部写入。那接下来我们就可以。来做一个查询,Select from clicks。我们可以看到当前所有的数据都已经写入进来了,当前我们的写入形式呢,是跟之前在ES里边的保存方式类似,所有的数据来了之后都会追加在后边,而如果说我们在前边把user定义成了当前的primary key的话,那显然当前的写入就会变成了一个更新插入,变成了一个覆盖的方式,就只会保存当前每一个用户最后一次访问的页面U。
14:18
啊,所以这种方式的话,就跟之前写入red的保存形式比较类似了,这就是将数据写入MYSQL的过程。
我来说两句