我正在尝试将avro字节流反序列化为scala case类对象。基本上,我有一个带有avro编码数据流的kafka流,现在向模式中添加了一个,我正在尝试更新scala case类以包括新字段。case类如下所示
/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
sw_version: String,
timestamp: String,
reading: Double,
new_field: Option[String] = None
) {this() = this("na","na","na",0,None) }
avro模式如下所示:
{
"type": "record",
"name": "some_name",
"namespace": "some_namespace",
"fields": [
{
"name": "deviceId",
"type": "string"
},
{
"name": "sw_version",
"type": "string"
},
{
"name": "timestamp",
"type": "string"
},
{
"name": "reading",
"type": "double"
},
{
"name": "new_field",
"type": ["null", "string"],
"default": null
}]}当接收到数据时,我得到以下异常:
java.lang.RuntimeException: java.lang.InstantiationException我可以很好地接收用python编写的消费者的数据,这样我就知道数据以正确的格式被正确地流式传输。我怀疑问题出在case类构造函数的创建上,我尝试过这样做:
/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
sw_version: String,
timestamp: String,
reading: Double,
new_field: Option[String]
) {
this() = this("na", "na", "na", 0, some("na"))
}但没那么走运。
反序列化程序代码是(摘录):
// reader and decoder for reading avro records
private var reader: DatumReader[T] = null
private var decoder : BinaryDecoder = null
decoder = DecoderFactory.get.binaryDecoder(message, decoder)
reader.read(null.asInstanceOf[T], decoder)我找不到任何其他用于反序列化avro的case类的构造函数的例子,我去年在java.lang.NoSuchMethodException for init method in Scala case class上发布了一个相关的问题,根据响应,我能够实现我当前的代码,从那以后一直运行得很好。
发布于 2019-06-02 04:32:33
我按照一种完全不同的方法解决了这个问题。我使用了这个示例https://github.com/jfrazee/schema-registry-examples/tree/master/src/main/scala/io/atomicfinch/examples/flink中提供的Confluent Kafka客户端。我还有一个合流的模式注册表,使用kafka附带的容器化的all in one解决方案和一个模式注册表https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html,设置起来非常容易。
我必须在我的pom.xml文件中添加合流的依赖项和存储库。这将放在存储库部分。
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>这将在依赖项部分中列出:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<!-- For Confluent Platform 5.2.1 -->
<version>5.2.1</version>
</dependency>使用https://github.com/jfrazee/schema-registry-examples/blob/master/src/main/scala/io/atomicfinch/examples/flink/ConfluentRegistryDeserializationSchema.scala中提供的代码,我能够与Confluent模式注册表通信,然后基于avro消息头部中的模式id,这将从模式reg中下载模式,并返回给我一个GenericRecord对象,我可以从该对象轻松地创建感兴趣的任何和所有字段,并创建DeviceData对象的新DataStream。
val kafka_consumer = new FlinkKafkaConsumer010("prod.perfwarden.minute",
new ConfluentRegistryDeserializationSchema[GenericRecord](classOf[GenericRecord], "http://localhost:8081"),
properties)
val device_data_stream = env
.addSource(kafka_consumer)
.map({x => new DeviceData(x.get("deviceId").toString,
x.get("sw_version").toString,
x.get("timestamp").toString,
x.get("reading").toString.toDouble,
x.get("new_field").toString)})confluent kafka客户端负责按照模式对avro字节流进行反序列化,包括默认值。设置模式注册表和使用confluent kafka客户端可能只需要一点时间来适应,但从长远来看,这可能是更好的解决方案。
https://stackoverflow.com/questions/56369848
复制相似问题