00:00
接下来我们来介绍一下怎么样将数据输出写入到中。我们都知道是一个基于内存的数据存储,那既然是基于内存嘛,最主要的特点当然就是运行速度快了,所以在实际项目当中往往是必不可少的。我们往往可以把它用来作为。快速访问的数据库,或者是用作缓存,另外也可以作为消息代理,那当中呢,提供了特别丰富的数据类型,比如说字符串,哈希表,列表,集合排序集合位图等等等等,那其实我们知道它里边既然是基于内存本质的存储,都可以认为是key value这样的一个存储结构啊,那所以呢,我们写入到red的数据呢,一般也是比较有特点的,就是往往不会把大量的类似于日志数据。灌入到当中,而只是一个key,一个value,就相当于更新对应的K的值就可以了。
01:02
那对于flink而言呢,它并没有直接给我们提供官方的red连接器,不过在bar here这个项目里边为我们提供了辅助的连接器啊,那所以接下来我们可以引入BA here项目当中为我们提的flink connector。那后面跟着的是当前skyla的版本,而这里边连接器的版本呢,目前只更新到1.0,这个版本呢,可能略显落后,不过一般情况下我们的测试其实没有涉及到,比方说。的版本,或者说当前这个连接器版本对它的要求其实并不高,我们只要使用里边的核心组件就可以了啊,那正常情况下呢,我们直接引入是没有问题的,所以接下来在泡文件当中首先引入。RA的相关依赖。把它引入之后,那接下来要做测试呢,其实原理跟之前也基本上是一致的,也是直接think传入一个把here给我们提供的连接器当中实现的think方式就可以了,所以接下来我们看一看这部分代码怎么样去进行测试。
02:16
接下来我们创建一个新的测试类,叫做。Think to。整体的测试流程还是一样的。首先我们先exception,然后创建一个流的执行环境。叫做烟薇。同样不是正确性,先把并行度全局设为一,接下来是读取数据源,诶,这里我们会发现,因为写入到red的数据呢,往往都是一个key一个value。那所以当前如果我们还用之前的测试数据。当前的这样几条特殊数据,那最后呢,可能往往就是诶,Mary对应的一个访问,Bob对应的一个访问,Alice对应一个访问,后面的数据会覆盖掉前面的数据,就假如说我们以user作为key的话啊,那很显然是这样的一个过程,所以后面的这些内容其实就都不重要了,那我们也不需要用。
03:17
自定义各种时间,各种访问URL这些数据,去观察他们的一些特别的行为,我们直接使用随机生成数据的click可以了啊。所以之前我们。自定义出来的数据源,测试数据源现在终于派上用场了,那这里就直接因为点source new一个click。放在这儿,我们得到的就是基本的stream,然后接下来呢,我们就需要把相关的内容做一个简单的处理,写入到red当中,当然了,我们不做任何处理也行,因为这已经是event类型了嘛,也不用转换了,所以直接根据event里边的信息把它做一些选择,写入到当中就可以了。
04:06
这里面的关键在于。我们要调用一个。ADD think方法里边要传一个think function啊,那很明显这个是连接器帮我们实现的,那这个对应的think叫什么名呢?我们可以看到,它本身就叫做rain。哎,我们可以先把它创建出来,然后点进去,发现当前的think果然就继承自rich think方式,要的就是它,而且可以直接调它的public构造方法啊,那这个就非常的直接,非常的明确。而里边呢?需要传入两个参数,一个是flink jeice con base,一看这个名称就知道jeice con,那很显然就是要创建到redis集群的连接嘛,啊,这里边要做一些配置项,另外呢,还有一个red map,这个red map是什么东西呢?简单来讲,这就是需要去指定我们当前用什么样的命令去处理数据,将数据写入到当中。
05:18
另外就是这个命令里边我们要提取的数据key value到底是什么,在这里边都应该明确的定义。所以关键我们要实现这两个东西。呃,那这里的话很明显con是比较简单的,我们可以先创建一个。到的连接。其实就是里斯。连接配置。这里首先需要去new一个flink,诶,这里我们会看到。需要的是jealice con base,哎,那这里我们可以直接调用的呢,可以直接调用jeice conig调用它的build方法。
06:02
我们只要把对应的这个引入就可以了,那这个可以可以作为这里的参数传入吗?点进去之后我们发现没有问题,它就继承自link。所以这种方式是没有问题的,那点进去继续观察,又会发现这里面它的构造方法是私有的。很显然,这里又有设计模式出现了,Builder设计模式,我们要调用的是它的builder点。Build方法啊,那里边呢,当然可以去set各种各样的参数,我们这里最关键的当然就是host内容啊,啊,所以关键的是host和port port的话我们可以用默认的6379啊,那如果是当前使用的哈102作为red集群的服务器的话,我们需要把这个host改成杜甫102。接下来我们。当前的配置做一个具体的定义,那builder接下来我们做一个set。
07:03
传入HADOOP102。其他的不用再配了,都用默认就行。接下来一个build得到的是一个。那下边呢,我们就需要把configig做一个传入,另外还要去传入一个red,这个red map相对来讲就会复杂一点,那我们会发现这个red map呢,它本身是一个interface。是一个接口,哎,那所以当前的这个接口怎么样去实现呢?呃,我们可以直接自己用一个类来实现这个接口。自定义类实现。Red map接口。那么这里就public static class。我们叫做MY。Implement RA。
08:00
里边需要有泛型,泛型当然就是我们当前所处理的数据类型了。现在是。所以直接把event传入。那上面呢,我们直接就一个my red就可以了。整个流程上面还要Env.Q执行起来。关键就在于实现它。里边有必须要去实现的抽象方法,这个接口里面的抽象方法主要有哪些呢?我们看到上面有一个get command。所以这个方法主要就是要返回一个当前red操作命令的描述,哎,那所以这里边其实就是要拗一个。Red command description,那这里边它传入什么信息呢?啊,这边可以直接传入一个red command,一个命令,一个操作啊,那当然了,也可以后面再追加一个string类型的字段,这就是表示我们当前做的这个操作,有可能还要传入一个额外的K,额外的参数。
09:08
那当前我们想要做的是什么操作?很显然就是想要把当前每一个用户的访问事件这个数据写入到里面保存起来,而且我们是针对每一个用户去写入。相当于。同一个用户的数据就要不停的更新啊,那我们当然是基于U的去更新了,所以后面我们会看到这里边有key和value的对应定义啊,我们这里边要的就是。Get key from当前的K到底是什么?我们直接把它定义成。User就可以了,那后面呢,还需要定义当前的到底是什么,我们这里面默认的返回都应该是一个string,这样才方便写入到red。那这里边我们可以直接就把URL提取出来做返回就行了,所以最后我们保存的其实就是当前用户最后一次,最近一次访问的页面地址。
10:04
那这里面就是data.url。把这些都定义好。那当前的这个命令又应该怎么写入呢?哎,那很很明显啊,如果说我们当前直接每一个用户都是一个P,那么我们就可以直接把对应的这个p put进去就可以了。但是我们肯定还是希望把所有用户的访问信息都保存在一张表里边,所以接下来我们要操作的其实是一个哈希表,是一个哈希啊,那这里面我们的要写入哈希表的话,显然我们就应该是red command点。H set啊,就是h set这样一个操作,就是像里边一张哈希表去写入的命令,那另外呢,当然还需要一个额外的key,就是当前哈希表的名称,哪张表去写入。我们的key和value都有了表名应该追加在后边作为第二个参数。这里的表名我们就叫做evens,或者叫做CS吧。
11:08
用户的点击访问事件表。这样就定义好,所以整体来讲还是非常简单的,只要按照接口的定义,把所有的内容都实现传入就可以了。那接下来呢,我们当然就可以去运行一下去进行测试了,那首先呢,当然是需要在哈杜102里边去启动一个的服务了,我们首先先进入到。的目录下边去,然后。运行radi。So。对应的,我们要指定当前的red配置文件,先把它运行起来,接下来有了red服务器,我们就可以运行代码进行写入测试了。
12:00
我们可以先等待一下,等待当前数据的写入。啊,那运行起来之后,因为我们当前是一个click s,很明显当前是不会直接停止的,因为我们是无限循环嘛,诶那当前我们就得看一看里边到底有数据了吗。接下来我们可以到这边哪新重新去创建一个。Redis的命令行控制台啊,那这样的话我们就可以看到相关的信息了,接下来我们首先是要进入到RA的C下面去。然后接下来我们就可以查询。当前,诶,果然多了一个clicks这样的key,这就是当前我们要写入用户访问事件的那张哈希表。那接下来如果说我们想要查看里边所有的数据的话,那当然可以直接执行h get。
13:03
啊,接下来我们想要的就是对应的K,那当然就是了。我们可以看到,当前爱丽最后一次访问是ID为十的这个商品的页面,访问的是他,那么最后一次访问的呢,是商品ID为100的这个商品的页面。Carry访问的是,Carb访问的是啊,当然了,如果说我们不停的去获取当前最近一次访问的话,我们会发现这个数据是在不停改变,不停更新的,如果我们实时的把这个数据做一个输出去进行监控的话,就可以看到当前每一个用户实时的最近一次访问的变化。那这样的一种处理呢?在很多实时监控的场景下,往往会有非常的应用。这就是把数据写入到red的测试过程。
我来说两句