00:00
先来实现一下,在代码里面做一下。这个叫另外一个啊op n,那大家想既然是前面不分组,那是不是我们相当于要就直接要做window or了,然后收集的话,那可能我们用一个这个process or window function全窗过函数来收集所有数据嘛,Or window function。这也是一个example,是试用all window function实现的一个,好,那前面的这个部分还是一样,我们先把这个copy过来。读取数据。最后execute执行起来。然后接下来我们就是看中间直接把它开窗啊,就是基于当前的这个stream啊。我们现在的想法是。直接。开窗收集。所有数据。
01:01
排序后面我们要去做对应的排序的话,我们可以按照之前我们的想法,最后统计的时候只要有这个URL就够了啊,就当前的这个数据来了之后啊,我本来这里边我不是有时间戳吗?我根据时间戳把它分配到对应的窗口里边,然后只要保存一个当前的URL userr什么的那个信息并不重要,呃,直接把它删了,然后我就只要URL,然后所有的URL来一个就加一,来一个就加一啊这样的把它统计出来,统计出一个结果,最后我们把它做一个排序就完事了啊,所以接下来我们的操作就是。先map一下,我们这里其实可以直接做一个动态表达式就完事了。二这样的话就得到了一个。String类型的stream,然后接下来我们做一个WINDOW2。顺便我们把这个就是data stream直接开窗啊,Window这个做法也简单的说一下啊,那这里边我们要的是一个slide in。
02:04
Event time Windows of i'seconds10以及I'm秒数五秒先把窗口创建出来,然后接下来可以用aggregate这种方式,前面我去自定义一个自己想要的aggregate function式,比方说我管这个叫,我们当前不是要统计每一个URL的对应的个数吗?但是我们现在要把UR个数放到一个哈希map里边去啊,那所以现在我们就是new一个URL。哈希,Map。AJ。我们可以这样去把它创建出来的,然后后边我们再定义一个,呃,自定义的这样的一个处理函数啊,比方说我们就叫。URL or window result。这是我们想到的这个基本的处理的方法啊,我们先把完整的过程先定义出来。
03:02
Print。然后接下来就是具体实现了,我们先实现一个实现自定义啊,增量聚合函数。Static bus抄一下吧,啊,有点长。一个aggregate function。刚好我们又复习一遍,哎,那这里面我们的输入类型已经map成URL了,所以当前输入类型是string。然后是acccc,这个比较特殊一点,我们现在要的是一个。啊,是一个哈希map对不对啊,这里我们是要把这个哈希map作为一个状态保存下来啊,因为后边我要把这个状态全部完整的拿出来,然后交给后边的这个全窗口函数去做这个处理提取,把这个所有的只要拿出来,所以这里边我可以给这个泛型啊,泛型是K。URL以及一个长整形的count值h map啊,这个引入。
04:05
最后还有一个类型是输出的类型,那当前输出的类型其实大家也知道了,我是不是应该把这个哈希map也要做一个对应的这个输出啊,当然这个输出我们可以直接在get result这一步啊,就把它直接转换,我们就直接把它转换成一个a release好了,那转换之后可能就会更加的方便一点啊,比方说我这儿把它转换成一个a list。在这里边就稍微的麻烦一点,这里可能会涉及到我需要把对应的这一个a release变成一个二元组,然后string了。如果要是这样做了一个定义的话。那里边我们接下来创建一个accumulator的时候,那就是应该要去new一个map了。把哈希map用出来对吧,然后每来一个数据的时候,我们现在要干什么事儿呢?就得更新这个哈希map了嘛,啊,那所以我们当前其实是得判断一下啊,如果当前我的accumulator CA map里边。
05:08
如果已经contains key当前的value的话,那不就是我们的那个URL吗?如果已经有它的话,汉堡我这里那就直接把它这个加一进去就完事了嘛,所以我就直接pumulator,填。Put崩前的value。然后另外还有之前的那个数加一啊,那我们干脆把这个还是做一个提取吧,我们说之前的count的数量是accumulator里边啊。Value get出来,然后现在把这个加一。重新给他,然后我们这里边returnccumulator,那这个是如果有的话,那没有的话呢,没有现在不是来了一个数据吗?来了一个数据,那我们就直接把accumulator。Value。
06:01
一个一不时给一个一啊,或者你这里边一开始判断的时候,如果有的话get没有的话,Count等于零也是一样的,对吧,那就下面这就变成统一的一步了,这样我们把它return过去,然后接下来那我们就是在这个get result这一步拿到最后现在啊这个哈,Map里边已经有所有的数据了,哎,那接下来我们是不是应该要去一个list。Ist。呃,这个类型是TOP2STRING。好,我们把这个就叫做result吧,这叫做result后,就是要把它直接返回去,接下来我们就干脆直接遍历一下当前所有的T。K,要从accumulator啊,那当然这个要从set里面拿了。对于每一个K,我们当前这个列表里边都可以去艾一个二元组了。这个二元组那就选of,首先目前的P放进来,对应的value呢?值呢?那当然是在accumulator里边去at对应的值其实就是这样一个便利,把它放到当前的这个list里边来而已,既然都已经把它转成a list了,那我直接把它做了排序,然后再返回不就完了吗?那到最后的process or window function里边,其实就直接包装一个对应的窗口信息,把它打印输出就完了吗?这里边我们就直接把这个result做一个吧。
07:30
Release有效方法嘛,所以我们这里边只要传一个or,然后这个compare方法,这个实现就完了,他也知道现在我们是要做一个降序排列,降序排列的话,呃,那就应该后减前,所以是这个O2R f1,也就是当前的这两个值,我们是得选这个完整型这个值嘛,当然这里边return的时候是一个要返回一个int啊,所以这里边我们取它的那个int value。当然了,就是这里边理论上这么做并不安全啊,就是万一你要是那个值很大的话,直接截取value,大家知道是截取它的这个低位嘛,啊,这样有可能会导致这个数据错误啊啊我们这里边只是简单写了啊,一般不可能大到那种程度,减去OE的F1,然后我们t value啊,这样的话就可以做一个从。
08:19
到校排序,然后我们把这个result做一个返回。是我们当前的这个增量均衡函数,然后这个全窗口函数呢,哎,那这个又得自己去写一下了啊。直线自定义全窗口函数。包装。信息输出结果。Public static class。Process window function process all window function都是一个抽象类,所以我们这里是extend process也是all window function。
09:01
这里边肯定就不像那个window function,它还有K的信息啊,如果是process window function的话,这里后面会有一个K的信息,那现在呢,就少了K的信息了,现在是in out,还有W,所以这里大家知道,就是我们想要输入的型就是in和out,还有window啊,Window当然就是time window了,那这里的in input是什么呢?啊,这个稍微的特殊一点,我们现在的input是这个这里的输出,哎,是这里的这个list。然后当前的输出我们就直接给一个string就好了,然后time window。好,把这个写好之后大家看,理论上来讲,上边这里肯定这个类型匹配我们的这个,呃,所有的语法是正确的,对吧,就不会再爆红了,那下边这里必须要实现的是一个process方法,Process方法里边也比较简单啊,主要就是提取那些信息,然后呃,把那个对应的东西拿出来不就完事了吗?这里边给大家一个提取信息做处理的这种方式,比方说我们这里用这个string来表达的话,我们可以用一个string builder。
10:05
或者是buffer,这个无所谓啊,我们定义一个result,那么这里面你有一个stream builder。然后接下来为了可视化显示的方便啊,我们可以直接先来一行这个分界线。然后接下来,现在这个已经排好序了,我只要是取list前两。包装信息输出。这样就可以了,所以这里面其实非常简单,就是int之前的I等于零,这就是两次嘛,也就做两次I小于二,当然这里边你如果要是N的话,你可以把这个二改成N啊,那然后接下来我们这里边这个爱加加。里边要做的操作也非常简单,本身我们这里边不是二元组嘛,所以这里边我定义一个二元组,呃,首先我应该把这个看,这里边还是个类型的,我应该先把它拿到elements are.next。
11:04
我先把这个东西先拿到,因为它里边本来是一个list,然后呢,它又把它包在了一个特过类型里边,这个就是层层嵌套啊,就是非常的麻烦啊,我们先把它拿到目前的这个,其实就是我们最后的这个list。那么现在我们就从list里边去取了,用list去当前的I了。第二个对吧,第零个第一个那就是前两名嘛,啊,那这个我们就把它叫做当前的current的吧。然后接下来那就是平的去添加了我们当前的这个result啊,里边去添加对应的信息,比方说几名啊number。哪个点number几后边加上number几呢?那当然是I加一了。然后后边我们再加个空格就好了啊。如果要是不停的end的话,那其实我们没必要一直调用这个方法,我们这里可以给一个这个string in,我们把这个number copy过来。
12:08
然后后边继续加,接下来是URL。Current temple.F0至URL后面再加上空格,好,下面继续加当前的访问量。分量的话是我们后边的第二个元素。F1,当然,本来这个是一个长整型啊,无所谓,其实主要就是这些,呃,然后在这之前大家其实会发现我们每次调用一次,应该还有一个窗口信息,那窗口信息的话,那我们其实应该在上面就写出来点end窗口我们直接写结束就好了,对吧,写一个就够了啊风口结束时间。因为结束时间才跟这个触发非常的关键嘛,才才非常的重要啊,呃,那么我们这里写有一个time stamp。然后里边传入的就是当前窗口的。
13:04
窗口怎么拿window的这个长整形的时间戳?这样啊,我们在这儿应该有一个N对吧?呃,换行一下,然后这里也换行一下。这就是一个相当于一个可视化的打印了啊,视化应用可以看一下在实际项目当中应该怎么样去写这样的东西啊,那那当然了,这个写完了之后访问量这个写完了之后,这应该有一个杠N对不对。然后把这个infer直接判到这就完事了。每来一个数据。一名输出,第二名输出,这样就完事啊,那这个完了之后,我们再来分割一行,一个窗口里边把它框起来就可以了啊,这就是完整的这个流程,最后还有一步,我们是这个process方式,输出是out.collect要把这个result做一个输出,但是它是个string builder,所以我们要调它to string方法,哎,这样的话就可以了。这就是我们定义的一个完整的处理流程啊,大家看稍微的有一点复杂,但是如果把每一步都拆解出来的话,应该也还是可以看到对应的每一步具体的操作的啊,这样我们就看到了,哎,输出当前的这个排名。
14:13
我们这个写错了,不对啊哦,这里我们不应该是user,我们是针对URL去做访问,当然大家想,如果我们现在是用这个来实现的话,这也有意义,这算什么呢?这算统计每个十秒钟之内的访问量最大的用户TOP2啊,就是访问次数最多的两个用户啊,这个也是有意义的,那干脆我们这个就不改了,这是统计了一个前两名的这个访问用户。这是用process all window方式做了一个实现。
我来说两句