作者:王知无
By 暴走大数据
场景描述:Flink在处理实时数据时,假如其中一条数据是脏数据,例如格式错误,字段缺少等会报错,这时候该怎么处理呢?
关键词:Flink 脏数据
声明:本文不含工作纪律要求的保密信息,严格遵循公司关于数据资产的保密规定,图文都做过脱敏处理。
这是我最近在调试一个Flink任务中出现的问题。
问题描述
我们线上的一个任务今天报错,业务场景是:
Flink消费消息队列中的消息然后做简单的维表联合查询。今天报警发现类似如下错误:
任务fail-over重试几次,然后失败。
报错很明显,出现了NumberFormatException,null不能转为Long。
解决办法
解决办法更简单。
这个问题在Spark和Flink中都会存在,最直接的办法就是过滤掉。
阿里云上的Blink同样给出了文档,如下:
使用:
或者:
在数据源头直接过滤掉,不要参与计算。
自定义一个UDF
按照上面的处理办法,在SQL中处理当然没有问题,但是我们在实际环境中会遇到非常多的这种情况,我个人建议自定义一个UDF,这个UDF的作用就是专门处理null或者空串或者其他各种异常情况的。
官方给出的一个经典的UDF案例如下:
public class HashCode extends ScalarFunction {
private int factor = 12;
public HashCode(int factor) {
this.factor = factor;
}
public int eval(String s) {
return s.hashCode() * factor;
}
}
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// register the function
tableEnv.registerFunction("hashCode", new HashCode(10));
// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");
// use the function in SQL API
tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");
照着实现一个自己的处理逻辑即可!