首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Kafka Connect -从HDFS中的JSON记录到Avro文件

Kafka Connect -从HDFS中的JSON记录到Avro文件
EN

Stack Overflow用户
提问于 2022-11-28 17:53:57
回答 1查看 22关注 0票数 0

我当前的设置包含Kafka、HDFS、Kafka Connect和Schema注册表,它们都位于网络对接容器中。

Kafka主题包含没有模式的简单JSON数据:

代码语言:javascript
运行
复制
{
    "repo_name": "ironbee/ironbee"
}

Schema注册表包含一个JSON模式,描述Kafka主题中的数据:

代码语言:javascript
运行
复制
{"$schema": "https://json-schema.org/draft/2019-09/schema",
"$id": "http://example.com/example.json",
"type": "object",
"title": "Root Schema",
"required": [
    "repo_name"
],
"properties": {
    "repo_name": {
        "type": "string",
        "default": "",
        "title": "The repo_name Schema",
        "examples": [
            "ironbee/ironbee"
        ]
    }
}}

我试图实现的是一个连接,从一个主题读取JSON数据并将其转储到HDFS (Avro或Parquet)中的文件中。

代码语言:javascript
运行
复制
{
"name": "kafka to hdfs",
"connector.class": "io.confluent.connect.hdfs3.Hdfs3SinkConnector",
"topics": "repo",
"hdfs.url": "hdfs://namenode:9000",
"flush.size": 3,
"confluent.topic.bootstrap.servers": "kafka-1:19092,kafka-2:29092,kafka-3:39092",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "false",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}

如果我尝试通过StringConverter读取原始JSON值(没有使用模式)并将其转储到Avro文件中,它就会工作,因此

代码语言:javascript
运行
复制
Key=null   Value={my json} touples

所以根本没有可用的结构。

当我试图通过JsonSchemaConverter使用我的模式时,我会得到以下错误

代码语言:javascript
运行
复制
“Converting byte[] to Kafka Connect data failed due to serialization error of topic”
“Unknown magic byte”

我认为我的连接配置有问题,但经过一周的尝试,我的谷歌技能已经达到了极限。

所有的代码都在这里可用:https://github.com/SDU-minions/7-Scalable-Systems-Project/tree/dev/Kafka

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-11-28 19:55:20

原始JSON值通过StringConverter (没有使用模式)

schemas.enable属性只存在于JSONConverter上的。字符串没有模式。JSONSchema总是有一个模式,所以属性也不存在。

当我试图通过JsonSchemaConverter使用我的模式时,我会得到错误

您的生产者需要使用合流JSONSchema序列化程序。否则,它不会被发送给卡夫卡与你的错误中提到的“魔法字节”。

我个人还没有尝试在Connect中将JSON模式记录直接转换为Avro。通常模式是直接生成Avro,或者在ksqlDB中转换,例如转换为一个新的Avro主题,然后由Connect使用。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74604635

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档