00:00
刚才我们实现了这个保存到数据写入的这种情况,然后接下来我们再写下面一种情况,就是网页S里边写数据啊,那这个当然也是可以的,首先我们先看一眼这个ES的依赖,需要引入的依赖,那同样官方给我们实现了这样一个ES的连接器,那我们就不客气的把它直接引入就可以了,对吧?啊,这个还是比较简单的。好。然后接下来我们在这个s test下边继续去new一个object,啊,这个东西我们叫ES think test。呃,然后这个过程其实是一样的啊,大家会想到直接main函数里边啊,我这个就直接从red里边把前面一样的代码抄过来就可以了,对不对啊,因为我们已经很熟悉前面做了哪些事情啊,首先创建环境,然后并行都设成一,然后从文件里边读取数据啊,有了这个source,然后做转换,把它map成一个s reading的数据类型,然后接下来是不是就可以做think了啊?
01:08
在这里啊。这个大家不要忘记,把这里稍微改一下。然后think这一步的话,呃,那其实这个肯定要有所不同,对不对?Data stream,呃,这里我就应该要ADD。不能不能在前面加加那个B了,对吧,ADD的一个think,那大家会想到这里面又得new一个什么什么东西了,对吧,要new一个这个新function才能把它加入进来,那这里面到底要做什么操作,我们后面再看。然后同样不要忘记后边对执行啊,这个我们叫ES think test啊,那如果做这个ES的写入,我们需要哪些东西呢?呃,同样大家会看到啊,在这个我们文档里边,大家能够看到这里边传的东西稍微有点奇怪,传的是一个什么呢?一个ES think builder的build啊方法,所以大家知道按这种方式的话,那就是这个build是不是要返回一个think function啊,对吧,这里要返回返回一个think function,所以前面我们要实现的是一个ES think builder。
02:26
那么这个ES think builder又是又是怎么实现的呢?哎,所以大家看到它是一个就是ES。Elastic search sink.build然后创建出来的这样一个builder有点绕,有点奇怪,对吧?但我们的理解难道不是直接那个new,一个ES think就应该可以了吗?它是要先用这个think去build一下,然后在这个里边定,呃,Builder,定义好这个builder,在这个里边定义好我们的操作,呃,这个对那边发送了请求,然后在这里边build.build。
03:02
实现我们的这个think方式啊,是这样的一个过程,那首先大家看到前面其实还是要做一些。做一些配置的对吧?呃,我们这里边首先定义它需要传的一些配置啊htpp hosts,因为大家知道我们这里这个ES的写入都是要发送HTP请求的,对不对啊,所以我们要给它对应的那个ES集群,要POS要先创建出来,这里我们去new一个a list。它的类型是HTP。创建出来,创建出来,然后htp host,接下来可以把它这个对应的host加入进去,对不对啊,这里面每一次加的时候呢,因为它的类型是htp host,所以每一次都要用一个htp host,对吧?啊,它这个就是要求这么做,所以说只能这么别扭,呃,那当然了,这个端口是9200,一般是。
04:06
好,然后接下来我们就可以去创建一个要的那个ES think builder,对吧,创建一个ES think的builder。呃,这里我们定义ES think。Builder,你有一个elastic search。Think,那大家看把这个东西引入啊,这里边我们先不要这些东西,因为我们主要要的是它的什么,要它它builder,对builder这里边有类型,那么还是sensor reading对不对?我们整个流里边的数据类型都是ssor reading,所以到这里它输入的数据也还是一个sensor reading的类型,然后我们做包装,然后发送请求去写入ES就可以了,对吧?啊,这是这个过程啊,那这里边大家看他要传的参数,这就有讲究了,他要传的参数是什么呢?是要有一个是不是htp host的list子啊,啊,这就是为什么前面我们要这么别扭的创建这么一个东西,是不是就是这里面需要啊,对吧?Htp host构成的一个list,然后后边要传一个ES think方式,我我们本来以为是直接把这个ES think方式就直接直接写进去就好了,对吧?啊,这里边它是。
05:40
还做了这样一个操作,好,那这里我们htp hosts先传进来,然后后边是不是要去new一个,对我这里边就直接在这里写好了,写成一个匿名函数的,呃,就是匿名类的一个方式就可以了,对吧,就不要去单独再建立我们自己的那个,呃,定义好的那个函数了,比方说我们你有一个。
06:08
诶,这个不是ES think啊,我们是要一个ES think function,对,好,诶大家看他直接把这个我们要复写的东西都已经写出来了,我们要实现的要复写的是一个process这样的一个方法,对吧?那大家其实已经知道了,这个方法是不是就是数据来了,你到底怎么处理对不对,然后怎么样把我们要的那个数据包装好之后,发送请求到ES集群上面,然后把它写入,那大家看这里边除了element这是什么呀。Element就是当前传进来那个数据对吧,然后接下来是不是还有上下文啊ctx另外还有一个。Index,诶,那大家会知道indexor用来干什么的。是不是就到时候去发送我们的那个操作请求啊,对吧?大家看这是一个request indexor,好,接下来我们就可以去写入我们要做的操作了,呃,这里边比方说我们如果要是想看清楚它这个就是操作的过程的话,我们可以在这里面打一行数据,对吧?比方说我们要。
07:21
保存一个数据,我们一开始saving data element,对吧。然后接下来,呃,大家知道这个如果要插入这个ES里边的话,我们是要写到那个S里面去,对不对,那么它的source是不是必须得是一个,呃,一个map类型或者是一个g object呀,啊,所以这里边我们要包装成一个。包装成一个map或者Jason object。
08:00
呃,这里边我们就我就把它叫成Jason,那其实是把它包装成一个哈希map的一个一个类型啊,所以我直接去new一个哈希map这里边的类型,那这个哈希map就是它的那个k value的类型,对吧,我们这里就都给它string类型好了,方便我们做操作。先就出来,接下来是不是jason.put不是put all啊,一个一个put前面的K给什么呢?哎,比方说我们要把那个ID传进去,那比方说这个就是senor ID对吧。要传的数据是什么?是不是就是element.id啊?因为这个数据是不是都已经来了,我们现在是要包装成这个,最后能写到ES里面的那个数据结构对吧?啊,所以这是这个啊,然后Jason点啊,当然了,后边肯定是不是还有temperature啊,Temperature这个就是element.temperature。
09:16
哦,大家知道这个本身它是一个double类型,那这里是不是还要涂string啊,对吧,然后同样Jason,我们把最后还有一个时间戳,那这个叫ts time stamp element.time step。同样它是一个长整型,也要to string,对吧,这样就把它包装好了,要有的这个S数据是不是就都有了,好,那接下来就是接下来是不是就该把这个包装好的,就是要创建那个request了,对吧?创建request,然后把它发出去就可以了,创建。Index request。
10:01
准备发送数据啊,所以这里边我们可以定义一个index request。啊,他是不是得要用,我们这里边肯定就是得得这个有有支持的这个东西对吧。把这个requests引入,然后它里边有一个index request,然后接下来它后边是不是就可以定义当前的操作的index是什么,Type是什么,Sources是什么,都可以往后追加了,对吧?啊,所以这里我直接跟在后边吧,Index我们就叫三四号了,呃,这个呃这里边啊,就是如果要是ES6以前的话,就还必须得指定typeb es7之后就已经废弃了,对吧?啊,这里面我们这个版本是ES6之前的,所以是要指定这个type的啊,这个type我就叫,呃,我就叫那个reading data吧。
11:02
然后我要指定一个S对吧?诶这里的S应该是什么呀?这里的S是不是就是我们前面包装好的。这次对吧,对,所以这里边我直接把这个包装好的东西放进去就完事了,创建出来,然后接下来干什么,是不是就是发送HTV请求啊,对吧?呃,就是利用,利用什么来发送呢?我们在这个传入的参数里边是不是有一个indexor啊,它是一个request index,所以是不是用它就可以发送我们的index request啊,所以利用。Inex,诶是叫index吗?Index。发送。请求写入数据啊,所以我们这里边其实就是。
12:07
Indexor直接可以点它有一个ADD的方法对吧,这个ADD里边要传进来的是不是就是一个index request呀?啊,所以我们可以把这个index request传进来,这样的话到时候利用这个index就可以把它发送出去了啊这这个就是呃给我们官方实现的这个呃连接工具啊,它的这个连接器里边think方式实现的这个过程啊,稍微的有点绕,但是其实思路应该还是比较明确的,就是先把数据准备好对吧,然后创建这个HP request去去把它发送出去,好,那当然了,最后写完之后我们还可以再打印一条数据来告诉我们说当前这个写完了对吧。Data save啊,这就是我们整个的这个过程啊,那当然到下边这个ADD think的时候,是不是我们要ADD的是ES think builder下边的对要调它的build方法啊,诶大家可以看到这个build方法返回的是不是就是一个ES think这样的一个类型啊,那这个类型是不是它是继承了,就是实现了一个ES think base,然后这个东西它是不是就实现了一个rich think方啊,所以是在这儿呢?啊,它的这个继承关系是这样的。
13:33
稍微的有一点点绕,那大家就是按照这个流程把它实现一下,知道怎么做就好了,好,那我们还是来测试一下吧,接下来我们还是起一个ES啊,需要。呃,我们一般大家可以直接把这个提起来就可以了,呃,提起来之后,其实我们直接是不是可以在浏览器里边直接去看当前的那个状态啊。
14:11
大家习惯是用什么去看这个状态?直接用浏览器是吧,或者是K吧,对吧?啊好,那么我们logo host9200,然后我们可以先看一眼当前cat一下当前所有的那个所有的索引对不对。好,大家看这里边有一个index是test,跟我们的不一样,对不对啊,所以这个其实没什么关系,只要这里边骑起来是正常的,我们等一下来测试就可以了,好,那这里把这一个代码跑一下,大家看一下效果,大家看这边代码已经运行完毕了,呃,按照这边报出的信息来看的话,是每一个六条数据对不对,每一条数据应该都已经写进去了啊,但是到底写进去没有,这个我们还是得到这儿来去看一眼啊,啊,我们先看一眼,还是看一眼这个索引吧,诶果然多了一个,多了一个三四对不对?好,那接下来我们要看一看这个三四里边的内容啊,那是得去三四。
15:21
Search对吧?呃,我们可以加一个pretty。诶,大家可以看到是不是确实TOTAL6条都已经写进来了呀,而且大家可以看到就是我们当前都在这个SS下边按,就是按照我们定义的这个顺序,对吧?S ID temperature ts,把我们的这一个ID和温度值还有时间戳全部都写进来了啊,那大家可以去测试一下啊,这个跟不同的是red,我们是把那个ID作为K,然后写那个哈希map了,当然就可以覆盖对吧,我们这里边就是一视同仁,直接来一条,就直接往里面塞一条,当然就是所有的数据都写进来了啊,这是ES这一部分的一个实现。
我来说两句