前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink维表关联系列之Hbase维表关联:LRU策略

flink维表关联系列之Hbase维表关联:LRU策略

作者头像
Flink实战剖析
发布2022-04-18 11:51:44
1.1K0
发布2022-04-18 11:51:44
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

维表关联系列目录: 一、维表服务与Flink异步IO 二、Mysql维表关联:全量加载 三、Hbase维表关联:LRU策略 四、Redis维表关联:实时查询 五、kafka维表关联:广播方式 六、自定义异步查询

LRU

LRU(Least Recently Used),最近最少使用缓存淘汰算法,认为最近访问过的数据在将来被访问的概率也比较大,当内存达到上限去淘汰那些最近访问较少的数据。

在Flink中做维表关联时,如果维表的数据比较大,无法一次性全部加载到内存中,而在业务上也允许一定数据的延时,那么就可以使用LRU策略加载维表数据。但是如果一条维表数据一直都被缓存命中,这条数据永远都不会被淘汰,这时维表的数据已经发生改变,那么将会在很长时间或者永远都无法更新这条改变,所以需要设置缓存超时时间TTL,当缓存时间超过ttl,会强制性使其失效重新从外部加载进来。接下来介绍两种比较常见的LRU使用:

  1. LinkedHashMap LinkedHashMap是双向链表+hash表的结构,普通的hash表访问是没有顺序的,通过加上元素之间的指向关系保证元素之间的顺序,默认是按照插入顺序的,插入是链表尾部,取数据是链表头部,也就是访问的顺序与插入的顺序是一致的。要想其具有LRU特性,那么就将其改为访问顺序,插入还是在链表尾部,但是数据访问会将其移动达到链表的尾部,那么最近插入或者访问的数据永远都在链表尾部,被访问较少的数据就在链表的头部,给 LinkedHashMap设置一个大小,当数据大小超过该值,就直接从链表头部移除数据。 LinkedHashMap本身不具有ttl功能,就是无法知晓数据是否过期,可以通过给数据封装一个时间字段insertTimestamp,表示数据加载到内存的时间,当这条记录被命中,首先判断当前时间currentTimestamp与insertTimestamp差值是否达到ttl, 如果达到了就重新从外部存储中查询加载到内存中。
  2. guava Cache google guava下面提供了Cache缓存模块,轻量级,适合做本地缓存,能够做到以下几点: a. 可配置本地缓存大小 b. 可配置缓存过期时间 c. 可配置淘汰策略 非常适用于Flink维表关联LRU策略,使用方式:
代码语言:javascript
复制
cache = CacheBuilder.newBuilder()

                .maximumSize(1000)

                .expireAfterWrite(100, TimeUnit.MILLISECONDS)

                .build();

表示最大缓存容量为1000,数据的过期时间为100s。

LRU方式读取Hbase

实现思路:

  1. 使用Flink 异步IO RichAsyncFunction去异步读取hbase的数据,那么需要hbase 客户端支持异步读取,默认hbase客户端是同步,可使用hbase 提供的asynchbase 客户端;
  2. 初始化一个Cache 并且设置最大缓存容量与数据过期时间;
  3. 数据读取逻辑:先根据Key从Cache中查询value,如果能够查询到则返回,如果没有查询到结果则使用asynchbase查询数据,并且将查询的结果插入Cache中,然后返回

引入pom.xml 依赖:

代码语言:javascript
复制
 <dependency>

            <groupId>org.hbase</groupId>

            <artifactId>asynchbase</artifactId>

            <version>1.8.2</version>

</dependency>

<dependency>

            <groupId>com.google.guava</groupId>

            <artifactId>guava</artifactId>

            <version>28.0-jre</version>

</dependency>

demo版:

代码语言:javascript
复制
class HbaseAsyncLRU(zk: String, tableName: String, maxSize: Long, ttl: Long) extends RichAsyncFunction[String, String] {



  private var hbaseClient: HBaseClient = _

  private var cache: Cache[String, String] = _



  override def open(parameters: Configuration): Unit = {

    hbaseClient = new HBaseClient(zk)

    cache = CacheBuilder.newBuilder()

      .maximumSize(maxSize)

      .expireAfterWrite(ttl, TimeUnit.SECONDS)

      .build()

  }



  override def asyncInvoke(input: String, resultFuture: async.ResultFuture[String]): Unit = {



    val key = parseKey(input)

    val value = cache.getIfPresent(key)

    if (value != null) {

      val newV: String = fillData(input, value)

      resultFuture.complete(Collections.singleton(newV))

      return

    }

    val get = new GetRequest(tableName, key)

    hbaseClient.get(get).addCallbacks(new Callback[String, util.ArrayList[KeyValue]] {

      override def call(t: util.ArrayList[KeyValue]): String = {

        val v = parseRs(t)

        cache.put(key, v)

        resultFuture.complete(Collections.singleton(v))

        ""

      }

    }, new Callback[String, Exception] {

      override def call(t: Exception): String = {

        t.printStackTrace()

        resultFuture.complete(null)

        ""

      }

    })

  }

  private def parseKey(input: String): String = {

    ""

  }

  private def fillData(input: String, value: String): String = {

    ""

  }

  private def parseRs(t: util.ArrayList[KeyValue]): String = {

    ""

  }

}
 

对于查询hbase, 需要合理设计rowKey,为了避免查询热点,例如rowKey通过md5方式散列。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-10-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • LRU
  • LRU方式读取Hbase
相关产品与服务
TDSQL MySQL 版
TDSQL MySQL 版(TDSQL for MySQL)是腾讯打造的一款分布式数据库产品,具备强一致高可用、全球部署架构、分布式水平扩展、高性能、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供完整的分布式数据库解决方案。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档