欢迎关注微信公众号:数据科学与艺术 作者WX:superhe199
背景:产品数据需要从M端同步到B端,现在的用PG的主备方式备份满足了业务需求,所以需要使用其他的工具来满足需求 使用红帽开源的Debezium组件,介绍请看:Tutorial :: Debezium Documentation,此工具把源数据库的数据发送到kafka。 同时使用了confluent的kafka JDBC Connector 组件,该工具把kafka中的数据保存到目标数据库。
教程里面是从MySQL到Kafka,也就是把数据从数据库读到了消息中间件,还没有写到对标数据库,查看了官方的Demo,按照官方的Demo做了也不能写到PG。所以通过查看Debezium官方文档和JDBC Connector (Source and Sink) for Confluent Platform | Confluent Platform 5.4.0的文档,配置成功,中间有些配置的细节分享如下。
version: ‘2’ services: zookeeper: image: debezium/zookeeper:1.8 container_name: zookeeper ports: - 2181:2181 - 2888:2888 - 3888:3888 kafka: image: debezium/kafka:1.8 container_name: kafka ports: - 9092:9092 - 29092:29092 links: - zookeeper:zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS=LISTENER_EXT://localhost:29092,LISTENER_INT://kafka:9092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_INT:PLAINTEXT,LISTENER_EXT:PLAINTEXT - KAFKA_LISTENERS=LISTENER_INT://0.0.0.0:9092,LISTENER_EXT://0.0.0.0:29092 - KAFKA_INTER_BROKER_LISTENER_NAME=LISTENER_INT postgres: image: postgres:11 container_name: postgres ports: - 5432:5432 environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres postgres-sink: image: postgres:11 container_name: postgres-sink ports: - 5431:5432 environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres connect: image: debezium/connect:1.8 container_name: connect ports: - 8083:8083 - 5005:5005 links: - zookeeper:zookeeper - kafka:kafka - postgres:postgres - postgres-sink:postgres-sink environment: - GROUP_ID=1 - BOOTSTRAP_SERVERS=kafka:9092 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses
中间源数据库有个配置需要修改,postgresql.conf 中的 wal_level = logical # minimal, replica, or logical 需要改成logical,初始值为replica。不知道修改后对PG原来的机制有什么影响,需要DBA来判断。配置文件在kafka容器的/var/lib/postgresql/data/postgresql.conf 目录下。用docker cp把配置文件拷贝出来修改后,再拷贝回去,然后需要重启PG。因为容器用了数据卷,重启也能保存之前的数据状态。另外也能通过命令(ALTER SYSTEM SET wal_level = ‘logical’)修改,修改后也需要重启。
因为用到了kafka JDBC connector,但是kafka connect再部署的时候用的是Debezium的版本,所以会出现找不到执行类的报错,解决方法是把kafka JDBC connector 的jar包拷贝到容器的目录下(/kafka/connect/confluentinc-kafka-connect-jdbc/ 次目录需要进容器手工建立)。
做完上述基础配置后,接下来配置connector: 配置Debezium source: { “name”: “student-connector-source”, “config”: { “connector.class”: “io.debezium.connector.postgresql.PostgresConnector”, “tasks.max”: “1”, “plugin.name”: “pgoutput”, “publication.name”: “my_publication_source”, “database.hostname”: “postgres”, “database.port”: “5432”, “database.user”: “postgres”, “database.password”: “postgres”, “database.dbname” : “studentdb”, “database.server.name”: “dbserver100”, “publication.autocreate.mode”: “all_tables”, “snapshot.mode”: “always”, “key.converter”: “org.apache.kafka.connect.json.JsonConverter”, “value.converter”: “org.apache.kafka.connect.json.JsonConverter” } }
参数参考:Debezium connector for PostgreSQL :: Debezium Documentation
{ “name”: “jdbc-sink”, “config”: { “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”, “connection.url”: “jdbc:postgresql://postgres-sink:5432/studentdb?user=postgres&password=postgres”, “dialect.name”: “PostgreSqlDatabaseDialect”, “tasks.max”: “1”, “delete.enabled”: “true”, “insert.mode”: “upsert”, “pk.fields”: “id”, “pk.mode”: “record_key”, “topics”: “dbserver100.public.students”, “table.name.format”: “students”, “transforms”: “unwrap”, “transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”, “transforms.unwrap.drop.tombstones”: “false”, “auto.create”: “true”, “errors.log.enable”: “true” } }
参数参考:JDBC Sink Connector Configuration Properties | Confluent Documentation
注意:source会读取所有的表的数据变化,sink只会针对你需要的表进行数据备份,所以有多个表要做数据迁移的情况下,需要配置多个sink connector,但是source只需要配置一个。如果要配置多个库,则需要多个source。source也可以设置表的白名单。