前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink Broadcast State实战案例:电商平台用户行为模式分析

Flink Broadcast State实战案例:电商平台用户行为模式分析

作者头像
PP鲁
发布2020-02-17 17:09:42
9800
发布2020-02-17 17:09:42
举报
文章被收录于专栏:皮皮鲁的AI星球皮皮鲁的AI星球

Broadcast State是Flink 1.5引入的功能,本文将跟大家分享Broadcast State的潜在使用场景,并使用电商用户行为分析的例子来演示Broadcast State的使用方法。关于Flink状态的基本原理,Keyed State和Operator State的使用方法,可以参考我之前的文章:Flink状态详解

Broadcast State使用场景

无论是分布式批处理还是流处理,将部分数据同步到所有实例上是一个十分常见的需求。例如,我们需要依赖一个不断变化的控制规则来处理主数据流的数据,主数据流数据量比较大,只能分散到多个算子实例上,控制规则数据相对比较小,可以分发到所有的算子实例上。Broadcast State与直接在时间窗口进行两个数据流的Join的不同点在于,控制规则数据量较小,可以直接放到每个算子实例里,这样可以大大提高主数据流的处理速度。

我们继续使用电商平台用户行为分析为例,不同类型的用户往往有特定的行为模式,有些用户购买欲望强烈,有些用户反复犹豫才下单,有些用户频繁爬取数据,有盗刷数据的嫌疑,电商平台运营人员为了提升商品的购买转化率,保证平台的使用体验,经常会进行一些用户行为模式分析。基于这个场景,我们可以构建一个Flink作业,实时监控识别不同模式的用户。为了避免每次更新规则模式后重启部署,我们可以将规则模式作为一个数据流与用户行为数据流connect在一起,并将规则模式以Broadcast State的形式广播到每个算子实例上。

电商用户行为识别案例

下面开始具体构建一个实例程序。第一步,我们定义一些必要的数据结构来描述这个业务场景,包括用户行为和规则模式两个数据结构。

代码语言:javascript
复制
/**
    * 用户行为
    * categoryId为商品类目ID
    * behavior包括点击(pv)、购买(buy)、加购物车(cart)、喜欢(fav)
    * */
case class UserBehavior(userId: Long,
                        itemId: Long,
                        categoryId: Int,
                        behavior: String,
                        timestamp: Long)
/**
    * 行为模式
    * 整个模式简化为两个行为
    * */
case class BehaviorPattern(firstBehavior: String, secondBehavior: String)

然后我们在主逻辑中读取两个数据流:

代码语言:javascript
复制
// 主数据流
val userBehaviorStream: DataStream[UserBehavior] = ...
// BehaviorPattern数据流
val patternStream: DataStream[BehaviorPattern] = ...

目前Broadcast State只支持使用Key-Value形式,需要使用MapStateDescriptor来描述。这里我们使用一个比较简单的行为模式,因此Key是一个空类型。当然我们也可以根据业务场景,构造复杂的Key-Value对。然后,我们将模式流使用broadcast方法广播到所有算子子任务上。

代码语言:javascript
复制
// Broadcast State只能使用 Key->Value 结构,基于MapStateDescriptor
val broadcastStateDescriptor =
new MapStateDescriptor[Void, BehaviorPattern]("behaviorPattern", classOf[Void], classOf[BehaviorPattern])
val broadcastStream: BroadcastStream[BehaviorPattern] = patternStream
.broadcast(broadcastStateDescriptor)

用户行为模式流先按照用户ID进行keyBy,然后与广播流合并:

代码语言:javascript
复制
// 生成一个KeyedStream
val keyedStream =  userBehaviorStream.keyBy(user => user.userId)
// 在KeyedStream上进行connect和process
val matchedStream = keyedStream
  .connect(broadcastStream)
  .process(new BroadcastPatternFunction)

BroadcastPatternFunctionKeyedBroadcastProcessFunction的具体实现,它基于Broadcast State处理主数据流,生成(Long, BehaviorPattern),分别表示用户ID和命中的行为模式。下面的代码展示了具体的使用方法。

代码语言:javascript
复制
/**
    * 四个泛型分别为:
    * 1. KeyedStream中Key的数据类型
    * 2. 主数据流的数据类型
    * 3. 广播流的数据类型
    * 4. 输出类型
    * */
class BroadcastPatternFunction
extends KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)] {
  // 用户上次性能状态句柄,每个用户存储一个状态
  private var lastBehaviorState: ValueState[String] = _
  // Broadcast State Descriptor
  private var bcPatternDesc: MapStateDescriptor[Void, BehaviorPattern] = _
  override def open(parameters: Configuration): Unit = {
    lastBehaviorState = getRuntimeContext.getState(
      new ValueStateDescriptor[String]("lastBehaviorState", classOf[String])
    )
    bcPatternDesc = new MapStateDescriptor[Void, BehaviorPattern]("behaviorPattern", classOf[Void], classOf[BehaviorPattern])
  }
  // 当BehaviorPattern流有新数据时,更新BroadcastState
  override def processBroadcastElement(pattern: BehaviorPattern,
                                       context: KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)]#Context,
                                       collector: Collector[(Long, BehaviorPattern)]): Unit = {
    val bcPatternState: BroadcastState[Void, BehaviorPattern] = context.getBroadcastState(bcPatternDesc)
    // 将新数据更新至Broadcast State,这里使用一个null作为Key
    // 在本场景中所有数据都共享一个Pattern,因此这里伪造了一个Key
    bcPatternState.put(null, pattern)
  }
  override def processElement(userBehavior: UserBehavior,
                              context: KeyedBroadcastProcessFunction[Long, UserBehavior, BehaviorPattern, (Long, BehaviorPattern)]#ReadOnlyContext,
                              collector: Collector[(Long, BehaviorPattern)]): Unit = {
    // 获取最新的Broadcast State
    val pattern: BehaviorPattern = context.getBroadcastState(bcPatternDesc).get(null)
    val lastBehavior: String = lastBehaviorState.value()
    if (pattern != null && lastBehavior != null) {
      // 用户之前有过行为,检查是否符合给定的模式
      if (pattern.firstBehavior.equals(lastBehavior) &&
          pattern.secondBehavior.equals(userBehavior.behavior))
      // 当前用户行为符合模式
      collector.collect((userBehavior.userId, pattern))
    }
    lastBehaviorState.update(userBehavior.behavior)
  }
}

总结下来,使用Broadcast State需要进行下面三步:

  1. 接收一个普通数据流,并使用broadcast方法将其转换为BroadcastStream,因为Broadcast State目前只支持Key-Value结构,需要使用MapStateDescriptor描述它的数据结构。
  2. BroadcastStream与一个DataStreamKeyedStream使用connect方法连接到一起。
  3. 实现一个ProcessFunction,如果主流是DataStream,则需要实现BroadcastProcessFunction;如果主流是KeyedStream,则需要实现KeyedBroadcastProcessFunction。这两种函数都提供了时间和状态的访问方法。

KeyedBroadcastProcessFunction个函数类中,有两个函数需要实现:

  • processElement:处理主数据流(非Broadcast流)中的每条元素,输出零到多个数据。ReadOnlyContext 可以获取时间和状态,但是只能以只读的形式读取Broadcast State,不能修改,以保证每个算子实例上的Broadcast State都是相同的。
  • processBroadcastElement:处理流入的广播流,可以输出零到多个数据,一般用来更新Broadcast State。

此外,在KeyedBroadcastProcessFunction中可以注册Timer,并在onTimer方法中实现回调逻辑。本例中为了保持代码简洁,没有使用,一般可以用来清空状态,避免状态无限增长下去。

小结

本文解释了Broadcast State的原理和使用场景,并以电商平台用户行为分析为例演示了具体的使用方法。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-02-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 皮皮鲁的AI星球 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Broadcast State使用场景
  • 电商用户行为识别案例
  • 小结
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档