我有一个使用Kafka Streams绑定器的Spring cloud stream的项目。对于流的输出,我使用Avro,并使用Confluent提供的Serde (io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
)。
我可以在Confluent Schema Registry中使用它。序列化和反序列化是正确发生的。但是,我想看看我们是否可以使用 Spring Cloud Schema Registry Server 而不是 Confluent 的。我配置了一个独立的Schema Registry服务器,并在我的项目中将模式注册表设置为它(将
schemaRegistryClient.endpoint
和schema.registry.url
属性)。
当我尝试它时,似乎Spring Cloud能够与独立的服务器一起工作。它将资源文件夹中可用的架构注册为.avsc文件。但是,当我发送一条消息时,Confluent序列化程序似乎继续将其作为Confluent Schema Registry处理(它具有与Spring Schema Registry不同的REST端点)。结果,它得到了一个405响应码。
我们得到以下异常(部分堆栈跟踪)
org.apache.kafka.common.errors.SerializationException: Error registering Avro schema:
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]; error code: 50005
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)
在我看来,有两种可能性:
content-type: application/*+avro
),而不是使用Confluent提供的本机Serde,或者有人能帮我找出是哪一个吗?如果是第二个,谁能指出哪里出了问题?
发布于 2021-02-08 10:57:53
每个模式注册表提供程序都需要一个专有的SerDe库。例如,如果您想将 AWS Glue Schema Registry 与 Kafka 集成,那么你就需要亚马逊的SerDe了。因此,Confluent的SerDe库期望Confluent的模式注册表位于schema.registry.url
属性。
https://stackoverflow.com/questions/64045814
复制相似问题