如何使用StreamSets实现MySQL中变化数据实时写入Kudu

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

Fayson的github:https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的


在前面Fayson介绍了《如何在CDH中安装和使用StreamSets》和《如何使用StreamSets从MySQL增量更新数据到Hive》,通过StreamSets实现数据采集,在实际生产中需要实时捕获MySQL、Oracle等其他数据源的变化数据(简称CDC)将变化数据实时的写入大数据平台的Hive、HDFS、HBase、Solr、Elasticserach等。在《如何使用StreamSets从MySQL增量更新数据到Hive》中,使用受限于表需要主键或者更新字段,我们在本篇文章主要介绍如何将MySQL Binary Log作为StreamSets的源,来实时捕获MySQL变化数据并将变化数据存入Kudu。

StreamSets实现的流程如下:

  • 内容概述

1.环境准备

2.创建StreamSets的Pipeline流程

3.Pipeline流程测试

4.总结

  • 测试环境

1.StreamSets版本为3.1.2.0

2.CM和CDH版本为5.13.1

3.MariaDB版本为5.5.56

2.环境准备


1.开启MariaDB的Binlog日志

修改/etc/my.conf文件,在配置文件[mysqld]下增加如下配置

server-id=1
log-bin=mysql-bin
binlog_format=ROW

(可左右滑动)

注意:MySQL Binlog支持多种数据更新格式包括Row、Statement和mix(Row和Statement的混合),这里建议使用Row模式的Binlog格式,可以更加方便实时的反应行级别的数据变化。

修改完MariaDB的配置后重启服务。

[root@ip-172-31-16-68 ~]# systemctl restart mariadb
[root@ip-172-31-16-68 ~]# systemctl status mariadb

(可左右滑动)

登录MariaDB创建同步账号

GRANT ALL on maxwell.* to 'maxwell'@'%' identified by '123456';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
FLUSH PRIVILEGES;

(可左右滑动)

2.StreamSets安装MySQL驱动

将MySQL的JDBC驱动拷贝至

/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR/streamsets-libs/streamsets-datacollector-mysql-binlog-lib/lib目录

3.在MariaDB数据库中创建测试表

create database test;
create table cdc_test (
       id int,
       name varchar(32)
);

(可左右滑动)

4.使用Hue创建Kudu表

create table cdc_test (
       id int,
       name String,
       primary key(id)
)
       PARTITION BY HASH PARTITIONS 16
STORED AS KUDU; 

(可左右滑动)

3.创建StreamSets的Pipline


1.登录StreamSets,创建一个新的Pipline

2.选择Origins类别,搜索MySQL Binary Log

配置MySQL Binary Log

配置MySQL信息

配置同步账号信息

高级配置,根据自己的需要进行配置

到此MySQL Binary Log的配置完成。

3.添加表过滤的Stream Selector

Stream Selector基本配置

配置分流条件

4.添加插入类型分流的Stream Selector

Stream Selector基本配置

配置分流条件

5.添加处理Delete类型日志的JavaScript Evaluator

该JavaScript Evaluator主要用于解析DELETE类型的Binary Log 日志

配置基本属性

配置JavaScript脚本,脚本如下:

for(var i = 0; i < records.length; i++) {
  try { 
    var newRecord = sdcFunctions.createRecord(true);
    newRecord.value = records[i].value['OldData'];
    newRecord.value.Type = records[i].value['Type'];
    newRecord.value.Database = records[i].value['Database'];
    newRecord.value.Table = records[i].value['Table'];
    log.info(records[i].value['Type'])
    output.write(newRecord);
  } catch (e) {
    // Send record to error
    error.write(records[i], e);
  }
}

(可左右滑动)

6.添加处理INSRET和UPDATE类型日志的JavaScript Evaluator

该JavaScript Evaluator主要用于解析INSERT和UPDATE类型的日志

配置基本属性

配置JavaScript脚本,脚本如下:

for(var i = 0; i < records.length; i++) {
  try { 
    var newRecord = sdcFunctions.createRecord(true);
    newRecord.value = records[i].value['Data'];
    newRecord.value.Type = records[i].value['Type'];
    newRecord.value.Database = records[i].value['Database'];
    newRecord.value.Table = records[i].value['Table'];
    log.info(records[i].value['Type'])
    output.write(newRecord);
  } catch (e) {
    // Send record to error
    error.write(records[i], e);
  }
}

(可左右滑动)

7.为JavaScript Evaluator-DELETE添加Kudu

配置Kudu基本属性

配置Kudu环境

Kudu的高级配置,Fayson这里使用的是默认配置

8.为JavaScript Evaluator-UPSERT添加Kudu

配置基础属性

配置Kudu环境

Kudu高级配置

9.流程创建完成后,启动该Pipelines

4.Pipeline流程测试


1.登录MariaDB数据库,向cdc_test表中插入数据

insert into cdc_test values(1, 'fayson');

(可左右滑动)

查看StreamSets的Pipeline实时状态

可以看到Kudu-Upsert成功的处理了一条数据

使用Hue查看Kudu表数据

数据成功的插入到Kudu的cdc_test表中。

2.登录MariaDB数据库修改cdc_test表中数据

update cdc_test set name='fayson-update' where id=1;

(可左右滑动)

查看StreamSets的Pipeline实时状态

可以看到Kudu-Upsert成功处理了两条数据,这两条数据分别是INSERT和UPDATE

使用Hue查看Kudu的cdc_test表

3.登录MariaDB数据,删除cdc_test表中数据

delete from cdc_test where id=1;

(可左右滑动)

查看StreamSets的Pipeline实时状态

可以看到Kudu-Delete成功处理一条日志

使用Hue查看Kudu的cdc_test表,id为1的数据已不存在

5.总结


  • 实现MySQL CDC的前提是需要开启MySQL的Binary Log日志,并且需要创建复制账号,SreamSets中MySQL-Binary Log实际充当的为MySQL的一个Slave。
  • 向Kudu实时写入数据的前提是Kudu的表已存在,否则无法正常写入数据。
  • JavaScript脚本需要注意在解析每一条Record是需要使用其内置的Function,在示例中Fayson将MySQL Binary Log复杂的JSON数据解析重组为简单的Map对象,这里就省去了Kudu入库时“Field to Column Mapping”的映射,需要去确保组装的Map数据中Key与Kudu表中的column字段一致。
  • 在Kudu插入数据时指定Kudu表名需要注意,如果使用Impala创建的表,则需要加上impala的前缀格式impala:<database>:<table>。

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。 温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原文发布于微信公众号 - Hadoop实操(gh_c4c535955d0f)

原文发表时间:2018-04-09

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏加米谷大数据

技术干货 | Hadoop3.0稳定版安装攻略来啦!

第一次安装Apache Hadoop3.0.0是不是状况百出?没关系安装攻略来啦! ? Apache Hadoop 3.0.0在前一个主要发行版本(hadoop...

5239
来自专栏Spark学习技巧

Spark部署模式另类详解

一, Spark的运行模式讲解 Spark运行模式有很多种,本文主要是将local,Standalone,yarn。因为平时生产中用的最多的也是...

3185
来自专栏Hadoop实操

如何使用StreamSets实时采集Kafka数据并写入Hive表

8542
来自专栏Hadoop实操

如何在CDH集群上部署Python3运行环境及运行Python作业

当前有很多工具辅助大数据分析,但最受欢迎的就是Python。Python简单易用,语言有着直观的语法并且提供强大的科学计算和集群学习库。借着最近人工智能,深度学...

8634
来自专栏用户画像

Hive 内表与外表的区别

②创建外部表多了external关键字说明以及location ‘/home/wyp/external’

1123
来自专栏架构师小秘圈

HDFS极简教程

HDFS(Hadoop Distributed File System )Hadoop分布式文件系统。是根据google发表的论文翻版的。论文为GFS(Goog...

3596
来自专栏运维前线

Cloudera(CDH) 简介和在线安装

实验背景 笔者需要维护线上的hadoop集群环境,考虑在本地搭建一套类似的hadoop集群,便于维护与管理。 Cloudera 简介 经过搜索发现Clo...

9217
来自专栏我是攻城师

Hive使用ORC格式存储离线表

51610
来自专栏Hadoop实操

如何通过Cloudera Manager为Kafka启用Kerberos及使用

在CDH集群中启用了Kerberos认证,那么我们的Kafka集群能否与Kerberos认证服务集成呢?本篇文章主要讲述如何通过Cloudera Manager...

6429
来自专栏Hadoop实操

如何为Presto集成Kerberos环境下的Hive

5233

扫码关注云+社区

领取腾讯云代金券