我们一直在研究kafka生态系统。让我浏览一下流程
源(SQLServer) -> Debezium(CDC) -> Kafka Broker -> Kafka流(处理、加入等) -> Mongo连接器-> Mongo DB
现在我们在最后一步,我们正在将处理过的数据插入到mongo dB中,但现在我们需要向上插入数据,而不是只插入数据。
我们能从mongo接收器连接器获得upsert(插入/更新)功能吗?至于现在,我明白这是不可能的。
发布于 2019-12-30 14:10:40
请遵循提供的链接,它有关于kafka mongo连接器的所有信息。我已经成功地实现了upsert functionilty.you,只需要仔细阅读本文档即可。
Kafka Connector - Mongodb
发布于 2020-10-21 22:33:39
实际上,这是一个upsert,如果${uniqueFieldToUpdateOn}不在mongo中,我们希望插入,如果它存在,则更新,如下所示。
根据您的用例更新/替换,有两种主要的方法可以对集合中的数据更改进行建模,如下所述:
更新
以下配置说明:
使用您要在其上模拟更新的记录所特有的字段更新${uniqueFieldToUpdateOn}。
AllowList (白名单)此字段与一起使用PartialValueStrategy允许为id策略投影自定义值字段。
UpdateOneBusinessKeyTimestampStrategy意味着只会更新上面声明的唯一字段引用的一个文档(最新时间戳胜出)。
"document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
"document.id.strategy.partial.value.projection.list":"${uniqueFieldToUpdateOn}",
"document.id.strategy.partial.value.projection.type":"AllowList",
"writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneBusinessKeyTimestampStrategy"替换
注意:这个模型是一个替换,而不是更新,但可能仍然有用
以下配置说明:
将${uniqueFieldToUpdateOn}替换为您要在其上模拟替换的记录的唯一字段。
AllowList (白名单)此字段与一起使用PartialValueStrategy允许为id策略投影自定义值字段。
ReplaceOneBusinessKeyStrategy意味着只有一个由上面声明的唯一字段引用的文档将被替换。
"document.id.strategy":"com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
"document.id.strategy.partial.value.projection.list":"${uniqueFieldToUpdateOn}",
"document.id.strategy.partial.value.projection.type":"AllowList",
"writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy"https://stackoverflow.com/questions/59083430
复制相似问题