前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink table窗口聚合的open函数未调用的bug分析

flink table窗口聚合的open函数未调用的bug分析

作者头像
Spark学习技巧
发布2019-12-25 11:51:59
2.1K0
发布2019-12-25 11:51:59
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

今天分析一下,flink table聚合udf AggregateFunction的open函数未被调用的bug。

情景一:

当然,对于udf的聚合操作,在flink里面有两种用法,一种是不用窗口的分组聚合类似于

代码语言:javascript
复制
Table table = tEnv.sqlQuery("select  DateUtil(rowtime,'yyyyMMddHH'),WeightedAvg(number,number) from source group by DateUtil(rowtime,'yyyyMMddHH')");

情景二:

一种是使用窗口的分组聚合操作,例如:

代码语言:javascript
复制
tEnv.sqlUpdate("insert into sink select fruit,WeightedAvg(number,number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from source group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)");

表面上看是是同一个类型的udf,底层执行逻辑应该一样。但是flink内部coden的时候,被完全解析成了不同的聚合函数。

假设我们定义一个AggregateFunction的udf叫做WeightedAvg,主要进行求平均值,其中有一个变量 flag,初始值为1 ,我们想我在open的时候更改为100.

代码语言:javascript
复制
package org.table.agg;

import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.FunctionContext;

import java.util.Iterator;

/**
 * Weighted Average user-defined aggregate function.
 */
public  class WeightedAvg extends AggregateFunction<Integer, WeightedAvgAccum> {
    @Override
    public void open( FunctionContext context) throws Exception, Exception {
        this.flag =100;
    }

    private int flag =1;
    @Override
    public WeightedAvgAccum createAccumulator() {
        return new WeightedAvgAccum();
    }

    @Override
    public Integer getValue(WeightedAvgAccum acc) {
        System.out.println("value of flag  is : "+flag);
        if (acc.count == 0) {
            return null;
        } else {
            int i = acc.sum / acc.count;
            return i;
        }
    }

    public void accumulate(WeightedAvgAccum acc, int iValue, int iWeight) {
        acc.sum += iValue * iWeight;
        acc.count += iWeight;
    }

    public void retract(WeightedAvgAccum acc, int iValue, int iWeight) {
        acc.sum -= iValue * iWeight;
        acc.count -= iWeight;
    }

    public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
        Iterator<WeightedAvgAccum> iter = it.iterator();
        while (iter.hasNext()) {
            WeightedAvgAccum a = iter.next();
            acc.count += a.count;
            acc.sum += a.sum;
        }
    }

    public void resetAccumulator(WeightedAvgAccum acc) {
        acc.count = 0;
        acc.sum = 0;
    }
}

package org.table.agg;

/**
 * Accumulator for WeightedAvg.
 */
public class WeightedAvgAccum {
    public int sum = 0;
    public int count = 0;
}

分别执行两个sql之后,你会发现:

情景一:value of flag is : 100

情景二:value of flag is : 1

之所以会情景二没有被更改为 100 主要原因是open函数没有调用,显然这种情况下,在AggregateFunction的open函数里初始化外部客户端,比如mysql,redis等客户端初始化,或者通过open的context参数传递一些参数到AggregateFunction,比如权重阈值等,都变的行不通了。

直接给出大致结论,主要原因是:

情景一对应DataStream的GroupAggProcessFunction。

情景二对应DataStream的AggregateFunction,而该函数并没有open方法。仅仅说的是滚动窗口,还有其它窗口AggregateUtil。

解决办法是有很多,比如使用构造函数在注册的时候传参并初始化,比如使用readobject()|writeObject()方法等。

如代码,可以给WeightedAvg加入构造函数:

代码语言:javascript
复制
 public WeightedAvg(int flag) {
        this.flag = flag;
  }

然后注册udf的时候直接初始化:

代码语言:javascript
复制
tEnv.registerFunction("WeightedAvg",new WeightedAvg(100));

哎,只能说flink的坑太多,有待改进。但是这个也体现出了我们码农的存在的必要性。

本文举例仅仅是一种窗口操作,更多的窗口聚合是否会调用aggregateFunction的open方法,可以仔细阅读AggregateUtil。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-12-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档