前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Streaming Data Changes from MySQL to Elasticsearch

Streaming Data Changes from MySQL to Elasticsearch

作者头像
程序猿杜小头
发布2022-12-01 21:35:26
1.4K0
发布2022-12-01 21:35:26
举报
文章被收录于专栏:程序猿杜小头程序猿杜小头

Streaming Data Changes from MySQL to Elasticsearch

MySQL Binary Log包含了针对数据库执行DDL(Data Definition Language)和DML(Data Manipulation Language)操作的完整事件,其被广泛应用于数据复制和数据恢复场景。本文所分享的就是一种基于MySQL Binary Log特性实现增量数据近实时同步到Elasticsearch的一种技术。要想实现增量数据的同步,仅仅有binary log是不够的,我们还需要一款变更数据捕获(CDC,Change Data Capture)工具,可能大家很快就会想到阿里巴巴开源的Canal。没错,但本文今天给大家分享一款新的开源工具:Debezium。Debezium构建于Kafka之上,它为MySQLMongoDBPostgreSQLOrcaleCassandra等一众数据库量身打造了一套完全适配于Kafka Connectsource connector。首先,source connector会实时获取由INSERTUPDATEDELETE操作所触发的数据变更事件;然后,将其发送到Kafka topic中;最后,我们使用sink connector将topic中的数据变更事件同步到Elasticsearch中去,从而最终实现数据的近实时流转,如下图所示。

Debezium支持StandalonePrimary and replicaHigh available clustersMulti-primary等多种拓扑结构。

1 安装MySQL

1.1 解压与配置

代码语言:javascript
复制
tar -xzvf mysql-8.0.21-el7-x86_64.tar.gz -C /root/debezium/

在mysql-8.0.21-el7-x86_64根目录下,新增my.cnf文本文件,然后将以下内容复制到my.cnf文件内。

代码语言:javascript
复制
[client]
port=3306
socket=/root/debezium/mysql-8.0.21-el7-x86_64/mysql.sock

[mysqld]
port=3306
socket=/root/debezium/mysql-8.0.21-el7-x86_64/mysql.sock
key_buffer_size=16M
max_allowed_packet=128M
basedir=/root/debezium/mysql-8.0.21-el7-x86_64
datadir=/root/debezium/mysql-8.0.21-el7-x86_64/data
server-id=101
log-bin=mysql-bin
binlog-format=ROW
binlog-row-image=FULL
binlog_expire_logs_seconds=86400

[mysqldump]
quick

1.2 初始化

代码语言:javascript
复制
./bin/mysqld --defaults-file=/root/debezium/mysql-8.0.21-el7-x86_64/my.cnf --initialize

执行上述初始化操作后,仔细观察控制台:你会发现root账号已经生成了一个临时密码

代码语言:javascript
复制
2020-12-28T02:55:20.774965Z6 [Note] [MY-010454] [Server] Atemporarypasswordisgeneratedforroot@localhost:&,Yot7iMeT_T

1.3 启动MySQL Server

初始化操作并没有启动MySQL Server,所以你还需要手动启动MySQL Server。

代码语言:javascript
复制
./bin/mysqld --defaults-file=/root/debezium/mysql-8.0.21-el7-x86_64/my.cnf --user=root

1.4 重置'root'账号密码

代码语言:javascript
复制
USE mysql;
ALTER USER 'root'@'localhost' IDENTIFIED BY 'root账号新密码';
FLUSH PRIVILEGES;

1.5 更新远程访问权限

代码语言:javascript
复制
USE mysql;
UPDATE USER SET host = '%' WHERE user = 'root';
FLUSH PRIVILEGES;

2 安装Kafka

配置不多赘述,自行上网解决。目前Kafka依赖Zookeeper组件,故其内置了Zookeeper,我们可以直接使用,无需单独下载。

代码语言:javascript
复制
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties>/dev/null 2>&1 &
nohup ./bin/kafka-server-start.sh config/server.properties>/dev/null 2>&1 &

3 安装Debezium

3.1 解压source connector与sink connector

代码语言:javascript
复制
tar -xzvf debezium-connector-mysql-1.4.2.Final-plugin.tar.gz -C /root/debezium/connector-plugins
unzip confluentinc-kafka-connect-elasticsearch-11.0.3 -d /root/debezium/connector-plugins

3.2 Kafka Connect

为了更方便、更规范地整合Kafka与其他数据系统,Kafka提供了Kafka Connect,Kafka Connect定义了source connectorsink connector接口规范。如果想从其他数据系统传输数据到Kafka,那么就需要实现source connector接口规范;如果想从Kafka传输数据到其他数据系统,那么就需要实现sink connector接口规范。此外,Kafka Connect还暴露了一套REST API,可以更方便地对connector进行管理。

3.2.1 配置
代码语言:javascript
复制
#################### connect-distributed.properties ###################
bootstrap.servers=localhost:9092

group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# Topic to use for storing offsets.
# This topic should have many partitions and be replicated and compacted.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

# Topic to use for storing connector and task configurations.
# This topic should be a single partition, highly replicated.
config.storage.topic=connect-configs
config.storage.replication.factor=1

# Topic to use for storing statuses. 
# This topic can have multiple partitions and should be replicated and compacted.
status.storage.topic=connect-status
status.storage.replication.factor=1
 
# Hostname & Port for the REST API to listen on. 
rest.host.name=localhost
rest.port=8083

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# The list should consist of top level directories. 
# source connector和sink connector的依赖路径
plugin.path=/root/debezium/connector-plugins/
3.2.2 创建topic
代码语言:javascript
复制
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic connect-offsets --partitions 3 --replication-factor 1 --config cleanup.policy=compact
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic connect-configs --partitions 1 --replication-factor 1 --config cleanup.policy=compact
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic connect-status --partitions 3 --replication-factor 1 --config cleanup.policy=compact
3.2.3 启动
代码语言:javascript
复制
nohup ./bin/connect-distributed.sh config/connect-distributed.properties>/dev/null 2>&1 &

3.3 注册debezium source connector

参数

描述

默认值

include.schema.changes

若值为true,那么source connector会将schema变更事件发布到kakfa中;topic的命名和database.server.name一致

true

tombstones.on.delete

若值为true,那么source connector针对delete操作会额外生成一个墓碑事件

true

database.server.id

和mysql中server_id值一致

database.include.list

指定数据库名称,多个数据库以逗号分割

database.history.kafka.topic

指定保存mysql schema history的topic名称,该topic仅能由debezium自己消费

代码语言:javascript
复制
{
    "name": "debezium-mysql-source-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "include.schema.changes": true,
        "tombstones.on.delete": true,
        "database.hostname": "10.254.9.82",
        "database.port": "3306",
        "database.user": "root",
        "database.password": "Nz_3@sMw7P",
        "database.server.id": "101",
        "database.server.name": "master",
        "database.include.list": "inventory",
        "database.history.kafka.bootstrap.servers": "localhost:9092",
        "database.history.kafka.topic": "master.schema.history"
    }
}
代码语言:javascript
复制
curl -X POST -H 'Content-Type: application/json' -d '@/root/debezium/connector-configs/debezium-mysql-source-connector.json' http://localhost:8083/connectors

当source connector注册成功后,再次检查topic,你会发现两个全新的topic,它们分别是:master和master.schema.history,二者差异如下表所示。

topic名称

保存内容

topic消费方

master

schema变更事件,但仅仅涉及database.include.list所指定的数据库

第三方消费者

master.schema.history

schema变更事件,涉及所有数据库

debezium

3.4 注册confluent sink connector

参数

描述

默认值

key.ignore

若值为false,那么Elasticsearch文档ID将和MySQL保持一致

false

schema.ignore

若值为false,那么Elasticsearch将禁用动态映射特性,转而根据schema来定义文档中字段的数据类型

false

write.method

若值为UPSERT,那么Elasticsearch会根据文档是否存在来进行INSERT亦或UPDATE操作

INSERT

behavior.on.null.values

若值为DELETE,那么sink connector将会根据文档ID删除该文档

FAIL

transforms.unwrap.type

ElasticsearchSinkConnector主要用于数据扁平化处理,因为Debezium所生成的数据变更事件是一种多层级的数据结构,这不利于在Elasticsearch中保存,所以需要对这种结构进行扁平化处理

transforms.unwrap.drop.tombstone

若值为false,墓碑事件不会被丢弃

true

transforms.unwrap.delete.handling.mode

Debezium会为每个DELETE操作生成删除事件和墓碑事件;若值为none,那么墓碑事件将会保留

drop

transforms.key.type

ExtractField$Key可以从Debezium数据变更事件的Key中抽取特定字段值

transforms.key.field

指定抽取字段

代码语言:javascript
复制
{
    "name": "confluent-elasticsearch-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "master.inventory.customers",
        "batch.size": "500",
        "key.ignore": false,
        "schema.ignore": false,
        "write.method": "UPSERT",
        "connection.url": "http://10.254.8.14:9200",
        "connection.username": "elastic",
        "connection.password": "Qwe123!@cmss",
        "connection.timeout.ms": "3000",
        "read.timeout.ms": "5000",
        "behavior.on.null.values": "DELETE",
        "transforms": "unwrap, key",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": false,
        "transforms.unwrap.delete.handling.mode": "none",
        "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.key.field": "id"
    }
}
代码语言:javascript
复制
curl -X POST -H 'Content-Type: application/json' -d '@/root/debezium/connector-configs/confluent-elasticsearch-sink-connector.json' http://localhost:8083/connectors

当你完成source connector和sink connector的注册后,你可以通过通过Kafka Connect提供的REST API来查看当前已注册的连接器,具体如下:

代码语言:javascript
复制
curl --location --request GET 'http://10.254.8.14:8083/connectors'
-----------------------------------
[
    "confluent-elasticsearch-sink-connector",
    "debezium-mysql-source-connector"
]

kafka transformation

  • Source connectors pass records through the transformation before writing to the Kafka topic.
  • Sink connectors pass records through the transformation before writing to the sink.

3.5 验证

3.5.1 插入数据
代码语言:javascript
复制
INSERT INTO `inventory`.`customers`
            (`id`,
             `first_name`,
             `last_name`,
             `email`,
             `create_time`,
             `update_time`)
VALUES      ( 1001,
              'optimus',
              'prime',
              'optimus@prime.com',
              '2021-03-03 13:55:07.000000',
              '2021-03-21 13:55:11.000000' ); 
代码语言:javascript
复制
GET /master.inventory.customers/_search
{
    "query": {
        "match_all": {}
    }
}
-----------------------------------
{
    "took": 750,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "master.inventory.customers",
                "_type": "_doc",
                "_id": "1001",
                "_score": 1.0,
                "_source": {
                    "id": 1001,
                    "first_name": "optimus",
                    "last_name": "prime",
                    "email": "optimus@prime.com",
                    "create_time": 1614779707000000,
                    "update_time": 1616334911000000
                }
            }
        ]
    }
}
-----------------------------------
3.5.2 删除数据
代码语言:javascript
复制
DELETE FROM `inventory`.`customers` WHERE  `id` = 1002 
代码语言:javascript
复制
GET /master.inventory.customers/_search
{
    "query": {
        "match_all": {}
    }
}
-----------------------------------
{
    "took": 435,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 0,
            "relation": "eq"
        },
        "max_score": null,
        "hits": []
    }
}
-----------------------------------
3.5.3 更新数据
代码语言:javascript
复制
UPDATE `inventory`.`customers`
SET    `email` = 'optimus_prime@transformers.com'
WHERE  `id` = 1001 
代码语言:javascript
复制
GET /master.inventory.customers/_search
{
    "query": {
        "match_all": {}
    }
}
-----------------------------------
{
    "took": 1,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "master.inventory.customers",
                "_type": "_doc",
                "_id": "1001",
                "_score": 1.0,
                "_source": {
                    "id": 1001,
                    "first_name": "optimus",
                    "last_name": "prime",
                    "email": "optimus_prime@transformers.com",
                    "create_time": 1614779707000000,
                    "update_time": 1616334911000000
                }
            }
        ]
    }
}
-----------------------------------
3.5.4 更新主键
代码语言:javascript
复制
UPDATE `inventory`.`customers`
SET    `id` = 1002
WHERE  `id` = 1001 
代码语言:javascript
复制
GET /master.inventory.customers/_search
{
    "query": {
        "match_all": {}
    }
}
-----------------------------------
{
    "took": 2,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "master.inventory.customers",
                "_type": "_doc",
                "_id": "1002",
                "_score": 1.0,
                "_source": {
                    "id": 1002,
                    "first_name": "optimus",
                    "last_name": "prime",
                    "email": "optimus_prime@transformers.com",
                    "create_time": 1614779707000000,
                    "update_time": 1616334911000000
                }
            }
        ]
    }
}
-----------------------------------
3.5.5 新增字段
代码语言:javascript
复制
ALTER TABLE `inventory`.`customers` ADD COLUMN `address` VARCHAR ( 255 ) NULL;

INSERT INTO `inventory`.`customers`
            (`id`,
             `first_name`,
             `last_name`,
             `email`,
             `create_time`,
             `update_time`,
             `address`
             )
VALUES      ( 1001,
              'optimus',
              'prime',
              'optimus@prime.com',
              '2021-03-03 13:55:07.000000',
              '2021-03-21 13:55:11.000000',
              '77 Massachusetts Avenue'); 
代码语言:javascript
复制
GET /master.inventory.customers/_search
{
    "query": {
        "match_all": {}
    }
}
-----------------------------------
{
    "took": 1,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "master.inventory.customers",
                "_type": "_doc",
                "_id": "1001",
                "_score": 1.0,
                "_source": {
                    "id": 1001,
                    "first_name": "optimus",
                    "last_name": "prime",
                    "email": "optimus@prime.com",
                    "create_time": 1614779707000000,
                    "update_time": 1616334911000000,
                    "address": "77 Massachusetts Avenue"
                }
            }
        ]
    }
}
-----------------------------------

4 总结

本文为大家分享了一种基于Debezium实现增量数据实时流转的方案。当你通过INSERT指令向MySQL新增一行记录时,那么Elasticsearch中也会实时新增一行记录;当你通过UPDATE指令向MySQL更新一行记录时,那么Elasticsearch中也会实时对该行记录进行更新;当你通过DELETE指令向MySQL删除一条记录时,那么Elasticsearch中也会实时删除该行记录。同时,Debezium在应对主键更新亦或字段新增两种场景时,依然有较好的表现。当然,如果你想将存量数据复制到Elasticsearch中,那么建议采用Logstash配合Kafka来实现。

5 参考文档

  1. https://debezium.io/
  2. https://docs.confluent.io/kafka-connect-elasticsearch/current/index.html
  3. https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz
  4. https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.4.2.Final/debezium-connector-mysql-1.4.2.Final-plugin.tar.gz
  5. https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-elasticsearch/versions/11.0.3/confluentinc-kafka-connect-elasticsearch-11.0.3.zip
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-03-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序猿杜小头 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Streaming Data Changes from MySQL to Elasticsearch
    • 1 安装MySQL
      • 1.1 解压与配置
      • 1.2 初始化
      • 1.3 启动MySQL Server
      • 1.4 重置'root'账号密码
      • 1.5 更新远程访问权限
    • 2 安装Kafka
      • 3 安装Debezium
        • 3.1 解压source connector与sink connector
        • 3.2 Kafka Connect
        • 3.3 注册debezium source connector
        • 3.4 注册confluent sink connector
        • 3.5 验证
      • 4 总结
        • 5 参考文档
        相关产品与服务
        Elasticsearch Service
        腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档