00:00
除了value之外呢,我们还看到了在。State里边还可以定义各种各样其他的一些状态啊,我们在这儿可以统一的做一个简单测试。比方说,我们可以有list。Li said,同样里边也是需要去传入一个泛型表示列表里边每一个状态元素的,它的类型到底是什么啊?那比方说我们还是保存当前的。这个类型放进去这my list。那同样对应的还有。Map的话,哎,那干脆这个我们就是每来一个event,我们干脆就对它做一个计数吧,哎,我们这里面直接每来一个event,就把它对应的这个user后边加一个长整型的count值,相当于word count一样啊呃,或者说我们的那个呃,UVPV的那个统计一样,直接来了一个用户的点击,我们直接加一,那这个就非常的明确了啊,这个是一个my maps。
01:08
而它对应的还有这个reducing state。同样,它里边只能保存一个值,一个类型的值,那就是只有一个event了,那就是。把它定义出来,My reducing state。最后还有一个。Aggregating state。我们看到every state的话,它就比较多一点,它有in和OUT2个值啊,那就是表示我们当前的输入输出的类型了,当前我们也可以非常简单啊,直接硬是event,然后输出的话直接给一个spring得了。MY。Eighteen state。这就是我们定义出来所有可以使用的state,那然后在这个open生命周期方法里边,我们可以去。运行上下文里获取到当前的状态句柄,这个状态句柄的获取其实跟前面value state的这个获取是非常类似的。所以我们干脆。
02:10
直接。把它都copy下来就可以了。先copy下来,然后再进行更改my list state,哎,那这里我们就知道就不应该调get state方法了,只要去get,把它改成叫一个get list state方法就可以了啊。那当然里边这个描述器也不能是value state的描述器了,而是list script,然后里边这里需要注意的是,我们需要把同一段代码里边,同一个算子里边定义的状态命名成不同的名称,因为我们知道当前这一个内嘛,这个名称描述器里边的名称就代表在运行时环境里边对应状态的那个变量名嘛。我们在同一个作用域范围内,当然不能有相同名称的状态,就是类型你可以一样,但是名称显然不能一样啊,那所以这里边比方说我这个就叫做my list。
03:09
这个是可以啊,那后面这个还是英文的类型,那是因为我们当前list state本身的泛型也是英文的类型,这是可以的。这就像我们你可以定义一个呃浪类型的A,也可以再定义一个浪类型的B一样,但是你不能定义了浪类型的A之后,再去定义一个string类型的A啊,名称一定是单单独出现的啊,不能出现重复。那同样后边我们可以把这个map也做一个重写,那这里就应该是get。Map里边同样传入的描述器应该是一个map script里边必须把这个改掉,MY,那我们看这里面还不对啊,因为什么呢?因为map我们知道它是有UKUV2个。
04:00
不同的类型的啊,就是当前你得指定它的key和value嘛,哎,那所以首先我们指定K的类型。Strip。另外再指定value是long.plus。这样的话,这个value就定义出来了,那上面这个是我们比较熟悉的这些类型,值类型,或者列表类型,或者是映射类型,那接下来这两种呢,是属于。聚合的状态类型,这个可能相对比较特殊一点,它特殊就特殊在我们得做聚合,你除了基本的这个名称和类型定义之外呢,还得去定义一个聚合函数啊,那我们首先来看一下这个reducing state啊,My reducing state。在定义的过程当中,哎,那这里就应该是get reducing state了,里边同样也应该传入的是一个reducing state。它里面的参数呢,前面我们也说过了,它多了一个参数,除了最初的name和最后面的类型之外,中间还多了一个reduce function啊reduce function,这reduce function跟我们前面讲过的完全一样,就是表明它到底该怎么聚合,所以这个我们也是轻车熟路,呃,注意还是当前的这个。
05:20
名称需要改一下my reduce啊,那类型的话,这个可以不变,那关键就在于中间我们这里需要去。你有一个reduce方式。那这里这个reduce function的话,我们也可以非常的简单。之前已经聚合出来的结果和现在新来的某一个数据啊,那得到一个什么样的东西呢?我们就new一个event,这个event我选取的是,比方说我用VALUE1的user,里面不就是user URL和time吗?啊,那我就用VALUE1的user和URL,但是用VALUE2的time。
06:00
然后看到也就是说把把这个时间戳啊,只是做一个更新就可以了,那这样的话就可以看到我最终的聚合结果,这是我们对于reducing state的一个定义,当然看起来好像没什么特别的含义啊,那后边我们还有一个。Adreg state。定义其实也非常的类似。把这个调用方法的时候定义成get。Aggre state这样的一个方法,里边的描述器也是定义成。Aggregating state,同样它里边中间多了一个aggregate function啊,那那这里边我们同样还是。呃,我需要我我们这里啊。大家会看到这里边需要传入的是string,然后呃,Aggregate function和class,那这个class要传的是什么呢?是我们前面已经给入的当前。Aggreg aggre state,这里面出现的这个in和OUT2个输入输出类型吗?注意这里不是啊,这里需要入的。
07:07
其实是ACC,也就是我们要保存的那个状态的类型,因为它是状态描述器对吧,所以这里面我们关键就是你要保存的状态数据类型到底是什么,所以我们看ACC是在这儿出现,那我们就可以自己去做一个单独的定义了啊,就是到底要保存什么样的一个类型的ACC呢?啊,比如说我们也可以说干脆我就直接把这个ACC也是每来一个就加一啊,那这样的话,那就也变成了一个。长整型了,浪。我们也是做一个类似于work啊,呃,Uvpva这样的一个统计,那接下来我们在这儿。就需要去new一个aggregate方式。那这里边我们需要传入。长征。这个方式。我们在。
08:01
在讲到窗口的聚合函数里边,所得所使用的那个aggregate function是完全一样的啊。啊,那当然了,这里面的类型不对,就是我们前面的这个类型也得改。长整性,这样的话类型就匹配了,然后接下来我们做要做的操作也非常简单了,就是实现对应的这几个抽象方法啊呃,接口里边的这个方法,那一个是create,当前我创建一个初始的时候啊,这个状态应该是多少呢?当然是零了,长整性0L a每来一个元素的时候怎么办呢?啊,那当然是要Q加一,每来一个就加一。那当前这个get result是怎么办呢?我们现在是把它定义成stream也没关系吧,我干脆这里边直接一个不就完了吗?我们直接return当前的count值是多少?加上啊,当前的cuumulator做一个字符串序列化之后的输出啊,那这里面word的话,其实我们不需要做太多的事情啊,也可以把这个A加B写在这里,这就是一个非常简单的aggregating的定义,它其实也可以实现前面我们。
09:12
对应的想想要做到这个map的这个功能啊,也可以实现啊,那当然了,接下来我们可以在代码里边再去做一个调用,我们看一下这些。对应的状态怎么样去做访问和更新,那这个list的话就非常简单了啊,这里边我们直接把它做一个啊,然后去做一个value,去做一个添加就可以了,你如果想获取里边的元素的话,那就调它的get方法,把那个list获取出来,去打印啊,去去访问,去输出,这这个就非常简单的集合类型的操作,这个我们就不说了,那接下来我们看一下。Map呢啊,那map当然就是每来一个我们应该要的是value的user对吧,这是当前的K,然后那么它的值呢,诶,那它的值我们可以把。
10:02
里边我们先去get。当前K的值,然后获取了之后再去加一啊。啊,那当然这里面有一个风险,就是一开始第一个我们这里边并没有那个get default对吧,那假如说一开始。没有任何数据的时候,Get之后得到的当然是一个nu nu去加一,这个显然是不对的啊,那所以这里面我们可能需要去做一个判断。就是它是否等于,那如果是的话就用零,不是的话就用他自己啊,这样的话,我们当前就不会出现控制人异常的这种情况。然后我们还应该要去看一下啊。接下来我们去把它做一个。输出my map去get一下当前的。对应的这个值,然后去把它做一个打印输出,那前面我们还可以把这个加上。我们可以把当前的key。
11:02
也一起输出。这样的话就可以看到整个的全貌。啊,那对于同样的这一个操作而言,如果想要用这个aggreating state实现的话,那就简单多了,我们直接ADD一个,诶,当前来了一个,新来了一个元素嘛,直接调ADD方法,然后里边自动就会加一,这样的话就更加的简单啊啊那同样如果说我们想要直接得到最后的结果的话,啊,那这个就非常简单了,因为我们最终它的输出就是一个string吧,哎,那可以直接掉它的get方法,它它的get方法拿到的不就是聚合的值吗?我们聚合出来的值本身。就是最后的那个stream嘛,呃,就是调的那个get result get result方法得到的就是最终聚合出来的值嘛,所以这里边就可以直接得到最最后的结果。所以这个还是比较简单的啊呃,另外还有一个reducing state,这个就非常的简单了,那就是。
12:06
My reducing state。My reducing state。可以直接调一个爱的方法,我们可以直接把当前的这个value传进来啊,那也可以看一下。当前的reducing state。是长什么样啊,可以把它做一个。Get,然后拿出来,因为我们知道它里面本身就都是一个值嘛。聚合之后结果就是一个值,类似于y state,所以直接调get方法拿出来就可以。这是我们能够看到的这样的一个调用的方式,呃,那么为了让我们看的更加的明显一点,在这里我们可以把这个my value state这个先注掉,然后可以看到后边的统计。好,那接下来我们就可以来测试一下,看一看这些状态他的行为到底是什么样的了啊,那当然了,在前面我们会发现其实还有一些小的地方,我们其实定义的时候是有问题的,比如这里的aggregating state,我们说过在同一段代码里边,同一个任务的实例里边,所有的状态是不能出现重名的情况,哎,那我们看到前面我们都记得更改了,那后边这个APP state呢,也需要更改才对啊,那这里边我们把它改成MYA这样的话就可以了。
13:28
然后另外在下边如果说我们想要看到更加清楚的输出的话,我们可以把。Value state的打印先,然后后边state也可以加上这样的一句输出,我们就叫my state。上面我们这个叫MY。Maps。另外这里还有一个细节,就是前边我们这里其实做这个操作的时候,Map做更新的时候不应该是把它更新成零,初始的时候不应该更新成零,因为我们是直接put它加一之后的这个效果吧,啊,那所以如果来了一个之后,应该是直接把它put成一,这样的话就完全没问题了啊,那所以接下来我们可以再看一看运行的结果到底是什么样的。
14:26
我们可以看到,每来一条数据之后,对应的都会有三条状态对应的打印结果输出,我们可以具体来看一下。我们可以看到第一条数据,很明显来了一条Alice的数据,哎,那么map state输出的是ALICE1,那么a state呢?Aggreg state当然也是COUNT1。那后面又来了一条这个carry瑞的数据的时候,我们看到他跟前面Alice的数据完全不矛盾,完全不冲突,他直接统计CARRY1,那么count也是一,再来一条Alice数据的时候呢,Alice是二,Count是二啊,那我们看到Alice的这个reducing it,它前面的URL都不变,只是把time stamp更新成当前最新的time stamp而已。
15:14
所以这就看的非常的明显。那如果说我们还想更加明显的去做一个对比的话,在这里也可以去增加一个本地变量,我们可以看一下。当前的set和本地变量的区别到底在哪里?我们增加一个本地变量进行对比。这个就非常简单啊,我们就来一个长整型的本地变量,就叫做count吧。初始值。当然是零。接下来。每来一条数据调调到这里的方法,那接下来我们自然就是每来一条数据嘛,直接在这里count加加就可以了。
16:01
康康佳佳啊,那接下来我们再输出一下。这个是当前的count值。加上count。我们重新运行一下,看一看当前的count值跟其他的状态值又会有什么样的区别?啊,我们看到COUNT1后边我们看Bob的数据来了之后,是直接在后边增大的。我们看本身别的状态那都是各自处理各自的啊,CARRY1 bob1,然后是BOB2,然后是ALICE1,这个我们定义的aggregating state,那都是这个count是按照当前K去逐渐增长的。而我们这里边定义的抗这个本地变量呢,每来一个数据就增长一次,一次,每来一个数据就增长一次,它跟K没关系,所有的。K的数据都可以访问到同一份count这个变量,所以它相当于这就是他们的一个共享变量了,每来一个都会加价,在这里边我们针对这个本地变量和K的区别就可以看的非常的清楚。
17:12
这就是关于hit的基本的使用方法。
我来说两句