专栏首页小晨讲FlinkApache Flink 中广播状态的实用指南

Apache Flink 中广播状态的实用指南

来源:ververica.cn

作者 | Fabian Hueske

翻译 | 王柯凝  校对 | 邱从贤(山智)

Via:https://flink.apache.org/2019/06/26/broadcast-state.html 自版本 Flink 1.5.0 以来,Apache Flink 提供了一种新的状态类型,称为广播状态(Broadcast State)。在本文中,将解释什么是广播状态,并通过示例演示如何将广播状态应用在评估基于事件流的动态模式的应用程序,并指导大家学习广播状态的处理步骤和相关源码,以便在今后的实践中能实现此类的应用。

什么是广播状态

广播状态可以用于通过一个特定的方式来组合并共同处理两个事件流。第一个流的事件被广播到另一个 operator 的所有并发实例,这些事件将被保存为状态。另一个流的事件不会被广播,而是发送给同一个 operator 的各个实例,并与广播流的事件一起处理。广播状态非常适合两个流中一个吞吐大,一个吞吐小,或者需要动态修改处理逻辑的情况。我们将使用后者的一个具体实例来解释广播状态,并在本文的其余部分里对详细的 API 加以说明。

使用广播状态的动态模型评估

假设电子商务类型的网站获取了所有用户的操作行为数据作为用户的操作流,网站的运营团队致力于分析用户的操作,来提高销售额,改善用户体验,并监测和预防恶意行为。网站期望实现一个流应用程序,用于检测用户事件流中的模式,但需要避免在每次模式有变化的时候还要修改和重新部署应用程序,因此我们使用另外一个特征流来读取、更新当前特征,接下来我们通过一个实例逐步阐述如何通过 Apache Flink 中的广播状态来完成相应工作。

实例的程序获取两个数据流,第一个流提供了网站上的用户操作行为数据,如上图左上方所示,一个用户的交互事件由操作的类型(用户登录、用户注销、添加到购物车或者完成付款等)和用户的 ID(按颜色编码的)组成。图中的用户操作事件流包含用户 1001 的“登出”操作,然后是用户 1003 的“支付完成”事件,以及用户 1002 的“添加到购物车”操作。

第二个流提供了应用程序要评估的用户操作模式,模式是由两个连续的操作组成的。在上图中,模式流包含了以下两种:

  • 模式1:用户登录并立即登出,并没有点击网站上其它的页面;
  • 模式2:用户将商品添加到购物车,然后登出,而并没有完成购买操作;

这样的模式有助于企业更好地分析用户行为、检测恶意行为和提高网站体验。例如,如果只是将商品添加到购物车里而没有完成后续的支付,那么网站可以采取合适的方法,更好地了解用户没有购买的原因,并采取一定的措施以提高网站的购买转化率(例如提供优惠券、免运费等)。

在上图右侧,显示了一个 operator 的三个并发实例,这些实例获取模式和用户操作行为的数据流,评估数据流上的模式,并向下游发出模式匹配事件。为了简便起见,我们的例子中的 operator 只对一个进行两次后续操作行为的模式进行评估。当从模式流中获取到新模式的时候,将替换当前活动的模式。原则上,该 operator 也可以实现评估更复杂的模式或多个模式,这些模式可以单独添加或是删除。

我们将描述负责模式匹配的程序如何处理用户的操作和模式流。

首先,向 operator 发送一个模式,该模式被广播给这个 operator 的三个并发实例,接着,每个并发实例将模式存储在广播状态中,由于广播状态只能使用广播数据来进行更新,因此所有并发实例的状态都应该是相同的。

接下来,第一个用户信息流会基于用户 ID 进行划分,并发送给 operator 的实例,分区会确保同一用户的所有操作都由同一并发实例处理。上图显示了在 operator 实例处理了第一个模式和前三个操作行为事件之后应用程序的状态。

当任务接收到新的用户操作数据时,它通过查看用户最新的和历史的操作记录来评估当前的活动模式。对于每个用户,operator 都在 keyed state 中存储用户的上一个操作。到目前为止,由于上图中的任务只为每个用户接收一个操作(我们刚刚启动了应用程序),因此不需要评估模式。最后,keyed state 中用户的上一个操作将更新为最新的操作,以便在同一用户的下一个操作行为到达时能够进行查找。

在前三个操作行为被处理了之后,下一个事件,即用户 1001 的注销操作,将被发送到处理用户 1001 的并发实例中。当并发实例接收到用户操作的数据时,它从广播状态和用户 1001 的上一个操作中查找当前的模式。由于这两个操作符合模式匹配,因此会往下游发送匹配事件。最后,该任务会通过使用最新的操作来覆盖前一个事件以更新其 keyed state。

当一个新模式进入了模式流,它会被广播给所有任务,并且每个并发实例通过使用新模式替换当前模式来更新其广播状态。

一旦广播状态更新为新模式,那么匹配逻辑将像以前一样继续执行,即用户操作行为事件按键(key)进行分区,并由负责的并发实例进行评估。

如何实现广播状态的应用程序?

到目前为止,我们在概念上讨论了应用程序,并解释了如何使用广播状态来评估事件流上的动态模式。接下来,我们将展示如何使用 Flink 的 DataStream API 和广播状态功能实现该实例的程序代码。

让我们从程序的输入数据开始。有两个数据流:操作行为流和模式流,在这一点上,我们并不关心数据流从何而来,这些流可以从 Apache Kafka、Kinesis 或任何其它系统中获取。

DataStream<Action> actions = ???
DataStream<Pattern> patterns = ???

Action 和 Pattern 都是 POJO,每个都含有两个字段:

  • Action的字段:Long userId, String action
  • Pattern的字段:String firstAction, String secondAction

作为第一步,我们将 userId 作为操作行为流上的键:

KeyedStream<Action, Long> actionsByUser = actions
  .keyBy((KeySelector<Action, Long>) action -> action.userId);

接下来,我们准备广播状态,广播状态通常表示为 MapState,这是 Flink 提供的最通用的状态接口类。

MapStateDescriptor<Void, Pattern> bcStateDescriptor = 
  new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));

由于这个应用程序一次只评估和存储一个 Pattern,所以我们将广播状态配置成具有键类型 Void 和值类型 Pattern 的 MapState。MapState 的键永远为 null。

BroadcastStream<Pattern> bcedPatterns = patterns.broadcast(bcStateDescriptor);

以 MapStateDescriptor 为参数,调用模式流上的 Broadcast 转换操作,得到一个  BroadcastStream 对象 bcedPatterns。

DataStream<Tuple2<Long, Pattern>> matches = actionsByUser
 .connect(bcedPatterns)
 .process(new PatternEvaluator());

在获得了 keyedstreamactionsByUser 和广播流 bcedPatterns 之后,我们对两个流使用了 connect() 方法,并在连接的流上调用了 PatternEvaluator 类(见下面 PatternEvaluator 的代码)。PatternEvaluator 是实现 KeyedBroadcastProcessFunction 接口的自定义类。它调用了我们之前讨论过的模式匹配逻辑,并发出 Tuple2 的记录,其中包含用户 ID 和匹配的模式。

public static class PatternEvaluator
    extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, Pattern>> {
 
  // handle for keyed state (per user)
  ValueState<String> prevActionState;
  // broadcast state descriptor
  MapStateDescriptor<Void, Pattern> patternDesc;
 
  @Override
  public void open(Configuration conf) {
    // initialize keyed state
    prevActionState = getRuntimeContext().getState(
      new ValueStateDescriptor<>("lastAction", Types.STRING));
    patternDesc = 
      new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
  }

  /**
   * Called for each user action.
   * Evaluates the current pattern against the previous and
   * current action of the user.
   */
  @Override
  public void processElement(
     Action action, 
     ReadOnlyContext ctx, 
     Collector<Tuple2<Long, Pattern>> out) throws Exception {
   // get current pattern from broadcast state
   Pattern pattern = ctx
     .getBroadcastState(this.patternDesc)
     // access MapState with null as VOID default value
     .get(null);
   // get previous action of current user from keyed state
   String prevAction = prevActionState.value();
   if (pattern != null && prevAction != null) {
     // user had an action before, check if pattern matches
     if (pattern.firstAction.equals(prevAction) && 
         pattern.secondAction.equals(action.action)) {
       // MATCH
       out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
     }
   }
   // update keyed state and remember action for next pattern evaluation
   prevActionState.update(action.action);
 }

 /**
  * Called for each new pattern.
  * Overwrites the current pattern with the new pattern.
  */
 @Override
 public void processBroadcastElement(
     Pattern pattern, 
     Context ctx, 
     Collector<Tuple2<Long, Pattern>> out) throws Exception {
   // store the new pattern by updating the broadcast state
   BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc);
   // storing in MapState with null as VOID default value
   bcState.put(null, pattern);
 }
}

KeyedBroadcastProcessFunction 接口提供了三种方法来处理数据记录和发出的结果:

  • processBroadcastElement() 方法:每次收到广播流的记录时会调用。在 PatternEvaluator 类中,我们只需使用 null 键将接收到的 Pattern 记录放入广播状态中(记住,我们只在 MapState 中存储一个模式);
  • processElement() 方法:接受到用户行为流的每条消息时会调用,并能够对广播状态进行只读操作,以防止导致跨越类中多个并发实例的不同广播状态的修改。PatternEvaluator 类的 processElement() 方法从广播状态中获取当前模式,并从 keyed state 中获取用户的前一个操作。如果两者都存在,它会检查前一个和当前的操作行为是否与模式匹配,如果是这样,则会发出模式匹配记录。最后,它将 keyed state 更新为当前用户操作;
  • onTimer() 方法:当之前注册过的计时器触发时被调用。计时器可以在processElement 方法中定义,用于执行计算或是清除状态。为了保持代码的简洁性,我们没有在例子中实现这个方法,但当用户在某段时间内没有操作时,它可以用来删除最后一个操作,以避免由于非活动用户而导致状态增长;

你可能注意到了 KeyedBroadcastProcessFunction 类方法的上下文对象,提供了对其它功能的访问方法,例如:

  • 广播状态(读写或只读,取决于方法)
  • TimerService,允许访问记录的时间戳、当前的水印,并可以注册计时器
  • 当前键(仅在 processElement() 方法中可用)
  • 一种将函数应用于每个已注册键的 keyed state 的方法(仅在 processBroadcastElement() 方法中可用) KeyedBroadcastProcessFunction 类与其它任何 ProcessFunction 类一样,完全可以调用 Flink 的状态和时间功能,因此可以用于实现复杂的程序逻辑。广播状态被设计成了多功能,能够适应不同的场景和用例,虽然我们只讨论了一个比较简单的应用程序,但是你可以通过多个方式使用广播状态来实现应用的需求。

结论

在本文中,我们通过学习一个应用程序的实例,来解释 Apache Flink 的广播状态是什么,以及如何应用它来评估事件流上的动态模式,除此之外本文还讨论了广播状态的 API,并展示了相关源代码。

原文链接:https://ververica.cn/developers/apache-flink-%e4%b8%ad%e5%b9%bf%e6%92%ad%e7%8a%b6%e6%80%81%e7%9a%84%e5%ae%9e%e7%94%a8%e6%8c%87%e5%8d%97/

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink 使用 Broadcast State 的4个注意事项

    在 Apache Flink 1.5.0 中引入了广播状态(Broadcast State)。本文将描述什么是广播状态模式,广播状态与其他的 Operator ...

    smartsi
  • 最新 | Flink1.9来袭,Kafka x Flink Meetup深圳站精华(附PPT下载)

    虽然夏日已过,但是由 Apache Kafka 与 Apache Flink 联合举办的 Meetup 深圳站如火如荼的开展并在8月的最后一天落下帷幕。

    王知无-import_bigdata
  • Apache Hudi 0.8.0版本重磅发布

    自从Hudi 0.7.0版本支持Flink写入后,Hudi社区又进一步完善了Flink和Hudi的集成。包括重新设计性能更好、扩展性更好、基于Flink状态索引...

    ApacheHudi
  • 理解Flink checkpoint

    Checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的...

    神秘的寇先森
  • 我们在学习Flink的时候,到底在学习什么?

    后台很多小伙伴都在问Flink的学习路径,那么我们在学习Flink的时候,到底重点学习哪些东西呢?

    王知无-import_bigdata
  • [源码分析] 从实例和源码入手看 Flink 之广播 Broadcast

    对黑名单中的IP进行检测过滤。IP黑名单的内容会随时增减,因此是可以随时动态配置的。

    罗西的思考
  • A Practical Guide to Broadcast State in Apache Flink

    从版本1.5.0开始,Apache Flink具有一种称为广播状态的新型状态。 在这篇文章中,我们解释了广播状态是什么,并展示了如何将其应用于评估事件流上的动态...

    yiduwangkai
  • 基石 | Flink Checkpoint-轻量级分布式快照

    前面两篇,一篇是spark的driver的Checkpoint细节及使用的时候注意事项。一篇是flink的Checkpoint的一些上层解释。本文主要是将fli...

    Spark学习技巧
  • 最新消息!Cloudera 全球发行版正式集成 Apache Flink

    摘要:近期 Cloudera Hadoop 大神 Arun 在 Twitter 上宣布 Cloudera Data Platform 正式集成了 Flink 作...

    Fayson
  • 如何在Apache Flink中管理RocksDB内存大小

    原文:https://www.ververica.com/blog/manage-rocksdb-memory-size-apache-flink 翻译:zha...

    大数据技术与应用实战
  • 推荐10本大数据领域必读的经典好书(火速收藏)

    写博客也已经快一年了,从去年的1024到现在金秋10月已纷至沓来。回顾这一年所发布的原创文章,基本都是与大数据主流或者周边的技术为主。本篇博客,...

    大数据梦想家
  • ApacheFlink深度解析-FaultTolerance

    本系列文章来自云栖社区,对Flink的解析兼具广度和深度,适合对Flink有一定研究的同学学习。

    王知无-import_bigdata
  • 【转】分布式数据流的轻量级异步快照

    本篇翻译自论文:Lightweight Asynchronous Snapshots for Distributed Dataflows,Flink的容错快照模...

    yiduwangkai
  • 用Python进行实时计算——PyFlink快速入门

    在最新版本的Flink 1.10中,PyFlink支持Python用户定义的函数,使您能够在Table API和SQL中注册和使用这些函数。但是,听完所有这些后...

    实时计算
  • Flink面试通关手册

    2019 年是大数据实时计算领域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (内部的 Flink 分支版本)开源,大数据领域一夜间从 Spark ...

    大数据真好玩
  • Flink面试通关手册

    2019 年是大数据实时计算领域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (内部的 Flink 分支版本)开源,大数据领域一夜间从 Spark ...

    王知无-import_bigdata
  • 独家 | 一文读懂Apache Flink技术

    本文来自9月1日在成都举行的Apache Flink China Meetup,分享来自于云邪。

    数据派THU
  • 大数据面试杀招 | Flink,大数据时代的“王者”

    近几年Flink发展得异常的火热,对Flink还不太了解的朋友可以先去看看博主的上一篇文章?《简单告诉你,为什么要学 Flink,Flink 优...

    大数据梦想家
  • 硬核! 逛了4年Github ,一口气把我收藏的 Java 开源项目分享给你!

    Great Java project on Github(Github 上非常棒的 Java 开源项目).持续完善中。地址:https://github.com...

    Guide哥

扫码关注云+社区

领取腾讯云代金券