首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >我可以只看一个字段在Couchbase与卡夫卡-连接(CDC)?

我可以只看一个字段在Couchbase与卡夫卡-连接(CDC)?
EN

Stack Overflow用户
提问于 2022-07-01 09:02:47
回答 2查看 199关注 0票数 2

我们正在尝试将数据库从mysql转移到couchbase,并实现一些CDC ()逻辑,以便将数据复制到我们的新数据库。

所有环境的设置和运行。mysql,debezium,kafka,couchbase,kubernetes,管线等,我们还为debezium安装了kafka源连接器.下面是:

代码语言:javascript
运行
复制
      - name: "our-connector"
        config:
          connector.class: "io.debezium.connector.mysql.MySqlConnector"
          tasks.max: "1"
          group.id: "our-connector"
          database.server.name: "our-api"
          database.hostname: "******"
          database.user: "******"
          database.password: "******"
          database.port: "3306"
          database.include.list: "our_db"
          column.include.list: "our_db.our_table.our_field"
          table.include.list: "our_db.our_table"
          database.history.kafka.topic: "inf.our_table.our_db.schema-changes"
          database.history.kafka.bootstrap.servers: "kafka-cluster-kafka-bootstrap.kafka:9092"
          value.converter: "org.apache.kafka.connect.json.JsonConverter"
          value.converter.schemas.enable: "false"
          key.converter: "org.apache.kafka.connect.json.JsonConverter"
          key.converter.schemas.enable: "false"
          snapshot.locking.mode: "none"
          tombstones.on.delete: "false"
          event.deserialization.failure.handling.mode: "ignore"
          database.history.skip.unparseable.ddl: "true"
          include.schema.changes: "false"
          snapshot.mode: "initial"
          transforms: "extract,filter,unwrap"
          predicates: "isOurTableChangeOurField"
          predicates.isOurTableChangeOurField.type: "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"
          predicates.isOurTableChangeOurField.pattern: "our-api.our_db.our_table"
          transforms.filter.type: "com.redhat.insights.kafka.connect.transforms.Filter"
          transforms.filter.if: "!!record.value() && record.value().get('op') == 'u' && record.value().get('before').get('our_field') != record.value().get('after').get('our_field')"
          transforms.filter.predicate: "isOurTableChangeOurField"
          transforms.unwrap.type: "io.debezium.transforms.ExtractNewRecordState"
          transforms.unwrap.drop.tombstones: "false"
          transforms.unwrap.delete.handling.mode: "drop"
          transforms.extract.type: "org.apache.kafka.connect.transforms.ExtractField{{.DOLLAR_SIGN}}Key"
          transforms.extract.field: "id"

这个配置将这个信息发布给卡夫卡。从库尔抓来的。

如您所见,我们有原始记录、id和更改的字段、新值。

到目前为止没问题。实际上,我们有问题:)我们的字段是mysql中的DATETIME类型,但debezium将其发布为unixtime。

第一个问题,我们如何用格式化的日期时间(例如YYYY:ii:mm)发布这个问题?

让我们继续前进。

这里是实际问题。我们已经搜索了很多,但所有的例子都是记录全部数据到couchbase。但是我们已经在couchbase中创建了这个记录,只是想要最新的数据。事实上,我们也操纵了数据。

以下是couchbase的示例数据

我们只想更改couchbase中的bill.dateAccepted字段。尝试了一些yaml配置,但在接收器上没有成功。

这里是接收器配置

代码语言:javascript
运行
复制
      - name: "our-sink-connector-1"
        config:
          connector.class: "com.couchbase.connect.kafka.CouchbaseSinkConnector"
          tasks.max: "2"
          topics: "our-api.our_db.our_table"
          couchbase.seed.nodes: "dev-couchbase-couchbase-cluster.couchbase.svc.cluster.local"
          couchbase.bootstrap.timeout: "10s"
          couchbase.bucket: "our_bucket"
          couchbase.topic.to.collection: "our-api.our_db.our_table=our_bucket._default.ourCollection"
          couchbase.username: "*******"
          couchbase.password: "*******"
          key.converter: "org.apache.kafka.connect.storage.StringConverter"
          key.converter.schemas.enable: "false"
          value.converter: "org.apache.kafka.connect.json.JsonConverter"
          value.converter.schemas.enable: "false"
          connection.bucket : "our_bucket"
          connection.cluster_address: "couchbase://couchbase-srv.couchbase"
          couchbase.document.id: "${/id}"
EN

回答 2

Stack Overflow用户

发布于 2022-07-02 07:31:47

部分回答你的第一个问题。一种方法是可以使用SPI转换器将unixdatetime转换为string。如果要转换所有的日期时间,并且输入消息包含许多日期时间字段,则只需查看JDBCType并执行https://debezium.io/documentation/reference/stable/development/converters.html转换即可。

在提取I/U时,您可以编写一个自定义SMT (单消息转换),它有记录前后的记录,也有操作类型(I/U/D),并比较字段前后提取的增量。在过去,当我尝试做一些事情时,我偶然发现了下面的东西,这些东西作为一个参考非常有用。这样,您就有了一个delta字段和一个键,它可以只是更新而不是更新完整的文档(尽管接收器必须支持它,但在某个时候会出现)。

https://github.com/michelin/kafka-connect-transforms-qlik-replicate

票数 1
EN

Stack Overflow用户

发布于 2022-07-06 16:33:15

Couchbase 连接器不支持监视单个字段。通常,Couchbase源连接器比用于更改数据捕获更适合复制。请参阅交货担保文档中提到的注意事项。

Couchbase Kafka 接收器连接器通过内置SubDocumentSinkHandlerN1qlSinkHandler支持部分文档更新。您可以通过配置couchbase.sink.handler连接器配置属性来选择接收器处理程序,并使用子文档Sink Handler配置选项自定义其行为。

下面是一个配置片段,它告诉连接器用Kafka记录的整个值更新bill.dateAccepted属性。(还需要使用单个消息转换从源记录中提取此字段。)

代码语言:javascript
运行
复制
couchbase.sink.handler=com.couchbase.connect.kafka.handler.sink.SubDocumentSinkHandler
couchbase.subdocument.path=/bill/dateAccepted

如果内置接收器处理程序不够灵活,可以使用示例作为模板编写自己的自定义接收器处理程序。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72826851

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档