专栏首页M莫的博客在confluent上测试connect source和sink

在confluent上测试connect source和sink

  • 测试目标

为了实现分库分表前期的安全操作, 希望分表的数据还是能够暂时合并到原表中, 使用基于kafka connect实现, debezium做connect source, kafka-jdbc-connector-sink做sink.

实现步骤

开启binlog的MySQL

  • 创建测试数据库test 1create database test;
  • 初始化表 ``` create table if not exists tx_refund_bill( id bigint unsigned auto_increment comment ‘主键’ primary key, order_id bigint not null comment ‘订单id’, bill_type tinyint not null comment ‘11’ )comment ‘退款费用明细’ charset=utf8;

CREATE TABLE test_new1 LIKE tx_refund_bill;

12

- 数据测试sql

INSERT INTO tx_refund_bill (order_id, bill_type) VALUES (1,3);

update tx_refund_bill set order_id = 3 where id = 1;

select * from tx_refund_bill;

select * from test_new1;

123456789101112131415161718192021

# 在confluent快速搭建kafka connect - [download confluent](https://www.confluent.io/download/) - quick local start - 创建confluent配置目录 ``` mkdir ~/.confluent ``` - 设置confluent环境 ``` export CONFLUENT_HOME=/home/xingwang/service/confluent-5.4.0 export PATH=$CONFLUENT_HOME/bin:$PATH ``` - 安装debezium - [下载](https://www.confluent.io/hub/debezium/debezium-connector-mysql) - 解压后复制到/home/xingwang/service/confluent-5.4.0/share/java - 安装kafka-connect-jdbc - confluent默认带了kafka-connect-jdbc,只需要额外下载mysql-connector-java-5.1.40.jar放到/home/xingwang/service/confluent-5.4.0/share/java/kafka-connect-jdbc就可以了 - start confluent

confluent local start

1234567891011

- log位置 log在/tmp/下 - confluent 管理页面 [http://172.17.228.163:9021/](http://172.17.228.163:9021/) # 配置connect(配置可以直接在http client中执行(.http))

查看connectors

GET http://172.17.228.163:8083/connectors

delete connnector

curl -XDELETE ‘http://172.17.228.163:8083/connectors/debezium’

创建source debezium connector

curl -H “Content-Type:application/json” -XPUT ‘http://172.17.228.163:8083/connectors/debezium/config’ -d ‘ { “connector.class”: “io.debezium.connector.mysql.MySqlConnector”, “tasks.max”: “1”, “database.hostname”: “localhost”, “database.port”: “3306”, “database.user”: “root”, “database.password”: “[email protected]”, “database.server.id”: “19991”, “database.server.name”: “test_0”, “database.whitelist”: “test”, “include.schema.changes”: “false”, “snapshot.mode”: “schema_only”, “snapshot.locking.mode”: “none”, “database.history.kafka.bootstrap.servers”: “localhost:9092”, “database.history.kafka.topic”: “dbhistory”, “decimal.handling.mode”: “string”, “table.whitelist”: “test.tx_refund_bill”, “database.history.store.only.monitored.tables.ddl”:”true”, “database.history.skip.unparseable.ddl”:”true” }’

查看source debezium connector status

GET http://172.17.228.163:8083/connectors/debezium/status

delete connnector

curl -XDELETE ‘http://172.17.228.163:8083/connectors/jdbc-sink’

创建sink jdbc connector

curl -H “Content-Type:application/json” -XPUT ‘http://172.17.228.163:8083/connectors/jdbc-sink/config’ -d ‘ { “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”, “connection.url”: “jdbc:mysql://localhost:3306/test?nullCatalogMeansCurrent=true”, “connection.user”: “root”, “connection.password”: “[email protected]”, “tasks.max”: “1”, “topics”: “test_0.test.tx_refund_bill”, “table.name.format”: “test_new1”,

1234567

"insert.mode": "upsert", "pk.fields": "id", "pk.mode": "record_value", "transforms": "ExtractField", "transforms.ExtractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value", "transforms.ExtractField.field": "after" }'

查看connectors status

GET http://172.17.228.163:8083/connectors/jdbc-sink/status

```

实验

  • 在tx_refund_bill表中insert数据,观察test_new1的变化
  • 在tx_refund_bill表中执行update语句,观察test_new1的变化

reference

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Mysql实时数据变更事件捕获kafka confluent之debezium

    如果你的后端应用数据存储使用的MySQL,项目中如果有这样的业务场景你会怎么做呢?

    XING辋
  • Elasticsearch自定义分词

    XING辋
  • 业务代码抽象原则

    XING辋
  • 谈谈对云原生应用的理解

      微服务后时代是什么?炒得最火的就是Cloud Native。顾名思义,云原生就是面向云设计的应用,自从2013年Matt Stine提出概念后,更多是一套技...

    王昂
  • Topology的构建

    用户3003813
  • java学习与应用(3.1)--对象与部分类

    Object类:所有类的父类,其常用的方法如: toString方法,默认获得对象的地址值。一般重写后,可以根据开发需求自行使用其它用途,如输出对象属性(可通过...

    嘘、小点声
  • 精选25道Mysql面试题,快来测测你的数据库水平吧

    存储过程是用户定义的一系列sql语句的集合,涉及特定表或其它对象的任务,用户可以调用存储过程,而函数通常是数据库已定义的方法,它接收参数并返回某种类型的值并且不...

    吾非同
  • GNS3模拟ASA

    2 解压Unpack-0.1_win.zip(如解压到F盘,解压后会生成unpack目录)

    py3study
  • 代码美化的艺术

    原本只是想简单的聊一下代码格式化的问题,无奈本文拖沓了很久,在此期间,我又思考了很多,我越来越觉得代码格式化是一门艺术。为了衬托“艺术”二字,可能叫“代码美化”...

    叙帝利
  • 代码美化的艺术

    原本只是想简单的聊一下代码格式化的问题,无奈本文拖沓了很久,在此期间,我又思考了很多,我越来越觉得代码格式化是一门艺术。为了衬托“艺术”二字,可能叫“代码美化”...

    ConardLi

扫码关注云+社区

领取腾讯云代金券