我可以看到这已经被讨论了几次here for instance,但我认为由于Elasticsearch中的破坏性变化,解决方案已经过时了。
我正在尝试将我的Kafka主题中Json中的长/纪元字段转换为通过连接器推送的Elasticsearch日期类型。
当我尝试添加动态映射时,我的Kafka connect更新失败,因为我正在尝试将两个映射应用于一个字段,_doc和kafkaconnect。这是版本6的一个突破性变化,我相信每个索引只能有一个映射。
{
"index_patterns": [ "depart_details" ],
"mappings": {
"dynamic_templates": [
{
"scheduled_to_date": {
"match": "scheduled",
"mapping": {
"type": "date"
}
}
}
]
}}现在,我将重点放在如何通过将字段更改为时间戳、时间或日期来翻译连接器中的源消息。
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field" : "scheduled",
"transforms.TimestampConverter.target.type": "Timestamp"但是,我尝试通过此转换器发送的任何消息都会失败,并显示
Caused by: org.apache.kafka.connect.errors.DataException: Java class class java.util.Date does not have corresponding schema type.
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:604)
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:668)
at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithoutEnvelope(JsonConverter.java:574)
at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:324)
at io.confluent.connect.elasticsearch.DataConverter.getPayload(DataConverter.java:181)
at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:163)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.tryWriteRecord(ElasticsearchWriter.java:285)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:270)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:169)这似乎是一件很常见的事情,但我不知道如何在版本7中通过这个连接器将日期或时间字段放入Elastic中?
发布于 2019-09-24 17:04:36
Confluent文档指出ES连接器is currently not supported与ES 7。
根据this issue的说法,在连接器配置中将type.name=kafkaconnect更改为type.name=_doc可能就足够了。
https://stackoverflow.com/questions/58076599
复制相似问题