我正在尝试设置一个Debezium MySQL源连接器。我的目标是为每个数据库有一个主题,所以我正在研究利用主题的可能性,这样一个主题可以包含不同的消息类型,并且它们的模式可以存储在中。
按照这里的几个答案,我已经将密钥和值转换器主题名策略设置为io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
。
要将来自同一模式的所有消息重新路由到同一个主题,我将使用以下配置:
{
"name": "aws-db-connector",
"config": {
"group.id": "aws-db-group",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "secret-pw",
"database.server.id": "184054",
"database.server.name": "aws-db",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.aws-db",
"database.include.list": "db1,db2",
"transforms": "unwrap,Reroute",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "db,table,op,source.ts_ms",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "(.*\\S)\\.(.*\\S)\\.(.*\\S)",
"transforms.Reroute.topic.replacement": "$2_schema",
"transforms.Reroute.key.field.name": "table",
"transforms.Reroute.key.field.regex": "(.*\\S)\\.(.*\\S)\\.(.*\\S)",
"transforms.Reroute.key.field.replacement": "$3"
}
}
在我的docker-compose
文件中,我设置了:
- CONNECT_KEY_CONVERTER_KEY_SUBJECT_NAME_STRATEGY=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
- CONNECT_VALUE_CONVERTER_VALUE_SUBJECT_NAME_STRATEGY=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
- CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://registry:8081
- CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://registry:8081
对于价值观来说,这是完美无缺的。我可以看到,我的模式注册中心包含多个主题,格式为<TopicName>-<RecordName>-Value
,其中TopicName
是我要重路由这个数据的主题的名称。RecordName
是Debezium以server_name.database_name.table_name
格式创建的“旧”主题名称。
对于Keys来说,不幸的是,这个策略并不像预期的那样工作,而且我只有一个模式主题:看起来RecordName
包含的是新的主题名,而不是原始的主题名称。如果一个字段名在不同的表中具有不同的类型,则会导致冲突和不兼容错误。
是否有任何方法在生成关键主题时提供适当的RecordName
?
编辑-添加示例:
假设我的数据库包含三个表,table1
、table2
和table3
。
Table1:
CREATE TABLE `table1` (
`id` INT NOT NULL AUTO_INCREMENT,
`name` TEXT,
PRIMARY KEY (`id`)
);
Table2:
CREATE TABLE `table2` (
`id` INT NOT NULL AUTO_INCREMENT,
`name` BINARY,
PRIMARY KEY (`id`)
);
Table3:
CREATE TABLE `table3` (
`id` BINARY NOT NULL,
`name` INT,
PRIMARY KEY (`id`)
);
使用上述配置运行Debezium,它在模式注册表中创建以下值主题:
以及以下主要主题:
当轮到table3时,Debezium连接器就会失败,因为id
列在模式注册表主题中注册了int
类型,并且与table3中的bytes
类型不兼容。因此,我得到了这个错误:
正在注册的
架构与早期的模式不兼容;错误代码: 409
我所期望的是,也会为关键问题建立单独的主题:
这样就可以将具有不同关键架构的消息存储在同一个主题中。
发布于 2022-06-07 08:36:28
这似乎是Debezium默认的工作方式。它只会为每个主题创建一个关键模式,但是会创建不同的值模式,因此重路由到主题的所有消息都应该共享相同的键结构。
为了解决这个问题,应该使用RegexRouter
。在重路由之前应用InsertField
转换,还会将原始主题名添加到键中,并且可以从中提取表名。
"transforms": "InsertField,Reroute",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Key",
"transforms.InsertField.topic.field": "table"
"transforms.Reroute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.Reroute.regex": "(.*\\S)\\.(.*\\S)\\.(.*\\S)",
"transforms.Reroute.replacement": "$2_schema",
https://stackoverflow.com/questions/72489617
复制相似问题