前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >是时候改变你数仓的增量同步方案了

是时候改变你数仓的增量同步方案了

作者头像
用户2936994
发布2022-07-21 13:47:28
4920
发布2022-07-21 13:47:28
举报
文章被收录于专栏:祝威廉

经过一段时间的演化,spark-binlogdelta-plus慢慢进入正轨。spark-binlog可以将MySQL binlog作为标准的Spark数据源来使用,目前支持insert/update/delete 三种事件的捕捉。 delta-plus则是对Delta Lake的一个增强库,譬如在Delta Plus里实现了将binlog replay进Detla表,从而保证Delta表和数据库表接近实时同步。除此之外,detla-plus还集成了譬如布隆过滤器等来提升数据更新速度,解决更新导致的文件数不可控问题等等。更多特性可参考我写的专栏。

数据湖Delta Lake 深入解析

有了这两个库,加上Spark,我们就能通过两行代码完成库表的同步。

以前如果要做数据增量同步,大概需要这么个流程:

问题很明显,Pipeline长,涉及到技术多,中间转存其实也挺麻烦的,难做到实时。我们希望可以更简单些,比如最好是这样:

然后我可能只要写如下代码就可以搞定:

代码语言:javascript
复制
val spark: SparkSession = ???

val df = spark.readStream.
format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource").
option("host","127.0.0.1").
option("port","3306").
option("userName","xxxxx").
option("password","xxxxx").
option("databaseNamePattern","mlsql_console").
option("tableNamePattern","script_file").
option("bingLogNamePrefix","mysql-bi-bin")
optioin("binlogIndex","4").
optioin("binlogFileOffset","4").
load()


df.writeStream.
format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").  
option("__path__","/tmp/sync/tables").
option("mode","Append").
option("idCols","id").
option("duration","5").
option("syncType","binlog").
checkpointLocation("/tmp/cpl-binlog2")
.mode(OutputMode.Append).save("{db}/{table}")

读和写,非常简单。读你需要提供MySQL binlog信息,写的时候指定主键,以及表的存储路径。

如果使用MLSQL则更简单,下面是一个完整的流式同步脚本:

代码语言:javascript
复制
set streamName="binlog";

load binlog.`` where 
host="127.0.0.1"
and port="3306"
and userName="xxxx"
and password="xxxxxx"
and bingLogNamePrefix="mysql-bin"
and binlogIndex="4"
and binlogFileOffset="4"
and databaseNamePattern="mlsql_console"
and tableNamePattern="script_file"
as table1;

save append table1  
as rate.`mysql_{db}.{table}` 
options mode="Append"
and idCols="id"
and duration="5"
and syncType="binlog"
and checkpointLocation="/tmp/cpl-binlog2";

因为是增量同步,所以第一次需要先全量同步一次,用MLSQL也很简单:

代码语言:javascript
复制
connect jdbc where
 url="jdbc:mysql://127.0.0.1:3306/mlsql_console?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false"
 and driver="com.mysql.jdbc.Driver"
 and user="xxxxx"
 and password="xxxx"
 as db_cool;
 
load jdbc.`db_cool.script_file`  as script_file;
save overwrite script_file as delta.`mysql_mlsql_console.script_file` ;

load delta.`mysql_mlsql_console.script_file`  as output;

如果你使用了Console则可在编辑器里直接运行:

如果你安装了binlog2delta插件, 则可享受向导便利:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-11-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档