00:00
接下来我们给大家演示一下弗Li框架和ES框架的一个集成。首先什么是Li框架呢?它跟咱们之前演示的那个Spark框架其实差不多,他们都是大数据的数据分析引擎,只不过呢,我们的flink更倾向于实时数据处理。在5G时代到来的今天,我们数据的充斥着我们生活的方方面面,那么对实时数据的采集、存储、分析的需求呢,就尤为迫切,所以啊,我们这个框架呢,其实非常强大,那我们就需要将ES跟这个框架做集成,因为本身ES呢就是一个准时,时近实时的一个数据存储啊,分析框架对不对相集成效果会更好,那我们这里演示的呢,其实还是福利框架做处理之后将结果存储到我们的ES当中,哎,咱们讲下这个事情,首先第一个还是框架集成,我们先创建命运项来。按照我们的要求呢,我们这里呢,就跟之前一样就可以了,来点击六。Module,然后呢,选择下一步我们这个地方呢,来选择那然后给它写上一个叫ES,我们叫link。
01:01
好完之后把这个去掉。那么这里呢,我们选择来我们叫com.at硅谷,点ES就可以了,就的去掉,然后呢,点击完成。题完成之后,我们同样要增加依赖价包,把依赖的价包呢给它准备好来,那么下面呢,我们这边已经事先准备好了,来拷贝。拷贝之后。过来。啊,那你会发现呢,里面也有杰森呢,也有弗link的一些相关的连接器,对吧,它都有专门的,咱们这个版本呢,可能稍微差一些,但是没有关系,它是可以通用的啊好,那我们刷新一下,保证我们的依赖关系没有问题,咱们接下来呢,给大家实现一下这个功能啊,首先把类名呢咱们拷贝。拷贝之后呢,我们放到这边,我们点一下点。然后在这里呢,我们点击new创建。然后呢,我们。com艾特硅谷。At硅谷第S2A,我们拿过来,然后呢给它说明一个类,不明类之后呢,在里面我们添加main方法就可以了,嗯,好,我们首先先构建我们flink的环境啊,或者叫环境对象吧。
02:07
好,那么这里面会有一个我们叫做stream啊,我们的execution,我们的environment,这里呢,我们选择有两个啊,咱们有两个,咱们选择这个。然后点我们这里呢,叫做create啊,或者叫get都可以啊来。获取之后呢,我们这里直接给它来返回一个变量,嗯,可以了,咱们简称的叫env。啊,写完之后呢,那么我们接下来呢,我们往下来往下来,下面呢,我们要准备呢,给这个环境啊,咱们要去给他监听一下咱们的网络数据流叫socket。然后写上我们叫做local host,然后写上四个九。点完之后点一下VR回撤,那你现在拿到的就是一个three box,一个数据源呢,嗯,这个我就改个名,我们就叫source就可以了啊来。你拿到数据源之后,那么这里呢,我们需要干嘛呢?进行我们的操作,去完成我们的数据的采集,以及数据的输出,所以我们这里呢,其实是有一个什么呢?Think的概念,大家可以看到来。
03:07
我们这边呢,我们写上点,它有一个叫I think。这think呢,你可以简单的理解为就是数据的目的地啊,啊,数据和输出,咱们叫数据的输出,对吧?哎,咱们叫输出,而我们这个呢,你可以简单的理解为就是数据的输入啊,咱们叫做shops,咱们叫数据的输入,好了,一个输入的输出,那这个输出得有个S呀,这呢,其实我们的ES有个专门的builder来进行构建,所以我们写上啊,因为我们这个集成嘛,那么要用一些集成的一些类,嗯,咱们叫做什么呢?叫使用我们的ES的builder。Builder来构建输出,构建输出好了,那这个输出我们用什么呢?咱们叫做ES think builder,这样我们这里要需要去写上一下啊,来这里呢,我们去来写上一个它我们叫做new。咱们叫search啊as search呢它里面,诶把这个去掉,我们写上叫做S,那它有个think think呢有个builder,把这个builder给它拿过来啊来。
04:08
好了,那么你拿过来以后啊,那接下来我们在里面要添加参数了啊,那这个参数呢,我们来看都有什么,来里面呢,就会有一个叫HTTP啊,有一个集合,说白了就是你连接哪一个服务器啊,对吧,那我这里准备一下。咱们叫list,然后呢间括号,你们叫的HTTP,咱们叫host,好,我们写上去叫host吧,改S啊这个集合嘛,New。好了,改完之后把我们的这个呢,给它来导一下,嗯,倒一下,然后把这个呢,咱们也倒一下。再倒一下啊。啊,写完之后,你把这个house呢放进去就可以了,这是第一个,二个呢,就是说你这个数据是怎么处理的,它怎么就能够往我们的ES里面放,你有一个builder吗?这个build就是用来构建输出,那你怎么输出的呢?来,咱们接着往下,下面的这个地方呢,它需要传一个什么呢?叫做function函数类,那函数类或者函数对象,那这里呢,咱们直接给它拗了啊来。
05:08
咱们直接new,我们就叫做什么呢?Electric search,然后呢,这里面有个function,哎,直接写就行了,因为呢,我们从网络当中获取的数据呢,都是一行一行的字符串,这个泛型呢,我们给它来一个string就可以啊好点完以后,那么我们这个方式你点一下,点它其实是个interface。那么接口怎么可能直接构建对象呢?不可以,但是你可以采用匿名内部类,或者叫匿名实现类,对不对?那好,这里我们来,诶,给他来重写它的方法,那么你重写它的方法的话,那么其中啊,里面有什么open啊,什么close呀,包括什么process呀,把这些方法我们可以重写了,你重写之后,那其实就可以完成它的操作,对吗?哎,就这意思,除了这个以外,我们还可以有别的啊,比方说来,咱们再来,嗯。看一下它的方法当中还有一些别的方法啊,咱们这就不管它了啊,所以不管它以后呢,我们继续呢,我们往下啊来往下这叫process,它来处理我们的请求,那么首先啊,我们在这个里面呢,我们就写上,大家看它里面都拿到了什么东西呢?第一个我们的这个S就是你获取的数据,这个呢是我们的request啊index。
06:16
这看着名字,这个不就是我们所谓的什么我们请求的索引吗?对不对,所以把这个拷贝拷贝拿过来,我们点啊点咱们叫ADD,你会发现什么东西。这个不就是我们的索引请求吗?我们还记得我们之前给大家讲Java API的时候,是不是就有这个请求,对不对,哎,就有啊,那有的话这不就简单了吗,对不对,所以呢,我们这里来啊,咱们来。都要给它构建一个啊,咱们叫做request,哎,我们request点,那它有现成的嘛,我们这里就叫index request。构建一个我们的索引请求,你们构建索引请求以后呢,我这里为了简单方便啊,所以我们点为A2回。这以后把这个拷贝,然后呢,放进去,放进去以后这里来我们改一下,这里呢,我们点一下啊,咱们叫index,给它起个名啊,给它起一个名啊,咱们就叫做什么呢,叫做嗯,Flink咱们的index吧,起这么个名。
07:13
这个就比较直观了啊,然后呢,接下来我们干嘛呢,给它来一个咱们叫做点我们的他咱们叫做S。啊,数据嘛啊,我们的source,这source呢,我就想啊,那就拿一个什么呢,我们的这个map吧,因为这个source可以传一个map,它可以传map的话,我就准备一个map不就完了吗?诶我们写上一个map啊,咱们写上一个map,咱们叫做string string。咱们叫杰森map,等于new哈希map,好,写完以后你把这个杰森map放进去。你放进去以后,那这样的话,它会把这个map呢,在底层呢,自动生成接身字符串,你就不用管它了,诶我们就这么写,点完之后,那你这个接身map里面写啥呢?太简单了,那咱们来咱们点一下打完以后,我就随便给这个数据,把你从网络当中获取的数据S给它存到这个这个属性当中不就可以了吗?对不对,或者呢,来咱们再拷贝,拷贝以后呢,点我们叫ID。
08:11
然后写上我们什么呢?我们叫做嗯,咱们叫做9001吧,嗯,好了。改完了之后,那你的请求也有了,你什么都有了,那么你执行不就完成了,对不对,哎,所以啊,这个操作就算是结束了,那么结束了以后,咱们得到的这个builder啊,那来吧,咱们叫做什么呢?咱们叫做builder啊嗯,那这个呢,我看看啊,咱们拷贝一下,拷贝拷贝以后呢,我这是不是应该写上一个。看看,把这个去掉,我们写它,咱们加上一个string,我们就叫做ES build,我觉得就可以啊,等于它好了,写完以后这个string呢,咱们拿过来,而这个builder我们需要在后面这个位置呢,给它点一下好点,我们叫做build,你有了一个builder的build的数据啊,我们后面去执行它不就OK了吗?对不对,对于我们来执行。
09:03
咱们叫执行我们的操作,这个时候呢,Env,咱们叫recu,然后呢,给他一个作业的名称,我们就叫link,咱们的ES就可以了。写完了我们的执行之后呢,我们前面还需要把我们的这个host给它加上啊,起码呢,你得知道我们连谁吧,对不对,那所以呢,我们at,嗯,At之后呢,我们new一个啊HTTP咱们叫host,这个咱们之前不都写过嘛,对吧,然后写上我们的local host本机,然后呢,9200,然后呢,访问的是我们HTB协议好了。咱们这边的NC呢,其实已经启动了,但是呢,有一个细节什么呢?我们默认情况下,它其实是支持批处理的,那么就意味着如果你只是简单的想去演示一下,那么你这样的话,他可能是看不出效果来的,为什么?因为他有批处理的概念,所以啊,咱们这个builder啊,咱们需要在执行之前一个简单的操作就是来一条我们就处理一条,来一条处理一条。
10:04
对啊,它有这么一个操作啊,咱们说一下这就比较特出了啊,这是为了演示啊,实际上肯定不是这样的,对吧,因为你实际上数据量多嘛,它很快就能够输出了,但我们这就一条嘛,对吧?好,那我现在呢来开始,我们这边已经启动之后,我们现在运行。运行之后看效果啊来,嗯,稍微等一下。等一下完成以后呢,大家可以看到我们在当前的操作当中,它已经开始执行了,那现在呢,我们就写上一个我们的AA好回车,回车以后大家可以看到它下面呢,就开始往里面增加数据了,没有问题吧,那么行,我们试一试,咱们就来9009001,然后呢,我们点击send查询,大家可以看到我们的数据是不是已经出来了,哎,咱们就简单点演示就可以了,那如果你的数据量输的多的话,你这个地方就可以做循环处理了嘛,对不对,这个我觉得没有问题啊,好了,我们辅另一个框架的集成就到这里了。
我来说两句