00:00
我们已经了解了flink处理迟到数据的三重保证,那接下来呢,我们就可以在代码当中做一个实现了啊,那前面我们做过这样一个例子啊,就是统计每个页面URL对应的访问频次啊,那我们是按照十秒钟一个窗口去统计它的访问频次的,而且把输出的结果包装成了一个样例类,叫做URL view count,里边包含了当前的URL信息,它的访问的频次count值,另外还有窗口的信息,起始时间和结束时间,我们把这个例子可以做一个改进,我们增加对于迟到数据的处理。所以呢,我们可以把这一段处理的代码直接复制一下,新建一个object。当前我们就叫做process data example。啊,那当然了,这里边所有的内容都要放进来,然后对应的影视转换我们也要引入。接下来呢,呃,这里我们调用的这两个增量聚合函数和全窗口函数呢,也可以直接使用之前在例子里边已经实现过的。
01:06
那这里我们所要做的更改呢?其实是数据源,这里我们就不要是一个按照时间顺序排好序的升序流了,我们要定义一个乱序流啊,那当然了,这个最简单的方式还是像我们之前测watermark一样啊啊,直接使用一个NC,然后读取一个soet文本流,诶这种方式是最为直接最为简单的测试方法。所以我们在这儿直接把它定义出来,把event引入啊,另外后边我们还要指定water mark生成策略啊,那就是使用这里的乱去生成策略。Copy过来。有了这一步操作之后得到的STEM啊,接下来我们就可以按照URL去进行分组,然后开窗口,接下来进行聚合统计,那如果要完整的处理迟到数据的话,很显然前面我们这里设置了一个watermark的延迟时间,这只是第一重保证,接下来呢,我们还得有第二重和第三重保证,那这两步操作。
02:06
都是在窗口分配器和窗口函数之间基于window stream去调用的啊,相当于是其他window API可选API,那这里面首先我们可以指定。指定。窗口允许等待的时间啊,那这里我们就要调一个loud lateness里边要传入一个time,当然了,我们这里引入的还是stream API window time.time哎,我们这里可以直接给一个比方说一分钟LI1,然后最后我们还有一个兜底的方法,那就是。将迟到。数据。输出到侧输出六。这里调用的方法是side output late data,然后这里边我们要传什么参数呢?哎,我们会发现里边需要的是一个output tag,这又是个什么东西呢?看起来就像一个输出标签啊,那这个标签呢,本身其实就是一个测输出流的唯一标志,哎,那所以我们定义的时候怎么定义呢?哎,就可以直接在这里啊定义一个泛型,然后后边给一个对应的名称,相当于它的这个ID一样,只要放进去写进去就可以了啊,那所以我们这里边也可以就用这样的一个例子啊,直接用官方给我们举的这个例子来做一个实现。
03:26
在上面我们首先定义出这样的一个测输出流标签。测输出流的。输出标签。然后在下边就可以把对应的这个标签传进去了啊,那当然了,这里边我们本身这儿定义成了它是一个string类型的标签啊,那我们现在如果说迟到的数据都是event类型的数据,要放到测输出流里边去的话,那显然测输出流的标签它指定的也应该是event啊,就是你要放什么样的数据,当前的这个流是什么类型的,你就指定什么样的类型。
04:01
所以这样定义好了之后,接下来我们就可以进行测试了,那当然了,我们还应该多打印一些信息,因为当前本身这个主流里边想要输出的就是我们当前窗口的聚合结果,我们并没有办法看到到底是哪一条数据来了,也没有办法看到我们当前的测试物流里边的数据到底收集到了没有,哎,那所以这里边首先啊,我们可以把原始的这个stream直接做一个print打印。我们可以把这个就叫做input data,然后接下来呢,我们还可以将侧输出流到数据进行打印输出。那这个特殊里面的数据到底应该怎么样去获取呢?这其实还需要调另外一个方法啊,那这个方法呢,是基于我们已经聚合得到的那个data stream来进行调用,这里边的测输出流,我们可以认为它就相当于一个支流一样啊,就是当前我们有一个主流在进行转换计算。
05:00
我们通过当前的窗口聚合操作之后得到的数据就在这个主流里边,然后这个测试物流呢,诶,就是从里边差出了一条流,所以我们可以从这个主流里边调用一个方法去捕获到差出来的这个分支,那对应的这个方法呢,就叫做,诶,这个需要点到data streamam里边调用它的方法,那就是get。Side output就有这样一个方法,这就传入一个测输出流的标签,然后就可以得到当前主流差出来的那个分支,就是当前的测数主流,得到的当然也就是一个data stream。这就是我们具体的一个调用的形式,那当然了,在这里的话,我们如果想要得到这个东西,那应该是在这儿就把这个流要单独的定义一下了,我们把这个叫做结果流吧,Result结果流。然后当然后边我们首先可以把这个结果先做一个打印输出,这个叫做result。然后下边我们再将测输出流做一个捕获,Get side output,然后里边要传入output tap。
06:06
再把它做一个打印就可以了,这个是data。啊,这就是我们完整的测试过程。
我来说两句