首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink 1.12将Avro通用记录序列化为Kafka失败,错误为com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException

Flink 1.12将Avro通用记录序列化为Kafka失败,错误为com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
EN

Stack Overflow用户
提问于 2021-04-05 10:43:04
回答 1查看 217关注 0票数 0

我有一台DataStreamGenericRecord

代码语言:javascript
复制
val consumer = new FlinkKafkaConsumer[String]("input_csv_topic", new SimpleStringSchema(), properties)
val stream = senv.
    addSource(consumer).
    map(line => {
        val arr = line.split(",")

        val schemaUrl = "" // avro schema link, standard .avsc file format
        val schemaStr = scala.io.Source.fromURL(schemaUrl).mkString.toString().stripLineEnd

        import org.codehaus.jettison.json.{JSONObject, JSONArray}
        val schemaFields: JSONArray = new JSONObject(schemaStr).optJSONArray("fields")

        val genericDevice: GenericRecord = new GenericData.Record(new Schema.Parser().parse(schemaStr))

        for(i <- 0 until arr.length) {
            val fieldObj: JSONObject = schemaFields.optJSONObject(i)
            val columnName = fieldObj.optString("name")
            var columnType = fieldObj.optString("type")

            if (columnType.contains("string")) {
                genericDevice.put(columnName, arr(i))
            } else if (columnType.contains("int")) {
                genericDevice.put(columnName, toInt(arr(i)).getOrElse(0).asInstanceOf[Number].intValue)
            } else if (columnType.contains("long")) {
                genericDevice.put(columnName, toLong(arr(i)).getOrElse(0).asInstanceOf[Number].longValue)
            }
        }

        genericDevice
    })

val kafkaSink = new FlinkKafkaProducer[GenericRecord](
    "output_avro_topic",
    new MyKafkaAvroSerializationSchema[GenericRecord](classOf[GenericRecord], "output_avro_topic", "this is the key", schemaStr),
    properties,
    FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)

stream.addSink(kafkaSink)

下面是MyKafkaAvroSerializationSchema的实现:

代码语言:javascript
复制
class MyKafkaAvroSerializationSchema[T](avroType: Class[T], topic: String, key: String, schemaStr: String) extends KafkaSerializationSchema[T]  {

    lazy val schema: Schema = new Schema.Parser().parse(schemaStr)

    override def serialize(element: T, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {

        val cl = Thread.currentThread().getContextClassLoader()
        val genericData = new GenericData(cl)
        val writer = new GenericDatumWriter[T](schema, genericData)

        // val writer = new ReflectDatumWriter[T](schema)
        // val writer = new SpecificDatumWriter[T](schema)
        val out = new ByteArrayOutputStream()
        val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
        writer.write(element, encoder)
        encoder.flush()
        out.close()

        new ProducerRecord[Array[Byte], Array[Byte]](topic, key.getBytes, out.toByteArray)
    }
}

Here's stack trace screenshot:

代码语言:javascript
复制
    com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
    Serialization trace:
    reserved (org.apache.avro.Schema$Field)
    fieldMap (org.apache.avro.Schema$RecordSchema)
    schema (org.apache.avro.generic.GenericData$Record)

如何使用Flink将Avro泛型记录序列化为Kafka?我测试了不同的写入器,但仍然得到了com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException,谢谢你的意见。

EN

回答 1

Stack Overflow用户

发布于 2021-04-05 17:00:20

您可以简单地将flink-avro模块添加到您的项目中,并在提供模式之后使用已经提供的可用于SpecificRecordGenericRecordAvroSerializationSchema

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66947688

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档