00:00
在实际的应用场景里边,真正的流逝数据其实往往应该是无界的啊,就是数据是源源不断的到来的,我们并没有办法把它直接收集到一个文件里边去做统一的读取。那在这种场景下,我们又应该怎么做呢?啊,这种时候我们就需要保持一个监听的状态,我们在不停的捕获新到的事件,新来的数据,来一条数据就处理一次,执行一次我们的处理流程。那为了模拟这样的一个场景,我们现在就不要去从文件里边读取数据,而是去监听一个主机的某个端口,然后通过这个端口去发送数据。每来一个数据之后,我们去看一看怎么样在里边去做转换计算,得到我们想要的输出结果。我们可以用一个Linux自带的小工具net cat来进行一个模拟啊,那这个工具其实非常的简单啊,非常的好用,我们在代码里边呢,也只需要配置两个参数,就主机名和端口号啊,把这两个配置好之后,就可以去捕获一个socket文本流,读取出这样一个socket文本流来,去进行转换计算了啊,接下来我们在代码里面做一个具体的实现,还是去用一个Java class,目前我们是真正的流式处理,所以就叫stream。
01:29
那么具体的处理流程其实跟前面还是非常的类似。那这个过程第一步创建流式执行环境。Stream execution environment execution environment,把它叫做env,然后接下来这个操作就是但是读取文件,而是读取一个文本流,Socket文本流啊,通过socket去发送数据,那这里边我们可以env去调用一个,我们看到内置有一个方法就叫做socket string。
02:08
啊,那这里边我们需要配置两个参数,简单的啊,就传两个参数,一个是hostname,一个string类型的host nameme,一个int类型的port,端口号,主机名端口号啊所以其实使用也非常简单,比方说现在我们就从我们可以起一个虚拟机哈,102,然后在这个里边通过net cat这样一个工具去发送数据,那么这里边。发送数据的过程啊,Net k使用的时候也非常简单,就直接NC啊,Net k的一个简写,然后杠LK表示保持当前的连接,持续去做监听,那比方说这里边我们端口号就指定成7777啊,一般这个端口号不会跟其他的应用去冲突,只要用一个没有被占用的端口号,然后这样启动就可以在这里去发送数据了。我们现在在代码里面要做的呢,其实就是监听当前的这一个啊,发送数据的socket的文本流的服务器啊,那所以这里面我们要给的就是哈杜1021777把它读取出来,那这里边我们得到的一个data stream。
03:19
也可以把它叫做line stream啊,我们知道data stream source本身也是继承自data stream的嘛,直接叫landed stream是完全没有问题的,那在之后的操作我们就想到了,跟之前的有界流的处理完全一样啊,所以这里边我们可以直接把它copy过来。直接copy过来,但这里面我们把这个需要做一个更改,Line three,然后下面还是先做一个Fla map,把它转换成二元组类型,诶,Returns指定返回的类型,原元组里边第一个元素是string,第二个元素是长整型,然后呢在KBY按照第一个元素word去做一个分组,后边再对第二个元素这个长整形的一去进行一个求和计算,最后得到的结果我们可以打印输出。
04:17
最后不要忘记的,需要有一个启动执行的过程,上面为了让它正常运行啊,需要throw exception把异常抛出,那这样的话整个过程就处理完毕了。接下来我们可以运行一下,看一看跟之前的有界流有什么区别。我们直接运行。当然这个前提是你必须要把这个neca要启动,因为这里如果要是没有对应的连接的话。我们在这儿直接想要去连接到这样一个端口,直接就会报错了啊,就会报这个连接异常啊,我们可以看到启动之后这里没有任何的输出,看起来好像卡在这里了。
05:00
这是正常的吗?啊,当然是没有问题的,因为我们现在并不是读取文件,我们读取这一个哈比102的771端口,读取S文本流数据,现在还没有任何数据输入呢。哎,所以如果我们想看到结果的话,那需要在这里做一个数据的输入,比方说hello word输入一条数据,这是一行数据,我们看一下,现在这里就输出了。HELLO1和WORD1。这看起来真的就是按照我们的顺序来输入了,因为我们现在的输入中间是隔了一段时间的,哎,那后面的数据是没有办法直接去并行插到前面去的啊,那所以现在我们就看到了哈1WORD1。然后接下来我们再输入一条hellolink。我们就可以看到接下来输出了FLINK1和哈二啊,这个就是在之前哈一的基础上直接叠加,然后变成了二,而且它前面的这一个数字五都是不变的,说明我们是把它分分配到同一个并行的分区子任务上去做执行的,然后这里边大家可以看到就前面是HELLO1 word1,后边呢,哈变放到后边了。
06:21
因为我们知道当前的这两个数据还是并行的啊,所以他俩的顺序其实谁前谁后是没有关系的啊,这样的话就非常好理解,可以知道他最后执行的结果了。同样,然后如果我们再来一个hello Java的话。得到的结果跟我们之前是完全一样。就有介流里边我们最终输出的结果,不不也就是这样吗?Hello 123,然后wordlink Java各自有一个输出啊,只不过我们可以看到这个具体流处理的过程,就是输入一条数据,这里就输出,不一定是输出一条,我们这里边做的拆分分词,那就是会有一一组对应的输出,在之前结果的基础上去做更新,聚合统计。
07:08
这就是我们对于无界的流失数据进行处理的结果啊,当然了,这里大家也可以看到,如果我们这里不再输入数据的话,这里就不会有结果输出,不会再更新,但是这个程序并没有运行结束,这里边我们还是在一直运行的一个状态,如果我们这里手动去把它停止的话,它才会停止,所以看到这就是事件触发的一个程序的状态啊,相当于我们这里是启动了一个监听的事件。只要有数据来这里就会进行响应,它一直是一种挂起的状态,如果我们想要停止的话,必须要手动去对它进行撤销,对他进行终止,这样的话才能退出程序。啊,这是我们基本的一个测试,当然了,这里我们是把这个主机名和端口号直接写死在这里了,在实际项目应用当中我们会知道啊,一般不会以这个hard code的形式直接写死,那往往应用的方式就是要不从配置文件里边去读取,或者呢,我们也可以把它直接设置成当前程序执行时候的配置参数啊,就是我们以命令行参数的形式来进行这个主机名和端口号相关的一个配置啊,那这种方式我们可以看一看,又应该去怎么去做呢?
08:28
我们这里可以。从参数中。提取。普鸡鸣和。听口号。里边就用到了一个。工具叫做per two,这是flink给我们单独提供的用来呃,从当前这个main方法的这个AX里边去提取参数的一个工具啊,那这里边它有一个方法就叫做from a。
09:01
我们把当前的参数X进来,那得到的这个东西本身就是一个ET to,我们就把它写在这里就可以了啊,那从这里边提取怎么去提呢?啊,首先我们希望得到一个string类型啊。How's the name。那就直接用permeter to给大家看,这里边其实就是非常简单啊,当前传进来的as直接都解析成了一个一个的键值,对,所以类似于这里边就是保存了一个哈希map一样啊,就是类似于一个映射啊,所以我们这里可以直接get,比方说我们当前参数就把它叫做host吧,啊知道在这个Java法运行的时候,我们前面可以加这个框告啊,然后加一个这个参数名称给对应的值,相当于就是一个key个value,所以我们把这个参数host nameme的这个名字啊,参数名叫做host。我们给的值当然就应该是度比102了啊,然后另外还应该有一个啊integer类型啊port。
10:05
同样to,我们可以看到它可以直接get in,这的话返回的就直接是一个int类型,就不需要再去做具体的处理了。要再去转换了啊,那这样的话我们就可以得到了hostname和port,以这个来做一个替代了hostname。和port,那在运行的过程当中,这个又应该怎么样去做处理呢?啊,这个可能会稍微的麻烦一点,在当前的这个代码里边,我们执行的时候需要去配置一下。配置项啊,就是当前的这个configuration需要去设置一下,我们看到这里有program arguments在这儿。一定,比方说host。102。On god。一起七七啊,我们把它apply一下,然后OK。
11:04
这样去运行的话,那就和刚才的状态完全一样了,我们是从外部参数里边把对应的主机名和端口号进行了传入啊,大家看现在已经启动起来了,那接下来我们可以还是啊,在这里可以收输入。我们可以看到里边就输出了HELLO1WORD1啊,那如果这里再给一个hello flink的话。当然就是1HELLO2啊,同样有对应的输出,就是关于无介流进行流式处理用。Link的data stream API进行处理计算的完整的流程。
我来说两句