近期我们遇到了一位客户提出的问题:MySQL 建表时,数据库表定义的字符集是 latin1,里面的数据是以 GBK 编码的方式写入的。当 Flink 的 JDBC Connector 在读取此维表时,输出数据的中文出现了乱码现象,如下图:
对于 Oceanus 平台而言,内部的数据处理都是以 Unicode 为标准的。对于非 Unicode 的字符集,在 JDBC Connector 读取时,可能会出现各种异常情况,即使 JDBC 连接 URL 参数中指定了characterEncoding
也无法避免中文乱码问题。
对于 MySQL 数据而言,最怕的不是数据乱码,而是变成问号 (????)。通常来讲,如果遇到了全是问号的情况,则数据基本无法还原了;而对于乱码来说,很可能源数据还在,只是编码选错了,通过恰当的解码方式,还是有希望恢复原有的数据。
因此我们需要编写一个 UDF(用户自定义函数),将 JDBC Connector 读到的 Latin1(这里实际上是 GBK)数据进行解码。
首先我们来看一下数据库中的原始数据(首先需要将终端的编码改为 GBK,否则显示的仍然是乱码):
以 id 为 1 的数据为例,这里喵的 GBK 编码是0xDF 0xF7
。
那问题来了,既然 Flink 并没有报类型错误,说明输入输出还是当作字符串看待的,只是字符串没有经过妥善解码。那 Flink 将其读取成了什么呢?我们来写一个 UDF 自定义函数看看。
对于这种编解码的场景,适合使用 Flink 的标量函数(Scalar Function),即单条记录进,单条记录出,无需聚合等复杂操作。
在当前的流计算 Oceanus 版本中,已经支持通过CREATE TEMPORARY SYSTEM FUNCTION
的方式来 声明 UDF。声明 UDF 后,在 程序包管理 界面,可以上传具体的实现类 JAR 包。
我们先编写一个打印出 String 里每个 Char 内容的函数,类名为DecodeLatin1
.
请先在 pom.xml 中引入 Flink 相关依赖,随后可以开始编写 UDF:
package com.tencent.cloud.oceanus.udf;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.stream.IntStream;
public class DecodeLatin1 extends ScalarFunction {
private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLatin1.class);
public String eval(String input) {
char[] inputCharArray = input.toCharArray(); // 不能用 getBytes() 方法, 否则原始内容会被再次编码
IntStream.range(0, inputCharArray.length).forEach(i -> {
LOGGER.info("{}", Integer.toHexString(inputCharArray[i]));
});
return input;
}
}
编写完成并打包后,可以将程序包上传(对于自建的 Flink 集群,则是放入 Flink 的 lib 目录):
随后可以在 SQL 代码中,引用这个程序包:
作业提交运行后,我们可以尝试读取 id=1 的数据,发现打印出来的日志里,字符串中实际上保留了原始字符的 GBK 编码,只是没有经过妥善解码,导致输出时误当作 Unicode 处理了。
另外还注意到,对于原始 Latin1 而言,每个字符占 1 个字节,而这里 Java String 中使用的是 Char 结构,每个字符占了 2 个字节,且高位字节恒为 0. 此猜想在 这篇 MySQL 官方文档 中得到了验证。
那么给我们的启示是:可以直接将 char[] 数组转为等长的 byte[] 数组,而不能按照传统思路,创建一个长度为 char[] 数组两倍的 byte[] 数组。
按照上面的思路,我们重新实现了一版,该版本可以实现解码并重新生成正确 String。
package com.tencent.cloud.oceanus.udf;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.util.stream.IntStream;
/**
* 如果 JDBC 数据库的 VARCHAR 为 Latin1 (或 GBK 等) 编码
* 可以使用这个函数转换为标准字符串
*
* SQL 代码声明方式:
* CREATE TEMPORARY SYSTEM FUNCTION DECODE_LATIN1 AS 'com.tencent.cloud.oceanus.udf.DecodeLatin1' LANGUAGE JAVA;
*/
public class DecodeLatin1 extends ScalarFunction {
private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLatin1.class);
public String eval(String input) {
return eval(input, "latin1");
}
public String eval(String input, String fromCharset) {
char[] inputCharArray = input.toCharArray();
// JDBC Driver 读取的 Latin1 字符, 高 8 位都是 0x00, 因此只考虑低 8 位即可, byte 和 char 数据部分等长, 长度无需乘以二
byte[] inputBytes = new byte[inputCharArray.length];
IntStream.range(0, inputCharArray.length).forEach(i -> {
inputBytes[i] = (byte) inputCharArray[i];
LOGGER.debug("{}", String.format("0x%02X ", inputBytes[i]));
});
try {
return new String(inputBytes, fromCharset);
} catch (UnsupportedEncodingException e) {
// 与 GET_JSON_OBJECT 的异常处理方式保持一致, 遇到异常数据时输出 null, 避免日志过量打印
LOGGER.debug("Unsupported charset {} for input string {}", fromCharset, input, e);
return null;
}
}
}
上传新版的 UDF,然后再次运行(注意本次增加了一个新字段FromCharset
,表示解码使用的实际字符集):
然后我们再读取数据库中 id 为 1 的数据,现在输出就正常了:
在遇到数据乱码等原生 Flink 无法轻易解决的问题时,可以尝试自定义函数来定位和排查,一旦确认问题根源,可以同样使用自定义函数来对数据进行校正。大大扩展了 Flink SQL 的功能。
另外,程序包可以分版本在不同的作业之间复用,基础包(UDF)和业务包(调用 UDF 的主程序)可以实现解耦。如果有更优化的实现,可以只更新基础包,避免对业务包的改动引入的风险。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。