00:00
了解了怎么样写入数据到red,接下来呢,我们再来了解一下怎么样写入数据到另外一个常见的大数据组件,那就是elastic search,经常我们把它简称为ES啊,那我们知道ES它是一个分布式的搜索和分析引擎,适用于所有类型的数据啊,那它的一个特点就是有非常简洁的rest风格API啊,那我们一般情况就是只要发一个HTTP请求就可以去查找或者说写入对应的数据了啊,那在大数据领域确实是应用非常广泛啊,所以flink呢,本身为ES也是专门提供了官方的think连接器的,注意只能做think不能做S啊,也就是说只能写入数据到ES中,不能从ES里边读取数据啊,那flink01:13呢是支持当前最新版本的ES的。接下来呢,我们就来测试一下,那第一步首先需要引入相关的依赖,那就是flink官方提供的ES连接器,叫做flink connector elastic search,后面跟着的六是表示当前ES的版本啊,那我这里边如果用到的ES是六的版本的话,就是e search6,如果是七的版本的话,那这里就是ES search7啊,然后面下划线同样跟着的是SC的版本,我们当前是2.12,然后接下来本身当前引入依赖的这个组件的版本,连接器版本就是flink的版本01:13啊,这是官方提供的,都是这种风格,我们直接把它做一个复制,然后copy到po文件里边来。
01:40
接下来我们就可以在ma依赖里边看到flink跟ES6之间的连接器,所以接下来我们又可以创建一个新的测试文件来看一看怎么样去做连接了。Think to ES test。没方法啊,那整体的测试流程应该都是非常的相近啊,我们可以直接把这个做一个copy。
02:05
先复制过来。同样上边我们需要,呃,关于red的这些东西我们就可以直接删掉了啊,然后这里把对应的SC这个包下边所有的东西都要引入,然后接下来我们自然能想到啊。如果说想要把当前的数据写入到ES当中的话,那这里I think当然就是要传入一个。ES的连接器给我们提供的think function,哎,那就不叫red think了,叫什么名字呢?哎,非常简单,就叫做elastic search s。诶,我们看到就是这个东西,然后本身我们当前要写入的数据就是event类型啊,那当然了,直接放在这里就可以了,然后我们可以点进去看一眼,这个东西到底是什么,我们看到啊,首先哦,它继承自elastic search think base,然后本身这个think base呢,它又实现了reach think function,那当然了,这就是我们当前需要的东西嘛,Think function就是它,然后当前如果要直接创建它的对象的话,需要传入什么样的参数呢?诶,我们看到。
03:14
很悲剧,当前它又是private私有化构造方法,那如果说我们想要创建它的话,显然它应用的又是构建者模式啊,所以构造者模式里边我们需要去使用它的builder,然后做各种各样的配置,然后调用一个build方法去创建一个elastic search think的对象,哎,所以我们整个的这个过程啊,前面我们都已经非常熟悉了建造者模式啊,现在我们要做的事情也是这样,那就不是直接去new了,而是应该elastic search think我们来它引入,然后创建一个它的builder,当然了,如果我们要创建builder的时候啊,因为当前这个builder是一个静态的类啊,啊,所以我们当前肯定是后面不能加泛型的啊,这里直接点builder啊,那对应的这个builder我们看到它本身也有泛型,所以我们当前呢,还应该是在这里去指定当前的类型是event。
04:12
然后接下来我们还需要看一下builder,它到底应该怎么样去创建好,那它的构造方法里边我们看到啊,主要要传入的又是两个参数,一个是htv host的一个列表list啊,这个很好理解,因为我们知道当前的ES的配置项啊,主要就是当前ES集群的主机名嘛,哎,那当前其实就是一组HTB服务器啊,只要把这个配置好就可以了。然后另外还有一个就是elastic search think function,诶,我们看到这个名字,难道说这才是我们想要的那个s function吗?诶,不是的,它只是叫这个名字而已,我们点进去会发现它并没有实现s function接口,它本身就是一个单独定义好的接口,里边有open close2个方法,另外还有一个叫做process的方法。
05:02
这个我们看名字也就能知道了,Open和close应该对应的生命周期,而唯一的这个抽象方法呢,Process很明显就是在处理当前的数据,所以我们核心的逻辑其实就是在这里,Process里边每来一个数据,这里边还可以获取到当前的运行上下文,另外还有一个request index。这就是让我们从我们想要处理的数据里边提取出想要写入的信息,然后包装成一个request,一个HTTP请求,然后使用这里的index去做一个发送就可以了。所以我们的核心处理流程就应该是在search think function的process方法里面去实现的。这就是我们所要做的事情啊,啊,所以现在既然已经明确是这个样子了,那我们后边就把它对应的参数都写出来吧。它的两个参数,一个是http hosts,我们可以直接把这个变量名先命名在这里,另外还有一个就是ES方式,我们直接就叫做ES方吧,然后接下来在上边我们可以对应的做一个定义定义。
06:15
ES集群的主机列表,诶这其实就相当于是我们的连接配置项了,它只要定义出集群的列表,那接下来我们就知道发送HTP请求就可以做操作了,这里我们还需要去注意一下,就是在这个elastic search SK代码里边,它所需要的这一个htv host的list,这个list到底是Java里边的列表还是skyla的列表呢?啊,其实我们发现啊,这个是Java代码,所以本身这个list需要创建的是一个Java的list,所以我们这里可以直接去用一个Java的a list啊,我们引入的是Java u下边的list,然后里边的泛型当然就是htp host,只要把它引入,然后创建出来的这个,诶我们需要注意啊,跟下边对应,我们叫的名字叫做http hosts啊,所以把它命名成这个名字,接下来呢,当然就是去做添加了ADD一个。
07:15
你有一个http host,这里面我们要做的其实就是一个,呃,我们还是单节点吧,就一台主机就可以了,还是哈102。Post name啊,那对应的还有一个端口号,默认9200。只要把这个配置好就可以,然后接下来另外还需要去定义一个这里边我们所需要的ES的。Think function。所以接下来我们这里边就需要去new一个elastic search think function啊,注意它不是elastic search think是elastic search。Think function。这里的泛型跟我们外边的数据类型当然还是一样,Event里边我们看到啊,必须实现的一个抽象方法就叫做process,那这里边我们就可以定义具体的处理逻辑了。首先我们应该能够拿到当前的数据啊,我们看到就是这个T,它是event类型,我们接下来还需要去想好,就是要往ES里边写什么样的信息,所以呢,我们应该先从当前的event里边去抽取想要的数据做一个包装。
08:29
最后呢,把我们包装好的数据使用当前的index放在一个HTP请求里边发送出去啊,那我们知道对于ES而言啊,它发送数据的这种形式呢,你最好是把它包装成KV形式的这样的Jason对象,或者是一个map啊,所以我们这里边最简单的方式当然就是使用一个哈希map了啊,所以这里边我们可以直接用一个,同样这还是Java代码。使用Java里边的哈希map对应的泛型呢,K呢,我们就都定义成string吧,简单一点。
09:04
干脆就跟red那边测试的效果一样,就是一个user。然后一个URL。User是KURL是value,还是这种测试方式啊啊,那只不过区别呢,我们就不要去重复的把它覆盖了,我们只要不定义P的话,其实写入之后啊,每一条数据都是单独存在的啊,那所以这里边我们也可以更改一下前面的这一个测试数据源啊,我们不要直接S把click s添加进来,因为这里的话相当于我们的数据就太多了,我们可以直接,呃,比方说啊,我们把这个写入到文件里边的这几条数据直接拿过来。Copy这个。把STEM做一个更改好,接下来我们有限的几条数据全部依次啊,按照user和URL这样的建筑对包装好,写入到ES里面去,好,那这里边我们先把这个创建出来,这个就叫做当前的data。
10:02
或者叫做当前的data source啊,然后接下来那这个data我们就需要去做一个put操作了啊,直接建直对要做一个传递,那么我们想要的是t.user作为键,t.URL作为值。数据已经准备好了啊,接下来呢,就是。包装要发送的HTTP请求。诶,所以这里边我们想要包装的是一个什么东西呢,其实就是。ES所要能接收的一个request啊,那这里面我们调用的是。Search client客户端给我们提供的request类,它里边的对应的方法,我们调用它里边的index request的方法。这样就创建出了对应的一个index request这样一个请求,然后这个请求里边我们当然可以去做一些相应的配置了啊,在这里边我们比方说啊,定义一下当前的index,这是最关键的信息,我们还是啊,这个就相当于我们的表名一样了,所以还是叫做click。
11:11
然后接下来啊,那另外一个非常重要的就是我们得定义当前的SS的话直接把前面的data传进来就可以了,如果我们当前是ES6的话,另外还有一个比较重要的东西,那就是需要去定义type,就叫做点击事件吧,Event好,定义好了之后我们可以。给他一个名字,就叫做index request。最后一步,那就是发送。请求,我们调用的是已经传进来的参数request index,它的一个方法ADD,只要把这个index request添加进来,我们就可以发出这个请求,向ES里边去写入数据了。哎,所以其实就是这样的一个过程。这样我们就完成了整个的处理流程,构建出了elastic search think function啊,那么它作为builder的一个构造方法,里面的参数传进来就可以创建出一个builder的对象啊,当然这里边我们创建它的对象需要去new一个builder的对象,然后注意后边我们要的其实并不是builder,我们要的是elastic search s它的对象,那所以我们建造者模式的话,我们应该是builder掉它的点build方法,这样的话就可以实现我们对应的think方式。
12:32
这就是整个完整的一个处理流程。好,我们把这一部分代码已经写完之后,那接下来就可以去测试一下了,如果要测试的话,那我们还是应该先到哈杜102那边去把ES服务器先提起来啊,那所以我们还是先进入到。ES啊,我们看到我当前安装的ES版本是6.8.5,所以我们在代码里边使用的也是ES6啊,那这里的话我们要启动ES服务的话,我们注意ES启动它要求是不能以root用户的身份去启动,所以这里我先要去切换一下用户,比方说切换到艾特硅谷用户,然后接下来就可以直接去启动当前的yes server了啊呃,那我们把它提起来之后,接下来就可以去。
13:18
启动代码,然后进行写入。我们先稍微等待一下,我们看到现在ES服务器已经提起来了started啊,所以接下来呢,我们可以过来直接运行代码,看一看效果怎么样。当然了,接下来我们做测试的话,这里应该是,呃,我们可以打开一个新的终端啊,然后在这里直接去发送HTP请求就可以了啊,或者呢,我们使用一些连接工具啊,K巴纳之类的工具,也可以去查看当前ES里边的数据,我们看到现在。Flink代码已经运行完毕,所有的数据应该都已经写入成功了啊,所以接下来呢,我们可以到卡102这边来再做一个查看,我们可以直接使用一个非常简单的客命令,然后去发送HTTP请求,去获取ES里边存放的数据啊,比方说啊,可以首先去查看一下。
14:13
当前已经有的所有索引啊,那local host9200,接下来我们去cat一下当前的Indies。我们可以看到,现在已经有一个index叫做click。然后接下来我们当然就可以查看clicks里边的所有的数据了,还是直接发送一个课请求,我们当前要查看的是。LOCALHOST9200下边的clicks。Search。加上一个pretty做一个格式化的显示啊,那所以接下来我们可以看到啊,当前所有数据其实就都已经写入到了ES里边,我们的total所有的数据是七条,因为我们当前并不是按照p value那样去覆盖写入的,所以我们看到啊,BOB1条数据,爱ICE1条数据,下面Mary同样有对应的数据啊,那Mary的好几次点击事件,它都是不同的数据,都以SS的形式出现,都会写入进来啊,那当前的indexs是clicks type是events啊,那所以这就是我们写入到ES里边的一个测试的情况。
我来说两句