kafka源码系列之mysql数据增量同步到kafka

一,架构介绍

生产中由于历史原因web后端,mysql集群,kafka集群(或者其它消息队列)会存在一下三种结构。

1,数据先入mysql集群,再入kafka

数据入mysql集群是不可更改的,如何再高效的将数据写入kafka呢?

A),在表中存在自增ID的字段,然后根据ID,定期扫描表,然后将数据入kafka。

B),有时间字段的,可以按照时间字段定期扫描入kafka集群。

C),直接解析binlog日志,然后解析后的数据写入kafka。

2,web后端同时将数据写入kafka和mysql集群

3,web后端将数据先入kafka,再入mysql集群

这个方式,有很多优点,比如可以用kafka解耦,然后将数据按照离线存储和计算,实时计算两个模块构建很好的大数据架构。抗高峰,便于扩展等等。

二,实现步骤

1,mysql安装准备

安装mysql估计看这篇文章的人都没什么问题,所以本文不具体讲解了。

A),假如你单机测试请配置好server_id

B),开启binlog,只需配置log-bin

[root@localhost ~]# cat /etc/my.cnf

[mysqld]

server_id=1

datadir=/var/lib/mysql

socket=/var/lib/mysql/mysql.sock

user=mysql

# Disabling symbolic-links is recommended to prevent assorted security risks

symbolic-links=0

log-bin=/var/lib/mysql/mysql-binlog

[mysqld_safe]

log-error=/var/log/mysqld.log

pid-file=/var/run/mysqld/mysqld.pid

创建测试库和表

create database school character set utf8 collate utf8_general_ci;

create table student(

name varchar(20) not null comment '姓名',

sid int(10) not null primary key comment '学员',

majora varchar(50) not null default '' comment '专业',

tel varchar(11) not null unique key comment '手机号',

birthday date not null comment '出生日期'

);

2,binlog日志解析

两种方式:

一是扫面binlog文件(有需要的话请联系浪尖)

二是通过复制同步的方式

暂实现了第二种方式,样例代码如下:

MysqlBinlogParse mysqlBinlogParse = new MysqlBinlogParse(args[0],Integer.valueOf(args[1]),args[2],args[3]){ @Override public void processDelete(String queryType, String database, String sql) { try { String jsonString = SqlParse.parseDeleteSql(sql); JSONObject jsonObject = JSONObject.fromObject(jsonString); jsonObject.accumulate("database", database); jsonObject.accumulate("queryType", queryType); System.out.println(sql); System.out.println(" "); System.out.println(" "); System.out.println(jsonObject.toString()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void processInsert(String queryType, String database, String sql) { try { String jsonString = SqlParse.parseInsertSql(sql); JSONObject jsonObject = JSONObject.fromObject(jsonString); jsonObject.accumulate("database", database); jsonObject.accumulate("queryType", queryType); System.out.println(sql); System.out.println(" "); System.out.println(" "); System.out.println(jsonObject.toString()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void processUpdate(String queryType, String database, String sql) { String jsonString; try { jsonString = SqlParse.parseUpdateSql(sql); JSONObject jsonObject = JSONObject.fromObject(jsonString); jsonObject.accumulate("database", database); jsonObject.accumulate("queryType", queryType); System.out.println(sql); System.out.println(" "); System.out.println(" "); System.out.println(jsonObject.toString()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }; mysqlBinlogParse.setServerId(3); mysqlBinlogParse.start();

3,sql语法解析

从原始的mysql 的binlog event中,我们能解析到的信息,主要的也就是mysql的database,query类型(INSERT,DELETE,UPDATE),具体执行的sql。我这里封装了三个重要的方法。只暴露了这三个接口,那么我们要明白的事情是,我们入kafka,然后流式处理的时候希望的到的是跟插入mysql后一样格式的数据。这个时候我们就要自己做sql的解析,将query的sql解析成字段形式的数据,供流式处理。解析的格式如下:

A),INSERT

B),DELETE

C),UPDATE

最终浪尖是将解析后的数据封装成了json,然后我们自己写kafka producer将消息发送到kafka,后端就可以处理了。

三,总结

最后,浪尖还是建议web后端数据最好先入消息队列,如kafka,然后分离线和实时将数据进行解耦分流,用于实时处理和离线处理。

消息队列的订阅者可以根据需要随时扩展,可以很好的扩展数据的使用者。

消息队列的横向扩展,增加吞吐量,做起来还是很简单的。这个用传统数据库,分库分表还是很麻烦的。

由于消息队列的存在,也可以帮助我们抗高峰,避免高峰时期后端处理压力过大导致整个业务处理宕机。

具体源码球友可以在知识星球获取。

欢迎大家进入知识星球,学习更多更深入的大数据知识,面试经验,获取更多更详细的资料。

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2018-06-12

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏NetCore

微信快速开发框架(二) -- 快速开发微信公众平台框架---简介

年底了,比较忙,大家都在展望未来,对于30+的我来说,发展和稳定是个难以取舍的问题。最近发了些求职信,鸟无音讯,没事做,做点帮助大家的东西吧。 之前做了个微信公...

25010
来自专栏从零开始学自动化测试

python接口自动化13-data和json参数傻傻分不清

前言 在发post请求的时候,有时候body部分要传data参数,有时候body部分又要传json参数,那么问题来了:到底什么时候该传json,什么时候该传da...

3664
来自专栏osc同步分享

在springBoot项目中使用activiti

依赖: 新建springBoot项目时勾选activiti,或者在已建立的springBoot项目添加以下依赖: <dependency> <groupId>...

2.8K7
来自专栏哲学驱动设计

三层的困惑

这个问题困惑我已经很久了,从开始学习。NET到现在…… 写三层的时候,遇到这样的情况怎么办? User{     int UserId,     Strin...

1905
来自专栏菩提树下的杨过

silverlight获取外部数据的另一种选择:FluorineFx

Silverlight从其它系统获取外部数据的常规途径无非下面2种: 1、直接远程加载文本或xml文件 (直接请求ashx/aspx,然后在ashx/aspx上...

1865
来自专栏王清培的专栏

.NET框架设计(高级框架架构模式)—钝化程序、逻辑冻结、冻结程序的延续、瞬间转移

阅读目录: 1.开篇介绍 2.程序书签(代码书签机制) 2.1ProgramBookmark 实现(使用委托来锚点代码书签) 2.2ProgramBookma...

21510
来自专栏james大数据架构

【C#|.NET】lock(this)其实是个坑

来自 【C#|.NET】lock(this)其实是个坑 这里不考虑分布式或者多台负载均衡的情况只考虑单台机器,多台服务器可以使用分布式锁。出于线程安全的原因,很...

3967
来自专栏.NET后端开发

深入理解DIP、IoC、DI以及IoC容器

摘要 面向对象设计(OOD)有助于我们开发出高性能、易扩展以及易复用的程序。其中,OOD有一个重要的思想那就是依赖倒置原则(DIP),并由此引申出IoC、DI以...

3358
来自专栏SeanCheney的专栏

Python模拟登陆 —— 征服验证码 8 微信网页版

微信登录界面 微信网页版使用了UUID含义是通用唯一识别码来保证二维码的唯一性。 先用一个伪造的appid获得uuid。 params = { ...

4719
来自专栏领域驱动设计DDD实战进阶

DDD实战进阶第一波(四):开发一般业务的大健康行业直销系统(搭建支持DDD的轻量级框架三)

上一篇文章我们讲了经典DDD架构对比传统三层架构的优势,以及经典DDD架构每一层的职责后,本篇文章将介绍基础结构层中支持DDD的轻量级框架的主要代码。 这里需要...

5575

扫码关注云+社区