首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >将字节流反序列化为Scala case类对象时发生java.lang.Instantiation异常

将字节流反序列化为Scala case类对象时发生java.lang.Instantiation异常
EN

Stack Overflow用户
提问于 2019-05-30 07:38:48
回答 1查看 451关注 0票数 1

我正在尝试将avro字节流反序列化为scala case类对象。基本上,我有一个带有avro编码数据流的kafka流,现在向模式中添加了一个,我正在尝试更新scala case类以包括新字段。case类如下所示

代码语言:javascript
复制
/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
                sw_version: String,
                timestamp: String,
                reading: Double,
                new_field: Option[String] = None
               )  {

this() = this("na","na","na",0,None) }

avro模式如下所示:

代码语言:javascript
复制
{
  "type": "record",
  "name": "some_name",
  "namespace": "some_namespace",
  "fields": [
    {
      "name": "deviceId",
      "type": "string"
    },
    {
      "name": "sw_version",
      "type": "string"
    }, 
    {
      "name": "timestamp",
      "type": "string"
    },
    {
      "name": "reading",
      "type": "double"
    },
    {
      "name": "new_field",
     "type": ["null", "string"],
      "default": null
    }]}

当接收到数据时,我得到以下异常:

代码语言:javascript
复制
java.lang.RuntimeException: java.lang.InstantiationException

我可以很好地接收用python编写的消费者的数据,这样我就知道数据以正确的格式被正确地流式传输。我怀疑问题出在case类构造函数的创建上,我尝试过这样做:

代码语言:javascript
复制
/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
                sw_version: String,
                timestamp: String,
                reading: Double,
                new_field: Option[String]
               )  {
this() = this("na", "na", "na", 0, some("na"))
}

但没那么走运。

反序列化程序代码是(摘录):

代码语言:javascript
复制
// reader and decoder for reading avro records
private var reader: DatumReader[T] = null
private var decoder : BinaryDecoder = null
decoder = DecoderFactory.get.binaryDecoder(message, decoder)
reader.read(null.asInstanceOf[T], decoder)

我找不到任何其他用于反序列化avro的case类的构造函数的例子,我去年在java.lang.NoSuchMethodException for init method in Scala case class上发布了一个相关的问题,根据响应,我能够实现我当前的代码,从那以后一直运行得很好。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-06-02 04:32:33

我按照一种完全不同的方法解决了这个问题。我使用了这个示例https://github.com/jfrazee/schema-registry-examples/tree/master/src/main/scala/io/atomicfinch/examples/flink中提供的Confluent Kafka客户端。我还有一个合流的模式注册表,使用kafka附带的容器化的all in one解决方案和一个模式注册表https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html,设置起来非常容易。

我必须在我的pom.xml文件中添加合流的依赖项和存储库。这将放在存储库部分。

代码语言:javascript
复制
<repository>
    <id>confluent</id>
    <url>http://packages.confluent.io/maven/</url>
</repository>

这将在依赖项部分中列出:

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro-confluent-registry</artifactId>
    <version>1.8.0</version>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <!-- For Confluent Platform 5.2.1 -->
    <version>5.2.1</version>
</dependency>

使用https://github.com/jfrazee/schema-registry-examples/blob/master/src/main/scala/io/atomicfinch/examples/flink/ConfluentRegistryDeserializationSchema.scala中提供的代码,我能够与Confluent模式注册表通信,然后基于avro消息头部中的模式id,这将从模式reg中下载模式,并返回给我一个GenericRecord对象,我可以从该对象轻松地创建感兴趣的任何和所有字段,并创建DeviceData对象的新DataStream。

代码语言:javascript
复制
val kafka_consumer = new FlinkKafkaConsumer010("prod.perfwarden.minute",
  new ConfluentRegistryDeserializationSchema[GenericRecord](classOf[GenericRecord], "http://localhost:8081"),
  properties)
val device_data_stream = env
  .addSource(kafka_consumer)
  .map({x => new DeviceData(x.get("deviceId").toString,
    x.get("sw_version").toString,
    x.get("timestamp").toString,
    x.get("reading").toString.toDouble,
    x.get("new_field").toString)})

confluent kafka客户端负责按照模式对avro字节流进行反序列化,包括默认值。设置模式注册表和使用confluent kafka客户端可能只需要一点时间来适应,但从长远来看,这可能是更好的解决方案。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56369848

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档