数据订阅案例

数据订阅原理

我们会通过模拟从库向主库获取对应 binlog 内容进行分析,大概架构图如下,我们会通过解析 binlog ,按照订阅通道配置的库表进行分析,所以几乎对主库没有影响。

注意:

  • 目前我们对订阅的消息内容默认会保留最近 3 天。
  • 另外如果订阅整库的话,后续新增的表也是会在原有订阅通道出现,不需要对原订阅通道进行新增配置操作。
  • 目前只支持 CDB For MySQL5.6,后续会很快支持 CDB For MySQL5.7。
  • 数据订阅暂不支持view,触发器和外键。
  • 数据订阅初次配置需要对相关 binlog_row_image 参数做调整,会根据符合条件自动 kill 老的 session 使参数立即生效。
  • 数据订阅目前支持的字符集包括 latin1,utf8,utf8mb4。

本文将以一个简单案例来说明数据订阅中拉取对应表到 Kafka 的功能,并且提供简易 KaflkaDemo下载 。以下操作将在 Centos 操作系统中完成。

配置环境

  1. Java环境配置

    yum install java-1.8.0-openjdk-devel

  2. 相关下载

安装 Kafka

具体请参考 http://kafka.apache.org/quickstart 启动之后创建一个 testtop 主题

[root@VM_71_10_centos kafka_2.11-1.1.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtop
Created topic "testtop".

获取密钥

登录 腾讯云控制台,单击导航条中的【云产品】>【管理工具】>【云 API 密钥】,或直接点击进入 云数据库控制台

选择数据订阅

  1. 登录 数据传输DTS控制台,选择左侧的【数据订阅】,进入数据订阅页面。
  2. 选择需同步的 CDB 实例名,然后点击启动,再返回数据订阅,点击你所创建的数据订阅。 详细介绍请参考 如何获取数据订阅
  3. 查看对应的 DTS 通道、 IP 和 Port,然后结合之前的密钥填写到对应 KafkaDemo.java 里面。

    // 从云API获取密钥,填写到此处 final String TOPIC = "testtop"; 订阅的主题 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 输入你的kafka对应ip:port props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); final Producer<String, String> producer = new KafkaProducer<String, String>(props); context.setSecretId("AKIDfdsfdsfsdt1331431sdfds"); 请填写你从云API获取的secretID。 context.setSecretKey("test111usdfsdfsddsfRkeT"); 请填写 你从云API获取的secretKey. // 在数据迁移服务里面通过数据订阅获取到对应的ip,port,填写到此处 context.setServiceIp("10.66.112.181"); 请填写你从数据订阅配置获取到的IP context.setServicePort(7507); 请填写你从数据订阅配置获取到的PORT final DefaultSubscribeClient client = new DefaultSubscribeClient(context); // 填写对应要同步的数据库和表名,并修改对应要落地存储的文件名. final String targetDatabase = "test"; 填写你所要订阅的库名 client.addClusterListener(listener); // 通过数据迁移订阅的配置选项获取到dts-channel的配置信息,填写到此处. client.askForGUID("dts-channel-e4FQxtYV3It4test"); 请填写你从数据订阅获取的通道dts的名称。 client.start();

编译操作与检验

javac -classpath binlogsdk-2.6.0-release.jar:slf4j-api-1.7.25.jar:slf4j-log4j12-1.7.2.jar:kafka-clients-1.1.0.jar -encoding UTF-8 KafkaDemo.java

  1. 执行启动,如果没有异常报错就是正常在服务。

    java -XX:-UseGCOverheadLimit -Xms2g -Xmx2g -classpath .:binlogsdk-2.6.0-release.jar:kafka-clients-1.1.0.jar:slf4j-api-1.7.25.jar:slf4j-log4j12-1.7.2.jar KafkaDemo

  2. 通过对表 alantest 插入一条数据,发现在 Kafka 订阅的 testtop 里面能看到已经有数据过来了。
MySQL [test]> insert into alantest values(123456,'alan');
Query OK, 1 row affected (0.02 sec)

[root@VM_71_10_centos kafka_2.11-1.1.0]#  bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtop --from-beginning 
checkpoint:144251@3@1275254@1153089
record_id:00000100000000001198410000000000000001
record_encoding:utf8
fields_enc:latin1,utf8
gtid:4f21864b-3bed-11e8-a44c-5cb901896188:5552
source_category:full_recorded
source_type:mysql
table_name:alantest
record_type:INSERT
db:test
timestamp:1524649133
primary:id

Field name: id
Field type: 3
Field length: 6
Field value: 123456
Field name: name
Field type: 253
Field length: 4
Field value: alan

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏实战docker

Docker下HBase学习,三部曲之二:集群HBase搭建

上一章《Docker下HBase学习,三部曲之一:极速体验》我们快速体验了单机版HBase提供的基础服务,接下来我们实战HBase集群环境的搭建。 集群规划 首...

2878
来自专栏Play & Scala 技术分享

PlayScala 2.5.x - 实现完全异步非阻塞的流数据导出

2634
来自专栏Java成神之路

GEF入门实例_总结_03_显示菜单和工具栏

还记得上一节我们新建的类: ApplicationActionBarAdvisor 吗,这个类继承自 ActionBarAdvisor。

882
来自专栏零基础使用Django2.0.1打造在线教育网站

零基础使用Django2.0.1打造在线教育网站(二):开发环境配置

努力与运动兼备~有任何问题可以加我好友或者关注微信公众号,欢迎交流,我们一起进步!

1735
来自专栏PHP实战技术

ThinkPHP3.2中英文切换!

最近公司项目版本升级,小梦已经忙成了狗,无暇顾及文章,今天抽时间写一篇助助兴!

2058
来自专栏高爽的专栏

探究InnoDB可重复读

在RC(Read Committed)和RR(Repeatable Read)两种事务隔离级别下,InnoDB存在两种数据读取方式: 快照读(Snapshot ...

1920
来自专栏大数据和云计算技术

hadoop运行环境搭建

森哥/洋哥hadoop系列,非常适合初学者: Hive 元数据表结构详解 HDFS学习:HDFS机架感知与副本放置策略 Yarn【label-based sch...

34811
来自专栏Hadoop实操

如何使用HAProxy实现HiveServer2负载均衡

前面Fayson介绍了《如何使用HAProxy实现Impala的负载均衡》,本文主要介绍如何使用HAProxy实现Hive服务的负载均衡。

4084
来自专栏深度学习自然语言处理

有关vi(vim)的常用命令

导读 vi(vim)是上Linux非常常用的编辑器,很多Linux发行版都默认安装了vi(vim)。vi(vim)命令繁多但是如果使用灵活之后将会大大提高效率。...

2816
来自专栏Angular&服务

homebrew 安装指定版本gradle(软件)安装源软件版本切换

可以看到,我这里是已经安装了 gradle 4.9 版本,第一行显示了 brew 使用的 bottled 里的 gradle 为 4.9 版本,在第 4 行中有...

1192

扫码关注云+社区