00:00
了解了广播状态的基本概念和用法,那接下来呢,我们就来举一个具体的应用案例了,我们准备来写代码了,呃,那我们还是考虑电商的应用场景,现在呢,我们考虑的就要稍微的复杂一点,因为只考虑比方说用户点击了哪个页面的话,这个没有办法去涉及到规则的变化啊。呃,比如说我们如果想要去对用户的行为去进行更加深入分析的话,那往往要判断什么呢?往往要判断用户先后发生的不同类型的行为,他们之间有什么样的关系啊,也就是要设置一个行为的组合模式。啊,比如说用户先做了一个登录,然后又做了一个下单操作啊,或者说用户做了一个浏览,然后又做了一个下单这样的一个操作,那这些先后发生的行为的组合,往往就能代表用户的一些使用习惯,那把他们检测出来之后进行统计分析,往往就可以找到一些。
01:05
提升用户下单概率啊,支付概率的这样的一些方法,好,那所以接下来我们看一看这个例子到底是要做一件什么事,那在这个例子里边呢,我们首先是要定义用户的行为啊,那这个行为呢,我们可以借鉴之前我们所说的用户点击某个页面这样一个行为,那现在呢,就不光只有点点击了,比如说哎,我们这里可能考虑的就是诶用户先登录,然后登录了之后呢,诶可能他做了一个下单的操作。比如说我们就把这个行为叫做buy啊,然后呢,下单之后他可能还有对应的支付操作啊,我们把这个行为叫做配,这样的话我们会发现啊,现在我们对于用户的行为其实就不关注其他的信息了,甚至我们连时间戳都可以不要了,我们只要提取这样的两个字段就可以,就是哪一个用户,然后产生了一个什么样的行为,所以我们把这样的一个数据结构可以包装成一个样例类,就叫做action啊,所以我们可以单独的把这样的样例类定义出来,直接就叫做user ID和action,这两个都是string类型的字段啊,就是一个用户,然后对应的一个行为。
02:19
这就是我们的一个基本的事件,然后接下来呢,这就相当于是我们要处理的基本数据了啊,然后接下来呢,我们还要定义一个想要检测的行为模式,这就是我们所说的pattern pattern指的是什么呢?哎,我们这里简化啊,就指定两个行为。先发生什么,后发生什么,ACTION1ACTION2啊,比如我们一开始定义的就是先检测用户是否有登录行为啊,就是有这个login。然后呢,接下来看用户是否有配有这个支付的行为啊,那如果是这样一个行为模式的话,我们就应该能把爱丽丝检测到。
03:01
那如果说之后呢,我们这个规则还会变,诶,所以接下来当前的这个pattern啊,行为模式检测改变了,我想要检测先做log,先做登录之后直接就有一个BY下单购买的这样一个行为啊,那这个时候的话,呃,我们检测到的应该就是Bo的这两条数据,对应的是我们当前能够匹配上的数据。所以按照这个分析,我们也已经能够发现啊,我们的数据流本身就是用户所有的action啊,就是这里的action stream,这就是用户行为事件流,这是这是我们所要去检测统计的数据。然后呢,另外还有一条对应的规则流,就是我们到底按照什么样的模式去匹配用户的行为,那这个规则流呢,我们可以用另外一个pattern string来进行定义啊,他所定义的其实就是先发生什么事,后发生什么事啊,比方说一开始我们要检测的是先log in后配,然后接下来呢,要检测的是先log后办,这样的话,当第二条数据到来的时候,我们当前这个模式流里边啊,第二条数据到来之后,现在我们要检测数据的行为模式就发生改变。
04:14
所以接下来我们看一看啊,怎么样用广播状态广播流去处理这样一个需求。我们还是在当前的包下边去创建一个skyla的object。现在我们主要是要测试广播状态。我们就叫做。Broadcast state。Example。没方法先写出来,那首先还是main方法里边stream execution environment get一下,获取当前的运行时环境。好,执行环境我们先拿到,然后接下来EV全局的并行度设成一,方便测试,然后我们就是读取数据流了,哎,那在读取之前,我们还是先把这个样例类先定义出来吧。
05:03
声名样例类。那这里case class。一个是我们想要的action。Action里边需要有一个user ID string类型。另外还需要有一个action。String练习啊,除了action之外呢,另外我们还应该有一个pattern。行为模式,哎,那这个的话其实就是两个action了,ACTION1。ACTION2。好,先把这个定义好,然后接下来我们自然就是先定义数据流了。定义数据流读取。用户行为事件,所以我们可以直接把这条流叫做action stream。它就可以直接用env去做一个from elements里面的话,哎,那当然就是直接生成action对应的对象就可以了啊,比方说我们这里就来一个Alice。
06:12
好。第一个行为老。这是一条数据啊,那剩下的我们就不再去重复去写了,直接从文档里边把这个copy过来就可以了。后边爱丽丝又来了一条数据,是配,然后是Bob的两条数据,Logging和B。啊,那对应的用户行为事件,我们这里就有四条数据啊,都放在这里了,这是我们的测试数据,然后接下来呢,诶,那就要定义。模式流,或者说这是我们的规则流。读取指定的。行为模式好,那这条流我们可以直接叫做pattern stream。因为。哎,同样还是from elements,这里面我们需要传入的是pattern。
07:01
的对象实例啊,那这里面我们其实就是。可以跟文档里面实验的一样啊,先做一个log in的检测,然后检测一下配这样的事件啊,那同样道理。接下来我们可以更改一下规则。Log之后要检测的是Y这样一个事件啊,这样的话读取这条流第二个数据来了之后,当前的检测规则就发生变化了。好,接下来我们要做的当然就是使用广播状态,然后连接两条流,哎,然后去处理这样的一个广播连接流了,所以接下来我们要首先定义。广播状态的描述器。这个描述器主要就是用来去创建广播流的啊,那所以这里可以直接new一个map state script。这里边需要有UKUV2个类型参数,哎,我们这里的,呃,那K到底是什么呢?其实我们知道啊,当前的这个广播状态里边主要要保存的其实就是一个pattern,就是要保存我们当前这个规则嘛,哎,所以其实现在也无所谓K不K啊,那所以这个给什么样类型都可以啊,比方说我甚至可以直接给一个空的,这个是完全可以的,直接给一个unit类型,然后。
08:20
哎,具体的值类型当然就是pattern啊,把它放在这里,然后接下来我们给一个名称啊,当前名称我们就叫patterns。然后有对应的类型,那就是class of unit和class of pattern。啊,把这个先定义好。我们可以直接把它叫做patterns啊,其实我们知道它本身是一个script啊,本身是一个描述器,然后接下来呢,诶,我们得去创建一个广播流,这个广播流那是需要。本身当前的这个规则数据流,或者说我们的这个配置数据流做一个。
09:00
Broadcast这样的一个方法调用传入当前定义好的描述器得到的。这就是一个。Broadcast stream,这就是一个广播流,那得到了广播流之后。接下来当然就是把广播流和原始的数据流连接起来,得到一个广播连接流进行处理了。所以就是。连接两条流进行处理。注意这个顺序不能乱啊,一定是原始的数据流去connect广播流啊,所以broadcast stream放在这里。这里还需要注意的一点是,我们当前去进行行为检测的时候,诶,到底是所有的数据都放在一起检测,比方说就是Alice的logging来了之后,然后报不来了,一个配这个也算吗?诶就是不同用户的不同行为,它也可以匹配在一起吗?诶,当然是不行的,我们检测的时候肯定是以某个用户为准,所以显然还得做一个KBY分组,按照用户进行分组,哎,所以在进行connect之前。
10:11
这个action stream首先是要做一个key by操作,我们要以当前user ID做一个分组,然后再去连接广播流啊,那我们知道了,后边接下来啊,要去执行的操作,我们看这个广播连接流就只能去调一个process方法,那这个process方法里边要传的。我们先给一个名称吧,比方说这个就是行为模式的一个检测处理啊,我们就叫pattern。Evaluate evaluation吧啊,就是做模式检测之后的计算啊,然后最后得到的结果,我们就可以把它做一个print打印了。VE执行起来,那这里边process要实现的,那就变成了一个key的broadcast processt方式啊,那因为我们现在是按照user ID已经做了一个分组操作,后边我们要处理的状态,当然也是跟当前的K有关的。
11:13
所以我们需要实现自定义的。Kid broadcast process function。那我们这里可以class pattern valuations。T的。Provide the cost process function,然后我们可以看一下它需要的泛型参数到底有几个啊,我们看到这里边有四个啊,其实这个也很好理解,本身broadcast,呃,本身这个process function啊,首先就应该是有input output2个嘛,然后如果要是broadcast process function的话,因为它是经过了两条流进行连接之后的,那当然就是INPUT1 input2,然后out。再加上如果还是key的broadcast process function的话,那肯定还需要有一个当前key对应的类型啊,那所以接下来我们就是key的类型user ID啊,是string,然后接下来INPUT1,那应该是用户的行为,哎,那就是action啊,本身的数据流,那INPUT2第二条流当然就是所谓的广播流了,广播流里的数据都是pattern嘛。
12:25
Patternon放在这儿,最后还有out,我们要输出什么信息呢?啊,简单想的话,我们可以直接输出一个string啊,啊,那另外我们想一想,这个string也不需要输出什么特别的东西,其实就是只要知道哪一个用户你检测到匹配的话,那不就是按照这个当前的pattern对应的有这样的一组行为吗?诶,那所以我干脆就把这个Python输出就行了,所以直接输出一个二元组就完了呗,就是哪一个用户。对应的什么样的一个pattern检测到,那前面用户名的话,当然还是string后边的pattern给一个pattern,这就是我们想要输出的内容。
13:01
在这个K的broadcast process方式里边,必须要实现两个方法,一个是process element,这是处理数据流的,每一个数据流里面的数据到了的时候都会调用它,那另外还有一个process broadcast element,那是我们的广播流,广播流里的每一个数据到了之后都会调这里。接下来呢,我们就要考虑一下具体的处理逻辑了,那现在具体逻辑其实我们知道这个广播流里边啊,其实就是当前的一个模式,一个规则嘛,这个其实发生变化的频次应该是比较低的,哎,那所以呢,我们可以把它当成配置读进来之后,那应该就直接用就完了,真正要处理的是process element,这里是我们的正常要处理的数据不停的来,那这个数据来了之后,我们到底是不是已经匹配到了当前的模式呢?诶,那。当前的行为知道是什么,但是呢,我们还应该知道前一次行为是否已经发生了,所以我们在判断当前到来的这个数据是否已经匹配了某一种模式的时候,首先我们应该能保存之前已经到来的一个事件啊,那这个事件用什么去保存呢?啊,最简单的方式当然就是用一个状态了。所以。
14:19
这里我们只要用一个直状态就可以了。定义直状态。保存其实就是上一次的用户行为。上一次用户行为。啊,那这个我们还是用lazy的方式去做一个定义吧,啊,我们就把它叫做previous action。State。啊,那它的类型当然是一个value。啊,这里面我们要保存的当然就是action类型了。好,Get run time contacts,我们直接去get state里边去new一个value state script啊,那里边给一个名称,当前就叫previous action。
15:02
另外给一个类型就是action啊,那其实我们也会发现啊,上一次用户的行为啊,其实我们这个action里面还包含了那个用户名嘛,呃,这里面我们要保存的那个行为这个字段,其实就是一个string啊,那所以我们这里面直接去指定是action类型也行,或者我们想简单一点的话,直接给一个string都可以,这个是完全没有问题的,只要保存到这个用户上一次干了件什么事就可以了啊,所以我们就直接这样去定义吧。然后接下来那就得看这个process element了。来了一个用户的行为事件,哎,那我们现在就要判断上一次的用户行为到底是什么,那然后呢,还要跟当前的那个行为模式,行为规则去进行一个对比,那这这里又有另外一个问题,行为规则我们不是动态要变化的吗?那这个规则在哪去拿呢?哎,所以这个规则我们知道它是从另外一条流里传递过来,那传递过来之后保存到哪了呢?那就保存到了当前的广播状态中。所以我们现在是要从。
16:11
广播状态中获取。行为模板。啊,就是我们所定义的这个行为模式啊,Pattern来,那我们定义出来这个pattern就等于怎么样去获取呢?哎,既然是要处理状态,那我们想到这肯定就得找。对应的上下文了吗?这里有一个参数CX,它是kid broadcast process function,这个里边给我们定义出来的一个read only context啊,只能读的一个上下文啊,为什么只能读呢?因为我们这里边想要去获取当前的广播状态,那我在这儿获取出这个广播状态的控制句柄来之后,难道还能改这个广播状态吗?不能,我们当前只是数据流,这个广播状态想要改,那得到哪去改呢?很显然,那得到广播流对应的元素里边处理的方法里边才能去改啊,那所以更改广播状态,那是要到下面这个方法里面去了。
17:13
所以接下来我们这里pattern啊,就是直接CX去调用一个get,我们看有一个broadcast。直接就有这样一个方法,那当前get这个broadcast state里边传入的也是一个map state script,那这个很显然跟我们上面定义的这个就应该完全一样吧,我直接把它copy过来传到这里。这个获取到之后是一个类似于映射状态的一个状态类型,那假如说我们想要获取真正定义出来的那个pattern的话啊,那我们知道这个pattern是在这个映射状态里边是一个值嘛,那它的K呢,K就是unit啊,那所以接下来我们还可以直接让它调一个get方法直接去获取,诶,那获取unit这个K对应的值拿出来的就真正意义上是我们当前所定义的行为规则那个pattern。
18:06
好,已经拿到了。那接下来我们就可以去进行一个行为的判断了,诶首先我们可以从。直状态中获取上一次的行为。啊,那这个我们就叫做previous action。直接从这个值状态里边点value把它拿出来。接下来我们就可以去判断了,哎,就是判断这个行为如果要等于pattern里的第一个字段,而且我们当前的行为又等于pattern里的第二个字段的话啊,那相当于就已经匹配成功了,诶,当然这个前提我们应该还得确认一下,就是当前我们获取出来的previous action和pattern都不能为now啊,不能为空,所以pattern如果不等于now,并且。
19:00
Previous action不等于now。的话。这个时候我们再做一个判断,哎,那就是if。Pattern。点ACTION1第一个行为这个字段如果等于previous action,并且pattern.action2第二个行为等于当前我们所处理的这个用户的行为事件啊,那就是value.action啊,这样如果判断它相等的话,那么我们就匹配成功,直接输出alt.collect啊,我们要输出的其实就是一个二元组啊,那所以直接括起来当前的用户诶,那我们直接用ctx去get他的key。Key就是用户名吧,然后把pattern写出去,表示我们检测到了符合这种行为模式的一组行为啊。那这样的话就实现了我们想要的输出啊,那如果说没有匹配的上,没有匹配的上怎么办呢?注意我们还得做一个状态的保存好,因为如果说当前啊,就根本上次没有什么行为,我们现在就是第一条数据刚来,那这个时候很显然是要把这条数据保存在previous action state这个状态里面的,等下一条数据来了之后再做判断。所以接下来我们还应该。
20:24
保存状态。这个保存那就是previous action state去做一个update,把当前。Value的。Action。保存进去,这就是我们整个处理的这个过程,哎,那前面我们提到。在这个process element处理事件流、数据流的这个方法里边,不能去更改广播状态,不能改我们的规则啊,只能读我们是read only context的吗?啊,这个只读的一个上下文,那什么地方可以更改当前的规则呢?那很显然就是另外一个流,广播流里边数据来了的时候,这就是要更改规则了,那这个更改规则其实就是要对广播状态做一个更新操作。
21:13
哎,那所以这个方法。我们会发现要做的事情刚好跟process element里边的相反,Process element里边的我们是从上下文里边去读广播状态,然后去用,只能读不能写,那我们现在呢,那是就是要改,就是要用当前收到的这个value,这个pattern去更改对应的unit这个K所保存的pattern。啊,那所以接下来我们这里其实就是可以直接用它啊,先把它定义出来,我们就把它叫做,呃,这个是广播状态,我们就叫bc state吧。这个广播状态接下来要做一个更新操作b state啊,那就是直接做一个put put key是unit值,就是当前的value,直接做一个更新就完事了。
22:04
这就是我们完整的处理流程啊,所以接下来我们可以运行一下,看一看测试的结果怎么样。啊,这个测试的数据比较简单啊,很很显然,我们前面这条数据能够检测到Alice logging之后做了一个配,后面这条数据能够检测到Bob是login之后做了一个BY操作,所以我们看到这两条都检测到了啊,这就是我们这样一个具体的应用案例,所以。在实际使用的时候,广播状态就是用在广播连接流里边啊,我们首先是需要使用广播状态这个描述器构建一个广播流,哎,这一般情况这都是一个配置数据流或者是规则数据流,然后呢,用我们要处理的真正的那个数据流跟当前的广播流做一个连接操作,得到广播连接流之后,接下来用process方式去进行一个处理,这里我们要传入的一般就是broadcast process function或者是kid broadcast process function。
23:07
里边处理的关键就是一个process element,这里边只能读取广播状态,不能写入,而更改广播状态的时候呢,要用另外一个方法,Process broadcast element。这就是关于广播状态的使用。
我来说两句