00:00
接下来我们再来介绍一下怎样将数据输出写入到elastic search当中。Elastic search有时候我们简称它叫ES,我们都知道它是一个分布式的开源搜索分析引擎,那它的最大的特点其实就是它的API是rest风格的,我们每次向ES集群服务器去发送一条请求命令的时候,其实就是提交了一个HTTP请求,所以它使用起来会非常简,非常的方便。而且呢,它的查询速度非常的快,支持各种各样的复杂查询,所以在大数据的应用领域也是非常的常见,那接下来我们就来看一看flink怎么样把数据写入到ES当中啊。这个E呢,其实支持是非趁的,Flink官方提供了相关的连接器,那么当前flink1.3已经支持当前最新版本的elastic search啊。所以如果我们是。
01:00
只用EL search7的话,那么我们引入的就是flink connector elastic search7后面是SC的版本,下边是flink的版本。来了,如果我们的ES版本比较低,是ES6的话,那这里只要把七改成六也就可以了啊,我们首先把相关的连接器依赖引入。放到文件下去。呃,这里因为我安装的ES本身是ES6,所以呢,我们把这个elastic search6做一个具体的引入,然后接下来就可以写相关的测试代码了,那整体来讲测试的流程跟之前还是非常的类似的,我们可以创建一个新的测试Java类。Think to ES。整个的处理流程跟向RA里面写入显然是非常类似的。所以我们干脆。把当前的数据直接放在这里。
02:01
那么这里呢,我们可以调整一下输出的数据结构,之前写入到reddi里边是一个P,一个v k value的形式,那写入到ES呢,很显然我们可以把当前数据里边的所有字段都保存在,包括时间戳,那这个的话就不要定义成k value的形式了,而且我们也不要直接用click s了,因为我们要把数据所有的都列举下来,那就不会去覆盖更新,应该把所有的数据都要保存,那那我们还是使用之前比较少的测试数据。拿来做一个输出吧。把当前的数据源换成from elements。最终得到的string去写入ES中。肯定当前就是还是要ADD think,然后传入一个s function了啊,这里面之前的think function叫做red think,那我们自然想到了ES的think function是不是就叫ES think呢?诶,所以我们直接写elastic search think,诶我们可以看到啊,当前的elastic search think,果然他继承自elastic search think base。
03:13
果然,它就是一个rich think function,所以这就是我们想要找的think function填在这儿是对的。但是呢,当前的elastic search think又有一个问题,它的构造方法是私有的。那这个问题就又来了,很显然接下来又有builder,这又是builder设计模式,所以我们要调用的是builder.build方法啊,那所以为了简单起见,干脆我们就不要去拗一个elastic search s了,我们要拗的其实是它的一个。Builder,然后后面要调用它的点的方法。所以这里面我们要看当前的这个builder,它的构造方法里边需要传入什么样的参数呢?这里面我们可以看到要传入的又是两个参数,一个是。
04:02
Http host类型的list。一个主机名的列表啊,那当然了,当前ES集群主要就是我们需要把所有的host name要传进来嘛,那另外还有一个就是当前的elastic search s方式,诶,那这个看起来这是不是又是一个think方式呢?并不是,它只是单独定义出来的一个函数,一个方式而已,主要用来具体实现我们写入数据到ES当中的具体逻辑。啊,所以接下来我们其实这个HP host比较简单,关键就是实现相关的elastic search方式。首先我们先把HP定义出来,上面就是创建一个接我要。定义hosts的列表,所以当前我们可以一个a list。那里边的数据类型应该是http host。
05:03
这里我们直接引入的就是阿帕奇HTP下的HTP。把它先创建出一个列表,然后接下来当然就是需要在里边去添加内容了,Http hosts ADD,一个HTP,你有一个HT,那里边的数呢,最简单的当然就是入。传入一个host name,再传入一个port就可以了啊,那后面我们还可以呃,相关的传一个STEM之类的东西啊,这里面最简单的就是直接传入。HADOOP102。后面是默认的端口,当然就是9200,把它添加进去,当前这个list我们就定义好了,所以第一个参数就是当前的HTPS。后边的第二个参数比较麻烦一点,我们要定义的是一个elastic search function,所以接下来我们直接在这里做一个定义吧。
06:03
定义。Elastic search。Think function。我们可以直接一个elastic search function里边相应的泛型肯定就是当前要处理的事件类型啊,Event。里边我们会看到search think function本身是一个接口,而且我们会发现它里边其实默认就有生命周期的open close方法。最关键的是要实现一个process方法啊,那这个看起来其实就是要处理当前的一个数据嘛,所以是process啊,那这里面直接去。实现里面最关键的方法process就可以了,那生命周期的话,如果说我们想要单独去做一些初始化或者清理的工作的话,也可以在open或者close里面去进行处理,我们这里的只要实现process就可以实现相关的功能,所以这里面呢,首先我们应该看到它的参数有三个,一个叫做。
07:11
Event类型的element,那显然就是处理每一个元素嘛,这就是当前的数据。然后有一个运行时上下文,它可以获取到很多更多的信息。最后还有一个request indexor。所以我们知道想要连接ES集群去发送请求,去执行命令写入数据,那用什么去发送请求呢?就用这里的request index,这个indexor只要在里边调用它的爱的方法,就可以传入一个request index request,我们把这个request构建好之后,传入进去就可以发送给ES集群,执行我们想要的写入操作。啊,那接下来我们关键就是怎么样去定义要写入的数据了,那这里面我们写入的数据呢,可以直接把它定义成一个Jason类型,也可以把它定义成一个哈希麦,诶,只要是这样的一个对应的数据类型,都可以实现相关的写入。
08:11
所以这里面最简单的方式当然就是直接一个哈希ma。如果是ma的话,那就涉及到了K和value,所以干脆我们还是定义一下当前user是K,然后我们想要传入的是URL啊,或者传入这个时间戳也是可以的,因为我们知道当前写入到ES里边,如果说我们是不停的追加写入的话,那其实不会覆盖之前的数据,所有的每一条输入都会对应有一个输出的结果。所以当前我们干脆就定义成。String string。类型的哈西曼啊,那。把它创建出来之后。这是一张map。当前我们应该在map里边去put相应的数据element.user和element点。
09:01
URL。把它构建好了之后,接下来那就是构建一个index request了,要发送请求了,那所以当前我们构建一个index。Request。构建这个的过程,我们需要去调用连接器里边当前的search client这样一个客户端里边提供的request类,它的一个静态方法index request。这样的话就可以直接返回一个index request。所以接下来在这里边呢,我们当然就可以继续去定义当前的一些特性了啊,我们这里可以定义最关键的当然是index了,类似于表名,我们直接就叫做。对于ES6的话,那还应该要定义type es7的话就不用定义type了,我们这里面就把type叫做type吧。然后接下来还有SS就是这里的关键,我们要入一个。
10:06
当前的哈希map map类型的数据把它传入进去。作为当前的。那得到的结果其实就是一个request。最后我们只要直接调用当前indexor的ADD的方法把。定义好的这个request传进去,就可以实现DS集群发送请求,然后插入数据了。那有了这个search think之后。我们看到这里定义出来之后,把它叫做elastic search方式,那么后边我们在定义builder的时后,第二个参数当然就是直接传入就可以。这就是我们整个处理的过程。那接下来我们就可以运行代码来做一下测试了,当然了,首先我们需要在杜甫102上先去启动一个ES集群啊,那我们这里面的话非常简单,就是直接。
11:07
找到对应。ES的安装目录啊,我这里边是6.8.5,所以我们引入的也是ES6的相关依赖,然后直接去启动E。就可以了。然后接下来呢,只要这边启动了之后,我们就可以运行代码,然后去观察最终的结果输出了。好,我们看到这里面已经正常启动了,接下来我们就可以运行代码。看一看能否将这里的八条数据全部写入?我们看到这里已经执行结束,我们就可以在啊,我们可以在浏览器里边直接去看,也可以用相关的ES的访问工具,比方说K8去进行访问啊,最简单的方式其实就是在命令行里边直接使用命令去提交HTTP请求了啊,所以这里面我们可以直接。
12:03
首先看一下LOCALHOST9200。那么看一下当前所有的索引是什么样子。Ines。我们可以看到现在已经多了一个叫做CS的索引,很显然我们当前它的数据量是八条,很明显就是刚刚我们插入数据的这样一个新的索引,那接下来我们可以查找一下。对应索引里边的具体信息啊,同样可以刻。LOCALHOST9200。我们查找的话,那就直接先写上当前的所有名称clicks,后边去做一个search。我们让看的清楚一点,可以加上。那这里可以就看到完整的八条数据的输出,我们这里面并没有进行覆盖,而是针对每一个用户的每一次访问,把他的用户名和URL都做了一次保存,这就是把数据写入到ES的具体的过程。
我来说两句