前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文搞懂 FlinkSQL函数 LAST_VALUE 的原理

一文搞懂 FlinkSQL函数 LAST_VALUE 的原理

作者头像
shengjk1
发布2021-02-02 15:46:23
1.2K0
发布2021-02-02 15:46:23
举报
文章被收录于专栏:码字搬砖码字搬砖
背景

刚开始接触 FlinkSQL 时,对 LAST_VALUE 特别好奇,虽然工作当中有在用到,但还是特别的想知道它是怎么实现的,今天终于可以总结一下

原理

当我们写入如下类似的 sql 时,就会用到 LAST_VALUE 函数

代码语言:javascript
复制
select LAST_VALUE(status) from temp;

LAST_VALUE 函数对应的具体类为 LastValueWithRetractAggFunction。 LAST_VALUE函数之所以能够起作用最关键的是

代码语言:javascript
复制
 /** Accumulator for LAST_VALUE with retraction. */
    public static class LastValueWithRetractAccumulator<T> {
        public T lastValue = null;
        public Long lastOrder = null;
        // value timestamp
        public MapView<T, List<Long>> valueToOrderMap = new MapView<>();
        // timestamp value
        public MapView<Long, List<T>> orderToValueMap = new MapView<>();

       ......
    }

    @SuppressWarnings("unchecked")
    public void accumulate(LastValueWithRetractAccumulator<T> acc, Object value) throws Exception {
        if (value != null) {//传进来的是 null 不做任何操作
            T v = (T) value;
            Long order = System.currentTimeMillis();
            List<Long> orderList = acc.valueToOrderMap.get(v);
            if (orderList == null) {
                orderList = new ArrayList<>();
            }
            orderList.add(order);
            acc.valueToOrderMap.put(v, orderList);
            accumulate(acc, value, order);
        }
    }

    @SuppressWarnings("unchecked")
    public void accumulate(LastValueWithRetractAccumulator<T> acc, Object value, Long order)
            throws Exception {
        if (value != null) {
            T v = (T) value;
            Long prevOrder = acc.lastOrder;// 默认是 null
            if (prevOrder == null || prevOrder <= order) {//类似链表头插法
                acc.lastValue = v;
                acc.lastOrder = order;
            }

            List<T> valueList = acc.orderToValueMap.get(order);
            if (valueList == null) {
                valueList = new ArrayList<>();
            }
            valueList.add(v);
            acc.orderToValueMap.put(order, valueList);
        }
    }

    @SuppressWarnings("unchecked")
    public void retract(LastValueWithRetractAccumulator<T> acc, Object value) throws Exception {
        if (value != null) {
            T v = (T) value;
            List<Long> orderList = acc.valueToOrderMap.get(v);// 查出所有的 timestamp
            if (orderList != null && orderList.size() > 0) {// 说明之前已经发出过了.此刻该 retract
                Long order = orderList.get(0);
                orderList.remove(0);//最早进入的那个 value 对应的 timestamp remove
                if (orderList.isEmpty()) {//说明该 value 有且仅进入了一次
                    acc.valueToOrderMap.remove(v);
                } else {
                    acc.valueToOrderMap.put(v, orderList);
                }
                retract(acc, value, order);
            }
        }
    }

    @SuppressWarnings("unchecked")
    public void retract(LastValueWithRetractAccumulator<T> acc, Object value, Long order)
            throws Exception {
        if (value != null) {
            T v = (T) value;
            List<T> valueList = acc.orderToValueMap.get(order);//取出相同 timestamp 对应的所有 value
            if (valueList == null) {
                return;
            }
            int index = valueList.indexOf(v);// 找到对应的 value 并将其删除
            if (index >= 0) {
                valueList.remove(index);
                if (valueList.isEmpty()) {
                    acc.orderToValueMap.remove(order);
                } else {
                    acc.orderToValueMap.put(order, valueList);
                }
            }
            if (v.equals(acc.lastValue)) {
                Long startKey = acc.lastOrder;
                Iterator<Long> iter = acc.orderToValueMap.keys().iterator();
                // find the maximal order which is less than or equal to `startKey`
                //找到小于要删除值对应时间戳的最大值
                Long nextKey = Long.MIN_VALUE;
                while (iter.hasNext()) {
                    Long key = iter.next();
                    if (key <= startKey && key > nextKey) {
                        nextKey = key;
                    }
                }

                if (nextKey != Long.MIN_VALUE) {
                    List<T> values = acc.orderToValueMap.get(nextKey);
                    acc.lastValue = values.get(values.size() - 1);
                    acc.lastOrder = nextKey;
                } else {
                    acc.lastValue = null;
                    acc.lastOrder = null;
                }
            }
        }
    }

首先呢是两个 MapView valueToOrderMap、orderToValueMap

valueToOrderMap 值( 此刻最终的结果 )---->消息进入accumulate 方法的系统时间戳 orderToValueMap 消息进入accumulate 方法的系统时间戳 ----->值( 此刻最终的结果 )

当 RowData( 内部使用 )对应的 rowKind 为 insert 或者 update_after 时,会进入 accumulate(LastValueWithRetractAccumulator acc, Object value) 方法。accumulate 方法相对比较简单其实就是分别对 valueToOrderMap、orderToValueMap 进行赋值。

当 RowData( 内部使用 )对应的 rowKind 为 delete 或者 update_before 时,会进入 retract(LastValueWithRetractAccumulator acc, Object value) 方法,主要是操作 valueToOrderMap 删除之前已经发出去的消息记录,然后进入 retract(LastValueWithRetractAccumulator acc, Object value, Long order),主要就是操作 orderToValueMap 删除对应时间戳的值,然后找出 不大于要删除数据对应时间戳的最大时间戳,下一步要 retract 就该它了

总结

其实就是通过 时间戳 来进行判断的

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-01-27 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 原理
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档