我当前的设置包含Kafka、HDFS、Kafka Connect和Schema注册表,它们都位于网络对接容器中。
Kafka主题包含没有模式的简单JSON数据:
{
"repo_name": "ironbee/ironbee"
}
Schema注册表包含一个JSON模式,描述Kafka主题中的数据:
{"$schema": "https://json-schema.org/draft/2019-09/schema",
"$id": "http://example.com/example.json",
"type": "object",
"title": "Root Schema",
"required": [
"repo_name"
],
"properties": {
"repo_name": {
"type": "string",
"default": "",
"title": "The repo_name Schema",
"examples": [
"ironbee/ironbee"
]
}
}}
我试图实现的是一个连接,从一个主题读取JSON数据并将其转储到HDFS (Avro或Parquet)中的文件中。
{
"name": "kafka to hdfs",
"connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
"topics": "repo",
"hdfs.url": "hdfs://namenode:9000",
"flush.size": 3,
"confluent.topic.bootstrap.servers": "kafka-1:19092,kafka-2:29092,kafka-3:39092",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "false",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
如果我尝试通过StringConverter读取原始JSON值(没有使用模式)并将其转储到Avro文件中,它就会工作,因此
Key=null Value={my json} touples
所以根本没有可用的结构。
当我试图通过JsonSchemaConverter使用我的模式时,我会得到以下错误
“Converting byte[] to Kafka Connect data failed due to serialization error of topic”
“Unknown magic byte”
我认为我的连接配置有问题,但经过一周的尝试,我的谷歌技能已经达到了极限。
所有的代码都在这里可用:https://github.com/SDU-minions/7-Scalable-Systems-Project/tree/dev/Kafka
发布于 2022-11-28 19:55:20
原始JSON值通过StringConverter (没有使用模式)
schemas.enable属性只存在于JSONConverter上的。字符串没有模式。JSONSchema总是有一个模式,所以属性也不存在。
当我试图通过JsonSchemaConverter使用我的模式时,我会得到错误
您的生产者需要使用合流JSONSchema序列化程序。否则,它不会被发送给卡夫卡与你的错误中提到的“魔法字节”。
我个人还没有尝试在Connect中将JSON模式记录直接转换为Avro。通常模式是直接生成Avro,或者在ksqlDB中转换,例如转换为一个新的Avro主题,然后由Connect使用。
https://stackoverflow.com/questions/74604635
复制相似问题