作者:fallinjava链接:https://juejin.im/post/5ca1d76c6fb9a05e1a7a999b
在我们使用mysql和elasticsearch结合使用的时候,可能会有一些同步的需求,想要数据库和elasticsearch同步的方式其实有很多。
可以使用canal,它主要是监听mysql的binlog 日志,可以监听数据的一些变化,如果数据发生了变化我们需要做什么逻辑,这些都是可以人为实现的,它是将自己模拟成一个slave节点,当master节点的数据发生变化是,它能够看到数据的变化。但是缺点也很明显,由于是java实现的,所以比较重,还需要使用zookeeper等集群管理工具来管理canal节点,所以本文暂时不介绍这种方式。
本文主要介绍使用Logstash JDBC的方式来实现同步,这个方式同步比较简单。当然它有一些缺点,就是有点耗内存(内存大就当我没说?)。
GET /myapp/_search{ "_source": "id"}
响应结果:
{ "took": 5, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 3, "max_score": 1, "hits": [ { "_index": "myapp", "_type": "doc", "_id": "2", "_score": 1, "_source": { "id": 2 } }, { "_index": "myapp", "_type": "doc", "_id": "1", "_score": 1, "_source": { "id": 1 } }, { "_index": "myapp", "_type": "doc", "_id": "3", "_score": 1, "_source": { "id": 3 } } ] }}
mysql> select * from user;+------+----------+-------------+------------+-------+------------+---------+| id | name | phone | password | title | content | article |+------+----------+-------------+------------+-------+------------+---------+| 1 | zhnagsan | 181222 | 123123 | ??? | ?????? | ???IE || 2 | lishi | 181222113 | 232123123 | 23??? | 234?????? | 4???IE || 3 | wangwu | 18111214204 | 1337547531 | ????? | lc content | Java |+------+----------+-------------+------------+-------+------------+---------+3 rows in set (0.00 sec)mysql>
现在我们执行一个sql向里面添加一条数据
mysql> insert into user (id, name, phone, password, title, content, article) values (4, "lc", "123456789", "123456789", "测试", "测试内容", "Java") Query OK, 1 row affected (0.00 sec)mysql>
GET /myapp/_search{ "_source": "id"}
响应结果
{ "took": 2, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": 4, "max_score": 1, "hits": [ { "_index": "myapp", "_type": "doc", "_id": "2", "_score": 1, "_source": { "id": 2 } }, { "_index": "myapp", "_type": "doc", "_id": "4", "_score": 1, "_source": { "id": 4 } }, { "_index": "myapp", "_type": "doc", "_id": "1", "_score": 1, "_source": { "id": 1 } }, { "_index": "myapp", "_type": "doc", "_id": "3", "_score": 1, "_source": { "id": 3 } } ] }}
Virtual machine:VMware 11.0.2 Operator System:CentOS release 6.9 (Final) ElasticSearch:6.4.0 Kibana版本:6.4.0 LogStash版本:6.6.1 JDK版本:1.8.0_181 MySQL版本: 5.1.73(这个版本是用yum直接安装的,其实这个教程和mysql版本没有多大关系,因为到时候是使用jdbc的驱动包来连接数据库的) logstash jdbc驱动包版本 5.1.46
Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。
Logstash是一个开源的服务器端数据处理管道,可以同时从多个数据源获取数据,并对其进行转换,然后将其发送到你最喜欢的“存储”。(当然,我们最喜欢的是Elasticsearch)
数据往往以各种各样的形式,或分散或集中地存在于很多系统中。Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。
数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。 Logstash 能够动态地转换和解析数据,不受格式或复杂度的影响:
尽管 Elasticsearch 是我们的首选输出方向,能够为我们的搜索和分析带来无限可能,但它并非唯一选择。Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。
首先,让我们通过最基本的Logstash管道来测试一下刚才安装的Logstash Logstash管道有两个必需的元素,输入和输出,以及一个可选元素过滤器。输入插件从数据源那里消费数据,过滤器插件根据你的期望修改数据,输出插件将数据写入目的地。
接下来,从命令行输入如下命令
bin/logstash -e 'input { stdin {} } output { stdout {} }'选项 -e 的意思是允许你从命令行指定配置
当启动完成时,会等待你的输入,你可以输入hello world
试试,它会给你一下信息的回馈。
[root@localhost logstash-6.6.1]# unzip mysql-connector-java-5.1.46.zip
input { jdbc { # jdbc驱动包位置 jdbc_driver_library => "/mnt/logstash-6.6.1/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar" # 要使用的驱动包类,有过java开发经验的应该很熟悉这个了,不同的数据库调用的类不一样。 jdbc_driver_class => "com.mysql.jdbc.Driver" # myqsl数据库的连接信息 jdbc_connection_string => "jdbc:mysql://0.0.0.0:3306/myapp" # mysql用户 jdbc_user => "root" # mysql密码 jdbc_password => "root" # 定时任务, 多久执行一次查询, 默认一分钟,如果想要没有延迟,可以使用 schedule => "* * * * * *" schedule => "* * * * *" # 你要执行的语句 statement => "select * from user" } }
output { # 将数据输出到ElasticSearch中 elasticsearch { # es ip加端口 hosts => ["0.0.0.0:9200"] # es文档索引 index => "myusreinfo" # es文档数据的id,%{id}代表的是用数据库里面记录的id作为文档的id document_id => "%{id}" } }
上面我们已经生成了这个mysqlsyn.conf这个文件,接下来我们就使用logstash来进行数据同步吧,同步之前先看下我们的数据库的user表的数据。
myusreinfo
这个索引,可以从图中看到我们只有myapp
这个索引。
[root@localhost logstash-6.6.1]# ./bin/logstash -f config/mysqlsyn.conf-f 指定配置文件启动
启动成功,并且已经在同步数据了,这个同步默认是每分钟执行一次,可以看到5分钟执行了5次
docs.count
的数量就是我们刚才数据库里面数据的数量。
yinya
到目前为止,所有google,stackoverflow,elastic.co,github上面搜索的插件和实时同步的信息,告诉我们:目前同步delete还没有好的解决方案。 折中的解决方案如下:
https://link.juejin.im/?target=https%3A%2F%2Fdiscuss.elastic.co%2Ft%2Fdelete-elasticsearch-document-with-logstash-jdbc-input%2F47490%2F9
https://link.juejin.im/?target=http%3A%2F%2Fstackoverflow.com%2Fquestions%2F34477095%2Felasticsearch-replication-of-other-system-data%2F34477639%2334477639