首页
学习
活动
专区
圈层
工具
发布
49 篇文章
1
YARN
2
Hadoop前世今生
3
AI分类
4
人工智能综述
5
随机森林
6
【HBase】HBase之what
7
【HBase】HBase之how
8
HBase篇--HBase常用优化
9
Hbase优化
10
flink源码从头分析第一篇之WordCount DataStream操作
11
大数据Flink-Java学习之旅第一篇
12
flink(12)-flink on yarn
13
Flink学习——Flink概述
14
Flink学习笔记:2、Flink介绍
15
Flink学习笔记(2) -- Flink部署
16
Flink入门(一)——Apache Flink介绍
17
Flink1.4 Flink程序剖析
18
Flink SQL 优化实战 - 维表 JOIN 优化
19
flink sql 知其所以然(十四):维表 join 的性能优化之路(上)附源码
20
Flink重点难点:维表关联理论和Join实战
21
Flink重点难点:状态(Checkpoint和Savepoint)容错与两阶段提交
22
详解flink中Look up维表的使用
23
Flink 1.11中对接Hive新特性及如何构建数仓体系
24
Flink 实时计算 - SQL 维表 Join 的实现
25
大数据技术周报第 010 期
26
实时数仓在有赞的实践
27
美团基于 Flink 的实时数仓平台建设新进展
28
基于Flink+Hive构建流批一体准实时数仓
29
实时数仓:基于流计算 Oceanus 实现 MySQL 和 HBase 维表到 ClickHouse 的实时分析
30
当 TiDB 与 Flink 相结合:高效、易用的实时数仓
31
flink维表关联系列之Mysql维表关联:全量加载
32
基于Flink的高可靠实时ETL系统
33
基于 Flink 实现的商品实时推荐系统(附源码)
34
【Flink】基于 Flink 的流式数据实时去重
35
Flink 实战 | 贝壳找房基于Flink的实时平台建设
36
Apache Hudi在华米科技的应用-湖仓一体化改造
38
Flink checkpoint
39
理解Flink checkpoint
40
flink checkpoint配置整理
41
flink checkpoint 源码分析 (二)
42
聊聊flink的checkpoint配置
43
Flink中案例学习--State与CheckPoint
44
Flink源码阅读(一)--Checkpoint触发机制
45
Flink企业级优化全面总结(3万字长文,15张图)
46
Flink高频面试题,附答案解析
47
学习Flink,看这篇就够了
48
【最全的大数据面试系列】Flink面试题大全
49
Flink SQL Client综合实战

flink维表关联系列之Mysql维表关联:全量加载

维表关联系列目录: 一、维表服务与Flink异步IO 二、Mysql维表关联:全量加载 三、Hbase维表关联:LRU策略 四、Redis维表关联:实时查询 五、kafka维表关联:广播方式 六、自定义异步查询

在维表关联中定时全量加载是针对维表数据量较少并且业务对维表数据变化的敏感程度较低的情况下可采取的一种策略,对于这种方案使用有几点需要注意:

  1. 全量加载有可能会比较耗时,所以必须是一个异步加载过程
  2. 内存维表数据需要被流表数据关联读取、也需要被定时重新加载,这两个过程是不同线程执行,为了尽可能保证数据一致性,可使用原子引用变量包装内存维表数据对象即AtomicReference
  3. 查内存维表数据非异步io过程

具体实例:广告流量统计,广告流量数据包含:广告位id,用户设备id,事件类型(点击、浏览),发生时间,现在需要统计每个广告主在每一个时间段内的点击、浏览数量,流量数据中只有广告位id, 广告位id与广告主id对应的关系在mysql 中,这是一个典型的流表关联维表过程,需要从mysql中获取该广告位id对应的广告主id, 然后在来统计。接下来看维表关联的实现代码:

代码语言:javascript
复制
class SideFlatMapFunction extends RichFlatMapFunction[AdData, AdData] {



  private var sideInfo: AtomicReference[java.util.Map[Int, Int]] = _



  override def open(parameters: Configuration): Unit = {

    sideInfo = new AtomicReference[java.util.Map[Int, Int]]()

    sideInfo.set(loadData)

    val executors=Executors.newSingleThreadScheduledExecutor()

    executors.scheduleAtFixedRate(new Runnable {

      override def run(): Unit = reload()

    },5,5, TimeUnit.MINUTES)

  }



  override def flatMap(value: AdData, out: Collector[AdData]): Unit = {

    val tid=value.tId

    val aid=sideInfo.get().get(tid)

    var newV=AdData(aid,value.tId,value.clientId,value.actionType,value.time)

    out.collect(newV)

  }





  def reload()={

    try{

      println("do reload~")

      val newData=loadData()

      sideInfo.set(newData)

      println("reload ok~")

    }catch {

      case e:Exception=>{

        e.printStackTrace()

      }

    }

  }



  def loadData(): util.Map[Int, Int] = {

    val data = new util.HashMap[Int, Int]()

    Class.forName("com.mysql.jdbc.Driver")

    val con = DriverManager.getConnection("jdbc:mysql://localhost:3306/paul", "root", "123456")

    val sql = "select aid,tid from ads"

    val statement = con.prepareStatement(sql)

    val rs = statement.executeQuery()

    while (rs.next()) {

      val aid = rs.getInt("aid")

      val tid = rs.getInt("tid")

      data.put(tid, aid)

    }

    con.close()

    data

  }

}
代码语言:javascript
复制
case class AdData(aId: Int, tId: Int, clientId: String, actionType: Int, time: Long)



object Demo1 {



  def main(args: Array[String]): Unit = {



    val env = StreamExecutionEnvironment.getExecutionEnvironment



    val kafkaConfig = new Properties();

    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");

    val consumer = new FlinkKafkaConsumer011[String]("topic1", new SimpleStringSchema(), kafkaConfig);

    env.addSource(consumer)

      .map(x => {

        val a: Array[String] = x.split(",")

        AdData(0, a(0).toInt, a(1), a(2).toInt, a(3).toLong) //默认aid为0

      })

      .flatMap(new SideFlatMapFunction)

      .print()

    env.execute()

  }



}

在kafka端生产数据:1,clientId1,1,1571646006000 控制台打印

代码语言:javascript
复制
> AdData(1,1,clientId1,1,1571646006000)

然后将MySQL维表中tid为1的aid 变为2,待一分钟后继续输入数据:1,clientId1,1,1571646006000 控制台打印

代码语言:javascript
复制
>AdData(2,1,clientId1,1,1571646006000)

说明维表数据的更新已经被加载了。

对于该demo还有两点值得思考:

  1. 异步加载过程是异步线程执行,如果异步线程加载抛出异常是无法被Task检测,也就是无法导致任务失败,那么就会导致使用的维表数据一直都是变化之前的,对于业务来说是无法容忍的,解决方式自定义一个维表关联的StreamOperator, 可获取到StreamTask, 然后再异步加载的异常处理中调用StreamTask.handleAsyncException方法,就可以导致任务失败,给用户发出警告
  2. 维表全量加载是在每个task里面执行,那么就会导致每个task里面都有一份全量的维表数据,可采取优化方式是在维表关联前根据关联字段做keyBy操作,那么就会根据关联字段hash然后对并行度取余得到相同的值就会被分配到同一个task里面,所以在加载维表数据的时候也可以在每个task加载与其对应的维表数据, 就可以减少加载的数据量。其具体计算的规则是: (MathUtils.murmurHash(key.hashCode()) % maxParallelism)*parallelism / maxParallelism 得到的值就是IndexOfThisSubtask 即task的索引,那么可使用同样的算法过滤维表数据。

end

下一篇
举报
领券