00:00
接下来我们要写入的,要给大家做测试的是ES啊,大家其实知道在实际应用场景里边,呃,往ES里边去写入数据其实也挺常见的,因为我们统计的数据统计出来之后,其实最终都是为了其他的应用要进一步做查询啊,那ES其实对于这种复杂查询,模糊查询就是一个非常好用的工具,对吧?啊,所以这里边我们可以考虑怎么样直接写到ES里边去,ES的话也非常的简单,大家看到它是FNK官方给我们提供了连接器,对吧?呃,这里边我引入的这个版本就是flink connector ES search 6es是六对吧?啊,这对应的这个ES版本是六啊,然后后面2.12这是。啊,Skyla的版本对吧,Flink依赖的skyla的版本,下边的本身连接器的版本,因为flink官方提供嘛,跟flink的版本号是完全一致的啊这里还是一点十点一,我接下来把这个po文件啊,里边的这个依赖重新引入。
01:02
好,把这个ES对应的这个依赖先放进来,这里边我们还是刷新一下,应该可以看到有这个新的依赖出现,接下来我们就在这个代码下面,Think下边啊,去新建一个测试的class。当前这个叫s test3。ES。那其实前面的这个过程都是一样的,类似的对吧,我直接就把这个red前面这一部分直接copy过来吧,还是啊,创建执行环境,从文件里边读取,然后接下来转换成s reading类型放在这。呃,最后这里边再来一个Env.execute执行起来对吧?啊,然后中间我们这里边也不做各种各样复杂的那个转换了,就相当于只是转换成了一个po类型啊,类似于抓到病的这个类型,然后接下来要做的就是。
02:05
Data stream是不是直接at think就可以了,实现想要的那个think方式啊,你有一个,那之前我们那个ready里边是有一个red think,那大家会想到那ES这里面是不是有有一个ES think呢?哎,我们试一下啊,诶,当然这里面有,大家看到是不是会有一个elastic search think,诶,是有一个elas,呃,Tic search think base对吧?哎,那我们看一下这个base到底是个什么东西。大家看这是一个,这是一个抽象类,然后它继承rich think function确实是它对吧,那确实是它,但是这里边它是一个抽象类,那我们之前我们一般都不会直接去用这个什么词,什么什么什么base对吧,那我们自然要看它有具体的实现吗?诶有有没有就是继承自他的这样一个东西呢。
03:01
诶,所以这里边大家看到下边我们应该能看到的,有一个所谓的这个,呃,这这里大家可以看到这儿,这儿是看不到的啊,这里边大家看到有一个这个think function,但这是一个interface,好像也跟它不是一个,不是一回事,对吧,它并不是继承了这个thinkface啊,它只是一个civil liable,一个function啊,那所以这里边我们想真正想要的东西是什么呢?就叫做我把这个删掉啊。就叫做as think think。大家看就是这个东西。这个东西elastic search think,它就继承了elastic search think base对吧,然后它就实现了这样一个think function啊,所以这就是我们想要做的这个事情啊好,那接下来我们就看一看它要它要怎么样去拗出来呢?哎,那这里边大家看到了构造方法又来了。私有化的构造方法对吧?那既然是私有化的构造方法,那是不是下面就一定会啊,有一个build,然后有一个build啊,这个build是一个public方法,是不是就会返回一个elastic search think啊,所以接下来我们又是这样一个过程,对吧?它这里边又了一个这么这么个东西啊,所以在这个build里边我可以做各种各样的这个对应的设置啊,所以接下来这个过程是不是跟我们前面那个差不多啊,对吧?啊builder啊,先把这个定义出来,那builder这里面大家看还有泛型,这个泛型应该是什么。
04:31
最后我们think方式是不是也是应该是sensor reading啊,所以这里边是不是也应该sensor reading啊,对吧,包装好的这个po类型啊,里边builder就是本身这个这个builder里边是可以传参的啊,我现在是不是根本就不需要呃,对应的这些。哎,这这里边呢,首先大家会看到,如果说你后边直接调这个build的话,那中间就应该是在这个builder里边要去做一些set操作,对吧?呃,这里边我是可以设置了之后,然后去做这个set的,但是大家会发现本身这个builder是不是它没有那个空参的构造方法呀。
05:11
这里面是不是必须得传参,哎,所以这里边你必须还不能直接就是就build就完了啊,那这里边就是你看到必须得传两个啥呢,又是两个参数。一个叫htb host。那这个大家也知道,ES发送请求的时候,是不是发送的就是。就是HTP请求啊,对吧,就直接向我们的ES这个服务器啊集群里边发一个这个HTP请求就可以了,所以它这里边最关键的连接配置的参数是不是就是HTP服务器,ES服务器的那些主机名啊,只要把那些主机名端口号给到我们这里,创建一个HP host的一个列表,诶这里边就相当于我们有了连接配置了。然后接下来就是直接发那个HP请求就完事了嘛,然后后边还有一个对照我们之前那个的话,除了连接配置,是不是接下来你得有发送的命令啊,诶,那现在大家就想到了,那这里边你发的就是HTP请求啊,那有什么命令呢。
06:14
那也总得你得包装好数据,然后告诉我要按按照什么方式发送,对吧,所以这个是在后边这个elastic search s function这个里边实现的。那这个东西是啥?这就是刚才我们看到的这个,对吧,大家还记得刚才我们看到这些接口吗?它并不是一个think function啊,尽管名字叫呃,Search search think function,它其实本质是不是就是一个function啊,对吧,就是一个,然后你看它里边这个接口要实现什么呢?实现一个process方法,Process方法大家看里边它就有。这这是不是当前来的每一个数据啊,要处理的每一个数据来了之后,在这,然后后边还有ctx上下文对吧,最后还有一个request indexer,哎,这这个indexer,那是不是就是我们最后不是要发那个HTV请求吗?是不是就用这个indexer就可以发送啊,所以接下来其实就是这些啊,就每一条数据来了之后,然后你怎么样包装,怎么样用这个indexer把它发送出去,发给这个ES就完事了。
07:20
所以接下来关键是要实现这个process方法,好,呃,那这里边我们这个流程已经弄清楚了,接下来我们还是在上边先定义ES的连接配置啊,那这个是需要去实现一个htp hosts对吧?啊,这里边是要去new一个a list啊,A list先把它创建出来,那它里边的那个数据类型大家也看到了,是htp host对吧。玩一下啊啊,然后这个我直接就叫做htp host就可以了,然后接下来里边是不是就直接我现在就就一个单节点集群啊,那是不是直接去ADD一个就完事了,你有一个htp host啊,里边就是host nameme local host啊ES端口9200对吧,这就是我们当前的这个。
08:14
第一个参数htp hosts,然后后面还需要有第二个参数,第二个参数是一个ES think function,对吧,这个稍微有点麻烦啊,所以我这里边你有一个,你有一个啊my ES think方式,Think方式好,我先把它先放在这儿啊,先创建出来,然后接下来我们就在下边具体是实现了,你也可以匿匿名类那种实现直接在这写对吧,但那个稍微会太长了啊,麻烦一点,我们分开写吧,所以这是自定义,呃,实现自定义。的,呃,ES写入操作的。
09:03
这个叫啊,这个我就不详细写了啊,叫全称叫elastic search think方式对吧?我们在后边给大家实现出来就完事了,Public static class class,呃,My ES think function,哎,这里边我要去实现的接口是elastic search think function对吧?就这个东西啊,然后接下来里边本身它还需要有一个泛型,这个泛型是不是就是当前我的数据类型啊,对吧,Sensor reading啊。Sensor reading好把这个先写出来,这个一写的话,上边这个,诶大家看默认上边它是掉了一个什么呀,呃,就是就是后这这里边本身是一个new了这样一个function对吧?然后我们这里边其实是利用这个ES in function先创建了一个builder对吧,然后最后再调这个builder的build的方法,创建出一个elastic search s,这才是我们真正要的那个think function。
10:06
稍微有点绕啊,但是大家如果要是把这个数据数据结构这个梳理清楚的话,好像也没有什么太大问题啊,那这里边它真正那个think function要写入数据的时候,这个操作其实就是在这个ES think function里边的process方法是不是在这里面实现的呀?啊,所以接下来就是。呃,写入操作啊,怎么实现?那首先我们是不是要拿到想要写入的数据啊,写入数据是不是都在这个element里面啊,那这个就是看我们怎么去定义了啊,我们定义写入的数据大家知道在ES里边这个叫做data source对吧?Source啊呃,那么这里面我需要的那个数据类型呢?必须要包装成一个哈map或者是一个Jason object,必须有一个key value这样的一个一个数据类型,对吧?大家知道在那个ES里面写入的时候,Source是不是也是类似于一个key一个value那样的写那那样的写法啊啊,所以这里边大家可以自己去定义啊,你也可以把这个数据就完整的一下全写写进去了,对吧,就是当前一一个key,然后sensor reading to string直接写进去,但同样跟red里边写入的这个形式一样,你那么写的话就像日志一样了,看起来不舒服对吧?所以我是不是可以把它里边的字段一个一个摘出来,K86K86把它定义出。
11:28
出来啊,所以接下来我们写入的时候定义这样的一个哈希map。呃,那这里边我要这个哈希map呢,就是一个当然都是string了啊,都定义成string,到时候后面我们写入的时候方便一点啊。好,先把这个定义出来,我直接就叫做这个,呃,不要叫哈map了,我叫做data source吧,这是我们要写入的这个数据源,然后接下来data source就是往里边去put是不是就完事了,我们这里面不是有三个字段吗?对吧?ID从element里边去拿,Get当前的ID拿出来。
12:08
然后同样后边可以去put啊,现在还有这个,比方说我有那个时间戳对吧,还有那个温度值,比方说我现在先写那个温度值,温度值大家看我这个不需要跟那个。本身我我我对应的那个Java并那个类型里边的那个字段一样,对吧,这是我写入ES里面的那个K嘛,所以这个无所谓啊,比方说我就叫temp,那就element.get temperature,然后再to string就完事了啊,最后我还可以再put那个时间戳。呃,对应的那个我叫TS吧,element.get time stamp to string。我直接把它包装好,一条一条数据往里边写入啊,大家想我当前的这个写入的话。那是不是相当于就是每来一条数据都会写入啊,它会像之前ES里边去做更新吗?
13:06
这会更新吗?之前我们ES是按照K,就是当前的那个ID是不是来了之后直接就会覆盖直接更新啊,那当前我这样包装这个data source的话,会更新吗?这不会对吧,这就是一条数据对不对,那它并没有指定这个ID是K对吧?所以这里边就无所谓啊,我们这里边就是把所有数据都直接灌进去了,好,这是定义好的那个呃,S啊,那接下来我们就可以去定义,呃,就是创要写入的时候是不是要用那个,就是要要用一个那个index request,对吧,要创建一个request啊,创建。呃,请求请求。呃,作为。向ES发起的,呃,就命令对吧,写入命令发的这个请求就是我们的写入命令,所以接下来我其实要用这个啊,呃,ES给我们提供的这个requests这个啊,ES连客户端给我们提供的这个requests下边可以去创建一个。
14:17
大家看这个index request对吧,这是最基本的这样一个请求,这里边它可以去做一些配置,这里边我可以要大家可以看到啊,我是不是可以去,首先我我不是应该定义那个index啊,当然这个请求你得指定,首先我们那个连接的那个ES的服务器啊,对应的那个主机名,端口号我是知道了,那后面这个index是不是还不知道啊,最关键的是index啊,所以这里边我随便给一个比方说我叫啊sensor对吧。然后后面哎,大家看到这个ES6是不是还有这个type的指定啊,必须得有type对吧,ES7的话这个就启用了啊,那这里边我们还必须得指定,比方说我这个就叫呃,Reading data吧,这无所谓啊,大家随便给一个什么都可以,然后最后我再来一个source,指定当前的这个数据源。
15:08
这就是我们当前的这个请求啊,我把这个直接叫做一个index request,对吧,那最后一步是不是就是。用。这个index发送请求。这个indexor是不是上面我们参数里面就有了,就在这儿对吧,就用它。Indexor点啊,大家看直接ADD把它添加进来,是不是接下来就就可以直接发送出去,把这个index request直接发送出去了啊,所以这个其实非常简单啊,就这样啊三步对吧,首先包装好我们想要写入的数据,然后创建请求,然后最终发送,这就是这个ES啊执行的这个过程,好,那接下来我们来还是来测试一下吧,整个完整的流程我们都已经写好了啊呃,那如果要测试的话,接下来还是需要去。
16:03
这要去起一个ES了啊。呃,我我们直接找到对对应的这个目录,然后我们要起一个这个ES6啊,那如果要启动的时候,直接我用这个elastic search直接起起来就完事了,对吧?呃,那这边如果起起来的话,我这边就可以运行了啊呃,我我这边是没有用那个后台执行,所以说我可以另外再再开启一个,开启一个页面去发送对应的这个HTP请求啊,大家可能习惯是用一些其他的连接工具,用K巴纳对吧,去访连接这个ES去访问当前的数据,那我这里边其实也不用那么麻烦,大家知道HTP请求嘛,我是不是直接可以,就是你直接用浏览器是不是都可以访问啊,对吧?那更简单的就是我们在这个命令行里边,用这个Q命令是不是也可以直接去实现发送HP请求的方式,对吧?所以这里边我可以直接刻去看一下啊,当前的这个首先看一下local host9200对吧。
17:07
呃,首先我看一下,呃,我我是不是首先想看这个sensor这个呃,Topic下边有没有东西对吧,那我首先得看有没有sensor这个topic啊,我看一下这个cat一下cat一下当前的这个inexes对吧,In inex。好,我把这个做一个啊,现在没有任何的对应的这个topic对吧?哎,所以现在是什么东西都没有啊,所以接下来我们这个应该起起起来了,对吧?Started啊起起来了啊好,那接下来我把这个代码运行一下。运行代码,然后接下来看看能不能把当前的这个数据直接灌到对应的对应的那个index下边去。好,这边执行起来,我们就看一下这边的效果啊。
18:06
好,大家看到现在这个已经写完这个数据了,那接接下来我们看一下当前有没有,诶,现在果然多了一个这个叫做3S的index来,那现在我们可以看一下里边这个数据到底是什么呀,对吧?来,那还是发一个这个科命令啊,Local host9200啊,这个大家知道,就直接写这个对应的index对吧?Sensor啊,然后我可以直接做一个这个search search对吧?啊,那大家如果要是看的清楚一点的话,也可以用这个pretty做一做一个这个显示,大家看现在是不是就是所有的数据都可以查询到啊,啊但是如果要是说你希望去看得更清楚一点的话。也可以对,我们在浏览器里边直接去访问,是不是也一样啊,这里边大家看到这个浏览器里边直接访问啊,你看到总共这里边是不是total hits7对吧,总共有这么七条数据写入进去了,每一条是不是都有啊啊你看这个341的这个37.1341的35.8346对吧?呃,三四七三四十所有的数据都都写进去了,这就是我们呃往这个ES里边写入数据的一个过程。
我来说两句