我正在尝试使用spark structured streaming (2.3.1版)处理来自kafka的avro数据流,所以我尝试使用this示例来反序列化。仅当主题value
部件包含StringType
时才有效,但在我的示例中,架构包含long and integers
,如下所示:
public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"variables\","
+ "\"fields\":["
+ " { \"name\":\"time\", \"type\":\"long\" },"
+ " { \"name\":\"thnigId\", \"type\":\"string\" },"
+ " { \"name\":\"controller\", \"type\":\"int\" },"
+ " { \"name\":\"module\", \"type\":\"int\" }"
+ "]}";
因此,它在
sparkSession.udf().register("deserialize", (byte[] data) -> {
GenericRecord record = recordInjection.invert(data).get(); //throws error at invert method.
return RowFactory.create(record.get("time"), record.get("thingId").toString(), record.get("controller"), record.get("module"));
}, DataTypes.createStructType(type.fields()));
说
Failed to invert: [B@22a45e7
Caused by java.io.IOException: Invalid int encoding.
因为我在模式long and int
类型中使用了time, controller and module
。
我猜这是字节数组byte[] data
的某种编码和解码格式错误。
发布于 2019-06-12 03:30:18
你有没有看过这个https://issues.apache.org/jira/browse/AVRO-1650。它专门讨论了你可能会遇到的问题。默认的UTF-8编码可能会在编码/解码过程中导致丢失。
我还建议,如果您正在处理二进制编码的数据,请使用Base64编码来保存/传输数据,因为这利用了ISO-8859-1,这是上面链接使用的正确编码。
发布于 2020-11-07 23:22:28
我也遇到过这种情况,我猜你可能配置了你的kafka值--反序列化器是默认的字符串反序列化器,你可以试着把反序列化器改成org.apache.kafka.common.serialization.ByteArrayDeserializer.
这就是我的解决方案。
希望能帮到你
https://stackoverflow.com/questions/55914401
复制相似问题