00:00
我们已经了解了处理函数process function,我们知道它是flink底层的API,那这一部分内容掌握了之后呢,结合之前介绍的API,我们就可以处理日常的需求了,基本上所有的处理转换我们都能搞定,不过呢,这一部分还是局限在一条流的转换计算,那假如出现了很多条流的转换计算,又应该怎么应对呢?啊,所以接下来呢,我们要介绍的就是另外一部分内容。第八章多流转换啊,那如果简单划分一下的话,多流转换其实可以分成两大类,那就是到底是把一条流拆成多条流,还是把多条流合成一条流,这就是所谓的分流和河流操作。首先我们来介绍一下分流操作,分流操作啊,最简单的方式其实我们知道就是前面我们不是已经得到一条data stream吗?那接下来我就干脆基于这个data stream调一次API啊,我就调用比方说map flat map filter各种调用,然后呢,这一部分操作定义完了之后,再去基于之前的data stream调用一系列的data API进行转换,诶,那就相当于我们又定义了一段流处理嘛。
01:17
得到的结果当然就是另外一条流了,哎,所以我们基于同一条流可以定义不同的操作处理流程,那这样的话就相当于把一条流分成了多条流,就岔开了,所得到的数据流呢,每一条流都是完全平等的。当然了,如果说我们做了map flat map这样的转换的话,那看起来这个并不是一个简单的分流,而是定义了不同的转换操作,那如果说我们只是想把原始的数据流分开的话,那可以定义一些筛选条件啊,所以一个最简单的方式就是直接用一个filter。Filter算子,哎,那我们根据某一个条件,比方说我们现在的数据流里边啊,根据user信息做一个筛选,如果是Mary这个用户点击的,我们就把它放在data stream1里边,如果是Bob点击的,我们放在DATA2里边,其他用户点击的我们放在data stream3里边,啊那接下来呢,我们可以在代码里边做一个这样的简单实现。
02:19
现在是新的一张,所以我们还是去新建一个package,这个叫做CHAPTER08。我们现在要测试的是分流操作。Object split strip test。那方法先写出来,基本的流程当然还是一样了,Stream execution environment,先把它get获取到当前,这个我们还是叫做烟V,上面记得要改成下划线。同样我们还是为了方便测试全局的并行度,先设成一,接下来读取数据源at source,我们还是直接用自己定义好的click source不停的生成测试数据就可以了,那得到这个我们先叫做stream吧。接下来呢,就是把它。
03:07
按照。不同用户的点击事件。进行分流操作。做一个筛选啊,那所以这里面最简单的方式就是不停的基于当前的stream去调用filter方法,那这里边的filter条件,那就是对应的用户嘛,所以这里面比方说user。假如说user就是marry的话,好,那那么比方说我们把它叫做,这叫做marry stream。或者叫做一同样道理。给入不同的筛选条件。比方说这里是Bob啊,那这里就可以换一个叫做bobre,那如果是其他人呢?剩下的情况我们就直接叫做else stream啊,那当然了,它的筛选条件就应该是user,既不等于marry,也不等于Bo。
04:04
那这个参数如果要出现两次的话,我们知道下划线就不能直接去使用了,得完整的写一个拉姆达表达式。data.user。不等于marry。并且得点user不等于ball。这就是我们完整的处理逻辑啊,所以接下来的话,我们就可以把这分开的每一条流去做一个打印输出,看一看里边的信息到底是什么样,这个是Mary这条流的信息,那Bob stream也可以做一个打印。主数是Bob,另外还有elsere。这里输出是else。最后enve执行起来啊,接下来我们就可以运行一下,看一看得到的效果到底是怎么样的。我们每秒钟会生成一条数据啊,那我们看是else,然后有Mary的数据,Alice丝,我们看到这个爱ice丝对应的用户生成的就是在else这条流里边啊,啊,那Mar呢,对应的就是Mar这条流里的,Bob的数据,当然就是Bob这条流里了,Carry这个用户啊,那同样也是在else这条流,这就实现了一个分流的操作,把我们的一条流一分为三。
05:20
这是最简单的一个实现,直接使用filter进行分流。
我来说两句