专栏首页腾讯云流计算Flink SQL 自定义函数指南 - 以读取 GBK 编码的数据库为例
原创

Flink SQL 自定义函数指南 - 以读取 GBK 编码的数据库为例

背景介绍

近期我们遇到了一位客户提出的问题:MySQL 建表时,数据库表定义的字符集是 latin1,里面的数据是以 GBK 编码的方式写入的。当 Flink 的 JDBC Connector 在读取此维表时,输出数据的中文出现了乱码现象,如下图:

中文数据乱码

原因分析

对于 Oceanus 平台而言,内部的数据处理都是以 Unicode 为标准的。对于非 Unicode 的字符集,在 JDBC Connector 读取时,可能会出现各种异常情况,即使 JDBC 连接 URL 参数中指定了characterEncoding也无法避免中文乱码问题。

对于 MySQL 数据而言,最怕的不是数据乱码,而是变成问号 (????)。通常来讲,如果遇到了全是问号的情况,则数据基本无法还原了;而对于乱码来说,很可能源数据还在,只是编码选错了,通过恰当的解码方式,还是有希望恢复原有的数据。

因此我们需要编写一个 UDF(用户自定义函数),将 JDBC Connector 读到的 Latin1(这里实际上是 GBK)数据进行解码。

首先我们来看一下数据库中的原始数据(首先需要将终端的编码改为 GBK,否则显示的仍然是乱码):

数据库中的原始数据

以 id 为 1 的数据为例,这里的 GBK 编码是0xDF 0xF7

那问题来了,既然 Flink 并没有报类型错误,说明输入输出还是当作字符串看待的,只是字符串没有经过妥善解码。那 Flink 将其读取成了什么呢?我们来写一个 UDF 自定义函数看看。

UDF 编写

对于这种编解码的场景,适合使用 Flink 的标量函数(Scalar Function),即单条记录进,单条记录出,无需聚合等复杂操作。

在当前的流计算 Oceanus 版本中,已经支持通过CREATE TEMPORARY SYSTEM FUNCTION的方式来 声明 UDF。声明 UDF 后,在 程序包管理 界面,可以上传具体的实现类 JAR 包。

我们先编写一个打印出 String 里每个 Char 内容的函数,类名为DecodeLatin1.

初步代码

请先在 pom.xml 中引入 Flink 相关依赖,随后可以开始编写 UDF:

package com.tencent.cloud.oceanus.udf;

import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.stream.IntStream;

public class DecodeLatin1 extends ScalarFunction {
	private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLatin1.class);

	public String eval(String input) {
		char[] inputCharArray = input.toCharArray();  // 不能用 getBytes() 方法, 否则原始内容会被再次编码

		IntStream.range(0, inputCharArray.length).forEach(i -> {
			LOGGER.info("{}", Integer.toHexString(inputCharArray[i]));
		});

		return input;
	}
}

编写完成并打包后,可以将程序包上传(对于自建的 Flink 集群,则是放入 Flink 的 lib 目录):

上传程序包

随后可以在 SQL 代码中,引用这个程序包:

作业中引用该程序包

作业提交运行后,我们可以尝试读取 id=1 的数据,发现打印出来的日志里,字符串中实际上保留了原始字符的 GBK 编码,只是没有经过妥善解码,导致输出时误当作 Unicode 处理了。

日志片段

另外还注意到,对于原始 Latin1 而言,每个字符占 1 个字节,而这里 Java String 中使用的是 Char 结构,每个字符占了 2 个字节,且高位字节恒为 0. 此猜想在 这篇 MySQL 官方文档 中得到了验证。

那么给我们的启示是:可以直接将 char[] 数组转为等长的 byte[] 数组,而不能按照传统思路,创建一个长度为 char[] 数组两倍的 byte[] 数组。

改版后的代码

按照上面的思路,我们重新实现了一版,该版本可以实现解码并重新生成正确 String。

package com.tencent.cloud.oceanus.udf;

import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.util.stream.IntStream;

/**
 * 如果 JDBC 数据库的 VARCHAR 为 Latin1 (或 GBK 等) 编码
 * 可以使用这个函数转换为标准字符串
 *
 * SQL 代码声明方式:
 * CREATE TEMPORARY SYSTEM FUNCTION DECODE_LATIN1 AS 'com.tencent.cloud.oceanus.udf.DecodeLatin1' LANGUAGE JAVA;
 */
public class DecodeLatin1 extends ScalarFunction {
	private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLatin1.class);

	public String eval(String input) {
		return eval(input, "latin1");
	}

	public String eval(String input, String fromCharset) {
		char[] inputCharArray = input.toCharArray();

		// JDBC Driver 读取的 Latin1 字符, 高 8 位都是 0x00, 因此只考虑低 8 位即可, byte 和 char 数据部分等长, 长度无需乘以二
		byte[] inputBytes = new byte[inputCharArray.length];

		IntStream.range(0, inputCharArray.length).forEach(i -> {
			inputBytes[i] = (byte) inputCharArray[i];
			LOGGER.debug("{}", String.format("0x%02X ", inputBytes[i]));
		});

		try {
			return new String(inputBytes, fromCharset);
		} catch (UnsupportedEncodingException e) {
			// 与 GET_JSON_OBJECT 的异常处理方式保持一致, 遇到异常数据时输出 null, 避免日志过量打印
			LOGGER.debug("Unsupported charset {} for input string {}", fromCharset, input, e);
			return null;
		}

	}
}

上传新版的 UDF,然后再次运行(注意本次增加了一个新字段FromCharset,表示解码使用的实际字符集):

上传新版本,并修改调用方式,再次运行

然后我们再读取数据库中 id 为 1 的数据,现在输出就正常了:

中文数据正常解析

总结

在遇到数据乱码等原生 Flink 无法轻易解决的问题时,可以尝试自定义函数来定位和排查,一旦确认问题根源,可以同样使用自定义函数来对数据进行校正。大大扩展了 Flink SQL 的功能。

另外,程序包可以分版本在不同的作业之间复用,基础包(UDF)和业务包(调用 UDF 的主程序)可以实现解耦。如果有更优化的实现,可以只更新基础包,避免对业务包的改动引入的风险。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 如何应对飞速增长的状态?Flink State TTL 概述

    在流计算作业中,经常会遇到一些状态数不断累积,导致状态量越来越大的情形。例如,作业中定义了超长的时间窗口,或者在动态表上应用了无限范围的 GROUP BY 语句...

    KyleMeow
  • Flink 类型和序列化机制简介

    使用 Flink 编写处理逻辑时,新手总是容易被林林总总的概念所混淆,本文将逐步解密 Flink 的类型和序列化机制。

    KyleMeow
  • Flink SQL 状态越来越多?Idle State Retention Time 特性概览

    在上一篇文章中,介绍了 Flink State TTL 机制,这项机制对于应对通用的状态暴增特别有效。然而,这个特性也有其缺陷,例如不能保证一定可以及时清理掉失...

    KyleMeow
  • 【r<-ROC|包】分析与可视化ROC——plotROC、pROC

    在【r<-绘图|ROC】ROC的计算与绘制这篇文章中我讲了ROC曲线的本质以及如何计算和绘制ROC曲线。注意,我这里谈到的ROC并未曾涉及机器学习模型的拟合与预...

    生信技能树
  • tensorflow实现将ckpt转pb文件的方法

    本博客实现将自己训练保存的ckpt模型转换为pb文件,该方法适用于任何ckpt模型,当然你需要确定ckpt模型输入/输出的节点名称。

    砸漏
  • java的poi技术读取和导入Excel

    http://www.cnblogs.com/hongten/gallery/image/111987.html

    Hongten
  • CPU 100% 异常排查实践与总结

    1、问题背景 昨天下午突然收到运维邮件报警,显示数据平台服务器cpu利用率达到了98.94%,而且最近一段时间一直持续在70%以上,看起来像是硬件资源到瓶颈需要...

    用户1177713
  • 用reStructuredText来写博客(测试)

    它是一个类似于MarkDown的标记语言,具体可参考这里:http://zh.wikipedia.org/wiki/ReStructuredText, 手册在这...

    the5fire
  • Linux 系统 CPU 100% 异常排查实践与总结

    昨天下午突然收到运维邮件报警,显示数据平台服务器cpu利用率达到了98.94%,而且最近一段时间一直持续在70%以上,看起来像是硬件资源到瓶颈需要扩容了,但仔细...

    马哥linux运维
  • 数组函数 array_column

    array_column 函数简介传入一个参数,返回二维数组中指定列传入一个参数,指定列不一定存在的情况传入两个参数,且两个参数对应的列都存在且不重复如果第二个...

    写PHP的老王

扫码关注云+社区

领取腾讯云代金券