首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >在jdbc同步连接器中转换/指定表名

在jdbc同步连接器中转换/指定表名
EN

Stack Overflow用户
提问于 2020-12-11 01:48:39
回答 1查看 631关注 0票数 1

我需要使用confluent/kafka将Server数据库从on位置迁移到GoogleCloud。

我确实有一个源debezium连接器

代码语言:javascript
运行
复制
{
  "name": "mssql_src",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    ...
    ...
    ...
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.add.fields": "op,table,source.ts_ms",
    "transforms.unwrap.delete.handling.mode": "rewrite",

    "transforms": "Reroute",
    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "source_dbname.dbo(.*)",
    "transforms.Reroute.topic.replacement": "target_dbname$1"
  }
}

重路由转换不适用于解包的连接,我仍然获得source_dbname.dbo.*主题而不是target_dbname.*

我需要插入数据到target_dbname数据库jdbc同步连接器有以下配置

代码语言:javascript
运行
复制
{
  "name": "mssql_trg",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics.regex": "source_dbname.dbo.*",
    "table.name.format": "${topic}",
    "connection.url": "jdbc:sqlserver://xxx.xxx.xxx.xxx:1433;DatabaseName=rocketlawyer3",
    "connection.user": "sqlserver",
    "connection.password": "sqlserver",
    "dialect.name": "SqlServerDatabaseDialect",
    "insert.mode": "upsert",
    "auto.create": true,
    "auto.evolve": true,
    "pk.mode": "record_value"
  }
}

显然,它失败了,因为所有SQL操作都将表引用为source_database_name.dbo.table_name。

以下是两个问题:

  1. 如何仅为table_name使用table.name.format或其他选项更改此字符串table.name.format
  2. 如何使转换(重新路由和展开)在源连接器中工作
EN

回答 1

Stack Overflow用户

发布于 2020-12-11 23:38:31

您需要使用链式变换。只需将unwrapRerout SMT组合在一起:

代码语言:javascript
运行
复制
{
  "name": "mssql_src",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    ...
    ...
    ...
    "transforms": "unwrap, Reroute",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.add.fields": "op,table,source.ts_ms",
    "transforms.unwrap.delete.handling.mode": "rewrite",

    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.Reroute.topic.replacement": "$3"
  }
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65244686

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档