00:00
接下来我们来用一下JDBC的connect,那么就以my circle为例啊,那要用这个connect呢,我们首先。要写买车口,得导入一个MYSQ的连接依赖。来,我拷贝。在泡键里面添加这个买车口,连接依赖。导入好,我们先来看一下官网啊呃,点击一下这个JD bc connector。刚才我们提过flink官方有一个JDBC连接器,但我们现在要用什么STEM提供的连接器,为什么官方的只能保证至少一次,所以它呢,基于两段提交,实现了一个精准一次啊精准一次。那同样我们在配置文件,指定一些参数啊,有什么呢?你看JDBC开头啊,然后有一个语义指定为有一次,也可以是至少一次或者不指定啊,接下来是MYSQL的用户名密码啊,驱动名称。
01:01
啊,还有一些配置,连接池的配置,这几个是不是连接啊。还有URL地址。那下面有每个参数的介绍啊,那CDBC呢,它是支持读也支持写。但是读呢,我们还是建议用flink CDC啊,因为它是以P的方式啊,P的方式它并呃不会说很实时。它可以说是伪事实啊,那所以一般如果我们是要读取my circle的实时变更数据,还是建议使用听blo那种方式啊,它不是啊,它不是。那我们就来用一用它的那个sin写法吧,啊,Think写法。来吧,我们开始写吧。那比如说我从卡夫卡里面读,我再拷贝一个啊。My circle think。
02:02
DEMO。改个名字吧。啊,不要这个二呢。那这个不要了。前面还是一样啊,前面还是一样啊,初始化环境啊,然后从搞搞图。那我这边呢。就不要打印了,我返回。那接下来我们直接另用一个JDBC,呃,有S也有S啊。那注意导包啊,我们选择的是什么呢。下面第二个是最差的,然后呢是一个Java版本。好,那这边要指定一个泛型,泛型是什么呢?我们通过Java代码去操作,比如说操作MY。MYS狗是一张表啊,它有很多字段呢,那代码里怎么跟它体现一一对应?我们是不是通常会定义一个九?
03:08
对吧,啊,就定一个实体内呗,跟表对应的实体内啊,那举个例子,我这边拿一个吧,啊,我就不做什么很复杂的包管理了,我就直接在这创建啊,比如说来一个water sensor啊,Water sensor呢,比如说有这么几个字段啊,这个我们随便定义啊,比如说有一个string的啊,Ad啊,比如说有一个long类型的,比如说有一个。呃,Int类型的VC啊,然后是不是还有一个构造器啊对吧。我们套入一个构造器啊,首先要来一个空参的啊,再来一个所有参数的。还要get set?
04:04
再来一个土军。差不多了吧,啊,那这个简单的我们就定义好了。其他关了。那这里放行,我们就可以指定我写入一个water center啊实体内,那里面的我们要传什么呢。一个上下文对吧,Contest。那这就完了吗?还没完啊,哎,我们还要继续点。呃,它可以指定一个。就是我如以什么方式去写进去,我可以指定只写入某些字段。对吧,也就是说我们是不是要写一个insert into这么一个SQL语法对吧,那里面的CTRL加P看一下。还有一个circle from function啊,我们就new给它就行了,Circle from function啊,我们实现它就可以了,这边返回一个字符串啊,也就是返回一个要执行插入的circle就可以了,那你这边可以写什么呢?
05:09
啊,正常来讲我们是这么写,Insert into my circle的一个库名。里面的一张表啊,那库名我们前面已经指定过了,你看我这边是用了一个test库啊。那给大家看一下,我这里应该是有一张表叫sensor,跟这个实体内是对应的。呃,他的唯一。啊,这个密码。密码我改过啊。有一个test库啊,里面有一张生成表,它的字段也是什么idts VC啊,那现在只有两行数据。好,那接下来就知道怎么写了吧,就写my circle的语circle语法嘛,Insert into,呃,Center啊,你可以去指定字段,比如说idts VC啊,你可以只插入一个,也可以插入两个随便写啊,然后呢,比如说。
06:10
那问题就来这边怎么写?你按照我们正常规,是不是把这个东西,呃解析一下。对吧?啊,比如说你把它点get idd传进来啊,然后再逗号,然后再把获取TS,然后再传进来,但这样比较麻烦,对吧,那在录服我们这边好像又写不了啊,但是我们可以借助stream的一个工具类啊。哎,我们怎么写呢。啊。点format。那第一个呢,就传入我们这个。把它换个行啊,然后呢,逗号那这边怎么写呢?呃,百分号S。代表什么字符串啊,那么字符串一定要注意,要用单引号把它引起来,或者双引号,但是双引号你就得加一个转义符了,所以单引号方便一点啊,然后百分号D代表的是数值类型的。
07:14
啊,把它到D代表数字类型啊,这个是是dream啊,然后这个是弄这个是int对吧,那我就这么写啊,然后呢,你后面传参哎,第一个就对应。第一个变量,第二个就对应第二个,第三个呢就对应第三个,就这么来写就对了啊,那么就是water center.get ID逗号water ss.get ts water sensor.get VC啊。那这就完事了呗。对吧,那这样它就会返回一句一一行,呃,一个circle啊一个circle,那在接下来呢,点think。里边要传一个data string是什么呢?数据来源,也就是说我是从哪个流里边要去做think think出去啊,也就think的来源啊,来源啊,那我们就把卡夫卡DS传进来。
08:12
那这边呢,我看一下啊,类型还不行,为什么呢?因为它要求是什么啊,Stream类型啊stream类型。那我们就把它转成一个string就行了呗。啊,不是把它转成一个water s不就OK了吗?啊。那我们来改一下吧,呃。这样一个啊。逗号。然后呢,Return,利用一个water center对吧,呃,Spread。
09:06
零啊,这不串没问题啊,SP的一但它是一个弄类型啊,转一下弄点pass弄。再来一个。我直接拷贝。最后一个是int对吧,呃,Pass int。啊,不是论文啊,那还不如自己写,Indier than pass in。那这个类型改一下water。三次。好,那就对应起来了嘛。好,这样就可以了。那我们看一下啊,这个是卡不卡一,然后S1。看一下啊啊卡不卡一这个集群S1这个topic啊,然后这个地址没问题好OK。
10:06
执行。肯定报错,为什么呢,没有指定一个什么杠杆空啊杠杆空。这是因为我们在idea运行才需要啊,如果我们部署提交的话就不用了。来点RA编辑配置啊,然后呢,在这里呢,杠杠空啊把它指定上来。好运行。现在就不报错了对吧?呃,那么刚才是哪一个topic s1对吧啊一。啊,当前就是S1,那我们直接就卡不卡生产数据,然后观察一下my circle口就可以了啊好,那我们来呃,来个S4逗号四四回车,那如果买circle能看到S4这条数据,那说明。
11:02
通的啊,刷新你看进来了吧啊。再玩一条吧,S5。刷新哎,你看又进来了吧,啊,那么大家别忘了啊。我们的这个string差的这个jdbc connect,还有一些参数。啊往上翻啊什么呢,啊,最最重要的我觉得是这一点啊。语义级别你可以指定为精准一次啊,还有一些性能效率上的一些连接池的配置啊,也可以去改。这就是他提供我们的一个方便的API。
我来说两句