Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >数据订阅案例

数据订阅案例

原创
作者头像
宗文
发布于 2018-06-27 08:37:38
发布于 2018-06-27 08:37:38
83800
代码可运行
举报
文章被收录于专栏:数据订阅数据订阅
运行总次数:0
代码可运行

数据订阅原理

我们会通过模拟从库向主库获取对应 binlog 内容进行分析,大概架构图如下,我们会通过解析 binlog ,按照订阅通道配置的库表进行分析,所以几乎对主库没有影响。

注意:

  • 目前我们对订阅的消息内容默认会保留最近 3 天。
  • 另外如果订阅整库的话,后续新增的表也是会在原有订阅通道出现,不需要对原订阅通道进行新增配置操作。
  • 目前只支持 CDB For MySQL5.6,后续会很快支持 CDB For MySQL5.7。
  • 数据订阅暂不支持view,触发器和外键。
  • 数据订阅初次配置需要对相关 binlog_row_image 参数做调整,会根据符合条件自动 kill 老的 session 使参数立即生效。
  • 数据订阅目前支持的字符集包括 latin1,utf8,utf8mb4。

本文将以一个简单案例来说明数据订阅中拉取对应表到 Kafka 的功能,并且提供简易 KaflkaDemo下载 。以下操作将在 Centos 操作系统中完成。

配置环境

  1. Java环境配置

    yum install java-1.8.0-openjdk-devel

  2. 相关下载

安装 Kafka

具体请参考 http://kafka.apache.org/quickstart 启动之后创建一个 testtop 主题

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[root@VM_71_10_centos kafka_2.11-1.1.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testtop
Created topic "testtop".

获取密钥

登录 腾讯云控制台,单击导航条中的【云产品】>【管理工具】>【云 API 密钥】,或直接点击进入 云数据库控制台

选择数据订阅

  1. 登录 数据传输DTS控制台,选择左侧的【数据订阅】,进入数据订阅页面。
  2. 选择需同步的 CDB 实例名,然后点击启动,再返回数据订阅,点击你所创建的数据订阅。 详细介绍请参考 如何获取数据订阅
  3. 查看对应的 DTS 通道、 IP 和 Port,然后结合之前的密钥填写到对应 KafkaDemo.java 里面。

    // 从云API获取密钥,填写到此处 final String TOPIC = "testtop"; 订阅的主题 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 输入你的kafka对应ip:port props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); final Producer<String, String> producer = new KafkaProducer<String, String>(props); context.setSecretId("AKIDfdsfdsfsdt1331431sdfds"); 请填写你从云API获取的secretID。 context.setSecretKey("test111usdfsdfsddsfRkeT"); 请填写 你从云API获取的secretKey. // 在数据迁移服务里面通过数据订阅获取到对应的ip,port,填写到此处 context.setServiceIp("10.66.112.181"); 请填写你从数据订阅配置获取到的IP context.setServicePort(7507); 请填写你从数据订阅配置获取到的PORT final DefaultSubscribeClient client = new DefaultSubscribeClient(context); // 填写对应要同步的数据库和表名,并修改对应要落地存储的文件名. final String targetDatabase = "test"; 填写你所要订阅的库名 client.addClusterListener(listener); // 通过数据迁移订阅的配置选项获取到dts-channel的配置信息,填写到此处. client.askForGUID("dts-channel-e4FQxtYV3It4test"); 请填写你从数据订阅获取的通道dts的名称。 client.start();

编译操作与检验

javac -classpath binlogsdk-2.6.0-release.jar:slf4j-api-1.7.25.jar:slf4j-log4j12-1.7.2.jar:kafka-clients-1.1.0.jar -encoding UTF-8 KafkaDemo.java

  1. 执行启动,如果没有异常报错就是正常在服务。

    java -XX:-UseGCOverheadLimit -Xms2g -Xmx2g -classpath .:binlogsdk-2.6.0-release.jar:kafka-clients-1.1.0.jar:slf4j-api-1.7.25.jar:slf4j-log4j12-1.7.2.jar KafkaDemo

  2. 通过对表 alantest 插入一条数据,发现在 Kafka 订阅的 testtop 里面能看到已经有数据过来了。
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
MySQL [test]> insert into alantest values(123456,'alan');
Query OK, 1 row affected (0.02 sec)

[root@VM_71_10_centos kafka_2.11-1.1.0]#  bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtop --from-beginning 
checkpoint:144251@3@1275254@1153089
record_id:00000100000000001198410000000000000001
record_encoding:utf8
fields_enc:latin1,utf8
gtid:4f21864b-3bed-11e8-a44c-5cb901896188:5552
source_category:full_recorded
source_type:mysql
table_name:alantest
record_type:INSERT
db:test
timestamp:1524649133
primary:id

Field name: id
Field type: 3
Field length: 6
Field value: 123456
Field name: name
Field type: 253
Field length: 4
Field value: alan

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
MySQL数据迁移TcaplusDB实践
随着业务数据量的剧增,传统MySQL在数据存储上变得越来越吃力,NoSQL因其良好的性能、扩展性、稳定性逐渐成为业务选型的首要考虑。TcaplusDB是腾讯云推出的一款全托管NoSQL数据库服务,旨在为客户提供极致的数据据存储体验,详细信息请参考官方文档。本文主要介绍如何将MySQL数据迁移到TcaplusDB。
温球良
2020/08/14
2.4K0
MySQL数据迁移TcaplusDB实践
在springboot中对kafka进行读写操作
只需要在dependencies中增加 spring-kafka的配置即可。完整效果如下:
冬天里的懒猫
2020/08/03
3.2K0
微服务同时接入多个Kafka
kafkaOneTemplate 定义第一个Kafka的高级模板,用来发送消息 kafkaOneContainerFactory 消费监听容器,配置在@KafkaListener中, producerFactory 生产者工厂 consumerFactory 消费者工厂 producerConfigs 生产者配置 consumerConfigs 消费者配置
阿提说说
2022/11/18
1.3K0
微服务同时接入多个Kafka
String boot with Apache kafka 完整的发布订阅例子
本文节选自电子书《Netkiller Java 手札》地址 http://www.netkiller.cn/ 5.21.7. 完整的发布订阅实例 上面的例子仅仅是做了一个热身,现在我们将实现 一个完整的例子。 例 5.5. Spring boot with Apache kafka. SpringApplication package cn.netkiller; import org.springframework.boot.SpringApplication; import org.spring
netkiller old
2018/03/05
8840
springboot kafka集成(实现producer和consumer)
本文介绍如何在springboot项目中集成kafka收发message。 1、先解决依赖 springboot相关的依赖我们就不提了,和kafka相关的只依赖一个spring-kafka集成包 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</ve
用户1225216
2018/03/05
3.5K0
Kafka单机环境配置及基本使用详解
Broker 是一个Kafka的Server,一台单物理机或者集群都可以拥有多个broker一个broker可以容纳多个主题,这个与复制因子、主题的分区都有关系。
星哥玩云
2022/07/26
9750
JavaWeb项目架构之Kafka分布式日志队列
架构、分布式、日志队列,标题自己都看着唬人,其实就是一个日志收集的功能,只不过中间加了一个Kafka做消息队列罢了。
小柒2012
2018/03/30
1.5K4
JavaWeb项目架构之Kafka分布式日志队列
5.Java Kafka Demo
到此一个最简单的demo 就可以运行起来了,当然,看起来简单,内部还有很多深层次的东西,我们会在后续谈到!
ParkJun
2020/07/14
2.5K0
CKafka系列学习文章 - 云上消息队列它香不香?(十七)
一、先买买买,一个 1、供上购买链接:https://buy.cloud.tencent.com/ckafka?rid=1 2、先创建一个Ckafka实例 image.png image.png im
发哥说消息队列
2020/06/15
1.5K0
【真实生产案例】SpringBoot 整合 Kafka 实现数据高吞吐
在上篇文章中,我们详细的介绍了 kafka 的架构模型,在集群环境中,kafka 可以通过设置分区数来加快数据的消费速度。
Java极客技术
2022/12/02
1.2K0
Kafka集群( 2.8.0版本)之四:Java简单实现生产者和消费者
(1)在IDEA底部找到Terminal (2)确认项目根目录,执行mvn clean package (3)编译成功,看到BUILD SUCCESS (4)找到编译好的jar包
程裕强
2021/09/09
9570
超详细的Kafka教程-从部署到开发到原理都有讲解
在说Kafka之前,假设你有一定的消息队列的知识。知道消息队列的模式(点对点模式,发布/订阅模式),也知道消息队列的优点,如果不知道没关系,去百度或者Google搜索都有相关详细的资料。那么我们接下来说说Kafka。
Lvshen
2022/05/05
10K0
超详细的Kafka教程-从部署到开发到原理都有讲解
搭建单体SpringBoot项目 集成Kafka消息队列
通过@Configuration、@EnableKafka,声明Config并且打开KafkaTemplate能力。
郭顺发
2023/07/07
5540
爬虫架构|利用Kafka处理数据推送问题(2)
在前一篇文章爬虫架构|利用Kafka处理数据推送问题(1)中对Kafka做了一个介绍,以及环境搭建,最后是选择使用阿里云的Kafka,这一篇文章继续说使用阿里云的Kafka的一些知识。 一、发布者最佳实践 发布的完整代码(根据自己的业务做相应处理): package com.yimian.controller.kafka; import java.util.Date; import java.util.Properties; import java.util.concurrent.Future; impo
黄小怪
2018/05/21
1.6K0
如何使用StreamSets实时采集Kafka并入库Kudu
温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。 Fayson的github:https://github.com/fayson/cdhproject 提示:代码块部分可以左右滑动查看噢 1.文档编写目的 ---- Fayson在前面的文章《如何使用StreamSets实现MySQL中变化数据实时写入Kudu》,本篇文章主要介绍如何使用StreamSets实时采集Kafka的数据并将采集的数据写入Kudu。 内容概述 1.测试环境准备 2.准备生产Kafka数据脚本 3.配置StreamSet
Fayson
2018/07/12
2.7K0
【Spring底层原理高级进阶】Spring Kafka:实时数据流处理,让业务风起云涌!️
Spring Kafka 是 Spring Framework 提供的一个集成 Apache Kafka 的库,用于构建基于 Kafka 的实时数据流处理应用程序。Apache Kafka 是一个高性能、分布式的流数据平台,广泛用于构建可扩展的、实时的数据处理管道。
苏泽
2024/03/10
1.1K0
【Spring底层原理高级进阶】Spring Kafka:实时数据流处理,让业务风起云涌!️
Kafka从入门到进阶
如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给大家。
java架构师
2018/12/28
1.1K0
聊聊spring对kafka的集成方式
除了官方的java api类库外,spring生态中又额外包装了很多,这里一一简单介绍下。
code4it
2018/09/17
3K0
Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
操作步骤 Maven依赖 核心依赖 kafka-clients <dependency> <groupId>org.apache.kafkagroupId>
小小工匠
2021/08/17
5690
kafka 结合springboot实战--第三节
kafka 消费者可以将消费到的消息转发到指定的主题中去,比如一条消息需要经过多次流转加工才能走完整个业务流程,需要多个consumer来配合完成。转发代码示例如下:
六个核弹
2022/12/23
4310
推荐阅读
相关推荐
MySQL数据迁移TcaplusDB实践
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验