00:00
接下来总结一下flink如何去处理迟到数据,之前我们说过啊,因为分布式的流处理啊,它所有的这个数据呢,首先是有网络延迟啊,然后分布式传输,然后分布式的处理之后。很有可能就会出现乱序。就是在事件时间语义下先发生,先产生出来的那些数据,我们可能是到后边才能够接收到,才去处理啊,那这个时候怎么办呢?啊,一种方式之前我们已经提到了,那就是water mark嘛,Water mark有一个延迟时间,但是water mark的延迟时间我们说它是直接把表调慢了,这个操作有点太重,因为你一旦要是把这个整个表调慢的话,那我们所有的跟时间相关的操作,那就全部滞后。那那你说是低延迟高吞吐的这个流处理,那就没效果了呀,我们用这个SP streaming VP可能也能做到几秒钟延迟这样的一个状态,你号称的这个毫秒级别的延迟,这就完全做不到嘛,所以在flink真正处理事件的时候,往往这个因为乱序导致的当前数据的这个偏差啊,我们在设置watermark延迟的时候,一般设置的会比较小。
01:17
一般呃,可能是在这个毫秒级别啊,最多可能也就是一两秒在这个级别,那如果说很多数据它的乱序程度都超过这个又怎么办呢。哎,那没关系,我们一般情况不就是在窗口里边要收集这些数据吗?大家知道如果说超过了auto的延迟时间,它就有可能收不到这个窗口里边了,那怎么办?我让窗口多待一会儿,不要把它关掉嘛。可以allowud lateness,这是第二招啊,那如果要是类比的话,就这就相当于之前我们那个是把表直接调慢了,哎,你让他这个就延迟发车了,那现在呢,现在严格意义上来讲,他其实是发车了,就比方说我们八点到九点吧,九点要发车,Waterac调慢,比方说调慢五秒钟,那么现在其实就是到了司机他已经调慢的表到了九点的时候,那我们要触发一次计算,那这个时候其实车就已经在往前走了。
02:14
已经在往前走,但是他走的比较慢,哎,什么意思呢,就是他还允许一段时间内,比方说一分钟内你还可以赶得上。他的车门还开着呢,开着车门慢慢悠悠在往前走啊,所以接下来的这个状态就是什么呢?在他允许一分钟迟到数据的这个时间范围内,只要还是属于八点到九点事件时间的这个数据来了之后,还能赶上这趟车。那就是来一个迟到数据,就会又触发一次窗口的计算,而且这个触发的计算呢,是在之前窗口统计的结果基础上去叠加的,去累加的,不是单独算的啊。啊,就是相当于直接更新之前窗口的结果了。哎,这样的话,这个就非常好,这不就相当于是快速的先得到一个近似正确的结果,然后后边再。
03:09
一段时间内在不停的更新这个结果,最终得到一个完全正确的结果吗?啊,当然了,现在一分钟的延迟可能还无法做到完全正确,因为我们不知道他最迟的那个数据到底能迟多少,那有可能就就迟的没边了,我们不能一直等下去啊,这个对系统损耗肯定很大,窗口都不关,这资源释放不掉啊,那怎么办呢?侧输出流测输出流的时候窗口是关了,但是没关系啊,窗口处理的结果我们不都输出了吗?你写到一张表里。然后我们在后边。拿到这个测输出流里边的迟到数据之后,那大家想我直接判断一下它的时间戳,是不是就知道它应该属于哪个窗口啊?那我们到之前处理结果的那张表里边找到那个结果,跟他一合并,是不是就又是一个最终正确的结果啊?
04:04
所以大家想一想,这是什么?这不就是?之前我们说的拉姆达架构吗?拉姆达架构的想法不就是用一个快速层啊,就是一个流处理器,先快速的得到一个正确近似正确的结果,然后呢,哎,在另外一层是一个批处理器,然后再它是一直等着的啊,等所有数据都到齐了,计算出一个最终准确的结果,更新最终的结果,得到一个最终正确的结果。那所以现在大家看,通过这样的三重保证。Flink用一套系统,一套API,就这么简单的几个调用,就直接搞定了拉姆达架构两套系统做到的事情。哎,所以这就是flink非常厉害的一点,它整个的这个架构就适合做这种事情,所以它稍微有一点巧妙的设计,就能够解决我们的乱序数据处理的问题。这是理论上的一个描述了啊,接下来我们还是在代码里边具体的测一测,所以接下来我们在代码里边还是新建一个测试类。
05:09
呃,这个我们主要是测这个let data对吧,十道数据let data啊S。啊,那前面的这一部分我们主要是测这个迟到数据嘛,啊,那前面这一部分我们就不用考虑那么多了啊,就直接把这个对应的那些东西都copy过来就完事了嘛,其实主要就是创建一个滚动窗口啊,比方说我们就十秒钟十秒钟滚动窗口,然后再给他设一个延迟时间啊,Automark的延迟时间,然后再设一个窗口的延迟时间,然后再放到测出入流,哎,这样不就完了吗?所以其实整体来讲还是非常简单的啊,那比方说我们就用之前那个URL view count,我们就直接用这个例子吧。而且这些对应的这些类,我们都已经是public静态的这个类啊,都实现了,所以这个我们就直接调这个就完事。只加一点东西啊,因为这个代码很快,我们主要是做测试。
06:04
这里边为了做测试,这个就不要再直接ADDS这个click s了,因为它是自动一秒钟一个去去谈的,我们回头如果再去看的话,可能得找半天,有可能这个数据太大,我们就看不到了,所以我们最好是自己手动来控制它的这个变化啊,所以我把这个删掉啊。这个注掉吧,这个助掉了,那自然我们就想到接下来得怎么做呢。Dream。因为。那就。Socket type,对,当然你从实际应用的角度来讲,你用卡夫卡更好,对吧,那我们这里用socket主要是NC命令更容易起嘛,啊,直接一个命令起来发测试数据就可以了,这里是哈。102。七七啊,端口无所谓,大家用哪个都行,大要注意啊,这个读进来的话,这变成一个string了,不是event,那这怎么办呢?诶,那我们还得去map一下咯。
07:02
我还是去new一个map function吧。最后要得到的是一个。里边要实现的是一个map方法,呃,这个过程呢,那就首先还是要拆分了,呃,其实我们也可以直接把之前的那个汇过来啊。还是回顾一下吧,我们就直接用value点啊。根据空格去做分割,大家还记得click长什么样?哎,这个是逗号分割的这个字段,所这里是号对吧?逗号分割,然后再trim去除空格,然后第一个是user,第二个是URL,最后一个是行整型的time Sam啊,所以接下来既然都拆开了,我们直接包好,包好一个event就完事了嘛,返回就完事了,那首先就是FIELD0。副格去掉fields 1K格去掉啊,最后是一个长整形,所以我们on.value of fields。
08:02
二哎,这样的话,把本本流读进来的一串字符串啊,转换成了我们想要的一个event。这样的话,我们就可以一条一条数据去测了,啊,起一个NC1条一条去测了,那我们先就这样先测一下吧,就比方说。我们把这个叫呃,Result吧。因为要区分开,等下我们可能还要基于它去获取测数处理区分开啊,这个是result。我们还记得这个三重保证里边第一重是water的延迟,我们之前这个是直接click,它是升序的,所以这是zero,哎,我们现在要测的话,还是给它加一个吧,我们加一个second。SECOND2,好,给个两秒钟吧,方便测一点,只是举例,大家知道这两秒钟也很大了啊,然后接下来。
09:00
下边的第二步,那就是在window和最后的这个处理之间,我们要加一个。Loudness方说,这里边我们给一个一分钟的延迟。允许一分钟。造数据,呃,然后另外呢,我还允许。Set output data里边还需要有一个T,我们定义一个。猪出标签。好,那这里边就new一个output tag tag,然后这里的泛型,它有泛型,泛型那就是你的测输出流里边是什么数据,这里就是什么类型,对吧?那对于窗口来讲,它只能把我们原始的那个数据类型输出出来,那所以这里边就只能是event了啊。然后这里给一个标签的ID啊,那这个就我们就叫late吧,在这里大家要注意啊,对于这个测输出流的定义。同学可能想到,诶,那是不是我我这里边直接直接这么定义就完了呢,你这样到时候运行不行的啊,我把这个标签传进来说运运行会出问题的,因为目前它的这一个测出流标签对于这个flink流的提取的过程当中,它的泛型会被擦除掉。
10:18
啊,所以这里边也会涉及到这个泛型擦除的问题,那怎么办呢?啊,我们用一个。就是我们以这个呃,内部匿名类的这种方式啊,把这个特殊物流标签定义出来,那这样的话就可以方便flink去获取。内部的那个type information啊,对应的那个类型信息啊,啊,这个大家大概知道怎么用就行了啊,这样的话你需要把这个event全出来,对吧,因为你是内部匿名类嘛,这个不能是空的,好啊,那这样的话就定义好了。另外接下来还应该有一个获取特殊出流的一个操作啊,大家看基于。处理完成之后的结果,Get set output啊,那这里面当然就是直接传一个标签不就完了吗?Print。
11:08
这个是。好,这就是我们整个的程序架构,该输出该打印的地方都有了,对吧?三重保证,一个是watermark的延迟两秒,然后窗口,诶,每个窗口十秒钟的滚动窗口啊,又可以允许一分钟的迟到数据,然后还有这个测出出流,最终没来的数据啊,放到特殊出流里面去,这个代码我们就改完了。
我来说两句