维表关联系列目录: 一、维表服务与Flink异步IO 二、Mysql维表关联:全量加载 三、Hbase维表关联:LRU策略 四、Redis维表关联:实时查询 五、kafka维表关联:广播方式 六、自定义异步查询
LRU(Least Recently Used),最近最少使用缓存淘汰算法,认为最近访问过的数据在将来被访问的概率也比较大,当内存达到上限去淘汰那些最近访问较少的数据。
在Flink中做维表关联时,如果维表的数据比较大,无法一次性全部加载到内存中,而在业务上也允许一定数据的延时,那么就可以使用LRU策略加载维表数据。但是如果一条维表数据一直都被缓存命中,这条数据永远都不会被淘汰,这时维表的数据已经发生改变,那么将会在很长时间或者永远都无法更新这条改变,所以需要设置缓存超时时间TTL,当缓存时间超过ttl,会强制性使其失效重新从外部加载进来。接下来介绍两种比较常见的LRU使用:
cache = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(100, TimeUnit.MILLISECONDS)
.build();
表示最大缓存容量为1000,数据的过期时间为100s。
实现思路:
引入pom.xml 依赖:
<dependency>
<groupId>org.hbase</groupId>
<artifactId>asynchbase</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.0-jre</version>
</dependency>
demo版:
class HbaseAsyncLRU(zk: String, tableName: String, maxSize: Long, ttl: Long) extends RichAsyncFunction[String, String] {
private var hbaseClient: HBaseClient = _
private var cache: Cache[String, String] = _
override def open(parameters: Configuration): Unit = {
hbaseClient = new HBaseClient(zk)
cache = CacheBuilder.newBuilder()
.maximumSize(maxSize)
.expireAfterWrite(ttl, TimeUnit.SECONDS)
.build()
}
override def asyncInvoke(input: String, resultFuture: async.ResultFuture[String]): Unit = {
val key = parseKey(input)
val value = cache.getIfPresent(key)
if (value != null) {
val newV: String = fillData(input, value)
resultFuture.complete(Collections.singleton(newV))
return
}
val get = new GetRequest(tableName, key)
hbaseClient.get(get).addCallbacks(new Callback[String, util.ArrayList[KeyValue]] {
override def call(t: util.ArrayList[KeyValue]): String = {
val v = parseRs(t)
cache.put(key, v)
resultFuture.complete(Collections.singleton(v))
""
}
}, new Callback[String, Exception] {
override def call(t: Exception): String = {
t.printStackTrace()
resultFuture.complete(null)
""
}
})
}
private def parseKey(input: String): String = {
""
}
private def fillData(input: String, value: String): String = {
""
}
private def parseRs(t: util.ArrayList[KeyValue]): String = {
""
}
}
对于查询hbase, 需要合理设计rowKey,为了避免查询热点,例如rowKey通过md5方式散列。