00:00
我们再来给大家讲一讲怎么样往red里边去写入数据啊,那red呢,大家看到官方并没有给我们提供向red里边写入数据的think方式,没有提供连接器,那怎么办呢?诶好在第三方的这个平台啊,我们这个把her这一个项目里边给我们提供了一个flink rabbits的连接器啊,所以这个时候我们可以直接把它拿来用,但是呢啊,这个第三方引入就就有这样的不好的一个一个特点就是大家看到它的版本升级更新没有那么,没有那么快,或者说支持没有那么好,就是大家看巴赫这里边呢,没有scla2.12的连接器支版本,只有2.11的连接器支持版本啊,但是呢,呃,就是经过测试之后发现它这个还是能用的啊,就是呃,本身这个2.12.222.11 skyla底层这个变化引入的新特性,这里边没有用到啊,所以这里边还是通用,还是可以用的啊,大家就。
01:00
可以直接把这个flink,呃,Red的connector这个连接器引入进来,它自己的版本呢,是1.0版本,大家这个可以在美上面去下到这个依赖啊,然后我们先把它引入到po文件里边来。还是放在当前的dependency下边。然后接下来大家知道我们就是还是新建一个,呃,在在这个think test下边啊,新建一个单立对象object,当前我们叫做red think test。好,先把它创建出来。然后接下来这个没方法里边啊,其实我们也想到了,这个跟前面基本上都差不多对吧?哎,我直接照着抄就就完事儿了啊呃,这个前面这一部分我们都讲过之后,对大家来讲是一个基本操作,非常简单,所以我们就不一遍一遍再去重复敲了,我这里边就直接把它引入过来,然后呢,这里边注意还是就是有有这个细节大家还是注意一下啊啊当然你运行如果报错的话,大家看那个报错信息应该也能知道它是什么原因啊,这个影视转换要引入,然后现在我们还是就就直接用这个当前的这个文件数据吧,之前卡夫卡已经测过了,我们就不再去重复测试了啊,那把这个读进来之后,大家知道最后还是不要忘记有一个execute对吧,当前是red this think test,然后接下来同样你要往里边写入的话,还是这个data stream,那就得做一个at s里边我们就想到了,哎,那是不是它的连接器就会给。
02:40
给我们提供一个类似于之前卡夫卡大家看到那个弗Li卡夫卡producer那样的一个东西,对吧?呃,这里边给大家多看一眼啊,就是这个卡夫卡producer啊,这个里边它本身是个什么东西呢?大家往上看的话,会会发现它实现了一个to face commit think方式啊,就是所谓的两阶段提交think function式啊,至于这个两阶段提交到底是个什么东西啊,这个我们到后面再讲,这涉及到就是状态一致性的保持了啊,就保证它的这个,呃,容错这这个性能啊,所以我们不管别的,但至少知道它是一个think方式对吧?认识think方式大家只要知道这个就可以了,所以接下来我们其实也是要看当前的这个red连接器里边到底是什么,哪个类给我们实现了think function这样的一个一个接口呢,啊,那所以接下来我们看一眼就是red。
03:40
Think,哎,大家注意啊,就是red think这个我把它引入一下,在这个连接器里边。我们当前的这个类,大家看它就实现了一个rich think function对吧?啊,所以接下来我要要整的就是它了啊,然后接下来我们看到它里边要传两个参数,这两个参数有点奇怪,一个叫做呃,Flink je conflict base啊,它是一个就就这这么一个类型,对吧?啊,但是我们从这个字面上理解也会发现它其实是什么呢?就是一个je连接的一个配置嘛啊,所以这个看起来也比较简单啊。另外还有一个red map,这个map又是个什么东西呢?它比较就是看起来奇特一点,但其实就是说你既然要操作red嘛,往red里面写数嘛,那你得告诉我,你到底要用什么命令往red里面写数,对吧,你到底要把数据怎么样写进去,然后你要写什么数据,你得告诉我,然后我就,诶可以调用这条命令,直接把它往里写了,所以其实这个map就是要定义我们当前的写入的数据是什么。
04:51
啊,然后还有一个就是我想用的写入的命令是什么,就用它来定义的啊,所以接下来我们要实现这么两个东西啊,啊,那这里边本身的类型大家知道,呃,我本身应该是一个sensor reading对吧?然后呢,里边要传两个参数,一个是一个配置项啊,我这个叫做com,我先把它写出来,然后另外一个大家发现那个这个康复可能还稍微的会简单一点,但那个map的话,呃,那肯定是我们得去重新自己new一个这样的类了,对吧,自己去创建一个这样的一个map类,然后我们new它的一个实例,所以这里边我去new一个my red map,诶把它先写到这儿,然后接下来我们就是分别实现了,那这里面的comfort,或者或者你想要去直接去在这儿去拗也可以,但是呢,呃,它稍微有点不方便,因为我们可以直接钓,但是它钓的过程呢,呃,给大家看一看啊,我们。
05:51
定义一个link,呃,前面大家看到这个我们要的是一个jeice con base对吧?那这个je conig base呢,本身大家看到它是一个抽象类,那在这个连接器里边有没有它的一个实现呢?诶,当然是有的,我们这里边想要用到的就是就是一个我先定义这个com啊,把它定义出来就是一个叫做flink jeices,哎,大家看下面有啊,叫做po conflict对吧,连接池的配置这么一个东西,然后这个东西如果大家点进去看的话,会发现它下边有这个构造方法对吧?哎,这个非常悲剧,这个构造方法又是一个private啊,不是protect了,变成一个private私有了,私有化了,那当然我们更不能掉了,对吧?哎,那那怎么样去用呢?诶,不要着急,下面大家看到有一个builder,又是类似的这种用法对不对,哎,你有这个。
06:51
Builder的话,那就相当于在builder里边,我们可以去调用它的公有方法,Public方法去创建一个自己类的实例啊,所以这个构造构造方法私有化,然后再用一个builder去创建实例啊,这就相当于是我们有一个有一个构建的工厂一样,对吧,有一个builder,然后我们看到这里边的这个builder下边它有一个方法跟前面也非常的类似啊,就叫做build啊,如果大家熟悉之后会发现很多这个包啊,很多这个连接工具啊,或者什么样的这个底层实现都是类似的这种方式啊,就是我要做一个给它构造方法做一个私有化,然后呢,呃,定义它下边的一个这个,呃内部类啊,或者说我们叫一个嵌套的子类,这个叫,呃,不是继承关系的一个子类啊,就是嵌套在里边的这个内部类一个build,然后调它的build build方法可以返回一个真正的我们要创建的这个类的实例,对吧?啊,这里边你就把它这些。
07:51
默认的参数就全传进来了,那我们这里边最关键的要什么呢?大家知道不就是host和port嘛,对吧?那至于其他的那些东西,Timeout什么,你都可以用默认的,我们这里边也没有password对吧?哎,直接去连就完事了,所以接下来其实就是要不能直接传,要用它的这个builder,调它的builder里边的build的方法,对吧?哎,那在这个builder基础上,我还可以给它set一些东西,所以我做这个配置到底是在哪配的呢?就在这儿配的,比方说host,我要这个还是本机啊,起一个logo host,然后set port port,大家知道默认的这个的端口我们提起来之后,6379啊,然后接下来调一下它的build方法,这样就把它创建好了。好,这可能稍微有一点奇怪,但是呃,大家应该也能理解是怎么做的,对吧,然后另外还有一个我们要定义一个,就是我们所谓的那个map了,对吧,定义。
08:51
一个呃,Red this map啊,就是这里边我们看到的这个第二个参数red map,那那这里边这个red map我们就单独在这定义了啊,就直接要定义class了,因为本身这一个弗link里边的这个呃,Jeice poor con啊连接其实给我们已经实现了这个类,所以我们直接把它这个调它的build,点点build去new它的这个实例就可以了,而这里边呢,Flink本身并不知道,我就是我们的连接器啊,本身并不知道你要调什么命令,对吧,你这个命令得自己去定义啊,所以说这个类也得我们自己去实现啊,所以这里边我们要做的是啊,就是定义这个my red map,然后去extend extend啊,当前的这个red这里边没有啊,我们写出来red map。
09:49
大家看哎,就是这个东西对吧,然后里边也要有类型,当前是sensor reading,好啊,就是我们接下来要做这件事情对吧?啊,那这里边你看到它必须要实现,诶这里边我们本身的这个这个类啊,把它定义到外边去对吧,因为当前这是在一个一个object里边啊,我们直接把它定义到外边来,然后当前我们想要去,呃,这个my red map想要去实现的就是,哎,不不是不是MY啊。
10:22
Red mapper对吧,就是叫做red mapper,你把这个引入的话,这里边就不报错了啊,这样我们把它放到外边,主要是看的清楚一点,你如果要想把它这个放在里边,其实也是一样的,对吧,你就是按照这个顺序上面这里边定义这个,呃,Comfort comfort,下面来定义我们的这个map,这样也是OK的,就是看大家习惯啊,我的习惯一般情况是还是把这个放在下边,这样这样多一点,那接下来我们看一下必须要实现什么方法呢?Override,对吧,必须要把这几个方法实现,这里边一个叫做get command description command,大家知道是命令的意思嘛,啊,所以这里就是我真的你看要返回一个red command description,这就是真的要去定义这个当前的命令了,定保存数据,写入red的命令,那这里面这个命令该怎么去定义呢?我直接。
11:23
接就用这个red,呃,Command description直接用这个,然后你你会看到啊,在这个类里边,它本身就有,呃,就是你你看到就是这这里边它的这个实现方法啊,它本身就可以传一个red command,然后呢,传一个additional key,传一个这样的参数过来,这个additional key是什么东西呢?哎,这就是假如说我我们之前大家知道在这个red里边,你保存这个数据,保存我们的所有的这个,呃,就是数据,或者说其他的一些数据结构的时候,一般情况下都是一个K,一个value k value,对对吧,但是呢,啊有例外,比方说你像这个哈希表,或者说像这个s set的时候,那它的这个组织结构是不是本身有一个表名,对吧?你像这个哈希表本身,你外边应该有一个表名,这里面我们可能有一个内。
12:23
然后呢,里边又是一对一对的k value对吧?哎,所以说这里边就有一个问题,就是你本身这个,呃,在这个里面保存的时候,你要保存的是key value嘛,但是你还得指定到底是在哪张表里边保存呀,所以这里边的这个它的表名name其实就是这里的这个additional k对吧?啊,就附加的这个K,然后里边具体保存的那是这个K和V,这是我们的数据,而这个是我们预先定义好的那张表表的K啊,那所以接下来如果我们已经定义好,那我们就干脆就按这个哈希表这种方式往里往里塞吧,对吧?啊,那如果哈希表的话,大家知道是应该是h set对不对,H set,然后加上这个additional key,后面就是对应的那个key和value了啊,所以接下来我们要实现这么一个命令啊,就是h set啊,表名对吧,H set。
13:20
呃,然后我们这个加上表名,呃,然后后边是K和value对吧?啊,当然在这个release底层里边,这里边的这个表名其实是叫做K的,对吧?这是我们当前key里边的这张表是它的value啊,所以这个是叫做K,然后里边这个叫做field对吧?Field value啊,这个大家知道什么意思就好啊,所以接下来我这里边要返回,大家知道要返回一个什么呢?啊,其实是不是点啊,是要调用当前。要调用它的构造方法,里边传一个red command。
14:03
我直接把这个写出来,这个也要引入对吧?啊,那这个red command呢,这是一个枚举类型啊,所以里边你就看到air push对吧,Air push啊,这个set啊,这个PFFA这些都有对吧?所以我们当前当然用的就是一个h set了啊,那后面还应该h set嘛,得有一个additional key对吧?我们当前就是要把当前的这个温度保存进去,那比方说这个温度就叫做sensor temp,就叫做当前的一个呃,传感器的温度值,把它做这样的一个处理就可以了啊,这是我们当前能够想到的这样的一种处理方式啊,啊,然后这里边我们把这个。还是?呃,我们看这里边在报错啊,看一下这个引入的是不是有问题,看一眼。Command这个应该是没有问题的啊,看一下他报报什么错。
15:01
啊,Cannot receive,它的这个command description.apply是吧?啊,所以这里我们其实必须得怎么样呢?呃,你这里边如果你直接这么去调用的话,大家会想到你这相当于是skyla里边认为它有一个伴生对象,然后去直接用伴生对象创建了,对吧?但是我们知道它这个当前的这个类本身就是一个抓法类,对吧?当然没有半生对象,没有apply方法了,所以我们应该怎么样new一个这个对象,对吧?还是要把它拗出来用这种方式啊,所以现在我们的代码里边,其实大家发现这个Java风格也非常多,因为很多底层实现都是Java啊,所以就这样去定义一个命令,然后后边呢,呃,这里边我们就是大家注意这是get value from data,那这是指定什么呢?当然是从数据里边指定我们当前的value对吧,然后下面还有指定key get key对不对啊,最后我们要的都是一个string,写入到里边都是string啊,所以这个其实非常简单,我们要写入的其实就是。
16:02
就是当前的温度值嘛,对吧,呃。将呃温度值指定为value,然后另外还有一个就是K的话就是ID了,对吧?将ID指定为K啊,那所以这里边的K直接从从这个sensor reading里边点i.ID啊拿到它本身就是string,不用转换对吧?这个非常简单,诶,这不是sensor reading啊,Data对吧?data.id然后这里边我们直接从data里边拿当前的temperature,它本身是个double,所以还要再调一下to three方法啊,这样的话就搞定了,这样就没问题了,对吧?啊,那这里边我们调用这个把它写入进去,好,那接下来我们来测试一下,当然测试之前首先我得去起一个这个red啊,我这里直接就在本地啊,默认直接起一个,然后我们看一下这个,用这个客户端连接上去看一眼当前。
17:06
看一下哦,没东西啊,没东西,那我们直接来运行,看看能不能正常写入。好,我们看一下当前已经执行完毕,哎,那我们见证一下这个red里边有东西没有对吧?呃,看一下这个case啊,诶,果然多了一个sensor ta,那我们现在要看里边东西,那h get对吧?啊,那这里面h get的话,我们这里边你还得知道具体的哪一个啊,那你要看的话,比方说sensor sensor temp,你还得看这个比方说SENSOR1对吧,这样可以拿到它当前的值是30.9,那或者我们直接看所有的吧,H get all sensor camp对吧?哎,大家看这个所有的直接都放在里面了,然后我们看到我们写入数据的时候,这里边的数据其实是,哦,这个是out啊,其实是有这么多条的,但最后呢,真正写进这张表的只有四条数据,那这里边341的别的数据怎么怎么样了呢?诶,这里边因为我们指定了K这样的写入,那大家知道是不是相当于我们后边输入进来的数据就都已经被覆盖了,对吧?哎,都是。
18:13
是在之前的那个基础上一个一个覆盖了,所以我们最后保留的就是最后一条三一的数据啊,这是比较典型的,就是相当于我们写入的时候啊,如果有重复数据,相当于会直接覆盖啊,这是写入到的场景。
我来说两句