前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink维表关联系列之kafka维表关联:广播方式

flink维表关联系列之kafka维表关联:广播方式

作者头像
Flink实战剖析
发布2022-04-18 11:54:55
9530
发布2022-04-18 11:54:55
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

维表关联系列目录:

一、维表服务与Flink异步IO 二、Mysql维表关联:全量加载 三、Hbase维表关联:LRU策略 四、Redis维表关联:实时查询 五、kafka维表关联:广播方式 六、自定义异步查询

Flink中广播状态

假设存在这样一种场景,一个是用户行为数据,一个是规则数据,要求通过规则去匹配用户行为找到符合规则的用户,并且规则是可以实时变更的,在用户行为匹配中也能根据规则的实时变更作出相应的调整。这个时候就可以使用广播状态,将用户行为数据看做是一个流userActionStream,规则数据也看做是一个流ruleStream,将ruleStream流中数据下发到userActionStream流中,使得在userActionStream流中每一个Task都能获取到ruleStream流中所有数据,这种行为在Flink中称之为广播,ruleStream流称之为广播流,userActionStream称之为非广播流,流入到userActionStream流中的rule数据称之为广播数据,放入到Flink的状态中就称之为广播状态。

定义一条广播流:

代码语言:javascript
复制
 val broadcastStateDesc=new MapStateDescriptor[String,String]("broadcast-state",BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO)

    val broadcastRuleStream=ruleStream.broadcast()
 

broadcastStateDesc定义了一个广播状态的描述,只能是 MapStateDescriptor类型,在后续的处理中可通过该描述获取到广播状态;广播流通过broadcast方式定义,其内部实现实际上是定义了该流数据分区方式为广播方式,由BroadcastPartitioner来对数据进行分区,在数据选择分区channel 会选择所有的channel, 也就是一条数据会发送到下游所有的Task中

广播流使用:

代码语言:javascript
复制
val connectedStream=userActionStream.connect(broadcastRuleStream) 

通过connect方式连接一条广播流,那么广播流broadcastRuleStream就会被广播到userActionStream非广播流中,得到的是一个BroadcastConnectedStream的流,该流包含两个输入流broadcastRuleStream与userActionStream,之后可以通过:

代码语言:javascript
复制
connectedStream.process(...) 

process中可为KeyedBroadcastProcessFunction或者BroadcastProcessFunction这两种类型的function, 取决于userActionStream的类型,如果为KeyedStream,则需要使用KeyedBroadcastProcessFunction,否则BroadcastProcessFunction。这两个function的区别在于BroadcastProcessFunction无法提供定时注册,因为定时注册只能在keyedStream中,在使用上都有两个方法:processElement处理非connected流数据并且只可读取广播状态,processBroadcastElement处理connectedStream流数据并且可读写广播状态。

在这里思考一个问题:在KeyedStream中状态都是与具体的key绑定的,在keyedStream中广播状态很显然是非key绑定的,否则就没法全局有效了,看下普通keyed状态存储类型:StateTable<K, N, SV>, SV表示具体的状态 ,可以是value/map/list任意类型,但是都与K有绑定关系,看下广播状态存储类型:HeapBroadcastState中Map<K, V>,是一个普通的map存储结构,其类型就是我们定义的broadcastStateDesc的类型,并没有具体的key绑定,所在在非broadcast流key切换对其并不产生影响,仍然可以读取全局的广播数据。

广播状态用于维表关联

如果需求上存在要求低延时感知维表数据的更新,而又担心实时查询对外部存储维表数据的影响,那么就可以使用广播方式将维表数据广播出去,既能满足实时性、又能满足不对外部存储产生影响,仍然以用户行为规则匹配为例,其实现步骤如下:

  1. 上层业务在规则数据变更的同时发送一条变更数据到kafka,或者直接通过binlog方式发送到kafka中
  2. 将规则数据流定义成为广播流,广播到用户行为数据流中
  3. 定义一个广播状态存储规则数据,在用户行为处理中查询广播数据进行规则匹配,符合要求则发送出去。

代码实现如下:

代码语言:javascript
复制
val env=StreamExecutionEnvironment.getExecutionEnvironment

    env.enableCheckpointing(60000)

    val kafkaConfig = new Properties();

    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");

    val ruleConsumer = new FlinkKafkaConsumer011[String]("topic1", new SimpleStringSchema(), kafkaConfig)



    val ruleStream=env.addSource(ruleConsumer)

      .map(x=>{

        val a=x.split(",")

        Rule(a(0),a(1).toBoolean)

      })

    val broadcastStateDesc=new MapStateDescriptor[String,Rule]("broadcast-state",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint[Rule] {}))

    val broadcastRuleStream=ruleStream.broadcast()



    val userActionConsumer = new FlinkKafkaConsumer011[String]("topic2", new SimpleStringSchema(), kafkaConfig)



    val userActionStream=env.addSource(userActionConsumer).map(x=>{

      val a=x.split(",")

      UserAction(a(0),a(1),a(2))

    }).keyBy(_.userId)

    val connectedStream=userActionStream.connect(broadcastRuleStream)



    connectedStream.process(new KeyedBroadcastProcessFunction[String,UserAction,Rule,String] {



      override def processElement(value: UserAction, ctx: KeyedBroadcastProcessFunction[String, UserAction, Rule, String]#ReadOnlyContext, out: Collector[String]): Unit = {



        val state=ctx.getBroadcastState(broadcastStateDesc)

        if(state.contains(value.actionType))

          {

            out.collect(Tuple4.apply(value.userId,value.actionType,value.time,"true").toString())

          }

      }

      override def processBroadcastElement(value: Rule, ctx: KeyedBroadcastProcessFunction[String, UserAction, Rule, String]#Context, out: Collector[String]): Unit = {



        ctx.getBroadcastState(broadcastStateDesc).put(value.actionType,value)

      }

    })



    env.execute()
 

以上就是简易版使用广播状态来实现维表关联的实现,由于将维表数据存储在广播状态中,但是广播状态是非key的,而rocksdb类型statebackend只能存储keyed状态类型,所以广播维表数据只能存储在内存中,因此在使用中需要注意维表的大小以免撑爆内存。

end

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-10-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flink中广播状态
  • 广播状态用于维表关联
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档