00:00
我们已经学习了,哎,朝着不同的外部系统去写入啊,卡夫卡、ISS文件系统都已经写过了啊,那有一些数据库,大家看到本身在我们当前一点十的支持里边没有提供这个连接工具,那这种情况怎么办呢?比如说MYSQL啊,大家看到GDPC的这种连接没有直接提供连接器,那我们怎么办呢?难道就不能写了吗?那其实是可以的,这个是完全没有问题的,只不过需要怎么样呢?那就需要自定义一个think function对吧?啊,那所以接下来我们就以这个MYSQL为例,给大家讲一讲怎么样用JDBC的方式啊,自自定义一个think方式,用DBC的方式写入外部系统,那接下来首先我们还是要引入一个依赖啊,大家看到引入的是MySQL connect Java啊,有些同学说这不是连接器吗?对吧,你这都已经引入了,这还这还不能直接用吗?这是我们这个。
01:00
本身MYSQ的一个连接器对吧?就相当于这个客户端,它能够去连接外部系统,连连接到这个MYSQL上面去,去做操作的一个工具而已,并不是flink内部给我们内嵌进去的工具啊,所以我们相当于就把它当成一个客户客户端来用的,对吧?啊,这个还是该引入还得引入啊,要不然你怎么往这个呃MYSQL里边写数呢,所以接下来我们还是在po文件里边先把依赖引入,然后接下来我们用的这个MY用的是5.1啊,大家如果要是有不同的版本的话,都是一样的,然后接下来我们在这个think test的下边还是去新建一个object。接下来我们新建一个object,就叫做JD bc think test,我们是以MYSQL为例啊。同样没方法,里边还是把前面的数据读取和基本转换先放在这儿,先把它转换出来,然后该引入的隐式转换还是要引入。
02:10
最后大家不要忘记还有一个这个execute对吧?要执行起来JD bc think test啊,然后这里边的流程也还是一样,直接at at think不就完了吗?啊,还是一样对吧?不是envr写错了,Data stream去ADD think,然后里边我们要实现的就是一个自定义的think方式,对吧?啊,那这里面我就直接写吧,没什么好说的了,你们不可能到别的地方去找了my gd bc think方式,好把它定义好,接下来我们在下边做一个实现class my gd bc think方式,然后呃,我们这里边实现的就是一个reach think function对吧?哎,大家想一想,为什么这里边我要用rich think function呢?因为rich负函数,大家回忆一下有什么特点,它有运行上下文对吧?另外还有生命周期。哎,现在我既然是要跟外部系统数据库去做连接,所以这个你建立连接的。
03:10
这个操作就不要每来一条数据都去调一次嘛,对吧?所以这里边我们有生命周期,在open和close里边去建立连接,关闭连接就非常重要了啊,这个对于系统的性能是影响很大的,所以接下来我们实现一个reach think function啊,然后里边我们看到它里边必须要实现的一个重写的一个方法叫做invoook,那同样还是传进来的数据,就是当前的数据,那invo大家就知道了啊,那之前我们在解析源码的时候,大家知道就是上层的那个任务,最后调到底层,其实都是一个invocable.invoke嘛啊,所以这里边底层其实也是类似的东西啊,也是一个invoke,那这里边就是每来一条数据之后要去做的事情就都放在这里,那现在我们首先还得建立连接呢啊,那这个建立连接我们得用这个生命周期,所以是把这个open先定义出来,那建立连接的话,大家可能会发现啊,就是我可以就是如果你。
04:10
你在这儿去是声明这个变量,然后建立连接的话,后面就不能用了,对吧,这是这是类边类类里边不同的方法嘛,它的这个作用域不一样,所以我们需要在外边直接先定义啊连接对吧?以及啊另外大家想到为了我们后边这个执行更加有效率,你写入的时候,大家想每一条数据来了之后,你调用一次那个写入命令啊,那是不是写入那个命令基本上都大同小异啊,啊对吧,基本上都是insert into1张表里边,然后什么字段,Value是什么,哎,那关键就是每次来了之后,我们要写入的具体的数据不同而已,其实命令都一样,所以能不能我们预先就把它定义好呢?哎,当然是可以的,我们可以有一个啊,就是预编译的语句,对吧?所以这里边预编译语句和语句和这个连接都在上面,直接把它定义好啊,那因为后边。
05:10
我们要给它付出值嘛,所以这里边我们直接定义一个Y类型对吧,这是一个这个connection,我们直接用抓va.cq就可以,然后一开始大家知道复出值的时候下划线对吧,默认默认初值,同样我可以定义一个insert statement预编译语句啊,这个是一个prepare的statement对吧,直接还是抓va CQ下边的一开始还是下划线啊。另外大家会想到有些情况下我可能是insert,有些情况下我把它定义成就像那个re里边一样,我定义成一个key,一个value对吧?啊就类似于这样的,那假如说我同样的这个传感器,它的温度来了之后呢,我不是在后面追加一条,而是直接把它更新,如果这么定义的话,那应该还有更新操作,对吧?所以我也给更新操作定义一个预编语语句prepare的statement定义出来,好,先放在这里,然后呢,Open生命周期里边就可以去做。
06:10
不值了对吧,这里边就可以直接给值了,这里边我们直接用driver manager.get connection对吧,直接拿到这里边我们直接给一个这个URL啊,JDBC啊,然后这里边我们是MYCQ里边啊,我还是本机直接起啊,就local host的3306,大家可以改成自己的,呃,集群里边啊,你自己的那个默认的访问地址,然后这里边我默认访问的这个数据库,我直接连这个text这个数据库,然后后边我跟上,因为我这里边有密码啊,所以我跟上当前的用户名和密码直接写死在这儿了,当然从安全性角度来讲,这不适合这么写,对吧,你从这个配置文件去写可能好一点,我们这里边只是给大家做示例啊,然后接下来那个预编译语句也可以直接在这里定义出来了,对吧?呃,这里边我们就是直接用connection,已经有connection了嘛,定义它的prepare statement里边我们写一条CQ写进来啊,那这里边这这个CQL。
07:10
字怎么写呢?那就看我们定义的那个表到底叫什么名了嘛啊,比方说我们那张表跟之前的那个呃之之前我们定义的那个red里边定义的差不多,我们也叫这个sensor temp对吧?啊,可以定义这么一张表,或者直接就叫sensor也可以啊,我们直接insert into这个sensor temp啊,然后接下来那就是要有什么样的数据传传入进去了对吧?当前我们要ID以及当前的temperature这个温度值啊,那后边我们要values它具体的值,哎,注意现在预边语句里边我还不知道这个值,不知道不知道怎么办呢?诶问号做占位符对吧,写在这儿就完事了啊,所以这个其实还是比较简单的一种定义啊,然后后边我们这个update同样也把它写出来,Collection,呃,Prepare statement,然后里边我们做一个update update senor。
08:10
啊,然后我们set当前的temp等于什么,对吧?你如果要更新的话,就把当前的那个温度值改了就完事了,然后这里边你还需要有一个查询条件对吧?哎,你说wherer当前的ID等于什么?同样还是一个占位符放在这儿啊,那接下来我们在这个invoke里边,其实就是来了一条数,然后执行这个操作对吧,然后我我们执行呃,这个。预编译的语句就完事了,然后把对应的数据填进去,执行预编译语句就完了啊这里边我们直接上来呢啊,大家可能会想到我是不是先要做一个查询,判断一下它里边有没有呢?啊,其实不需要,我可以直接执行更新操作对吧?啊,因为大家知道更新操作,其实这里边VRID其实就是一个查询嘛啊那我如果要能查到的话就把它更新,查不到的话我再去插入不就完了吗?哎,所以我们先先执行更新操作,查到就更新,哎,这里边我们直接update statement,先把那个该写的数写进去对吧?我们这里边首先更新的时候第一个temp,这是一个double类型,所以我这里边来一个set double啊,那这里边的位置大家看有这个parameter index对吧,位置是当前的第一个,然后后边要把当前value的。
09:38
写进去对吧?啊,然后后边还有一个第二个占位符,当前是ID,对吧?Set string,然后第二个位置把value.id放进去,然后接下来就可以执行了,Update statement execute执行一下好啊,那接下来如果更新没有查到数据,那么就查入对吧?哎,所以这里边相当于有一个这个if else的关系,那刚才我就直接已经执行了,那到底他有没有查到数据,有没有更新成功呢?诶可以用一个用一个这个字段来判断,对吧,我们直接用update statement,它有一个方法叫做get update count,就是你当前到底把它更新了几条啊,因为我们知道你这个查询是有可能更新多条的嘛,假如说一条都没有,那是不是就相当于没有查到啊,对吧?所以这这样的话,我们就做这个插入,那in。
10:38
First还是去set一下,哎,这里大家注意set的时候,它的第一个变成string ID了,对吧?所以我们把刚才的那个刚好改一下啊,顺序刚好改一下value.id然后下边是set double2value.temperature把它这么写,然后insert去做一个执行,所以大家看到其实整体来讲的话就这么简单对吧?就是来一条数据,呃,你首先先建立连接,然后来一条数据,这里边你就把它拆出来,然后该怎么写入怎么写入,但这里边主要的问题在哪里?大家想这还不简单吗?为什么这个官网官方还不给我们做这么一个简单实现呢?啊,其实主要问题就在于你如果想要把它做真正的集成在弗link里边的,实现弗link连接是先是要考虑我们发生故障之后,呃,就是容错性,对吧,发生故障之后恢复,然后还能保证我们的那个状态是一致的,对吧,你不要这个,呃,多写入也不。
11:38
啊,丢掉数据,那这个就比较麻烦了,这个要考虑的事情就会比较多,所以在这里呢,你自己可以去做任何的实现,但是呢,就没有那些保证对吧?但是至少功能我们是可以做到的啊,这就是大家掌握这种自定义的方法之后,剩下的就比较简单了,你什么数据库都可以去建立连接写对吧?那只不过保证不了那个一致性而已啊好,呃,那后边还应该既然有open就应该有close对吧?呃,最后做一些这个收尾的工作,那这收尾工作当然就是把所有的这个啊预语编译语句关掉,然后把这个连接关掉就完事了,对吧?这就是常见的一个利用这个rich function生命周期做的一些操作哦,这是一种就是建立数据库连接,然后做一些收尾,对吧?后面我们还会看到还有另外一种使用rich function的。
12:30
场场合,那就是用它生命周期,用它运行时上下文里边获取到的状态做状态编程,这个我们后面再讲啊好,那接下来我们来做一个测试吧,啊,那同样这个在测之前我们还是要先去把呃,那个MYSQL的话,我这边本来就是起着的啊,然后我先去连一下MYSQL-u root对吧,我因为有密码嘛,所以得先登录一下,然后当前我们看一下database啊,然后有这个test,呃,Test对吧,我们现在是就是要用这个test啊呃,然后我们看一下当前的这个tables啊,Tables里边有一些我做过测试的一些表,但是没有我们现在的那张啊,Sensor temp对吧?啊,所以接下来我们可以啊,不行,你不能直接运行,对吧?因为这是MYSQL嘛,这个表的结构你必须是预定义好的,所以接下来其实是还必须先做一个创建表的操作啊,Create table,我们那个叫三。
13:30
Temp对吧,是叫s temp吧,看一眼确认一下啊,S temp好,然后后边我们要定义它的这个,呃,两个字段,一个是ID叉,我们比方说20吧啊,这个我们not now啊,然后后边再定义一下temp temp这个是一个double脑到。好把它创建出来啊,当然我们现在看这个tables应该就有了,对吧,这里边儿刚创建出来肯定啥数都没有,那接下来我们就运行一下看看。
14:06
呃,我们JDBC啊。哦,这里边我们一开始给它创建,创建成了一个这个SC的class对吧,所以这里边我们要把它改成一个单立对象object对吧,要不然没法运行,你得实例化它,好我们运行一下啊,大家看到这里边我们报错了这个这个主要是因为刚才没有太仔细,这里边temp这个写错了对吧?啊,那写错了,那那没办法,我们重新重新来一遍吧,啊,直接把这个drop掉啊,啊这个sensor temp好,然后我们接下来同样还是把那个create把它创建出来对吧,然后我们ID这个小心点啊,下一句把这个temp是M啊,好,然后把它创建出来,现在再看一眼。手tables,哎,又有了对吧?啊,这个大家仔细的话也可以这个describe一下,看看里边内容应该正常,现在就没什么问题了啊,重新执行一下。
15:08
大家看这边已经运行完毕,好,那接下来我们就来看一下当前这里边的数据到底是什么样子啊,我们select share from sensor camp,好,大家看这个跟之前我们做这个,呃,就是red里边写入的时候一样,对吧?哎,都是这里边直接就是呃用的是最后一次输入的那个数据,覆盖掉了之前的值啊,所以这个其实是完全没有问题的,这在实际应用的当中,其实有一个就像red和MYMYSQL这样的数据库啊,你定义了K之后去复写它的这个,这个呃,值有一个,呃,很好的一个一个应用是什么呢?呃,就是大家之前还记得,就是我们有那个呃,Source test的时候,我们有一个自定义source对吧?哎,我可以直接去创建这样的一个source给大家看一看这个效果,这其实在实际应用的时候还挺常见的啊,就比方说我们这里边把这个定义出来啊,把这个。
16:09
Source引入这里边我们直接定义一个这个stream,后面我们就直接用这个stream了啊,然后接下来这个会有一个什么效果呢?啊,就是不停的去生成这个数据对吧?啊当然了,这里边其实这里啊就不需要去去用这个data stream了,因为本身这里边就是一个一个sensor reading的数据,对吧,已经卖不成这个样例类了啊啊所以然后接下来大家会看到我们这里边源源不断的产生数据,然后那边这个我们这个表里边的数据呢,就会这边啊就会源源不断的更新,对吧?啊所以到时候我们看到的效果就相当于是什么,你如果有一个呃,这里边有有一个大屏,然后你画一个页面,把当前我们的这个温度值直接显示动态显示在那里的话,哎,你就会看到这就是一个实时监控的一个一个应用,对吧?哎,当前它的那个数据实时的在变化,实时在变化,我们其实就是相当于后台在起一个应。
17:09
用不停的读当前这一个MYSQL或者red里边当前这个这个值就完了,对吧,然后你就能看到它里边在不停的更新这个值,好,我们现在来看一眼吧,然后大家看现在已经写进来了,对吧?啊,但是因为这个顺序的话,因为之前我已经有六七十,所以说它这个是放在上面,然后大家不停的刷新的话,就会看到这个值在不停的变化,对不对啊,这这个就是在基于之前的那个基础上不停的做这个更新变化,你如果企鹅应用不停的读取这个值的话,把它显示出来啊,这就是一个动态监控,监控的实时展示啊,这个应用还是比较常见的啊。
我来说两句