00:00
前面我们已经实现了一个有借流的,我out,整个这个处理过程呢,其实还是把所有的数据先放在一个文本文件里面啊,我们相当于就是逐行的读取这个文本文件里边的数据,当成流逝的数据,源源不断的一条一条来来一条就处理一个统计结果进行输出,但是我们会想到啊,在实际的生产环境里边,数据不应该是一个有界的形式。没有办法把数据都放在一个文件里边统一读进来,那真正的生产数据应该是源源不断的产生,源源不断的到来,那是真正意义上的流数据啊,那就是我们所说的无界流了。诶,这就需要我们不能简单的读取文件,而是要保持一个监听事件的状态啊,简单来讲,我们可以用这种方式来模拟,就是直接去监听发送数据的那台主机上的某一个端口,然后这台主机呢,可能就是通过这个端口来发送数据,我们只要监听它每来一条,这里就立刻做出统计判断,得到对应的结果输出就可以了。
01:07
所以这个过程我们就会发现啊,不应该是像之前这样运行完毕之后,最后会结束退出,我们整个这个流处理程序应该是永远不会退出的,就像一个挂起的程序一样,不停的保持监听的状态,每当有数据到来的时候,我们就立刻统计对应的结果。那具体的处理过程呢?其实跟前面有介流的处理基本上是完全一样的,我们只要把这个代码稍微做一点更改,只要把数据源的读取从文本文件换成一个流式的数据源,从某一个主机的端口上去读取数据,这样的话,我们所处理的就是一个真正意义上的无界流了啊,或者说这是我们真正意义上流处理程序的样子。所以接下来呢,我们还是在当前的包下边新建一个scla的object,一个单利对象啊,那么当前既然是一个真正意义上的流处理程序,那我们就不用区分了,直接就叫做stream worldout就可以了。
02:06
那整个的处理流程,我们知道在main函数里面整体还是一样的,我们可以直接把之前的内容做一个复制,全部先复制过来。哎,当然了,Stream execution environment啊,执行环境要引入,为了方便同时引入后边所需要的转换,我们在这里直接把它改成下划线,然后接下来呢,我们所要改的其实就是第二步读取数据的方式,我们不再是读取文本文件数据了,而是读取一个真正的流,诶,那我们所说的啊,读取某一个数据发送主机的某个端口,那这种方式呢,这就是非常经典的网络传输socket文本流的方式,所以我们当前读取的其实就是一个socket文本流数据。有了这个基本概念之后,诶,那后面我们只要把这个read text file改成读取一个文本流不就可以了吗?诶,那所以我们就想到只要调用这个方法,Socket text stream,因为我们要读取的就是socket文本流嘛,哎,所以我们当前直接调用socket text stream方法里边要传入的参数,当然也不是一个简单的文件路径了,这里边我们可以看到它所需要的参数是,诶,最简单的啊,我们看到后面两个参数,它是可以有默认值的,默认情况下,如果我们这些不改的话,其实只要两个核心参数就够了。
03:28
一个host name,一个port端口,哎,所以其实就是要指定连接监听哪个主机的哪个端口啊,这里边我们随便来指定一下吧,比如说像我们之前如果要是做过大数据相关的一些集群搭建的话,可能我们呃有一堆这个用于哈杜op相关集群的测试啊,我们之前就已经有这样的一组机器,比方说我这里命名的方式就是哈杜普100,哈杜101哈杜102,比方说我这里就用一个哈杜OP102吧。那我们就给一个主机名叫做哈杜102。
04:03
后面给一个端口号,我们随便给一个,只要是平常不去占用的端口就可以,比方说我随便给一个7777。这样就可以把当前这个主机开放的7777这个端口发送过来的数据接收到,然后保存成我们这里面data stream,接下来进行转换好,那后面的流程都是完全一样的,首先我们Fla map按照空格做一个分词打散,然后呢把它map成转换成一个二元组,接下来啊,那就是我们按照当前的word进行分组,然后再对后边的数量做一个sum统计,最后打印输出,不要忘记最后还有一个执行任务的过程,诶那这个就看的很明显啊,就是前面我们其实只是定义了每一步转换计算的操作。那事实上呢,我们其实这个程序执行只是先把这个监听程序跑起来了,放在这里了,接下来呢,真正意义上的数据是要在这个哈杜102,它通过它的7777端口进行socket文本流的传输,在那边输入一条数据,我们这边就统计一条。
05:09
所以这个代码其实非常的简单。那现在既然这个代码已经写完了,我们就可以做一个简单的测试了啊,那这个测试呢,需要注意的是不能在这里直接去进行运行启动啊,因为我们会想到当前我们是要从哈杜102的777端口去接收数据,那现在呢,诶,我们哈杜普102上并没有对应的这样一个网络服务啊,没有办法发出数据,那根本这个端口连都连不上,那当然这边启动就会失败,所以首先我们应该先到韩度102这台机器上。啊,那我现在就到当前我们已经创建好的这一组哈杜普集群上面,啊,在哈杜102这台机器上直接去创建一个对应的发送骚的文本流数据的这样一个服务,诶,那用什么方式去发送呢?诶这个其实非常简单,我们知道在Linux环境下边有一个自带的小工具叫做net cat。
06:05
那对应的命令呢,其实非常简单,就是一个NC命令,只要敲这个命令就可以创建一个socket文本流的服务端啊,那接下来我们就可以简单的输入数据去发送数据了,使用也非常简单,我们就在这里直接敲NC,然后后面可以加一个参数,比如说我这里跟上杠LK啊,这里L表示的就是要监听哪个端口,K的话就是K,就是要一直保持我们当前的连接状态啊,那后面呢,跟着的就是当前监听的端口号。比如说我们这里是7777,只要这样启动就可以了,这个起来之后,接下来我们就可以在这里运行当前的程序。我们直接做一个运行。运行起来之后我们会发现啊,跟之前我们做的有界流的测试读取文本文件不一样,当前呢,并不会输出任何的结果,其实这个很好理解,因为我们当前flink这里边,我们只是跑了一个程序在这儿监听,现在根本还没有数据啊,我们的数据是一定要在哈杜102这台机器上,在这里去输入,那边才会有对应的输出,哎,所以接下来呢,我们可以从这里直接去敲一段文字啊,比方说我们还是敲一个hello word。
07:17
去做一个当前文字的发送,然后我们再回到当前的测试页面来,现在的话就看到对应的有输出的统计数据了,我们输入了一行hello word,那么现在统计出来的就是HELLO1和WORD1啊,那当然了,后面如果说我们继续输出的话,那当然这里可以再来哈flink,那么接下来这里就会输出FLINK1和HELLO2,诶那这里为什么前面是先输出哈,后面就是先输出flink呢?为什么这个不按顺序来呢?诶,那其实我们知道当前哈和flink,它其实本身这里是两个词,打散之后,后边我们做统计,那就又是并行去执行了,所以当然他们的先后也是不以我们输入的顺序为标准的啊,有可能是打乱的,有可能是乱序,那这里前面呢,同样还有对应的一个数字小标。
08:12
我们就知道了,这也是当前并行计算的一个子任务的编号,哎,可以认为就是在并行执行任务的时候,哪一个线程输出了当前的这个统计结果啊,那这里边我们再来解决另外一个问题,就是当前这个数字到底应该是从几到几呢?啊,其实如果我们后边测试更多数据的话会发现啊,这里我的电脑在输出的时候,它其实就是只会输出一到16的数字。这是为什么呢?哎,这主要就是我们现在不是一个多线程做了一个并行执行吗?这就是跟我们当前并行执行的线程数有关的,那这个线程数我们默认到底这个并行的程度是多少呢?我当前没有做任何的设置,那在集成开发环境里边,我们在本地运行的话,默认的并行的程度就是以我当前的CPU的核心数量来做为基准的。
09:07
啊,那我当前这个CPU是16核,那所以呢,我们当前这个并行的程度默认就是16,也就是说如果说我这里不做特别的指定的话,输入足够多的数据测试,我们就会发现,诶,前面可能就会从一到16,所有的编号都会占据。哎,这这里其实也涉及到了一个概念,就是当前并行计算的这种程度,多线程到底占用几个线程,在弗link当中是有一个专门的概念的,就叫做。并行度。表示当前一个任务需要有几个并行的子任务来共同完成,那之后呢,我们还会再去展开讲解相关的概念啊,那这段程序呢,如果在实际应用当中我们会发现啊,一般我们不会把这个host name,主机名和端口号直接写死在这里,如果是在生产实际当中的话啊,对应的这些参数都应该是。
10:05
读取配置文件,或者说我们在执行当前任务的时候给一个指定参数,诶,那我们这里边可以简单的介绍一下读取配置文件,我们就不说了啊,那就是写在另外一个文件里边去做配置读取,那如果说我们想要从运行当前程序的参数里边读取,又怎么做呢?这个也简单。那其实我们知道啊,对于Java或者是scla的代码而言,我们在运行的时候都可以在wrong,诶,这个下面有一个编辑当前的configurations当前的配置啊,那我们可以直接给它加一些运行时的参数,所以我们完全就可以把当前的主机名和端口号作为运行参数配置在外啊,那所以在代码里边我们又应该怎么读取呢?呃,这个也比较简单,对于flink而言呢,还专门给我们提供了一个工具,就叫做。Parameter two。诶,我们看到就是所谓的参数工具,这是flink API Java u下边给我们提供的这个东西,我们可以直接把它引入,然后接下来,诶,那就可以直接from us,从当前的参数里边去读取信息,那对应的参数到底是从哪传进来的呢?其实很简单,就是我们当前main函数本身自带的X参数嘛,我们所传的所有参数都会放在这个string类型的数组里边啊,所以接下来呢,我们就直接from us。
11:33
然后从这里边读取就可以了,这是一个parameter two,我们先把它放在这里。然后接下来呢,当然就是要从这一个工具里边去提取我们想要的具体的主机名和端口号了,也非常的简单啊,那我们这里边就直接parameter to.get。如果我们想要get某一个比方说啊,我们指定的主机名就叫做host nameme,或者简单写一点啊,我们直接就叫host get到的host,我们就把它这个字符串啊,就叫做host name,哎,那当然了,后边对应的我们还可以有一个port,这个port应该是一个int类型,所以呢,我们就parameter two直接可以去get in,这样也是可以。
12:14
那对应的这个字段的K呢,我们就叫做port就可以了,那后边我们再要去指定主机名和端口号的时候,那就应该要把它做一个更改,不要直接写死,而是。把前面从参数里边提取出来的host name和port写在这里就可以。当然了,现在如果说我们想要重新执行的话,那就应该要在这里做一个额外的配置了,我们知道呃,对应的这个参数都是杠杠作为K的标志的啊,那么接下来就是杠杠host,我们指定的是哈杜102,然后杠杠。Port指定的是7777,我们可以把它应用一下,然后接下来重新运行。好,那接下来同样我们还是连接到哈杜1027777端口,接下来呢,我们重新输入一下当前的这些hello word重新输一遍,我们看现在又输出了WORD1和哈一啊,当然之前我们输出的是HELLO1WORD12,因为这个并行执行先后顺序是没准的啊,这个是比较随机的啊,同样我们后面可以输入hello flink和hello。
13:21
Scla,哎,那接下来我们就可以看到输出了,FLINK1HELLO2和LA1HELLO3。我们看到输入一个数据就得到一个数据,这就是所谓的真正意义上流出。
我来说两句