前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink维表关联系列之Mysql维表关联:全量加载

flink维表关联系列之Mysql维表关联:全量加载

作者头像
Flink实战剖析
发布2022-04-18 11:48:50
2.3K0
发布2022-04-18 11:48:50
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

维表关联系列目录: 一、维表服务与Flink异步IO 二、Mysql维表关联:全量加载 三、Hbase维表关联:LRU策略 四、Redis维表关联:实时查询 五、kafka维表关联:广播方式 六、自定义异步查询

在维表关联中定时全量加载是针对维表数据量较少并且业务对维表数据变化的敏感程度较低的情况下可采取的一种策略,对于这种方案使用有几点需要注意:

  1. 全量加载有可能会比较耗时,所以必须是一个异步加载过程
  2. 内存维表数据需要被流表数据关联读取、也需要被定时重新加载,这两个过程是不同线程执行,为了尽可能保证数据一致性,可使用原子引用变量包装内存维表数据对象即AtomicReference
  3. 查内存维表数据非异步io过程

具体实例:广告流量统计,广告流量数据包含:广告位id,用户设备id,事件类型(点击、浏览),发生时间,现在需要统计每个广告主在每一个时间段内的点击、浏览数量,流量数据中只有广告位id, 广告位id与广告主id对应的关系在mysql 中,这是一个典型的流表关联维表过程,需要从mysql中获取该广告位id对应的广告主id, 然后在来统计。接下来看维表关联的实现代码:

代码语言:javascript
复制
class SideFlatMapFunction extends RichFlatMapFunction[AdData, AdData] {



  private var sideInfo: AtomicReference[java.util.Map[Int, Int]] = _



  override def open(parameters: Configuration): Unit = {

    sideInfo = new AtomicReference[java.util.Map[Int, Int]]()

    sideInfo.set(loadData)

    val executors=Executors.newSingleThreadScheduledExecutor()

    executors.scheduleAtFixedRate(new Runnable {

      override def run(): Unit = reload()

    },5,5, TimeUnit.MINUTES)

  }



  override def flatMap(value: AdData, out: Collector[AdData]): Unit = {

    val tid=value.tId

    val aid=sideInfo.get().get(tid)

    var newV=AdData(aid,value.tId,value.clientId,value.actionType,value.time)

    out.collect(newV)

  }





  def reload()={

    try{

      println("do reload~")

      val newData=loadData()

      sideInfo.set(newData)

      println("reload ok~")

    }catch {

      case e:Exception=>{

        e.printStackTrace()

      }

    }

  }



  def loadData(): util.Map[Int, Int] = {

    val data = new util.HashMap[Int, Int]()

    Class.forName("com.mysql.jdbc.Driver")

    val con = DriverManager.getConnection("jdbc:mysql://localhost:3306/paul", "root", "123456")

    val sql = "select aid,tid from ads"

    val statement = con.prepareStatement(sql)

    val rs = statement.executeQuery()

    while (rs.next()) {

      val aid = rs.getInt("aid")

      val tid = rs.getInt("tid")

      data.put(tid, aid)

    }

    con.close()

    data

  }

}
代码语言:javascript
复制
case class AdData(aId: Int, tId: Int, clientId: String, actionType: Int, time: Long)



object Demo1 {



  def main(args: Array[String]): Unit = {



    val env = StreamExecutionEnvironment.getExecutionEnvironment



    val kafkaConfig = new Properties();

    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");

    val consumer = new FlinkKafkaConsumer011[String]("topic1", new SimpleStringSchema(), kafkaConfig);

    env.addSource(consumer)

      .map(x => {

        val a: Array[String] = x.split(",")

        AdData(0, a(0).toInt, a(1), a(2).toInt, a(3).toLong) //默认aid为0

      })

      .flatMap(new SideFlatMapFunction)

      .print()

    env.execute()

  }



}

在kafka端生产数据:1,clientId1,1,1571646006000 控制台打印

代码语言:javascript
复制
> AdData(1,1,clientId1,1,1571646006000)

然后将MySQL维表中tid为1的aid 变为2,待一分钟后继续输入数据:1,clientId1,1,1571646006000 控制台打印

代码语言:javascript
复制
>AdData(2,1,clientId1,1,1571646006000)

说明维表数据的更新已经被加载了。

对于该demo还有两点值得思考:

  1. 异步加载过程是异步线程执行,如果异步线程加载抛出异常是无法被Task检测,也就是无法导致任务失败,那么就会导致使用的维表数据一直都是变化之前的,对于业务来说是无法容忍的,解决方式自定义一个维表关联的StreamOperator, 可获取到StreamTask, 然后再异步加载的异常处理中调用StreamTask.handleAsyncException方法,就可以导致任务失败,给用户发出警告
  2. 维表全量加载是在每个task里面执行,那么就会导致每个task里面都有一份全量的维表数据,可采取优化方式是在维表关联前根据关联字段做keyBy操作,那么就会根据关联字段hash然后对并行度取余得到相同的值就会被分配到同一个task里面,所以在加载维表数据的时候也可以在每个task加载与其对应的维表数据, 就可以减少加载的数据量。其具体计算的规则是: (MathUtils.murmurHash(key.hashCode()) % maxParallelism)*parallelism / maxParallelism 得到的值就是IndexOfThisSubtask 即task的索引,那么可使用同样的算法过滤维表数据。

end

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-10-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档