00:00
了解了广播状态的概念和基本的使用方法,那接下来我们就来看一个具体的应用实例,那这个例子呢,就是我们所说的,在电商里面,我们往往是需要判断用户先后发生的一组行为的,那所以我们可以定义一些用户的行为模式,行为的组合模式,比如说先登录,然后再下单,或者先登录,然后做了一个支付操作,我们检测出这些连续的行为之后,然后再做一些统计,就可以了解到当前平台的一个运转的状况。另外还可以了解到用户的行为习惯,可以进行用户画像,然后去做精准推荐啊,那这个在当前的推荐系统以及用户画像的这个。构建过程当中其实都是非常有用的,那在这个过程当中,有可能我们的这个组合模式就会不停的发生改变,如果说我们当前这个组合模式发生改变,实时的变化,我们怎么能让它更加高效,更加实时的体现在计算结果里边呢?那这个策略就是使用广播流,接下来我们就在代码里边对这个需求做一个实现。
01:16
还是创建一个class,当前我们就叫做用户的行为behavior pattern行为模式的检测。Detect。Example。首先我们还是把整个的框架先写出来。Throws exception。呃,那接下来还是先去创建。执行环境。Get。叫做。同样还是把全局的并行度先设成一,方便我们进行测试,那接下来我们需要去定义两个流浪,我们应该想到当前的流应该有一个是用户。用户的行为数据流。
02:03
这是我们真正意义上的数据流,然后另外呢,还应该有一个。还应该有一个行为模式流。接下来我们应该要把它构建,基于它去构建一个广播流。广播流。这是我们基本的思路,那这里边既然涉及到用户的行为模式。呃,那还有这个用户的行为,我们可能要单独的做一个定义了,用户的行为的话,我们非常简单啊,这个就叫做action,那可能有一个用户的ID,然后再有一个,它到底做了一个什么行为就可以了,然后对于这个行为模式呢,我们可能也要单独的把它定义出来,先发生什么,发生什么,我们现在如果只考虑比较简单的两个行为的话,那其实就是ACTION1 action2。为了方便后边做这个代码里边的操作,我们不要用元组的方式把它定义了,在后边我们干脆把对应的po类都定义出来,我们首先先定义。
03:07
用户行为和。模式的。后者类。就是用户行为。事件。和模式的欧洲。那首先我们定义一个就都定义成static,就放在当前的这个文件里面。所以我们这里边直接定义一个action。这里面需要有两个字段了。Strip user ID。而那另外。有一个。Action。对应的这个行为,我们把它定义出来,那当然了,如果说考虑到对于这个flink里边我们应该要求的那个po类型啊,那应该是要有,呃,就是当前的这个属性字段应该是public的,或者是要有公有的这个呃,读写方法啊,那这个过程我们为了避免麻烦,干脆直接把它定义成public就可以了。
04:12
我们把它定义成public。然后接下来需要。有一个。无参的构造方法啊,那另外当然了,带参数的也应该要有对应的实现。呃,那最后还应该有一个to string方法,我们也方便的把它写出来,直接让这个idea帮我们自动生成就可以了。然后接下来还应该有一个行为模式的类,也是非常的类似static。定义这个叫pattern。那public?我们定义一个string类型的吧,一。以及ACTION2。那整个的这个过程其实跟前面也是非常类似的啊,Pattern,我们需要去。
05:04
创建一个。烘参的构造方法。然后带参数的也要生成出来。最后再生成一个to string方法,这样的话我们基本的类型就定义出来了。啊,然后接下来我们就可以去定义当前的这个数据流了,我们只是做一个简单的测试,所以就直接from好了,那就。烟威。直接from element。在这里边我们可以new一个action,当前是用户的行为。啊,那这个action我们需要有一个user ID啊。就叫爱丽丝吧,其实这是一个username嘛,就把它当成ID也是一样的。首先action,我们给一个log这样的一个。登陆行为。当然还可以定义一些其他的行为了,我们快速的把它定义出来,后边可以有一个登录之后,比方说做了一个配操作啊,那后边我们还可以定义。
06:08
登录,然后Bob登录之后,比方说做了一个下单的操作,这样都是可以的。定义出来之后,当前的这个流就是。Action。是我们当前的用户行为的事件。然后接下来呢,我们还应该定义一个行为模式的流,那所以当前是env,也是from element去做一个定义这个行为模式的话,我们就定义两个,做一个更新就好了。那首先是log。登录,然后检测登录之后的配这样一个事件。然后接下来。我们再来一个pattern。检测登陆之后。这样一个事件,这就是我们基本的一个定义。这条流就叫做。
07:02
TRY。用户的行为模式流。然后接下来那就需要基于它构建一个广播流了,所以这个时候我们构建广播流的时候,就需要用到广播状态。定义。广播状态。描述器。啊,那接下来我们需要一个。Mapstate。Script里边的泛型啊,那是当前的key和value value的话我们想到了当然是一个pattern了,就当前我们定义的规则嘛,当前的值,那当前的key应该比什么呢?K主要是方便我们在后边获取当前广播状态的时候用什么样的。键值把它提取出来,哎,那这里面我们如果这个东西并不重要的话,我们就一个这样的一个广播。
08:00
状态的话,哎,那就相当于用什么都行啊,那简单起见,我们干脆直接给一个空得了。空类型其实也是可以的。Void。啊,然后那么它的值类型的话,当然是P。里边需要去传入当前的名称以及类型啊,那当前的名称就给一个pattern。类型的话,类型那是。Void。值的类型,如果我们都用types去写的话,那应该是一个pole了。pole里边是。He。这样的话,就把当前的这个描述器定义好了,我们可以把它叫做一个script。然后接下来当然就是构建广播流浪,哎,所以这里面我们应该是基于前面的pattern stream,然后去调用一个broadcast方法,把当前的描述器传进来,这样的话就得到了一个broadcast string,我们可以把它叫做broadcast。
09:14
前面的内容我们都已经处理完了,接下来就是要连接两条流。我们原始的数据流action stream以及当前的广播broadcast。所以接下来是。连。连接。两条流进行处理。去调一个当前的方法,当然了,在调用之前我们还要确认一下当前的这个行为的判定应该是基于每一个user来的,哎,所以这里边我们还是应该先做一个KBY。黑白。当然是data.user ID。
10:01
然后接下来那就是去做一个connect,传入的是broadcast,现在得到的是一个所谓的。广播连接流broadcast connected啊,基于它呢,又可以调用process方法啊,那所以接下来我们就直接调process方法里边需要去传入一个对应的。Broadcast。Process方式啊,当然了,现在既然做了key,那应该是一个kid broadcast process方式,这是比较长的一个名称,我们这里可以自定义,这就叫做pattern。Detector。这里得到的结果,我们可以直接把它叫做当前的一个。Mates。最后当然就是把这个matches我们就定义,还是把它定义成string好啊啊,如果说我们最后得到对应的这个结果的话,呃,得到一个string,把它做一个打印输出,但是这里可能考虑到如果要明确的看清楚的话,应该得看清楚到底是在什么样的规则下的一个输出,一个匹配,哎,那所以这个我们干脆把当前输出的结果定义成一个二元组吧。
11:22
这样的话就可以看到,首先看到一个当前最终结果的输出,然后呢,还知道它是基于哪一个规则,哪一个pattern得到这样的一个结论。当然了,不管是什么二元组,我们也可以直接把它作为一个打印输出print出来就。最后因为Q执行起来,这是整个的框架。然后接下来的关键在于需要去实现。实现自定义的。这个是一个。说。Broadcast。
12:00
Process。方式。这个是非常长的一段啊,这样的一个类类的名称,那public static class。Pat detector。Detect。我们现在需要去。Kid。Broadcast process。Function,那么对于这样一个process function,它里边就有了四个,那这里边比之前的kid process function多了一个型,因为我们有两条流嘛,所以input的有两个,那同样前面还有一个KKS,当然就是当前K的类型啊,最后还有输出的类型,所以我们当前的。这一个process function,它里面的泛型应该是首先K类型user ID,那是然后第一条流的类型,数据流action。
13:01
第二条流啊,广播流的类型,那应该里边内部的类型是pattern,然后接下来输出的类型是这里的。二元组。所以我们接下来就是要实现它里边的对应的方法。主要就是element和broadcast element。那在这个过程当中最核心的是什么呢?呃,其实我们知道这个broadcast element,这个比较简单,那就是得当前的这一条流里边来了新数据的话,说明规则要更新了嘛,那我们就直接定义一个对应的啊。就是直接把当前广播流里边的对应的那个广播状态做一个更新就完事了。关键在于process element里边我们需要判断当前到底是否得到了一组匹配,这个匹配的话,不仅是要考虑当前的这个行为,当前这个行为还应该考虑他之前的一个行为,如果两个行为都对上了的话,这才是完整的一组匹配。
14:08
所以我们发现,如果你要跟当前的广播状态那两个行为去做对比的话,有当前行为还不够,我还得保存下当前用户的上一个行为。那所以上一个行为就需要额外定义一个一个。状态来保存了啊,那所以这里面我们需要去。定义一个。Head state。因为我们是K之后的嘛,跟当前的用户有关,所以我们定义一个k state保存。上一次用户的行为。或者说用户当前用户的上一个行为。这里面当然就是一个state了,只要一个直状态就可以。上一个行为,那我们把它叫做previous action。
15:02
加上一个state,表示这是一个value state,那既然定义了单独定义了一个k state,那就需要在open生命周期里边再把它啊,它的这个状态句柄要获取到啊,在运行时上下文里边去get,所以这里边我们previous。State应该等于get runtime contact,然后get it,这里要你一个value state的script里边给一个当前的名称,叫last action。类型的话,当然就是STEM。这是我们对于整个框架里面用到的状态,考虑到要用到状态的一个基本的定义。然后接下来我们先来处理process broadcast的list,因为这个是广播数据里面的数据嘛,这个比较简单,那我们其实就是非常简单啊,就是从上下文。
16:04
上下文中获取。广播。状态。并用。当前数据。更新状态。因为广播流里面的数据就是我们当前最新的规则嘛,当然是要把广播状态更新的,所有的规则都在广播状态,那接下来首先我要获取这个广播状态,获取广播状态的话,当然就是ctx,然后去做一个get,我们看到当前的这一个上下文就只能get broadcast。因为当前你是广播流里面的上下文嘛。获取当前的这个状态里边当然还要传入一个map state啊,那当前的这个定义其实跟前面我们在上面去初始定义的这个过程当中是完全一样的,所以直接可以把上面的这个定义copy下。
17:00
直接放在这里。得到了这个广播状态。那接下来我们可以。给它做一个命名,就把它叫做就叫做patternon吧。然后接下来。自然就是要更新当前的广播状态,或者我们把它叫做state。当前的这个状态。就用当前值去做一个更新,那就是put对应的K啊,那K的话是空满,那然后把当前传进来的value更新到这个pattern state里面,这样的话规则就改变了。然后接下来的关键就在于啊,那数据流里边来了一个数据之后,怎么判断它是否符合我们定义的规则呢?那同样还是应该要把当前的。规则状,当前的这个规则先从广播状态里边拿出来。
18:01
从。广播状态中。获取。匹配规则,匹配模式或者叫规则。那这个过程其实跟前面这个是完全一样的。只不过前边我们在broad process broadcast element里边可以获取广播状态,也可以更新广播状态,但是在这儿呢,诶,我们。在当前的process element里边,那就不能直接去更新当前的状态,而只能去获取当前的状态。我们会发现直接copy过来这里还在报错,原因就在于我们当前得到的不是不再是一个broadst state,而是一个read only broadcast state。所以得到这个之后不能更新,只能去做一个获取当前状态去做一个处理啊,那所以接下来其实我们可以得到当前的P。
19:04
这个就是直接从状态里边拿出来不就完了吗?啊,那直接get now,当前的是now,这样的话就可以得到匹配的模式了。得到了匹配模式。那接下来呢,我们还应该。拿到上一次的行为。获取用户上一次的行为,这个行为也是在状态里面,只不过这个状态不是在广播状态里,而是在我们自己定义的state那个。直状态里面,所以我们可以定义一个上一次的行为。Action。等于previous action state value直接把它拿到好,那接下来我们就要去做判断了。判断是否匹配。因为现在规则也已经拿到了,然后上一次的行为也拿到了,当前一次的行为也拿到了,那自然我就可以判断是否匹配,那这个是否匹配的话,首先得判断判断一下,那就是当前的上一次行为不能适当,Patternon也不能适当,这两个至少保证不为当,后面我们才不会抛出异常,所以。
20:13
先来一个pattern,不为空。并且previous action也不为空。在这个前提下,然后再去做一个判断。if。如果pattern的ACTION1。Equals。一次的行为。t.ACTION2EQUALS当前的行为。value.action哎,那如果这两个条件都满足的话,那就直接输出结果了,Out点。当前我们需要的是一个。二元组。他二。
21:02
呃,我们这里其实不需要。当前的当前的类型都已经定义好了,所以不需要再单独把它做一个确定了啊,里边直接传值就可以了,首先应该传一个string啊,那这里的string的话,其实主要是看到底是哪个用户啊,那所以这里面我们干脆就看一下这个CX。Get current拿到这个当前的用户ID就可以了,然后接下来呢?哎,那当前的规则是什么呢?他做了一个什么样的行为呢?直接把Python传进来就完事。这就是我们整个的判定是否符合模式的这样的一个过程。而在最后。不要忘记已经。处理完判断完了之后,如果当前没有直接匹配完成的话,那还应该更新状态,那就是要把上一次的这一个状态更新成当前的状态啊,那所以。
22:02
一定要注意状态的更新。所以是previous action去做一个update操作。把当前value action。这就是完整的处理流程。我们可以来运行一下,看一看执行的结果。我们可以看到可以完整的匹配出先后,匹配出不同用户,他们对应的行为到底符合哪种模式啊,那爱丽丝当然就是登录之后有支付行为,而Bob呢,是登录之后有一个下单的行。这就是广播流和广播状态的应用实例。
我来说两句