维表关联系列目录: 一、维表服务与Flink异步IO 二、Mysql维表关联:全量加载 三、Hbase维表关联:LRU策略 四、Redis维表关联:实时查询 五、kafka维表关联:广播方式 六、自定义异步查询
维度或者是维表概念熟知应该从数据仓库维度建模开始了解的,区别于事实表业务真实发生的数据,通常用来表示业务属性,比喻订单业务中,商品属性、商家属性都可以称之为维度表。在flink 流处理实时分析中或者实时数仓中,同样需要使用维表来完成一些数据过滤或者字段补齐操作,但是我们所需要的维度数据通常存储在Mysql/Redis/Hbase/Es这样的外部数据库中,并且可能是会随时变动的,根据业务要求数据的时效性,需要不同程度的感知维表数据的变化,在实际使用中常常会有以下几种方案可供选择:
二、Flink 异步IO
flink异步IO用于对外部访问的一种优化手段,可参考http://wuchong.me/blog/2017/05/17/flink-internals-async-io 阿里云邪大牛对flink 异步IO的介绍,里面详细介绍了异步IO相对于同步处理的性能优化与有序、无序原理实现,在这里分析一些源码帮助理解。
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);
if (timeout > 0L) {
// register a timeout for this AsyncStreamRecordBufferEntry
long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
timeoutTimestamp,
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
userFunction.timeout(element.getValue(), streamRecordBufferEntry);
}
});
// Cancel the timer once we've completed the stream record buffer entry. This will remove
// the register trigger task
streamRecordBufferEntry.onComplete(
(StreamElementQueueEntry<Collection<OUT>> value) -> {
timerFuture.cancel(true);
},
executor);
}
addAsyncBufferEntry(streamRecordBufferEntry);
userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
}
代码入口是AsyncWaitOperator算子processElement 方法,表示处理元素方法,每个处理的元素都会被封装成为StreamRecordQueueEntry对象,该对象会被放入内部有序或者无序的队列中,Emitter则负责从队列里面取数据,那么如何判断已经进入的元素已经完成异步IO操作了呢?答案就在StreamRecordQueueEntry里面:
在AsyncFunction函数中还有一个timeout方法,在异步调用超时的情况下会被触发。接下来看下其实现原理:
在AsyncFunction函数中默认timeout方法仅仅是会抛出Async function call has timed out.异常,我们也可以重写该方法,获取更多的信息。