00:00
来我们再来讲一讲怎么样啊,把把这个数据写入到ES里边,因为对于这个我们平常做统计输入输出的这种场景来看的话,ES也是一个场见的场景啊,大家知道就是你得到统计结果,往往我们要做一些查询啊,对吧,往往我们要做一些类似于报表的显示,类似于这个呃,复杂条件的查询的一个展示,所以说呃,ES呢,也是我们在工作当中经常会用到的一个输出的一个数据库,那flink支不支持往ES里边写呢?当然支持啊,直接就支持这个在flink官方的包里边,本来就给我们提供了到ES的这个连接器,所以说我们可以直接引入阿帕奇flink下边的ES连接器,这里边我用的是ES6的连接器,对吧?后面skyla版本2.12,然后当前连接器的版本跟flink一模一样,一点十点一,先把这个依赖引入,放在po文件下边。把当前依赖引入进来之后,接下来我们还是在这个think test的下边再新建一个啊。
01:09
Object,然后当前这个就是ES think test。同样还是命方法先定义出来,然后接下来我们前面还是大同小异对吧,所以说先抄过来再说,别管那么多了,直接诶这里边。当时这个没有没有处理好啊,这个鼠标稍微划了一下,我们把这个还是copy过来,前边读入数据,然后做这个map转换,转换成类。好,把这个上面引数转换,该做的引入引入,然后接下来最后还是不要忘记这个execute啊,当前这个是ES think test,然后同样了,呃,这里边其实都一样,就是data stream,我们直接去ADD think,那我们去找当前的连接器里边是否已经给我们提供了一个ES的一个一个think方式,对吧?啊,那之前我们这个red think这里边当时直接引入了一个red think,那我们这里边就想了,这里边有没有一个直接的elas think呢?啊,当然我们这里边要写全称啊,Elastic elastic search啊,这这个稍微拼写有点麻烦,对吧?Elastic search,然后think我们看一眼,诶,果然有这么一个东西啊,所以接下来我们其实就是要用这么个东西啊elastic s大家看一眼,它本身实现的是什么呢?哎,它继承了一个elastic search s base。
02:42
哎,大家看到这个base呢,最终又实现了一个rich think function对吧?哎,所以这就是我们要找的这些东西,先把它引入,然后接下来我们就继续看了,哎,那这个它的这个构造方法是什么呢?诶,一看又是一个private对吧,又是构造方法私有化啊,那所以这里边就呃,又麻烦了,呃,大家就知道又得找对应的实现了,果果不其然啊,它的实现又是一个builder对不对,里边又有一个builder,然后builder里边呢,又是调用,大家看这里边有一个build的方法,这个build的方法会真正的去new一个elastic search think,然后给我们返回啊,所以接下来我们又是一个builder,点点build的这样的一个过程,那这个builder啊,这里边这个builder在创建的时候,它本身到底是要什么参数呢?这里边要两个参数,一个类似于我们前面的那个啊,一个配置,配置主要就是要什么呢?啊,要你的那个host host name和。
03:42
嘛,那你这里边也是啊,ES大家知道是有有这个集群的啊,那这里边传的就是一个list,大家看是http host的list,因为ES都是发送HTP请求嘛,啊,所以这里边我们传的是这样一个东西,然后呢,后面还有一个elastic search think function,诶有同学可能想到,诶,这里边怎么又有一个think function呢?这难道是我们真正要的那个think function吗?哎,不是的啊,这是这是当前builder的一个参数,尽管里边大家知道可能他要去定义我们具体做那个think的一些操作,对吧,发送那个HTP请求的一些操作,但是呢,你点进去之后会发现它并不是我们要的think方式,Think function,它就是一个普通的function而已。
04:25
啊,所以这个其实并不是我们真正要的东西啊,啊,那呃,所以我们外面还是传上面这个elastic s,呃,Elastic search s这里我们直接点builder对吧,点builder。啊,然后这里我们可以把这个,呃,就是当前的这个泛型啊类型传进来啊,当前的这个sensor reading,然后里边的参数我直接把它这个换行写吧,这样大家可能看的清楚一点,对吧?呃,然后里边的参数我是要两个,一个是当前的htp host的list,对吧,我把它定义成http hosts,然后另外还有一个elastic search think function对吧,这是自己定义的一个呃,Think function式,所以这里边我定义一个叫my ES think方式,我把它定义出来,好,那那最后当然了,这里边得调这个build build.build对吧,你得把这个做一个,做一个最后的这个实现啊,所以说这里的这个过程其实是跟之前那个呃,大家定义那个con base的时候其实是类似的啊,也是用了这样的一个方法啊,那接下来我们就要把对应的这个HP host和这个ES think方式给大家定义出来。
05:42
改了啊啊,那这里边定义http host对吧?啊,所以我定一下啊htv host大家知道,这里面我就要一个这个刚才大家是不是已经看到了这里面的这个list,注意啊,这是一个Java的list对吧?啊Java的list,所以我们得用这个Java list的实现,你不能用这个SC的list啊,所以这里边我得用一个a list啊a list。
06:14
诶,直接用这个Java u下边的a release,然后里边的数据类型就得是http host对吧?啊,就得直接去去用这个东西啊,然后我们接下来,呃,把这个因为因为大家看到这里边你build要传的这个里边的数据类型也是htv host嘛,对吧,它本身就是在当前你看到阿帕奇HTV下面的啊,所以我们这里边直接引入这个,大家把这个包要引对啊,然后接下来,呃,里边一开始我们就先不要传任何的了,就先创建一个空的列表,然后往里边去追加对吧?ADD这里边再去new htv host,呃,里边当然就是local host啊,默认的ES端口大家知道9200对吧,我这里边就是一个单节点啊,那那大家如果要是多个的话,你再去挨个追加就完事了,这个非常简单,然后接下来我们再定义,呃定呃自定义啊。
07:14
自定义写入ES的哦,这个ES think方式对吧?这个我就简写了啊,大家知道是那个elastic search think方式,好,这里边我们定义一个my ES think方式啊,然后大家知道它本身也是一个函数类对吧?只不过它是一个function,而不是一个think function式,所以我们要去实现这个elastic啊,当然这里边不是这个elastic s think了啊,而是think function加上这么一个东西啊,大家看它本身一个是一个interface,我们现在要把它做一个实现啊,然后接下来大家看我这里面定义这个变量,就用了这个呃,匿名类的这种实现方式对吧?或者说你去定义一个,就比方说我定义一个class,然后去实现这个接口也是一样的啊,这里我用了这个匿名类的定义方式啊,其实没什么区别,因为你这里定义匿名类就可以直接直接把这个参数写到这儿了嘛,啊啊,这边我是把它提出来了啊,然后接下来。
08:14
在这里边本身的类型是sensor reading对吧?啊,然后里边大家看一下必须要实现重写一个什么方法呢?一个process方法处理对吧?那这里边处理什么东西呢?呃,这里边传入的参数t sensor reading,那当然就是当前数据了,来了一个数据到底要做什么操作对吧?所以这就是每来一个数据都会调用到这里的process方法,Process方法那后面还有什么呢?还有runtime contact,还有上下文啊,另外还有什么呢?Request index indexer啊,大家知道这个indexor是不是最后我们要发送HTP请求的时候就得用它来发送啊啊,所以接下来我们就是定义数据,然后用这个index发送就完事了啊,所以接下来我们就是包装数据了,我们呃。
09:02
包装一个啊,就是对于这个ES写入的时候呢,必须要用一个呃,Jason object或者是一个map来作为我们输入的这个data source对吧?啊,包装一个map作为data source,所以这里定义一下data sourceau data sourcece,诶,我们去你有一个,这里边又得去你有一个哈map了,对吧?这里边要的还是Java里边的数据类型啊,那这里边我们keep value都用string就可以,String先把它定义出来,然后接下来data source直接去ADD啊,直接put对吧,因为是map嘛,Put,然后当前比方说我们想写什么数据呢?啊,就把三个数都写进去吧,比方说当前要这个ID啊,那就是当前的这个t.ID对吧,然后data source,哎,我们还可以打断顺序,比方说下一个我们想要那个。
10:02
的温度值对它比较关心,哎,我们定义这个temperature t.temperature啊,这里面当然我们要的是这个string,对吧,所以做一个to string转换,然后data source put,最后还有一个时间戳,我们定义一个T st.time对吧?呃,这里边同样做一个to string转换,这就是我们想要的这个数据包装好,然后接下来好,那就是要创建一个index request了,对吧,我们创诶这个啊,创建index request用于发送HTTP请求对吧?啊,这这其实就是我们定义的那个请求嘛,呃,所以把这个数据要全塞进去,然后我们还得定义index呢,对吧,你到底是往哪个index里边。
11:02
且呢啊,所以接下来我们这里边定义一个index request request啊,Request这里边我要调用的,大家注意一下啊,要用到这个ES client给我们提供的这个是在连接器里边也引入了它的那个client,要用到它里边的request类里边的index request方法对吧,这样去创建一个index request,然后呢,诶,这里边就可以定义它的一些参数了,大家看到index对吧,比方说我们当前就叫sensor reading对吧?啊,这个算了啊,我们就叫sensor吧,因为后边大家看到还有对应的这个type对不对,因为现在ES6嘛,E7之后就就已经废弃这个type了,ES6你还得指定当年的这个type啊,那比方说我们这个叫reading data啊,随便给一个,然后后边把source,关键是要把这个source指定,这里边我们把data source传入。好,已经创建好了,最后就是用indexor发送请求,哎,那怎么样用indexor发送请求呢?这里边indexor非常简单,一个ADD方法对吧,只要你把这个request添加进来,ADD进来,它就给我们发送出去了啊,所以这里面直接这么一传就完事了,哎,这就是一个完整的处理过程。
12:24
啊啊,可能稍微有点麻烦,但是大家如果要是这个思路里的清晰的话,还是很简单的,对吧?这里边我要定义的这个as search function,其实就是一个处理流程,处理过程,然后呢,每来一条数据都会调到这里的方法,对吧?啊,那之前那个ES就是red呢,它是每来一条数据都会调到我们这里边定义的这个就是就是用这个命令要获取到一个写入的方法,对不对?哎,就是调用到这里来,用这个方法来写入,然后它底层会帮我们把这个命令和你这里当前的这个数据提取出来,包装成一个一个最终写入的方式,这里边尽管我们没有直接操作,但是它底层会帮我们做这些事,对吧?啊,这个大家看,呃,这个连接方式跟我们这里边实现的好像就不太一样啊,但是大家如果知道原理的话,其实都是大同小异差不多的,好,那接下来我们就来测试一下,看看这个效果怎么样啊,那同样这里边我还是要去。
13:25
起一个E了啊。呃,我们到呃这个下边来ES,这里边我要用这个S啊,6.8.5啊,然后直接把它提起来,然后等一下,我们其实是要,呃,其实就是直接读取当前这个文件里边的数据,然后来一条写一条,直接把它写进去就完事了,对吧?这里边的核心其实就是调用我们这里边的process方法,每一条数据来了之后都会调用这里的方法啊,所以是逐条写入,然后我们最终这个实现的elastic search think,这是一个think function式,它里面配置我们前面有这个定义好的htp host对吧,连接当前的这个集群,然后呢,呃,下面有我们定义好的处理过程,把它build出来,这就是一个完整的过程啊。好,我们看看提起来了没有。
14:17
好,这边应该是提起来了啊,直接把它运行一下。好,大家看,现在已经运行完了啊,这边没有任何输出,因为我们都输出到那个ES里了嘛,所以接下来我们来看一看ES中到底是不是已经有东西了,所以接下来我们看一眼,诶,大家知道这个直接可以用客命令去访问,因为都是HTP请求嘛,对吧,这个其实是没有任何问题的啊,所以接下来我们比方说我先去logo host9200啊,然后我去cat一下,对吧,Cat一下当前的这个indices,好,这个在运行之前我们没有看啊,那大家知道就是现在肯定就写进去了,Sens写进去了,对吧?然后接下来我们看一下它里边的内容怎么看呢?因为之前我是已经重新起的这个ES啊,之前都是清空的,所以这个肯定是新写进来的数据,然后我看一看里边的数据啊,Local host9200,然后接下来我们直接看sensor下面的东西,对吧,直接用这个sensor search,然后为了看的清楚一点,我们用这个pretty显示啊。
15:25
大家看接下来这其实就已经给我们全部列出来了,对吧?当前三四里边有几条数据呢?有八条数据啊,就是按照顺序,我们这里大家看三四十一对吧?啊三啊,这里还有341的数据,因为这里边这个本身这个列列出来的这个顺序并不是按照我们插入的顺序来列的,对吧?这个并并不重要啊,然后这里边三四六三四十,呃347对吧?然后我们呃341的好多条数据,每一条都写进去了啊,为什么?因为我们这里边写入的时候没有指定K的那种写入对吧?这里面就是原封不动是什么数就包装好就写进去,所以说我们的八条数据在这里面都出现了啊,这个ID temperature还有TS都是他们各自本身的那个样子,这就是在这个当前我们这个ES里边的一个写入,好那好,那当然了,有同学如果要是习惯,就是觉得在这个命令行里面写的。
16:25
不熟悉的话,大家直接在这个浏览器里边也是一样的,对吧,因为你这个直接访问的话,直接在浏览器里边,你可以直接这么清晰的看到,对吧,当前这个total hits是八,然后所有的数据在这里边一目了然。大家可以下来之后再测试一下。
我来说两句