我使用“自动”反序列化程序使用来自Kafka的Avro序列化消息,如下所示:
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroDeserializer"
);
props.put("schema.registry.url", "https://example.com");这非常有效,并且已经在https://docs.confluent.io/current/schema-registry/serializer-formatter.html#serializer的文档中找到了。
我面临的问题是,我实际上只是想转发这些消息,但为了进行路由,我需要一些来自内部的元数据。一些技术限制意味着我不能可行地编译生成的类文件来使用KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG => true,所以我使用一个常规的解码器,而不是绑定到Kafka中,特别是将字节作为Array[Byte]读取并将它们传递给手动构造的反序列化程序:
var maxSchemasToCache = 1000;
var schemaRegistryURL = "https://example.com/"
var specificDeserializerProps = Map(
"schema.registry.url"
-> schemaRegistryURL,
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG
-> "false"
);
var client = new CachedSchemaRegistryClient(
schemaRegistryURL,
maxSchemasToCache
);
var deserializer = new KafkaAvroDeserializer(
client,
specificDeserializerProps.asJava
);消息是一种“容器”类型,真正有趣的部分是union { A, B, C } msg记录字段中大约25种类型中的一种:
record Event {
timestamp_ms created_at;
union {
Online,
Offline,
Available,
Unavailable,
...
...Failed,
...Updated
} msg;
}因此,我成功地将Array[Byte]读取到record中,并将其提供给反序列化程序,如下所示:
var genericRecord = deserializer.deserialize(topic, consumerRecord.value())
.asInstanceOf[GenericRecord];
var schema = genericRecord.getSchema();
var msgSchema = schema.getField("msg").schema();然而,问题是我无法通过联合来识别、区分或“解析”msg字段的“类型”:
System.out.printf(
"msg.schema = %s msg.schema.getType = %s\n",
msgSchema.getFullName(),
msgSchema.getType().name());
=> msg.schema = union msg.schema.getType = union如何区分此场景中的类型?confluent注册表知道,这些东西有名称,它们有“类型”,即使我把它们当作GenericRecords,
我在这里的目标是知道record.msg是“类型”Online | Offline | Available,而不仅仅是知道它是一个union。
发布于 2020-01-21 23:33:23
在研究了AVRO Java库的实现之后,我认为可以肯定地说,根据当前的API,这是不可能的。我已经找到了以下在解析时提取类型的方法,使用自定义的GenericDatumReader子类,但在我在生产代码中使用类似以下内容之前,需要进行大量的改进:D
下面是这个子类:
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.ResolvingDecoder;
import java.io.IOException;
import java.util.List;
public class CustomReader<D> extends GenericDatumReader<D> {
private final GenericData data;
private Schema actual;
private Schema expected;
private ResolvingDecoder creatorResolver = null;
private final Thread creator;
private List<Schema> unionTypes;
// vvv This is the constructor I've modified, added a list of types
public CustomReader(Schema schema, List<Schema> unionTypes) {
this(schema, schema, GenericData.get());
this.unionTypes = unionTypes;
}
public CustomReader(Schema writer, Schema reader, GenericData data) {
this(data);
this.actual = writer;
this.expected = reader;
}
protected CustomReader(GenericData data) {
this.data = data;
this.creator = Thread.currentThread();
}
protected Object readWithoutConversion(Object old, Schema expected, ResolvingDecoder in) throws IOException {
switch (expected.getType()) {
case RECORD:
return super.readRecord(old, expected, in);
case ENUM:
return super.readEnum(expected, in);
case ARRAY:
return super.readArray(old, expected, in);
case MAP:
return super.readMap(old, expected, in);
case UNION:
// vvv The magic happens here
Schema type = expected.getTypes().get(in.readIndex());
unionTypes.add(type);
return super.read(old, type, in);
case FIXED:
return super.readFixed(old, expected, in);
case STRING:
return super.readString(old, expected, in);
case BYTES:
return super.readBytes(old, expected, in);
case INT:
return super.readInt(old, expected, in);
case LONG:
return in.readLong();
case FLOAT:
return in.readFloat();
case DOUBLE:
return in.readDouble();
case BOOLEAN:
return in.readBoolean();
case NULL:
in.readNull();
return null;
default:
return super.readWithoutConversion(old, expected, in);
}
}
}我在代码中为有趣的部分添加了注释,因为它主要是样板。
然后,您可以像这样使用这个自定义阅读器:
List<Schema> unionTypes = new ArrayList<>();
DatumReader<GenericRecord> datumReader = new CustomReader<GenericRecord>(schema, unionTypes);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(eventFile, datumReader);
GenericRecord event = null;
while (dataFileReader.hasNext()) {
event = dataFileReader.next(event);
}
System.out.println(unionTypes);这将为每个解析的union输出该union的类型。请注意,您必须根据记录中有多少个联合等来确定该列表中的哪个元素是您感兴趣的。
tbh :D不太好
发布于 2020-01-22 03:14:15
经过大量的研究,我提出了一个一次性的解决方案:
val records: ConsumerRecords[String, Array[Byte]] = consumer.poll(100);
for (consumerRecord <- asScalaIterator(records.iterator)) {
var genericRecord = deserializer.deserialize(topic, consumerRecord.value()).asInstanceOf[GenericRecord];
var msgSchema = genericRecord.get("msg").asInstanceOf[GenericRecord].getSchema();
System.out.printf("%s \n", msgSchema.getFullName());打印com.myorg.SomeSchemaFromTheEnum并在我的用例中完美地工作。
令人困惑的是,由于使用了GenericRecord,.get("msg")返回了Object,一般来说,我没有办法安全地进行类型转换。在这种有限的情况下,我知道演员是安全的。
在我有限的用例中,上面5行中的解决方案是合适的,但对于更一般的解决方案,https://stackoverflow.com/users/124257/fresskoma发布的答案https://stackoverflow.com/a/59844401/119669似乎更合适。
使用DatumReader还是GenericRecord可能是一个偏好问题,是否考虑到了Kafka生态系统,单独使用Avro我可能更喜欢DatumReader解决方案,但在这种情况下,我可以在代码中使用Kafak风格的命名法。
发布于 2020-04-21 15:54:16
要检索字段值的模式,可以使用
new GenericData().induce(genericRecord.get("msg"))https://stackoverflow.com/questions/59838118
复制相似问题