专栏首页腾讯云流计算 OceanusFlink SQL 自定义函数指南 - 以读取 GBK 编码的数据库为例
原创

Flink SQL 自定义函数指南 - 以读取 GBK 编码的数据库为例

背景介绍

近期我们遇到了一位客户提出的问题:MySQL 建表时,数据库表定义的字符集是 latin1,里面的数据是以 GBK 编码的方式写入的。当 Flink 的 JDBC Connector 在读取此维表时,输出数据的中文出现了乱码现象,如下图:

中文数据乱码

原因分析

对于 Oceanus 平台而言,内部的数据处理都是以 Unicode 为标准的。对于非 Unicode 的字符集,在 JDBC Connector 读取时,可能会出现各种异常情况,即使 JDBC 连接 URL 参数中指定了characterEncoding也无法避免中文乱码问题。

对于 MySQL 数据而言,最怕的不是数据乱码,而是变成问号 (????)。通常来讲,如果遇到了全是问号的情况,则数据基本无法还原了;而对于乱码来说,很可能源数据还在,只是编码选错了,通过恰当的解码方式,还是有希望恢复原有的数据。

因此我们需要编写一个 UDF(用户自定义函数),将 JDBC Connector 读到的 Latin1(这里实际上是 GBK)数据进行解码。

首先我们来看一下数据库中的原始数据(首先需要将终端的编码改为 GBK,否则显示的仍然是乱码):

数据库中的原始数据

以 id 为 1 的数据为例,这里的 GBK 编码是0xDF 0xF7

那问题来了,既然 Flink 并没有报类型错误,说明输入输出还是当作字符串看待的,只是字符串没有经过妥善解码。那 Flink 将其读取成了什么呢?我们来写一个 UDF 自定义函数看看。

UDF 编写

对于这种编解码的场景,适合使用 Flink 的标量函数(Scalar Function),即单条记录进,单条记录出,无需聚合等复杂操作。

在当前的流计算 Oceanus 版本中,已经支持通过CREATE TEMPORARY SYSTEM FUNCTION的方式来 声明 UDF。声明 UDF 后,在 程序包管理 界面,可以上传具体的实现类 JAR 包。

我们先编写一个打印出 String 里每个 Char 内容的函数,类名为DecodeLatin1.

初步代码

请先在 pom.xml 中引入 Flink 相关依赖,随后可以开始编写 UDF:

package com.tencent.cloud.oceanus.udf;

import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.stream.IntStream;

public class DecodeLatin1 extends ScalarFunction {
	private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLatin1.class);

	public String eval(String input) {
		char[] inputCharArray = input.toCharArray();  // 不能用 getBytes() 方法, 否则原始内容会被再次编码

		IntStream.range(0, inputCharArray.length).forEach(i -> {
			LOGGER.info("{}", Integer.toHexString(inputCharArray[i]));
		});

		return input;
	}
}

编写完成并打包后,可以将程序包上传(对于自建的 Flink 集群,则是放入 Flink 的 lib 目录):

上传程序包

随后可以在 SQL 代码中,引用这个程序包:

作业中引用该程序包

作业提交运行后,我们可以尝试读取 id=1 的数据,发现打印出来的日志里,字符串中实际上保留了原始字符的 GBK 编码,只是没有经过妥善解码,导致输出时误当作 Unicode 处理了。

日志片段

另外还注意到,对于原始 Latin1 而言,每个字符占 1 个字节,而这里 Java String 中使用的是 Char 结构,每个字符占了 2 个字节,且高位字节恒为 0. 此猜想在 这篇 MySQL 官方文档 中得到了验证。

那么给我们的启示是:可以直接将 char[] 数组转为等长的 byte[] 数组,而不能按照传统思路,创建一个长度为 char[] 数组两倍的 byte[] 数组。

改版后的代码

按照上面的思路,我们重新实现了一版,该版本可以实现解码并重新生成正确 String。

package com.tencent.cloud.oceanus.udf;

import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.util.stream.IntStream;

/**
 * 如果 JDBC 数据库的 VARCHAR 为 Latin1 (或 GBK 等) 编码
 * 可以使用这个函数转换为标准字符串
 *
 * SQL 代码声明方式:
 * CREATE TEMPORARY SYSTEM FUNCTION DECODE_LATIN1 AS 'com.tencent.cloud.oceanus.udf.DecodeLatin1' LANGUAGE JAVA;
 */
public class DecodeLatin1 extends ScalarFunction {
	private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLatin1.class);

	public String eval(String input) {
		return eval(input, "latin1");
	}

	public String eval(String input, String fromCharset) {
		char[] inputCharArray = input.toCharArray();

		// JDBC Driver 读取的 Latin1 字符, 高 8 位都是 0x00, 因此只考虑低 8 位即可, byte 和 char 数据部分等长, 长度无需乘以二
		byte[] inputBytes = new byte[inputCharArray.length];

		IntStream.range(0, inputCharArray.length).forEach(i -> {
			inputBytes[i] = (byte) inputCharArray[i];
			LOGGER.debug("{}", String.format("0x%02X ", inputBytes[i]));
		});

		try {
			return new String(inputBytes, fromCharset);
		} catch (UnsupportedEncodingException e) {
			// 与 GET_JSON_OBJECT 的异常处理方式保持一致, 遇到异常数据时输出 null, 避免日志过量打印
			LOGGER.debug("Unsupported charset {} for input string {}", fromCharset, input, e);
			return null;
		}

	}
}

上传新版的 UDF,然后再次运行(注意本次增加了一个新字段FromCharset,表示解码使用的实际字符集):

上传新版本,并修改调用方式,再次运行

然后我们再读取数据库中 id 为 1 的数据,现在输出就正常了:

中文数据正常解析

总结

在遇到数据乱码等原生 Flink 无法轻易解决的问题时,可以尝试自定义函数来定位和排查,一旦确认问题根源,可以同样使用自定义函数来对数据进行校正。大大扩展了 Flink SQL 的功能。

另外,程序包可以分版本在不同的作业之间复用,基础包(UDF)和业务包(调用 UDF 的主程序)可以实现解耦。如果有更优化的实现,可以只更新基础包,避免对业务包的改动引入的风险。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Apache Flink 1.10.0 重磅发布,年度最大规模版本升级!

    Apache Flink 社区迎来了激动人心的两位数位版本号,Flink 1.10.0 正式宣告发布!作为 Flink 社区迄今为止规模最大的一次版本升级,Fl...

    Spark学习技巧
  • Apache Flink 1.10.0 重磅发布,年度最大规模版本升级!

    Apache Flink 社区迎来了激动人心的两位数位版本号,Flink 1.10.0 正式宣告发布!作为 Flink 社区迄今为止规模最大的一次版本升级,Fl...

    Fayson
  • Flink从1.7到1.12版本升级汇总

    最进再看官方flink提供的视频教程,发现入门版本因为时间关系都是基于1.7.x讲解的. 在实际操作中跟1.12.x版本还是有差距的, 所以整理一下从1.7 版...

    王知无-import_bigdata
  • SQL注入的几种类型和原理

    在上一章节中,介绍了SQL注入的原理以及注入过程中的一些函数,但是具体的如何注入,常见的注入类型,没有进行介绍,这一章节我想对常见的注入类型进行一个了解,能够自...

    天钧
  • Flink最锋利的武器:Flink SQL入门和实战 | 附完整实现代码

    Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

    AI科技大本营
  • Flink 最锋利的武器:Flink SQL 入门和实战

    Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

    王知无-import_bigdata
  • Flink SQL 客户端如何使用

    Flink 的 Table & SQL API 可以处理 SQL 语言编写的查询语句,但是这些查询需要嵌入用 Java 或 Scala 编写的 Table 程序...

    smartsi
  • Hive 终于等来了 Flink

    其实比较也没啥意义,不同社区发展的目标总是会有差异,而且 Flink 在真正的实时流计算方面投入的精力很多。不过笔者想表达的是,Apache Hive 已经成为...

    Fayson
  • Flink重点难点:Flink Table&SQL必知必会(一)

    Flink本身是批流统一的处理框架,所以Table API和SQL,就是批流统一的上层处理API。目前功能尚未完善,处于活跃的开发阶段。

    王知无-import_bigdata
  • 干货 | 五千字长文带你快速入门FlinkSQL

    最近几天因为工作比较忙,已经几天没有及时更新文章了,在这里先给小伙伴们说声抱歉…临近周末,再忙再累,我也要开始发力了。接下来的几天,菌哥将为大家...

    大数据梦想家
  • SQL Stream Builder概览

    Cloudera的流分析中除了包括Flink,还包括SQL Stream Builder创建对数据流的连续查询。我们在该系列的第一部分介绍了《Cloudera中...

    大数据杂货铺
  • 我说Java基础重要,你不信?来试试这几个问题

    代码生成技术广泛应用于现代的数据库系统中。代码生成是将用户输入的表达式、查询、存储过程等现场编译成二进制代码再执行,相比解释执行的方式,运行效率要高很多。尤其是...

    王知无-import_bigdata
  • 十分钟入门Fink SQL

    Flink 本身是批流统一的处理框架,所以 Table API 和 SQL,就是批流统一的上层处理 API。目前功能尚未完善,处于活跃的开发阶...

    大数据老哥
  • 14-Flink-Table-&-SQL实战

    Apache Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非...

    王知无-import_bigdata
  • 基于Flink打造实时计算平台为企业赋能

    随着互联网技术的广泛使用,信息的实时性对业务的开展越来越重要,特别是业务的异常信息,没滞后一点带来的就是直接的经济损失。所以实时信息处理能力,越来越成为企业的重...

    王知无-import_bigdata
  • 网站渗透测试出漏洞该如何修复

    国庆即将到来,前一期讲到获取网站信息判断所属环境以及各个端口的用处和弱口令密码利用方法,这期仍有很多客户找到我们Sine安全想要了解针对于SQL注入攻击的测试方...

    网站安全专家
  • 网站被攻击渗透测试出问题怎么处理

    国庆即将到来,前一期讲到获取网站信息判断所属环境以及各个端口的用处和弱口令密码利用方法,这期仍有很多客户找到我们Sine安全想要了解针对于SQL注入攻击的测试方...

    技术分享达人
  • Flink Table&SQL必知必会(干货建议收藏)

    Flink本身是批流统一的处理框架,所以Table API和SQL,就是批流统一的上层处理API。目前功能尚未完善,处于活跃的开发阶段。

    大数据老哥
  • Flink or Spark?实时计算框架在K12场景的应用实践

    如今,越来越多的业务场景要求 OLTP 系统能及时得到业务数据计算、分析后的结果,这就需要实时的流式计算如Flink等来保障。例如,在 TB 级别数据量的数据库...

    芋道源码

扫码关注云+社区

领取腾讯云代金券