我们正在尝试将数据库从mysql转移到couchbase,并实现一些CDC ()逻辑,以便将数据复制到我们的新数据库。
所有环境的设置和运行。mysql,debezium,kafka,couchbase,kubernetes,管线等,我们还为debezium安装了kafka源连接器.下面是:
- name: "our-connector"
config:
connector.class: "io.debezium.connector.mysql.MySqlConnector"
tasks.max: "1"
group.id: "our-connector"
database.server.name: "our-api"
database.hostname: "******"
database.user: "******"
database.password: "******"
database.port: "3306"
database.include.list: "our_db"
column.include.list: "our_db.our_table.our_field"
table.include.list: "our_db.our_table"
database.history.kafka.topic: "inf.our_table.our_db.schema-changes"
database.history.kafka.bootstrap.servers: "kafka-cluster-kafka-bootstrap.kafka:9092"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable: "false"
key.converter: "org.apache.kafka.connect.json.JsonConverter"
key.converter.schemas.enable: "false"
snapshot.locking.mode: "none"
tombstones.on.delete: "false"
event.deserialization.failure.handling.mode: "ignore"
database.history.skip.unparseable.ddl: "true"
include.schema.changes: "false"
snapshot.mode: "initial"
transforms: "extract,filter,unwrap"
predicates: "isOurTableChangeOurField"
predicates.isOurTableChangeOurField.type: "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"
predicates.isOurTableChangeOurField.pattern: "our-api.our_db.our_table"
transforms.filter.type: "com.redhat.insights.kafka.connect.transforms.Filter"
transforms.filter.if: "!!record.value() && record.value().get('op') == 'u' && record.value().get('before').get('our_field') != record.value().get('after').get('our_field')"
transforms.filter.predicate: "isOurTableChangeOurField"
transforms.unwrap.type: "io.debezium.transforms.ExtractNewRecordState"
transforms.unwrap.drop.tombstones: "false"
transforms.unwrap.delete.handling.mode: "drop"
transforms.extract.type: "org.apache.kafka.connect.transforms.ExtractField{{.DOLLAR_SIGN}}Key"
transforms.extract.field: "id"
这个配置将这个信息发布给卡夫卡。从库尔抓来的。
如您所见,我们有原始记录、id和更改的字段、新值。
到目前为止没问题。实际上,我们有问题:)我们的字段是mysql中的DATETIME类型,但debezium将其发布为unixtime。
第一个问题,我们如何用格式化的日期时间(例如YYYY:ii:mm)发布这个问题?
让我们继续前进。
这里是实际问题。我们已经搜索了很多,但所有的例子都是记录全部数据到couchbase。但是我们已经在couchbase中创建了这个记录,只是想要最新的数据。事实上,我们也操纵了数据。
以下是couchbase的示例数据
我们只想更改couchbase中的bill.dateAccepted字段。尝试了一些yaml配置,但在接收器上没有成功。
这里是接收器配置
- name: "our-sink-connector-1"
config:
connector.class: "com.couchbase.connect.kafka.CouchbaseSinkConnector"
tasks.max: "2"
topics: "our-api.our_db.our_table"
couchbase.seed.nodes: "dev-couchbase-couchbase-cluster.couchbase.svc.cluster.local"
couchbase.bootstrap.timeout: "10s"
couchbase.bucket: "our_bucket"
couchbase.topic.to.collection: "our-api.our_db.our_table=our_bucket._default.ourCollection"
couchbase.username: "*******"
couchbase.password: "*******"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
key.converter.schemas.enable: "false"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable: "false"
connection.bucket : "our_bucket"
connection.cluster_address: "couchbase://couchbase-srv.couchbase"
couchbase.document.id: "${/id}"
发布于 2022-07-02 07:31:47
部分回答你的第一个问题。一种方法是可以使用SPI转换器将unixdatetime转换为string。如果要转换所有的日期时间,并且输入消息包含许多日期时间字段,则只需查看JDBCType并执行https://debezium.io/documentation/reference/stable/development/converters.html转换即可。
在提取I/U时,您可以编写一个自定义SMT (单消息转换),它有记录前后的记录,也有操作类型(I/U/D),并比较字段前后提取的增量。在过去,当我尝试做一些事情时,我偶然发现了下面的东西,这些东西作为一个参考非常有用。这样,您就有了一个delta字段和一个键,它可以只是更新而不是更新完整的文档(尽管接收器必须支持它,但在某个时候会出现)。
https://github.com/michelin/kafka-connect-transforms-qlik-replicate
发布于 2022-07-06 16:33:15
Couchbase 源连接器不支持监视单个字段。通常,Couchbase源连接器比用于更改数据捕获更适合复制。请参阅交货担保文档中提到的注意事项。
Couchbase Kafka 接收器连接器通过内置SubDocumentSinkHandler
或N1qlSinkHandler
支持部分文档更新。您可以通过配置couchbase.sink.handler
连接器配置属性来选择接收器处理程序,并使用子文档Sink Handler配置选项自定义其行为。
下面是一个配置片段,它告诉连接器用Kafka记录的整个值更新bill.dateAccepted
属性。(还需要使用单个消息转换从源记录中提取此字段。)
couchbase.sink.handler=com.couchbase.connect.kafka.handler.sink.SubDocumentSinkHandler
couchbase.subdocument.path=/bill/dateAccepted
如果内置接收器处理程序不够灵活,可以使用示例作为模板编写自己的自定义接收器处理程序。
https://stackoverflow.com/questions/72826851
复制相似问题