00:00
所以接下来我们可以在代码里边去做一个对应的实现,我们还是直接新建一个SC的object,我们就叫做top n,现在是key的process function。Example。我们使用的是K的process方式去进行实现,那前面的流程当然还是完全一样啊,啊,我们可以直接借鉴之前URL view count example里面的这种实现方式啊,把这个完整的代码可以直接copy过来。对应的下划线也要引入。这就相当于是之前我们所说的第一步,结合使用增量聚合函数和全窗口函数统计每个URL的访问频次。这是第一步操作,当然了,我们就不需要做这个print了。我们可以把它叫做。URL。Count stream,这是我们得到的一个新的流,那接下来呢,就要针对这个流里边的数据,按照窗口信息。
01:05
按照窗口信息进行分组提取,然后我们要做一个排序输出。最终得到里面的套N啊,那接下来我们是基于URL comes stream,然后去做一个KBY,当前KBY的是下划线点啊window and或者是window start,因为后边我们注册定时器是基于window and嘛,所以这里边我们直接指定window and就好了。接下来做一个process里边要实现的是一个自定义的process方式啊,那当然了,我们这个主要就是实现套需求,我干脆啊在下边实现一个自定义的类吧,而且我们还可以传入一个参数,传入一个指定当前的top nn到底是几,比如说我们现在这个N就是二啊,那我就传一个top n2。哦,那这一步做完了之后呢,得到的当然就是一个result stream啊,那最后我们可以直接把它打印输出。
02:07
Print。这就是整体的处理流程,接下来最关键的就是要实现自定义。Key的process方式。Plus top,哎,这里边我们有一个参数啊,这个我们还是直接把它叫做N就可以了。传进来一个参数啊,那接下来extend key process。Function。当前K的process方式里边会有三个类型参数KIO,那所以首先是当前K的类型。以window and作为分组的键,那么我们知道这是一个长整形的时间戳,那就是long。输入数据类型,URL will count。那这个类我们是需要从其他文件里边引入的啊啊,然后输出的类型呢,我们最后还是包装成就像之前一样啊,包装成字符串直接打印输出吧,所以接下来还是一个string类型的输出。
03:10
里边必须要实现的是一个process element方法。啊,那在这个之前呢,我们说还需要有一个open生命周期。得去声明一个list state,一个列表状态,诶,那所以这里边我们先把这个open先写出来,这个列表状态我们是希望让flink帮我们管理起来的,诶那这样一个列表状态到底应该怎么去声明,怎么去创建呢?呃,这个方法呢,比较特殊,我们是这样要使用运行时上下文啊,这就是我们说的啊,必须在负函数类里边才能够做这种状态编程的操作。为什么呢?必须在运行时上下文里面去调用对应的get。State相关的一些方法,那我们现在呢,不是get state,不是一般的state啊,是一个list state,所以调用的是运行上下文的get list state方法,然后里边需要传入一个参数,传的是一个list statescript,就是一个当前状态的描述器,哎,那所以这个描述器呢,我们就直接拗一个吧,有一个list state Dis script,哎,那这里需要有一个泛型类型,我们当前当然就是数据的类型了,URL view count和它。
04:26
放进来,那这里边它的描述器又需要有什么样的参数呢?简单来说就是有一个当前这个状态的名称,有一个name,另外还得有一个类型的说明,这个类型其实主要就是针对我们当前啊,里边所保存的数据元素的类型来指定我们当年要保存的其实就是URL count嘛,诶,那所以这个类型也就是他。这里面我们要写的可以直接写一下啊,比方说这个就叫一个list state吧。这个名字只要是他独一无二的就可以啊,具体叫什么无所谓,就像一个变量名一样,然后另外呢,还得有一个class。
05:06
URL willout类型指定。那当然了,得到的这个get这个state之后,我们就可以把它赋值给一个本地变量,哎,那比方说可以直接把它叫做URL will count list。啊,尽管比较长,但是我们一看这个名字就知道要做什么操作,这样的话就得到了这样一个状态,但是我们会发现真正要用这个状态呢,那不是在open生命周期里面,我们是在process element里边,接下来每来一个数据,我们想把它添加到这个list state里,哎,那这个又怎么办呢?诶,那相当于我们这个状态是要针对不同的方法里边都可以去使用,都可以访问得到,那就不能直接把它定义在open里边了,那需要把它放到外面。哎,那所以我们干脆啊,在外边去做一个声明声明。列表状态。
06:02
我们直接把它放在这里。URL will count is the state啊,那它的类型就是我们想要的list state。这个类型我们也要引入,那里边有泛型,当然就是URL view了,就是当前这个列表里边每一个元素要保存的类型到底是什么样。啊,那按说呢,后边我们直接把这一部分定义直接copy过去就可以了啊,Get runtime contact.get list state,但是这样做有问题,因为这里边我们调用了获取运行时上下文的方法。那如果定义在外边呢,这里其实是我们整个类的一个静态代码块。调用这一步的时候,其实当前还没有运行至上下,就是还没有我们当前这个类的对象实力呢,它的对象实力是要等到open生命周期里边调用的时候,这才是我们当前的一个初始化的操作,哎,那所以我们gettime contacts只能放在open里边去调,这样才是有效的。那这个时候怎么办呢?也有一种方法,就是我们在外面先把它声明出来,先给一个默认的空值。
07:13
然后呢,后边再去做一个赋值操作。这样不就完成了吗?诶,所以我们可以使用这种方式啊,做一个转换,就是在外边先做一个声明,等到到了open生命周期里边的时候,可以获取到运行的上下文了,再对它进行复制。啊啊,稍微有点麻烦啊,我们先大概的了解一下这种用法,后边我们讲到状态编程就会对它更加熟悉了。好了,有了list state,有了状态之后,接下来我们的逻辑其实还是非常简单的,那就是每来一个数据。每来一个数据。就直接放入。List state中。
08:00
那对应的操作就是URL will list state,哎,它既然是一个state,肯定能调相对应的一些操作,我们看到它有一个ADD操作啊,就可以把一个元素直接添加到当前的状态里面来,我们要添加的自然就是当前传进来的这个数据了,Value把它添加进来。另外还得。注册一个一毫秒之后的定时器,注意是什么时间点,一毫秒之后呢?哎,那就是窗口结束时间一毫秒之后的定时器啊,所以接下来我们注册定时器的时候,当然是使用ctx点2TIMER service,然后register even time timer当前要注册的时间戳,那我们是基于window and window and的话,我们可以在上下文里边。去获取当前的K,哎,当然了,如果说我们不想用这个K来去表示的话,因为现在既然都有数据嘛,数据里边本身就带着对应的窗口信息啊,那所以直接点window and也是一样的啊,后边再加上一毫秒,哎,这就是我们想要注册的定时器。
09:14
那这里大家可能会有一点疑惑,诶要这么说的话,那是每来一个数据都会注册一个定时器,这个定时器会不会注册的太多了呢?诶没关系,因为我们想到当前如果说我们按照这个window and做了一个分组之后,调用到这里来的所有数据,他们的window and应该都一样。那接下来呢,他们注册的定时器也都一样,之前我们说过定时器它就是以时间戳作为唯一标志的,那如果说时间戳都一样的话,相当于这就是同一个定时器嘛啊,所以说我们重复注册其实没有关系的啊,最后相当于只有一个,而且触发的时候也会只执行一次。所以定时器我们就搞定了,但是这还没完,更加重要的是我们得定义定时器触发的时候到底要干什么,那是在on timer方法里边要实现的数据提取、排序,最后输出的整个过程,那我们自然想到了,肯定还是先。
10:16
把数据。提取出来。放到list里。然后就很方便的可以进行salt方法的调用了吧?啊,那这里边我们会发现本来它其实就在这个list state里,那如果说我们想要直接对这个list state so可以吗?哎,那不行,没有给我们提供对应的方法,所以自然是需要先把它做一个比方说to list的转换,哎,但是很可惜这里也没有toist的方法,那怎么办呢?我们可以调它的一个点get方法,我们看到会返回一个interable类型。哦,那有了这个ter类型之后,那当然接下来我们就可以toist了,但是这个toist如果要掉的话,我们需要引入scla collection convert下面的影视转换,哎,那所以我们把这个引入的话,就可以直接toist,把它转换成一个列表了,哎,那接下来呢,我们可以把它叫做URL view countt list,这个就不再是状态了,而是一般的一个列表,哎,那针对它的话,当然就可以直接进行salt和其他的一些提取操作了,哎,我们直接SBY,同样还是我们可以用下划线点,诶现在不一样了,不是元组类型,我们现在可以直接提取它的下划线点count,然后加上一个负号,表示从大到小降去排列。诶,那最后呢,我们再来做一个take,当前的N现在不是直接给二了,我们既然传进了top n的N,那就直接把它做为参数传进来就可以了。
11:54
那得到的这个呢,我们就命名叫做top n。
12:00
Les吧。把这个得到之后接下来,诶,那其实就是。结果包装输出了。这个流程其实跟我们之前就非常的相似了。我们完全可以参考之前在这里所做的一切操作,还是定义一个result,接下来我们就在这里一个一个的进行包装,只不过有些东西呢,我们需要去更改一下,因为现在其实是没有进行窗口操作的,哎,那当前的尽管也没有这个contact啊,我们知道是CTXTX里边也没有窗口啊,诶,那所以这里边就不适用这个ctx了,那我们这里想要用的其实就只能是。K里边所保存的窗口结束时间,哎,或者我们这里边不是有time STEM吗?当前定时器触发的时间啊,再减一毫秒就是窗口的结束时间,哎,那所以这里边我们干脆后面这个窗口结束时间就可以直接用time stamp再减一就可以,那当然了,对于这个起始时间呢,我们就可以直接time stamp减一,这是结束时间,然后再减去,因为我们是十秒钟的窗口嘛,所以再减去1万就可以了。
13:16
接下来还得改一些东西,我们现在要便利的是一个top n list,它的indices,然后接下来呢,提取的时候也是从它里面去找,那同样还是I加一作为当前的排序的编号,然后接下来URL是什么呢?诶,那这个其实不能叫做temp了啊,我们这里边拿到的其实是一个URL view count。哎,那所以接下来我们这里就直接给一个URL view count.url。那接下来这里也是ul count.count哎,这样的话就得到了最后我们想要的东西。这就是完整的处理流程啊,其实前面已经有了基础的话,现在只要我们知道怎么用这个列表,状态逻辑还是非常清晰的,所以接下来我们可以运行一下看看效果。
14:08
运行起来我们看一看哦,现在已经有输出了,我们看到输出了当前的窗口是什么样的啊,十秒钟一个啊,一开始同样还是啊,只有一个URL top1后边呢,又会出现两个啊,我们看到接下来这个TOP1浏览量是四,TOP2浏览量是二啊,就会出现变化,这就是top n使用k process function进行的另外一种实现。那在实际应用的过程当中,很显然第二种实现方式会更加的常用,效率也会更高。
我来说两句