00:00
到目前为止呢,我们已经了解了窗口的基本用法啊,那我们知道了怎么样去定义一个窗口分配器,然后后边需要跟上一个窗口函数去指定当前窗口的处理计算流程。那这两步操作完成之后呢,我们就可以将数据按照窗口进行划分,得到这些数据的统计结果了。不过在之前我们测试的过程当中呢,尽管我们使用了事件时间,但是因为当前我们使用的click s,它就是按照当前的系统时间,而且是并行度为一,它就是一个一个按照顺序去生成的,所以后边我们就直接指定了assign asending time steps,按照升序数据去提取时间戳,然后指定水位线就可以了。哎,所以我们这里好像没有看到水位线对后边的影响,那我们自然想到了,在实际应用场景里边,往往数据都是乱序的呀,那如果要是乱序发生的话,我们现在窗口可以进行正常的处理吗?所以接下来呢,就可以结合水位线和窗口来做一个综合的应用,来测试一下水位线到底是怎么样去控制事件时间进展的。那当水位线推移的时候,窗。
01:13
口又会发生什么样的变化啊,所以接下来呢,我们就可以直接在之前water mark test这里边去做一个测试啊,那当然了,这里边我们可能需要去更改一些代码,因为我们会想到啊,当前要做流式处理数据的测试的话,我们直接在这里读取现成的一堆数据,From elements读进来可能是不够合理的,那我们呢,想要模拟真正的流式应用场景,一个数据一个数据的输入,然后看当前数据输入之后窗口会不会被触发,那这样的话我们能够更加直观的看到这个效果,所以呢,我们就把当前的数据源改成流式的数据源啊,那最简单的测试当然是文本流了。我们还是使用它。哈杜102。
02:00
7777端口号,然后接下来我们在进行处理的时候,就会有另外一个问题啊呃,我们看之前我们在这是直接后边就sign time stamp and watermarks了,那现在呢。我们就不能直接这么去做,因为我们从socket文本流里边读取出来的是一个string类型的data stream,我们得把它转换成event类型,这样的话后面才能从里边提取时间戳嘛,诶,那所以这里边我们需要首先做一步map转换,其实之前我们也提到过啊,这个map转换就是要实现对应的这个方法,每一个data来了之后,这里我们首先需要去从data。里去提取所有的字段,我们把它叫做F啊,那对应的呢,当然就是从data塔里边啊,调一个SPSP方法按照逗号作为分割符去做一个分割,切分出每一个字段放在FS这样一个集合类型里面,然后接下来呢,当然就是用这个集合类型包装出一个event了,那么这里的字段第一个字段F0。
03:06
这就是user啊,那我们做一个tri,去掉空格,后边F1第二个字段就是URL,同样也是string类型,直接做一个tri就可以。最后还有一个time step fs2,它是一个长整型,所以我们要tri之后再做一个to long操作,这样的话就包装好,得到了我们想要的event实践类型。接下来呢,当然就可以按照水位线对应的生成策略去提取时间戳生成水平线了,我们现在主要想测的其实是乱序流的这种场景,所以我们主要针对这种场景进行计算啊,那对于乱序流呢,我们这里的最关键的一点当然就是调用for bounded out of order needs这个方法里边要传入一个延迟的时间啊,那当然这个时间就是我们测试嘛,就是随便给多少都可以啊,我们就给一个整秒数吧。呃,如果我们想测毫秒的话也是可以的,因为本身数据里边我们带着的那个单位后边其实是毫秒啊,那所以这里边我们给一个二也可以啊,或者我们调大一点给一个五也是可以的。
04:10
然后接下来我们就要去做一些窗口处理操作了,哎,那首先。要想做窗口操作,先给个KBY啊,那KBY这里我们以什么作为建呢?那跟我们具体要实现的需求有关了啊,之前们如果要是统计每一个页面的PV的话。那么我们就得以URL作为当前分组的键,那如果我们要统计每一个用户他的访问频次的话,像一开始我们做的那个操作一样啊,那这里就以user作为分组的K就可以了啊,那比方说现在我们又想统计一下每一个用户访问的频次,那我们就以user为K就可以了,所以这里传入的就是下划线点user,这就根据需求来就可以啊,然后接下来我们就定义一个窗口吧,Window。
05:00
最简单的定义一个滚动窗口。Tumbling even time Windows里边点of传一个长度,我们还是十秒钟吧,Time,先把这个time引入。然后接下来啊,那就是SECOND10。定义了十秒钟的滚动窗口,接下来我们就可以直接aggregate去定义增量聚合函数,或者我们可以直接process定义一个全窗口函数,都是可以的啊,然后我们这里就简单一点吧,我们不用考虑效率什么的啊,主要是测试这个结果,我们直接给一个全窗口函数,收集所有数据,然后输出结果,那就是每一个用户在当前窗口范围内,他点击访问的次数是多少啊,相当于一个用户的活跃度统计啊,所以这里边我们直接new一个。全窗口函数,比方说我们这个叫water mark。Window result吧,啊,其实就是想做一个测试而已,最后可以直接print打印输出啊,那当然了,下面我们需要有enve要执行起来啊,那这里我们最关键的当然就是实现自定义的。
06:10
全窗口函数。Process window function式啊,那所以class watermark window extend process。Window function。里边有四个参数,我们还记得啊,Input,那当然是了。Output,我们现在输出的结果是什么呢?这个还是简单一点,我们输出一句话吧,就是输出这个一个string类型的提示信息,然后接下来呢,当前的K的类型是user,那当然就是string最后窗口的类型。Time window。里边必须要实现的抽象方法,当然就是这个process。那接下来呢,我们就可以从参数里边去提取我们想要的信息了,提取信息那首先应该是哪个用户啊,那用户就是当前的key嘛,所以我们可以直接把这个做一个更改啊,改成user就完了,然后接下来呢,我们还想需要当前的窗口的信息,窗口信息这个我们也熟啊,直接从contact里面去提取。
07:11
Start。Contact window.get start啊,当然了,如果我们想要end的话,那就contact window.get再获取出来。除了window的信息之外,另外我们要统计当前用户到底活跃度是多少,那就是他访问了多少次嘛,所以其实就是有多少个数,就是访问了多少次,哎,那我们直接。输出element.size就可以了,所以这里的count值也不用再转换成word count那种形式啊,一个数据一个一再去做叠加了,我们直接输出element.size就可以。这个代码上会非常的简单,只不过就是说我们需要把所有数据收集齐了再去统计。那最后我们可能还需要。了解一下当前的事件时间的信息啊,因为我们知道窗口的信息啊,它是什么时候关闭,这这些信息我们都是知道的啊,那关键呢,我们现在还想了解一下诶,它跟水位线有什么关系呢?所以这个时候我们多加一条信息。
08:13
增加水位线清洗。所以这里边我们把它叫做就是当前的current water mark。这个怎么去获取呢?诶我们当前不是有上下文吗?在上下文里边。我们还记得就可以获取current,诶,这样的话就直接可以拿到了,所以接下来我们可以做一个信息的输出,out.collect我们直接打印,一句话还是做一个字符串插值吧,哎,那就当前某一个窗口,那窗口谁呢?Dollar start。一个波浪线,然后Dollar and。这个窗口。然后我们再加上当前的。用户是谁?那就是Dollar user对应他的活跃度为啊,那后面其实就是我们统计出来的当前的抗值。
09:09
另外,我们还可以加上一个信息,那就是当前的水位线。水位线现在位于位于哪里呢?当然就是我们已经提取出来的current water,哎,这就是我们整个想要测试的东西,先把它写好。然后接下来我们就可以进行测试了,那当然这个测试之前我们还是先要到卡杜普102那边去,先把NC先起起来。7777,好,先起起来,等一下我们就用它来发送数据了。所以接下来我们就可以。运行当前的代码。跑起来。起来之后,在正常情况下啊,这里肯定是没有任何输出的,因为我们当前没有数据嘛,接下来我们要做的其实就是一条一条的输入对应的数据。比如说诶,这里我们输入一条。Mary的访问数据,那我们知道这里肯定没有任何输出了,然后接下来我们再来一个Bob的访问数据。
10:10
当然还是没有任何输出,这个肯定是能想到的,我们一条一条输的话会发现啊。当前每输入一条之后,当前的事件时间就会往前推进,就会有所进展,那我们现在定义了十秒钟一个滚动窗口,那什么时候第一个窗口应该要关闭了呢?诶,那显然是要等到水位线到达十秒的时候才会关闭当前的窗口。所以在这里干脆我们就直接让这个数据走的更快一点,时间走的更快一点,直接来一个十秒钟的数据,哎,我们把这个改成。一问,这就是十秒钟的数据嘛,啊,爱丽丝相当于十秒的时候又访问了一下这个商品的页面,好,那我们看一看,现在呢,还是没有输出的结果。哎,这是为什么呢?其实我们仔细分析的话就会发现啊,来了一个十秒钟的爱丽丝,访问数据的时候,当前的水位线到底是多少呢?
11:05
这只是当前最大的时间戳,那我们现在是乱序流的处理,所以它还有一个五秒钟的延迟时间。所以呢,我们当前其实水位线只进展到了五五秒的位置啊,当然了,如果我们较真的话,前面我们提到当前这个乱序流处理里边水位线的生成,它是基于当前最大的时间戳,减掉延迟时间还要减掉一毫秒啊,所以理论上来讲,我们当前的时间其实是。4999。其实是这个水位线当前是在4999这个位置,那它当然没有关闭我们想要关的这个零到十秒的窗口了,那什么时候才能够关闭窗口呢?诶,当然我们就想到了水位线要涨到十,那现在的这个延迟时间是五嘛,那是不是最大的时间戳要到15啊,诶我们应该能够想到是这样的一个情况,所以我们可以考虑一下直接在这给一条数据。
12:04
来一个。15000。我们来看一眼。我们看到现在窗口就输出了,而且我们看到啊,尽管这里边我们输入的数据只是爱丽丝的数据,这个没影响Mary和Bob,他同样时间也进展到十秒钟了,关闭了十秒钟的窗口,所以我们这里的输出是每一个用户都会有对应的输出,而且这里我们看到用户爱丽丝的活跃度它也是一,为什么也是一呢?啊,因为我们知道当前收集的数据啊,零到十秒的数据,它这儿只有三这一条。十和15秒这两条数据,它已经属于十到20秒下一个窗口了啊,所以我们当前当然就不会有任何的输出了。所以这就是水位线推进之后触发窗口计算结果输出的这个过程。当然了,这里我们看到啊,本身我们是要处理乱序流,所以我们还得看一看能不能正常处理一个乱序数据呢,比方说我们现在当前的窗口已经关闭了,那假如现在啊,我们再来一个三秒钟的数据,我们看看。
13:08
它能够继续统计进去吗?很显然,这就没有任何影响了,这里面不会有任何的输出,那假如我们这里再去输入一个八秒钟的数据。同样这里边没有任何的显示啊,说明已经到达窗口结束时间,窗口已经输出结果关闭之后,接下来再来的迟到数据,那就。相当于被丢掉了,不会做任何的处理了啊,那那这里我们会发现,这不就是数据丢了有问题了吗?诶,这个问题主要在于我们给的延迟时间不够嘛,当前的乱序程度,你如果在15秒之后又来了三,又来了八的话,那这个乱序程度都已经达到12秒了,所以我们给五秒的延迟,当然就有些数据是要丢掉的。所以接下来呢,我们可以继续测,接下来的测试我们可以继续往后走。比方说哎,我们继续推进,要关第二个窗口,要关第二个窗口的话,我们知道应该这个时间要达到20秒啊,但是直接给20,显然这里是没有任何输出的,因为watermark是有五秒钟的延迟的,诶那接下来呢,我们就可以继续给数据了,那。
14:14
比方说。我们这里给一个在五秒钟延迟范围内的一个迟到数据。18秒的数据。然后这里当然没有任何输出啦,水位线没变嘛,然后接下来我们继续推进水位线。让十到20秒的这个窗口进行关闭,那我们知道应该要时间戳到达25秒,这个时候减五秒,那水位线就应该到20啊,那当然了,真正意义上的水位线还应该再减一啊,就是相当于是19999啊,那所以我们看一下是不是这样。25。诶,我们看到现在又输出了一条消息。它输出的是十到20秒的这个窗口,用户爱丽丝的活跃度为三,为什么是三呢?因为它包含了第一条十秒的数据,15秒的数据,以及后边18秒的数据,尽管18秒是一个十道数据,但是因为包含在我们这个乱序范围内啊,它其实是在窗口关闭之前就到了,所以这个时候没有问题,它可以正常收进去,正常的统计在里边。
15:19
啊,那当前的水位线当然就是19999了,那前面十秒钟窗口关闭的时候,水位线是9999啊,就是9.999秒啊,所以通过这个例子的话,我们就已经了解了水位线和窗口之间的关系,而且我们看到了处理乱序数据,处理迟到数据到底是怎么样去做的。那当然这里还有另外一个问题,就是如果我们这里的延迟时间设的比较小的话,因为我们想当前这个处理要快一点啊,如果这个东西设的很大,像我们现在设五秒的话,我们这儿去做输出的时候,很明显就慢了很多嘛,25秒的数据都来了,我们才输出了十到20秒窗口的计算结果,这个延迟很大,所以实际应用的时候,往往这个时间我们会设的比较小,那这个设的比较小就会带来结果不正确呀,啊,那超出了延迟时间的迟到的数据,我们说默认就会被丢掉,那怎么处理这些数据呢?
16:15
其实在弗林里边还给我们提供了其他的处理迟到数据的方法,这个我们稍后再去给大家进行详细的讲解。
我来说两句