前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >利用logstash将mysql多表数据增量同步到es

利用logstash将mysql多表数据增量同步到es

作者头像
黎明大大
发布2020-09-08 15:35:11
3.7K0
发布2020-09-08 15:35:11
举报
文章被收录于专栏:java相关资料java相关资料

同步原理:

代码语言:javascript
复制
第一次发送sql请求查询,修改时间参数值是为系统最开始的时间(1970年),可以查询的
到所有大于1970年的数据,并且会将最后一条数据的update_time时间记录下来,
作为下一次定时查询的条件

一、启动es + kibana

代码语言:javascript
复制
如何安装,以及如何运行,这里就不做描述,没有装过的,可以参考我的这篇文章
https://www.jianshu.com/p/f52d9c843bd8

二、安装mysql

查询mysql版本
代码语言:javascript
复制
docker search mysql
通过docker下载MySQL5.7版本
代码语言:javascript
复制
如何安装docker,不是本文重点,这里不做多描述
docker pull mysql:5.7  (这里选择的是第一个mysql镜像, :5.7选择的5.7版本)
docker pull mysql       # 拉取最新版mysql镜像
运行mysql
代码语言:javascript
复制
docker run -p 3306:3306 --name mysql -e MYSQL_ROOT_PASSWORD=123456 -d mysql:5.7

账号:root
密码:123456
通过工具连接mysql
新建数据库
代码语言:javascript
复制
DROP DATABASE IF EXISTS `test`;
CREATE DATABASE `test` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;

USE test;

#商品表
DROP TABLE IF EXISTS `goods`;
CREATE TABLE `goods` (
  `id` int(11) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `goods` VALUES (1, '黎明电脑', '2020-07-01 00:40:19');
INSERT INTO `goods` VALUES (2, '黎明手机', '2020-07-01 00:40:32');

#用户表
DROP TABLE IF EXISTS `user`;
CREATE TABLE `user` (
  `id` int(11) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `user` VALUES (1, '黎明1', '2020-07-01 00:40:01');
INSERT INTO `user` VALUES (2, '黎明2', '2020-07-01 00:40:09');

现在已经启动了3台容器了

三、下载logstash源码包

代码语言:javascript
复制
官方地址
https://www.elastic.co/cn/downloads/logstash

国内加速下载网址
https://www.newbe.pro/Mirrors/Mirrors-Logstash/

下载地址
wget https://mirrors.huaweicloud.com/logstash/6.7.2/logstash-6.7.2.zip

下载zip命令解压
yum -y install zip

解压
unzip logstash-6.7.2.zip

四、下载mysql驱动

可能会有人疑问?为什么要下载mysql驱动 因为logstash需要连接mysql,并查询表数据,才确定是否同步数据

如下,是maven仓库,所有版本mysql驱动连接

代码语言:javascript
复制
https://mvnrepository.com/artifact/mysql/mysql-connector-java

我的数据库是5.7版本,我这里下载5.1.47的驱动了,当然如果你们的数据库是8.0以上的版本,那么就下相应的版本就行

现在两种下载方式 1.下载到本地,然后通过ftp工具上传到服务器 2.在服务器上下载,右击复制链接地址,通过wget命令下载即可

五、进入logstash目录,安装同步插件

代码语言:javascript
复制
安装会有点慢,大概2分钟左右吧
 bin/logstash-plugin install logstash-input-jdbc
 bin/logstash-plugin install logstash-output-elasticsearch

六、添加Mysql与ES同步配置

代码语言:javascript
复制
进入logstash/config目录下,新建 user.conf文件
vim user.conf


添加内容
input {
  jdbc {
    jdbc_driver_library => "/usr/local/software/my/mysql-connector-java-5.1.47.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.137.11:3306/test"
    jdbc_user => "root"
    jdbc_password => "123456"
    schedule => "* * * * *"
    statement => "SELECT * FROM user WHERE update_time >= :sql_last_value"
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "update_time"
    last_run_metadata_path => "syncpoint_table"
  }
}


output {
    elasticsearch {
        # ES的IP地址及端口
        hosts => ["192.168.137.11:9200"]
        # 索引名称 可自定义
        index => "user"
        # 需要关联的数据库中有有一个id字段,对应类型中的id
        document_id => "%{id}"
        document_type => "user"
    }
    stdout {
        # JSON格式输出
        codec => json_lines
    }
}

进入bin目录启动

代码语言:javascript
复制
./logstash -f ../config/user.conf

可以看到下图,如我标记的地方,logstash在第一次进行同步数据,会先从1970年开始,进行一次同步数据

之后每隔一分钟,会以最后的update_time作为条件,查询是否同步数据,如果查询的结果update_time时间大于所记录的update_time时间,则会继续同步数据,接下来在记录最后一次同步的update_time时间,依次类推

然后我们通过kibana,查询一下我们的索引结果

七、多表同步

到此,我们的单表同步已经完成,接下来我们开始实现多表同步
代码语言:javascript
复制
规则如下:
一个表,一个配置
多个表,多个配置
需要同步多少表,就需要加多少配置
当然配置的内容都差不多,改的地方是查询的表名,和es的索引以及类型的名称

添加第二张表的配置,配置就是上面的配置,稍微改动即可

进入logstash/config目录,修改配置文件
代码语言:javascript
复制
vim pipelines.yml
编辑文件

直接到最后,添加配置

代码语言:javascript
复制
- pipeline.id: table1
  path.config: "/usr/local/software/my/logstash-6.7.2/config/user.conf"
- pipeline.id: table2
  path.config: "/usr/local/software/my/logstash-6.7.2/config/goods.conf"

启动方式稍有改变

代码语言:javascript
复制
进入bin目录
./logstash

这里goods同步,为什么不是1970年呢,因为之前同步一次过,logstash会帮你记录,所以就以logstash最后一次同步时间计算

现在商品表也同步数据了

那如何证明,能够多表同步呢,很简单,我们修改两个表的数据,看是否都能查询的到,如下图,就可以证明商品表和用户表,都是根据各自表的最后时间进行同步的数据的

注意:有数据才会创建索引哦

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 同步原理:
  • 一、启动es + kibana
  • 二、安装mysql
  • 三、下载logstash源码包
  • 四、下载mysql驱动
  • 五、进入logstash目录,安装同步插件
  • 六、添加Mysql与ES同步配置
  • 七、多表同步
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档