前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >真实案例 | Flink实时计算处理脏数据问题

真实案例 | Flink实时计算处理脏数据问题

作者头像
大数据真好玩
发布2019-09-12 11:31:51
3.3K0
发布2019-09-12 11:31:51
举报
文章被收录于专栏:暴走大数据暴走大数据

作者:王知无

By 暴走大数据

场景描述:Flink在处理实时数据时,假如其中一条数据是脏数据,例如格式错误,字段缺少等会报错,这时候该怎么处理呢?

关键词:Flink 脏数据

声明:本文不含工作纪律要求的保密信息,严格遵循公司关于数据资产的保密规定,图文都做过脱敏处理。

这是我最近在调试一个Flink任务中出现的问题。

问题描述

我们线上的一个任务今天报错,业务场景是:

Flink消费消息队列中的消息然后做简单的维表联合查询。今天报警发现类似如下错误:

任务fail-over重试几次,然后失败。

报错很明显,出现了NumberFormatException,null不能转为Long。

解决办法

解决办法更简单。

这个问题在Spark和Flink中都会存在,最直接的办法就是过滤掉。

阿里云上的Blink同样给出了文档,如下:

使用:

  1. select avg(id) from SOURCE
  2. where test_id IS NOT NULL

或者:

  1. select avg(case when id IS Null
  2. then 0
  3. else test_id
  4. end as test_id)
  5. from SOURCE

在数据源头直接过滤掉,不要参与计算。

自定义一个UDF

按照上面的处理办法,在SQL中处理当然没有问题,但是我们在实际环境中会遇到非常多的这种情况,我个人建议自定义一个UDF,这个UDF的作用就是专门处理null或者空串或者其他各种异常情况的。

官方给出的一个经典的UDF案例如下:

代码语言:javascript
复制
public class HashCode extends ScalarFunction {
  private int factor = 12;
  
  public HashCode(int factor) {
      this.factor = factor;
  }
  
  public int eval(String s) {
      return s.hashCode() * factor;
  }
}

BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);

// register the function
tableEnv.registerFunction("hashCode", new HashCode(10));

// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");

// use the function in SQL API
tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");

照着实现一个自己的处理逻辑即可!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-09-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档