00:00
我们已经了解了怎么样将flink处理的结果数据写入到卡夫卡当中,其实在平常的实际应用当中呢,最常用的就是将数据写入到卡夫卡以及写入到HDFS分布式的文件系统当中啊,那所以掌握了这两样,我们基本上已经能够应对工作当中的绝大多数需求了。当然了在实际应用的过程当中,可能还会涉及到连接其他的外部系统,比如说诶,我们常见的啊,在项目当中经常会用到的一个组件就是red,我们知道red是一个开源的内存式的数据存储,我们可以认为它是一个内存级的数据库,它提供的数据结构也非常的丰富啊,像string啊,哈希啊,列表集合位图等等啊,这些它都是支持的,而且因为它是内存式的嘛,运行速度非常快,所以我们在实际应用过程当中往往会把它用作一个分布式的缓存,有时候也可以作为消息代理。
01:00
啊,那在很多项目当中都有他的身影。那对于这样一个工具呢,Flink并没有直接提供官方的连接器,不过呢,我们记得还有另外一个项目叫做阿帕奇啊,它给我们提供了助攻,有对应的连接工具可以让我们使用flink red connector啊,但是它的版本呢,稍微会比较落后一点,我们可以看到啊,目前能引入的最新的版本就是1.0版本的连接器,而且后面我们看到它也有一个对应scalela的版本,它只支持到2.11。不过呢,我们当前啊,SC2.11这个版本跟2.12的区别,并不会影响到我们的测试结果啊,所以啊,我们这里边就直接引入这个依赖是完全没有问题的,那如果在实际的生产项目当中,当然我们还是要用匹配的版本去进行运行啊,所以接下来我们先把这个依赖dependency先copy到po文件下边。
02:01
我们把它复制过来。好,引入之后,接下来我们可以看一下dependency里边已经多了一个BA here下边的flink connector,那接下来我们就可以去写代码进行一个测试了,所以同样还是在当前的包下边创建一个。Ske object,当前我们叫做think to。Test。没方法先写出来啊,那前面的流程的话,其实跟卡夫卡也是类似的啊,我们先把这个执行环境和全局的并行度先设置好。下划线引入,然后接下来我们要去读取数据,这里的数据源呢,我们可以稍微简单一点,直接。ADD source,你有一个click source,用我们自定义的测试数据源就可以了啊,那这里我们可以把得到的数据流叫做stream,然后接下来呢?啊,那其实这里边我们就不用做更多的转换了,直接把它向里边去写入就可以了,要调用的方法当然就是。
03:07
ADD think,然后里边是由把here给我们提供的连接器实现的一个think function,那这个think function叫什么名字呢?它就叫做read think啊,我们可以看到read think。后边跟着的是当前要写入的数据类型是event啊,那我们可能会想到,诶,那这个是不是涉及到序列化的问题,我们需要对它的这个数据结构进行一个解析呢?诶,没关系,因为我们知道里主要的存储方式就是key value键值对嘛,那所以这里边我们在写入的时候可以直接从event里边去提取字段,然后指定当前的键和值是什么,写入到release当中就可以了啊,所以我们这个就不需要再去把它做to string这样的一个转换了啊。然后我们可以关注一下ready think本身它的构造方法里边应该传入什么东西,我们可以看到它的构造方法里边要传入的两个参数,一个它要传入一个flink jeice con base,很显然这是我们要连接到red集群的这样一个配置项啊,那这个con base我们点进去的话会发现这是一个抽象类啊,那有没有对应的具体实现呢?如果说我们直接点到源码里面的话就会发现啊,下边有一个flink jellice to conig,这是一个jell里斯连接池的一个配置类啊,那我们看到它其实就实现了对应的这个flink jell这个base啊,那这里边我们干脆啊在上边先把这个配置项先写出来吧,我们。
04:43
创建一个D连接的配置项。所以这里面我们其实要创建的这个config,它是一个flink jaice po config啊,它实现了这个conig base,然后我们点进去看一看,这个类到底它又怎么样去创建呢?啊,那我们看到它的构造方法是私有的啊,那这个就比较麻烦了,很显然它又用到了设计模式,我们看一看下面又有什么东西呢?哦,它是有一个builder,这又是一个建造者模式,所以呢,我们是需要去使用它的builder类,调用builder类里边啊,我们看到可以去set各种各样的属性啊,Set host set max total max idol啊呃,所有的这些全部设好了之后。
05:31
最后我们调用一个build的方法,把当前的类的对象创建出来,所以接下来的这个过程基本上都是一致的啊,我们要先去调用它里边的builder这个静态类,然后set对应的各种各样的参数啊,那比方说这里边我们最重要的当然就是host了啊,这里边集群,比方说还是放在102上啊,我们这里就不搭集群了,只要有一个单节点起起来就可以了,做一个测试还是哈杜102啊,那不配端口,那默认就是6379,然后接下来呢,来一个build,把它创建出来就可以了。
06:10
当前的配置项已经创建好了,那接下来就把它填到下边s function的第一个参数位置,然后我们看到它后边还有第二个参数,第二个参数叫red map。哎,这看起来像一个像一个映射关系的指定,这到底是个什么东西呢?呃,其实点进去的话,我们就会发现啊,这是一个接口,它本身里边要实现的有这么几个方法,一个是get command description,因为我们知道啊,像red这样一个建值数据库里边去写入信息的话,那其实是要执行red的底层命令的,所以这里边呢,我们得指定到底是以什么样的命令去执行写入啊啊,那当前我们可以设想一下。我们到底要保存什么数据,要把它保存成什么样子啊,那比如说现在我们这个既然都是用户的点击一的这个事件嘛,那我们最后保存最核心的数据,其实不就是一个user,然后一个他点击的URL的这个信息吗?啊比如说我们现在就只想保存这两个信息。
07:17
那么就可以以user为K,以URL为value,构建一个建值对去进行保存,那当然了,当前我们的用户有很多,所以我们最后保存的呢,其实这样一个键值对的表格,哎,那这这就是所谓的哈希表啊,啊在。里边就有这样的一个对应的数据结构,就叫做哈希,那么对于哈希表的数据写入的命令,那当然就是h set,哎,所以我们应该要把这个h set命令啊,这样一个命令描述要写在map里边,利用get command的description这个方法去进行指定,最后要返回一个当前命令的描述。
08:02
然后后面呢,我们看到还有get key from data get value from data,这不就是指定我们当前写入数据时候的key和value吗?哎,我们要提取,提取的K是当前的user value是当前的URL啊,所以了解了这些的话,接下来我们就是要实现这个map接口啊。所以这里边我们。实现。Red map接口自定义一个类来实现吧,Class,比方说我们就叫做MY。Map。啊,那当然了,本身这个类啊,我们引入,引入之后它应该是要有泛型的,既然要往里面写入嘛,当然就是本身数据类型是什么样子,这里的泛型就是什么样,我们当前写入的类型是event。这个样例类类型啊,还方便我们后边去提取它里边的属性字段,所以根本不需要把它to string转换啊啊,那接下来我们看一下必须要实现的有三个方法,首先是当前的命令,那在我们这里边肯定就是要new一个这个red command description了。
09:16
Red description,然后我们看里边要传入什么样的参数去构建它呢?哎,整体来看的话,其实就是给一个red命令,到底用什么命令去写,然后接下来还有一个附加的K,诶,这个对于我们h set来讲是非常重要的,因为我们知道啊,如果要写这个哈希表的话,首先是h set,然后我们还得指定当前的表的名称。Table的名称,然后后边才是K和value啊,所以接下来我们这个命令里边应该直接指定h set以及当前的表名,那至于每一次调用的时候,我们跟当前的数据有关嘛,提取出来的T和value不同,然后再补充在后边就可以了,哎,所以在当前的命令里边,最关键的是指定前面的命令,写入命令以及附加K就是我们当前的表面啊,所以这里的话传入的信息那就是一个。
10:17
Radi command。啊,这里我们可以直接把这个类引入,然后接下来我们会发现啊,它其实有各种各样的选择,本身这个red它就是一个枚举类型啊,所以接下来我们要选的当然就是h set。然后第二个参数表明,我们就直接定义成。Clicks就可以了。这就是我们所谓的写入命令的描述,然后get key的话,哎,这个也非常简单,那就直接从当前的T里边去提取user出来就可以了,那接下来的value就从T里边去提取URL就可以了。这就是我们整个处理的一个过程,那最后的话,这里我们就是new,一个当前自定义的red。
11:06
这就完成了整个写入过程。最后补充EV。Executive执行起来,这就是完整的代码实现,所以整体来讲还是比较简单的,就是引入对应的连接器,然后按照它的构造方法要求的参数我们依次填入就可以了。好,那接下来我们就可以去运行来做一个测试了,当然了,在测试之前我们应该要先到哈杜102上去把服务要提起来啊,那当前呢,我们就是一个单节点,所以我们直接到哈度标零二上。找到当前的目录啊,然后我们直接将red server运行起来。跟上red.com配置文件好,接下来之后,接下来我们就可以运行代码二,把数据click source里随机生成的数据写入到red当中。
12:01
当然这个运行起来之后,因为我们在控制台没有任何的输出,所以可能我们会看不清楚当前的运行的状态啊,不知道到底是怎么回事儿,如果大家希望看到更清楚的话,我们也可以把这个stream做一个打印,就每来一条就在这里打印一条啊,然后接下来如果说我们想看到数据怎么办呢?那当然就得到red里边去找数据了,哎,那这个时候我们可以新建一个tab。然后我们在这里可以直接打开red的客户端啊,我们看一下当前的状态到底是什么样子啊,比方说我们首先要获取哈希表里面的信息的话,那我们应该是h get,当前的K是clicks,比方说我们知道有Mary这个用户,我们看一下哦,Mary这个用户对应的值,他当前的访问的URL是ID为三的商品的详情页啊,那当然了,我们也可以查看其他用户当前保存的数据啊,那Bob这个用户他访问的URL是favorite啊,是当前的收藏家。
13:04
那如果说我们想看到当前所有的数据的话,那就应该用h get on click。我们可以看到当前Mary Alice Bob ky4个用户,所有的用户都有他们访问的URL的信息,那如果说我们频繁的去做这样的一个查看的话,会发现啊,每个用户他当前保存的这个value是在不停变化,这就是因为们当前是一个键一个值嘛,而且数据在不停的生成,我们当前不停的有新的数据进来,那么新的数据就会覆盖之前的数据,所以我们这里边显示的其实就是每个用户当前最近一次访问到的URL页面。那我们自然就想到,如果连接到其他的一些外部系统的话,我们可以直接给一个监控屏幕,诶,直接在这里显示出来当前每一个用户啊,Mary访问的URL是什么,Bob访问的URL是什么?哎,那这样一一对应,一目了然,就可以看到当前最新的实时状态啊,所以在大数据的这个监控领域,其实应用也是非常的广泛的。
14:15
这就是关于写入数据到red的用法。
我来说两句