首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Key转换器的Kafka连接与主题名策略

Key转换器的Kafka连接与主题名策略
EN

Stack Overflow用户
提问于 2022-06-03 12:32:02
回答 1查看 483关注 0票数 2

我正在尝试设置一个Debezium MySQL源连接器。我的目标是为每个数据库有一个主题,所以我正在研究利用主题的可能性,这样一个主题可以包含不同的消息类型,并且它们的模式可以存储在中。

按照这里的几个答案,我已经将密钥和值转换器主题名策略设置为io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

要将来自同一模式的所有消息重新路由到同一个主题,我将使用以下配置:

代码语言:javascript
运行
复制
{
  "name": "aws-db-connector",
  "config": {
    "group.id": "aws-db-group",
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "secret-pw",
    "database.server.id": "184054",
    "database.server.name": "aws-db",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.aws-db",
    "database.include.list": "db1,db2",
    "transforms": "unwrap,Reroute",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.unwrap.add.fields": "db,table,op,source.ts_ms",
    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "(.*\\S)\\.(.*\\S)\\.(.*\\S)",
    "transforms.Reroute.topic.replacement": "$2_schema",
    "transforms.Reroute.key.field.name": "table",
    "transforms.Reroute.key.field.regex": "(.*\\S)\\.(.*\\S)\\.(.*\\S)",
    "transforms.Reroute.key.field.replacement": "$3"
  }
}

在我的docker-compose文件中,我设置了:

代码语言:javascript
运行
复制
- CONNECT_KEY_CONVERTER_KEY_SUBJECT_NAME_STRATEGY=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
- CONNECT_VALUE_CONVERTER_VALUE_SUBJECT_NAME_STRATEGY=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
- CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://registry:8081
- CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://registry:8081

对于价值观来说,这是完美无缺的。我可以看到,我的模式注册中心包含多个主题,格式为<TopicName>-<RecordName>-Value,其中TopicName是我要重路由这个数据的主题的名称。RecordName是Debezium以server_name.database_name.table_name格式创建的“旧”主题名称。

对于Keys来说,不幸的是,这个策略并不像预期的那样工作,而且我只有一个模式主题:看起来RecordName包含的是新的主题名,而不是原始的主题名称。如果一个字段名在不同的表中具有不同的类型,则会导致冲突和不兼容错误。

是否有任何方法在生成关键主题时提供适当的RecordName

编辑-添加示例:

假设我的数据库包含三个表,table1table2table3

Table1:

代码语言:javascript
运行
复制
CREATE TABLE `table1` (
    `id` INT NOT NULL AUTO_INCREMENT,
    `name` TEXT,
    PRIMARY KEY (`id`)
);

Table2:

代码语言:javascript
运行
复制
CREATE TABLE `table2` (
    `id` INT NOT NULL AUTO_INCREMENT,
    `name` BINARY,
    PRIMARY KEY (`id`)
);

Table3:

代码语言:javascript
运行
复制
CREATE TABLE `table3` (
    `id` BINARY NOT NULL,
    `name` INT,
    PRIMARY KEY (`id`)
);

使用上述配置运行Debezium,它在模式注册表中创建以下值主题:

  • db1_schema.db1-aws-db.table1-Value
  • db1_schema.db1-aws-db.table2-Value

以及以下主要主题:

  • db1_schema.db1_schema-Key

当轮到table3时,Debezium连接器就会失败,因为id列在模式注册表主题中注册了int类型,并且与table3中的bytes类型不兼容。因此,我得到了这个错误:

正在注册的

架构与早期的模式不兼容;错误代码: 409

我所期望的是,也会为关键问题建立单独的主题:

  • db1_schema.aws-db-db1.table1-Key
  • db1_schema.aws-db-db1.table2-Key
  • db1_schema.aws-db-db1.table3-Key

这样就可以将具有不同关键架构的消息存储在同一个主题中。

EN

回答 1

Stack Overflow用户

发布于 2022-06-07 08:36:28

这似乎是Debezium默认的工作方式。它只会为每个主题创建一个关键模式,但是会创建不同的值模式,因此重路由到主题的所有消息都应该共享相同的键结构。

为了解决这个问题,应该使用RegexRouter。在重路由之前应用InsertField转换,还会将原始主题名添加到键中,并且可以从中提取表名。

代码语言:javascript
运行
复制
"transforms": "InsertField,Reroute",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Key",
"transforms.InsertField.topic.field": "table"
"transforms.Reroute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.Reroute.regex": "(.*\\S)\\.(.*\\S)\\.(.*\\S)",
"transforms.Reroute.replacement": "$2_schema",
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72489617

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档