这是关于卡夫卡-连接-线轴连接器的CSV.我想知道是否有办法避免硬编码模式,让连接器动态创建模式?我有很多csv文件要处理,比如说每天几百GB,有时是几个字节的csv。有时,一些csv文件有新的列,而有些文件被删除。
我能够成功地阅读csv和写到弹性搜索,我跟踪了您的文章。https://www.confluent.io/blog/ksql-in-action-enriching-csv-events-with-data-from-rdbms-into-AWS/,所以现在我不想使用值模式和键模式。
在链接https://docs.confluent.io/current/connect/kafka-connect-spooldir/connectors/csv_source_connector.html中,我认为可以将schema.generation.enabled设置为true。
下面是REST调用,包括连接器配置
$curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://xxx:000/connectors/ -d '{
"name":"csv1",
"config":{
"tasks.max":"1",
"connector.class":"com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
"input.file.pattern":"^.*csv$",
"halt.on.error":"false",
"topic":"order",
"schema.generation.enabled":"true",
"schema.generation.key.name":"orderschema",
"schema.generation.value.name":"orderdata",
"csv.first.row.as.header":"true",
"csv.null.field.indicator":"EMPTY_SEPARATORS",
"batch.size" : "5000",
}
}
'当我提交这个文件时,我会得到以下错误。{“名称”:“顺序”,“连接器”:{“状态”:“失败”,"worker_id":"localhost:000",“跟踪”:为输入模式找到多个模式。\nSchema: {\"name\":\"com.github.jcustenborder.kafka.connect.model.Value\",\"type\":\"STRUCT\",\"isOptional\":false,\"fieldSchemas\":
这有什么解决办法?
发布于 2020-05-14 01:51:54
我现在能够解析所有的数据了。诀窍是先处理一个文件,any,然后检查add,随机添加另一个。看起来是这样的,它自动地更新模式。(就像这样称呼它)之后,将所有文件添加到文件夹中,就会处理得很好。耶!
https://stackoverflow.com/questions/61763185
复制相似问题