前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于 HBase & Phoenix 构建实时数仓(5)—— 用 Kafka Connect 做实时数据同步

基于 HBase & Phoenix 构建实时数仓(5)—— 用 Kafka Connect 做实时数据同步

作者头像
用户1148526
发布2024-03-21 10:06:16
920
发布2024-03-21 10:06:16
举报
文章被收录于专栏:Hadoop数据仓库Hadoop数据仓库

本篇演示安装配置 Kafka connect 插件实现 MySQL 到 Hbase 的实时数据同步。依赖环境见本专栏前面文章。相关软件版本如下:

  • JDK:11.0.22
  • MySQL:8.0.16
  • HBase:2.5.7
  • debezium-connector-mysql:2.4.2
  • kafka-connect-hbase:2.0.13

一、总体架构

总体结构如下图所示。

下表描述了四个节点上分别将会运行的相关进程。简便起见,安装部署过程中所用的命令都使用操作系统的 root 用户执行。

节点 进程

node1

node2

node3

node4

debezium-connector-mysql

*

*

*

kafka-connect-hbase

*

*

*

另外在 172.18.16.156 上安装 MySQL,并启动两个实例做主从复制,主库实例用3306端口,从库实例用3307端口。

所需安装包:

这里使用的 debezium connector 版本需要 JDK 11 以上支持。在安装了多个 JDK 版本的环境中,可以使用 alternatives 命令选择需要的版本:

代码语言:javascript
复制
[root@vvgg-z2-music-mysqld~]#alternatives --config java

共有 5 个程序提供“java”。

  选择    命令
-----------------------------------------------
   1           /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java
   2           /usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java
 + 3           /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.51-0.b16.el6_6.x86_64/jre/bin/java
   4           /usr/lib/jvm/jre-1.5.0-gcj/bin/java
*  5           /usr/lib/jvm/jdk-11-oracle-x64/bin/java

按 Enter 来保存当前选择[+],或键入选择号码:5
[root@vvgg-z2-music-mysqld~]#java -version
java version "11.0.22" 2024-01-16 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.22+9-LTS-219)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.22+9-LTS-219, mixed mode)
[root@vvgg-z2-music-mysqld~]#

在 172.18.16.156 的 /etc/hosts 文件中加入 Kafka 集群主机名:

代码语言:javascript
复制
# 编辑文件
vim /etc/hosts

添加以下内容:

代码语言:javascript
复制
172.18.4.126    node1
172.18.4.188    node2
172.18.4.71    node3
172.18.4.86    node4

二、安装配置 MySQL

安装配置 MySQL 一主一从双实例。

1. 创建 mysql 用户

代码语言:javascript
复制
# root 用于执行
useradd mysql
passwd mysql

2. 建立 MySQL 使用的目录

代码语言:javascript
复制
# 创建数据目录,确保数据目录 mysqldata 为空
mkdir -p /data/3306/mysqldata

# 创建 binlog 目录
mkdir -p /data/3306/dblog

# 创建临时目录
mkdir -p /data/3306/tmp

# 修改目录属主为 mysql
chown -R mysql:mysql /data

# 使用 mysql 用户执行下面的安装过程
su - mysql

3. 解压安装包

代码语言:javascript
复制
# 进入安装目录
cd ~

# 从tar包中把提取文件
tar xvf mysql-8.0.16-linux-glibc2.12-x86_64.tar.xz

# 建立软连接
ln -s mysql-8.0.16-linux-glibc2.12-x86_64 mysql-8.0.16

4. 配置环境变量

代码语言:javascript
复制
# 将 MySQL 可执行文件所在目录添加到 $PATH 环境变量中
# 编辑文件
vim ~/.bash_profile

# 修改或添加如下两行
PATH=$PATH:$HOME/.local/bin:$HOME/bin:/home/mysql/mysql-8.0.16/bin
export PATH

# 使配置生效
source ~/.bash_profile

5. 创建 MySQL 配置文件

代码语言:javascript
复制
# 编辑文件
vim /home/mysql/my_3306.cnf

文件内容如下:

代码语言:javascript
复制
[mysqld]
max_allowed_packet=1G
log_timestamps=system
binlog_transaction_dependency_tracking  = WRITESET
transaction_write_set_extraction        = XXHASH64

binlog_expire_logs_seconds=259200
lower_case_table_names=1
secure_file_priv=''
log_bin_trust_function_creators=on
character-set-server = utf8mb4
default_authentication_plugin=mysql_native_password
basedir=/home/mysql/mysql-8.0.16-linux-glibc2.12-x86_64
datadir=/data/3306/mysqldata
socket=/data/3306/mysqldata/mysql.sock

wait_timeout=30
innodb_buffer_pool_size = 16G
max_connections = 1000

default-time-zone = '+8:00'

port = 3306
skip-name-resolve 
user=mysql

innodb_print_all_deadlocks=1
log_output='table'
slow_query_log = 1
long_query_time = 1

tmp_table_size = 32M

# 开启 binlog
log-bin=/data/3306/dblog/mysql-bin
log-bin-index = /data/3306/dblog/mysql-bin.index 

tmpdir = /data/3306/tmp

server-id = 1563306

innodb_data_file_path = ibdata1:1G:autoextend
innodb_data_home_dir = /data/3306/mysqldata

innodb_log_buffer_size = 16M
innodb_log_file_size = 1G
innodb_log_files_in_group = 3
innodb_log_group_home_dir=/data/3306/dblog
innodb_max_dirty_pages_pct = 90
innodb_lock_wait_timeout = 120

gtid-mode = on
enforce_gtid_consistency=true

local_infile=0
log_error='/data/3306/mysqldata/master.err'
skip_symbolic_links=yes

[mysqldump]
quick
max_allowed_packet = 1G

[mysqld_safe]
open-files-limit = 8192

6. MySQL 系统初始化

代码语言:javascript
复制
mysqld --defaults-file=/home/mysql/my_3306.cnf --initialize

7. 启动 mysql 服务器

代码语言:javascript
复制
mysqld_safe --defaults-file=/home/mysql/my_3306.cnf &

8. 创建 dba 用户

代码语言:javascript
复制
# 连接 mysql 服务器
mysql -u root -p -S /data/3306/mysqldata/mysql.sock

-- 修改 root 用户密码
alter user user() identified by "123456";

-- 创建一个新的 dba 账号
create user 'dba'@'%' identified with mysql_native_password by '123456';
grant all on *.* to 'dba'@'%' with grant option;

重复执行 2 - 8 步,将 3306 换成 3307,创建从库实例。

三、配置 MySQL 主从复制

3306 主库实例执行:

代码语言:javascript
复制
-- 查看复制位点
show master status;
-- 创建复制用户并授权
create user 'repl'@'%' identified with mysql_native_password by '123456';
grant replication client,replication slave on *.* to 'repl'@'%';
-- 创建测试库表及数据
create database test;
create table test.t1 (
  id bigint(20) not null auto_increment,
  remark varchar(32) default null comment '备注',
  createtime timestamp not null default current_timestamp comment '创建时间',
  primary key (id));
insert into test.t1 (remark) values ('第一行:row1'),('第二行:row2'),('第三行:row3');

输出:

代码语言:javascript
复制
mysql> show master status;
+------------------+----------+--------------+------------------+------------------------------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set                        |
+------------------+----------+--------------+------------------+------------------------------------------+
| mysql-bin.000001 |      977 |              |                  | ba615057-e11c-11ee-b80e-246e961c91f8:1-3 |
+------------------+----------+--------------+------------------+------------------------------------------+
1 row in set (0.00 sec)

mysql> create user 'repl'@'%' identified with mysql_native_password by '123456';
Query OK, 0 rows affected (0.01 sec)

mysql> grant replication client,replication slave on *.* to 'repl'@'%';
Query OK, 0 rows affected (0.00 sec)

mysql> create database test;
Query OK, 1 row affected (0.00 sec)

mysql> create table test.t1 (
    ->   id bigint(20) not null auto_increment,
    ->   remark varchar(32) default null comment '备注',
    ->   createtime timestamp not null default current_timestamp comment '创建时间',
    ->   primary key (id));
Query OK, 0 rows affected (0.01 sec)

mysql> insert into test.t1 (remark) values ('第一行:row1'),('第二行:row2'),('第三行:row3');
Query OK, 3 rows affected (0.00 sec)
Records: 3  Duplicates: 0  Warnings: 0

3307 从库实例执行:

代码语言:javascript
复制
change master to
master_host='172.18.16.156',
master_port=3306,
master_user='repl',
master_password='123456',
master_log_file='mysql-bin.000001',
master_log_pos=977;

start slave;
show slave status\G
select user,host from mysql.user;
select * from test.t1;

输出:
mysql> change master to
    -> master_host='172.18.16.156',
    -> master_port=3306,
    -> master_user='repl',
    -> master_password='123456',
    -> master_log_file='mysql-bin.000001',
    -> master_log_pos=977;
Query OK, 0 rows affected, 2 warnings (0.00 sec)

mysql> start slave;
Query OK, 0 rows affected (0.01 sec)

mysql> show slave status\G
*************************** 1. row ***************************
               Slave_IO_State: Waiting for master to send event
                  Master_Host: 172.18.16.156
                  Master_User: repl
                  Master_Port: 3306
                Connect_Retry: 60
              Master_Log_File: mysql-bin.000001
          Read_Master_Log_Pos: 2431
               Relay_Log_File: vvgg-z2-music-mysqld-relay-bin.000002
                Relay_Log_Pos: 1776
        Relay_Master_Log_File: mysql-bin.000001
             Slave_IO_Running: Yes
            Slave_SQL_Running: Yes
              Replicate_Do_DB: 
          Replicate_Ignore_DB: 
           Replicate_Do_Table: 
       Replicate_Ignore_Table: 
      Replicate_Wild_Do_Table: 
  Replicate_Wild_Ignore_Table: 
                   Last_Errno: 0
                   Last_Error: 
                 Skip_Counter: 0
          Exec_Master_Log_Pos: 2431
              Relay_Log_Space: 1999
              Until_Condition: None
               Until_Log_File: 
                Until_Log_Pos: 0
           Master_SSL_Allowed: No
           Master_SSL_CA_File: 
           Master_SSL_CA_Path: 
              Master_SSL_Cert: 
            Master_SSL_Cipher: 
               Master_SSL_Key: 
        Seconds_Behind_Master: 0
Master_SSL_Verify_Server_Cert: No
                Last_IO_Errno: 0
                Last_IO_Error: 
               Last_SQL_Errno: 0
               Last_SQL_Error: 
  Replicate_Ignore_Server_Ids: 
             Master_Server_Id: 1563306
                  Master_UUID: ba615057-e11c-11ee-b80e-246e961c91f8
             Master_Info_File: mysql.slave_master_info
                    SQL_Delay: 0
          SQL_Remaining_Delay: NULL
      Slave_SQL_Running_State: Slave has read all relay log; waiting for more updates
           Master_Retry_Count: 86400
                  Master_Bind: 
      Last_IO_Error_Timestamp: 
     Last_SQL_Error_Timestamp: 
               Master_SSL_Crl: 
           Master_SSL_Crlpath: 
           Retrieved_Gtid_Set: ba615057-e11c-11ee-b80e-246e961c91f8:4-8
            Executed_Gtid_Set: ba615057-e11c-11ee-b80e-246e961c91f8:4-8,
c2df1946-e11c-11ee-8026-246e961c91f8:1-3
                Auto_Position: 0
         Replicate_Rewrite_DB: 
                 Channel_Name: 
           Master_TLS_Version: 
       Master_public_key_path: 
        Get_master_public_key: 0
            Network_Namespace: 
1 row in set (0.00 sec)

mysql> select user,host from mysql.user;
+------------------+-----------+
| user             | host      |
+------------------+-----------+
| dba              | %         |
| repl             | %         |
| mysql.infoschema | localhost |
| mysql.session    | localhost |
| mysql.sys        | localhost |
| root             | localhost |
+------------------+-----------+
6 rows in set (0.00 sec)

mysql> select * from test.t1;
+----+------------------+---------------------+
| id | remark           | createtime          |
+----+------------------+---------------------+
|  1 | 第一行:row1     | 2024-03-20 10:25:32 |
|  2 | 第二行:row2     | 2024-03-20 10:25:32 |
|  3 | 第三行:row3     | 2024-03-20 10:25:32 |
+----+------------------+---------------------+
3 rows in set (0.00 sec)

MySQL主从复制相关配置参见“配置异步复制”。

四、安装部署 Kafka Connector

在 node2 上执行以下步骤。

1. 创建插件目录

代码语言:javascript
复制
mkdir $KAFKA_HOME/plugins

2. 解压文件到插件目录

代码语言:javascript
复制
# debezium-connector-mysql
unzip debezium-debezium-connector-mysql-2.4.2.zip -d $KAFKA_HOME/plugins/
# kafka-connect-hbase
unzip confluentinc-kafka-connect-hbase-2.0.13.zip -d $KAFKA_HOME/plugins/

3. 配置 Kafka Connector

(1)配置属性文件
代码语言:javascript
复制
# 编辑 connect-distributed.properties 文件
vim $KAFKA_HOME/config/connect-distributed.properties

内容如下:

代码语言:javascript
复制
bootstrap.servers=node2:9092,node3:9092,node4:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=3
offset.flush.interval.ms=10000
plugin.path=/root/kafka_2.13-3.7.0/plugins
(2)分发到其它节点
代码语言:javascript
复制
scp $KAFKA_HOME/config/connect-distributed.properties node3:$KAFKA_HOME/config/
scp $KAFKA_HOME/config/connect-distributed.properties node4:$KAFKA_HOME/config/
scp -r $KAFKA_HOME/plugins node3:$KAFKA_HOME/
scp -r $KAFKA_HOME/plugins node4:$KAFKA_HOME/
(3)以 distributed 方式启动

三台都执行,在三个节点上各启动一个 worker 进程,用以容错和负载均衡。

代码语言:javascript
复制
connect-distributed.sh -daemon $KAFKA_HOME/config/connect-distributed.properties 
# 确认日志是否有 ERROR
grep ERROR ~/kafka_2.13-3.7.0/logs/connectDistributed.out
(4)确认 connector 插件和自动生成的 topic

查看连接器插件:

代码语言:javascript
复制
curl -X GET http://node2:8083/connector-plugins | jq

从输出中可以看到,Kafka connect 已经识别到了 hbase sink 和 mysql source 插件:

代码语言:javascript
复制
[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connector-plugins | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   494  100   494    0     0   4111      0 --:--:-- --:--:-- --:--:--  4116
[
  {
    "class": "io.confluent.connect.hbase.HBaseSinkConnector",
    "type": "sink",
    "version": "2.0.13"
  },
  {
    "class": "io.debezium.connector.mysql.MySqlConnector",
    "type": "source",
    "version": "2.4.2.Final"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "3.7.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "3.7.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "3.7.0"
  }
]
[root@vvml-yz-hbase-test~]#

查看 topic:

代码语言:javascript
复制
kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092

从输出中可以看到,Kafka connect 启动时自动创建了 connect-configs、connect-offsets、connect-status 三个 topic:

代码语言:javascript
复制
[root@vvml-yz-hbase-test~]#kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
__consumer_offsets
connect-configs
connect-offsets
connect-status
test-1
test-3
[root@vvml-yz-hbase-test~]#

4. 创建 source connector

(1)创建源 mysql 配置文件
代码语言:javascript
复制
# 编辑文件
vim $KAFKA_HOME/plugins/source-mysql.json

内容如下:

代码语言:javascript
复制
{
 "name": "mysql-source-connector",
 "config": {
     "connector.class": "io.debezium.connector.mysql.MySqlConnector",
     "tasks.max": "1",
     "topic.prefix": "mysql-hbase-test",
     "database.hostname": "172.18.16.156",
     "database.port": "3307",
     "database.user": "dba",
     "database.password": "123456",
     "database.server.id": "1563307",
     "database.server.name": "dbserver1",
     "database.include.list": "test",
     "schema.history.internal.kafka.bootstrap.servers": "node2:9092,node3:9092,node4:9092",
     "schema.history.internal.kafka.topic": "schemahistory.mysql-hbase-test"
     }
 }
(2)创建 mysql source connector
代码语言:javascript
复制
# 创建 connector
curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/source-mysql.json"
# 查看 connector 状态
curl -X GET http://node2:8083/connectors/mysql-source-connector/status | jq
# 查看 topic
kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092

从输出中可以看到,mysql-source-connector 状态为 RUNNING,并自动创建了三个 topic:

代码语言:javascript
复制
[root@vvml-yz-hbase-test~]#curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/source-mysql.json"
HTTP/1.1 201 Created
Date: Wed, 20 Mar 2024 02:31:30 GMT
Location: http://node2:8083/connectors/mysql-source-connector
Content-Type: application/json
Content-Length: 579
Server: Jetty(9.4.53.v20231009)

{"name":"mysql-source-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","topic.prefix":"mysql-hbase-test","database.hostname":"172.18.16.156","database.port":"3307","database.user":"dba","database.password":"123456","database.server.id":"1563307","database.server.name":"dbserver1","database.include.list":"test","schema.history.internal.kafka.bootstrap.servers":"node2:9092,node3:9092,node4:9092","schema.history.internal.kafka.topic":"schemahistory.mysql-hbase-test","name":"mysql-source-connector"},"tasks":[],"type":"source"}
[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connectors/mysql-source-connector/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   182  100   182    0     0  20726      0 --:--:-- --:--:-- --:--:-- 22750
{
  "name": "mysql-source-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "172.18.4.188:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "172.18.4.188:8083"
    }
  ],
  "type": "source"
}
[root@vvml-yz-hbase-test~]#kafka-topics.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
__consumer_offsets
connect-configs
connect-offsets
connect-status
mysql-hbase-test
mysql-hbase-test.test.t1
schemahistory.mysql-hbase-test
test-1
test-3
[root@vvml-yz-hbase-test~]#

5. 创建 sink connector

(1)创建目标 hbase 配置文件
代码语言:javascript
复制
# 编辑文件
vim $KAFKA_HOME/plugins/sink-hbase.json

内容如下:

代码语言:javascript
复制
{
  "name": "hbase-sink-connector",
  "config": {
    "topics": "mysql-hbase-test.test.t1",
    "tasks.max": "1",
    "connector.class": "io.confluent.connect.hbase.HBaseSinkConnector",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.storage.StringConverter",
    "confluent.topic.bootstrap.servers": "node2:9092,node3:9092,node4:9092",
    "confluent.topic.replication.factor":3,
    "hbase.zookeeper.quorum": "node2,node3,node4",
    "hbase.zookeeper.property.clientPort": "2181",
    "auto.create.tables": "true",
    "auto.create.column.families": "true",
    "table.name.format": "example_table"
  }
}
(2)创建 hbase sink connector
代码语言:javascript
复制
# 创建 connector
curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/sink-hbase.json"
# 查看 connector 状态
curl -X GET http://node2:8083/connectors/hbase-sink-connector/status | jq
# 查看 consumer group
kafka-consumer-groups.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092

从输出中可以看到,hbase-sink-connector 状态为 RUNNING,并自动创建了一个消费者组:

代码语言:javascript
复制
[root@vvml-yz-hbase-test~]#curl -X POST -H 'Content-Type: application/json' -i 'http://node2:8083/connectors' -d @"/root/kafka_2.13-3.7.0/plugins/sink-hbase.json"
HTTP/1.1 201 Created
Date: Wed, 20 Mar 2024 02:33:11 GMT
Location: http://node2:8083/connectors/hbase-sink-connector
Content-Type: application/json
Content-Length: 654
Server: Jetty(9.4.53.v20231009)

{"name":"hbase-sink-connector","config":{"topics":"mysql-hbase-test.test.t1","tasks.max":"1","connector.class":"io.confluent.connect.hbase.HBaseSinkConnector","key.converter":"org.apache.kafka.connect.storage.StringConverter","value.converter":"org.apache.kafka.connect.storage.StringConverter","confluent.topic.bootstrap.servers":"node2:9092,node3:9092,node4:9092","confluent.topic.replication.factor":"3","hbase.zookeeper.quorum":"node2,node3,node4","hbase.zookeeper.property.clientPort":"2181","auto.create.tables":"true","auto.create.column.families":"true","table.name.format":"example_table","name":"hbase-sink-connector"},"tasks":[],"type":"sink"}
[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connectors/hbase-sink-connector/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   176  100   176    0     0  23084      0 --:--:-- --:--:-- --:--:-- 25142
{
  "name": "hbase-sink-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "172.18.4.71:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "172.18.4.71:8083"
    }
  ],
  "type": "sink"
}
[root@vvml-yz-hbase-test~]#kafka-consumer-groups.sh --list --bootstrap-server node2:9092,node3:9092,node4:9092
connect-hbase-sink-connector
[root@vvml-yz-hbase-test~]#

6. 存量数据自动同步

sink connector 自动在 hbase 中创建了 example_table 表,并且自动同步了前面配置 MySQL 主从复制时添加的三条测试数据:

代码语言:javascript
复制
[root@vvml-yz-hbase-test~]#hbase shell
HBase Shell
Use "help" to get list of supported commands.
Use "exit" to quit this interactive shell.
For Reference, please visit: http://hbase.apache.org/2.0/book.html#shell
Version 2.5.7-hadoop3, r6788f98356dd70b4a7ff766ea7a8298e022e7b95, Thu Dec 14 16:16:10 PST 2023
Took 0.0012 seconds                                                                                                                       
hbase:001:0> list
TABLE                                                                                                                                     
SYSTEM:CATALOG                                                                                                                            
SYSTEM:CHILD_LINK                                                                                                                         
SYSTEM:FUNCTION                                                                                                                           
SYSTEM:LOG                                                                                                                                
SYSTEM:MUTEX                                                                                                                              
SYSTEM:SEQUENCE                                                                                                                           
SYSTEM:STATS                                                                                                                              
SYSTEM:TASK                                                                                                                               
example_table                                                                                                                             
test                                                                                                                                      
10 row(s)
Took 0.3686 seconds                                                                                                                       
=> ["SYSTEM:CATALOG", "SYSTEM:CHILD_LINK", "SYSTEM:FUNCTION", "SYSTEM:LOG", "SYSTEM:MUTEX", "SYSTEM:SEQUENCE", "SYSTEM:STATS", "SYSTEM:TASK", "example_table", "test"]
hbase:002:0> describe 'example_table'
Table example_table is ENABLED                                                                                                            
example_table, {TABLE_ATTRIBUTES => {METADATA => {'hbase.store.file-tracker.impl' => 'DEFAULT'}}}                                         
COLUMN FAMILIES DESCRIPTION                                                                                                               
{NAME => 'mysql-hbase-test.test.t1', INDEX_BLOCK_ENCODING => 'NONE', VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING =
> 'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', IN_MEMORY => 'false', COMPRESSION => 'NON
E', BLOCKCACHE => 'true', BLOCKSIZE => '65536 B (64KB)'}                                                                                  

1 row(s)
Quota is disabled
Took 0.1173 seconds                                                                                                                       
hbase:003:0> scan 'example_table',{FORMATTER=>'toString'}
ROW                                 COLUMN+CELL                                                                                           
 {"id":1}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.587, value={"before":null,"
                                    after":{"id":1,"remark":"第一行:row1","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin
                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"first","db":"test"
                                    ,"sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"
                                    thread":null,"query":null},"op":"r","ts_ms":1710901892115,"transaction":null}                         
 {"id":2}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.593, value={"before":null,"
                                    after":{"id":2,"remark":"第二行:row2","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin
                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"true","db":"test",
                                    "sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"t
                                    hread":null,"query":null},"op":"r","ts_ms":1710901892117,"transaction":null}                          
 {"id":3}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.596, value={"before":null,"
                                    after":{"id":3,"remark":"第三行:row3","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin
                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"last","db":"test",
                                    "sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"t
                                    hread":null,"query":null},"op":"r","ts_ms":1710901892117,"transaction":null}                          
3 row(s)
Took 0.0702 seconds                                                                                                                       
hbase:004:0> 

debezium-connector-mysql 默认会在启动时将存量数据写到 Kafka 中,这使得在构建实时数仓时,可以做到存量数据与增量数据一步实时同步,极大方便了 CDC(Change Data Capture,变化数据捕获) 过程。

7. 实时数据同步测试

MySQL 主库数据变更:

代码语言:javascript
复制
insert into test.t1 (remark) values ('第四行:row4');
update test.t1 set remark = '第五行:row5' where id = 4;
delete from test.t1 where id =1;

Hbase 查看数据变化:

代码语言:javascript
复制
hbase:004:0> scan 'example_table',{FORMATTER=>'toString'}
ROW                                 COLUMN+CELL                                                                                           
 {"id":1}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.587, value={"before":null,"
                                    after":{"id":1,"remark":"第一行:row1","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin
                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"first","db":"test"
                                    ,"sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"
                                    thread":null,"query":null},"op":"r","ts_ms":1710901892115,"transaction":null}                         
 {"id":2}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.593, value={"before":null,"
                                    after":{"id":2,"remark":"第二行:row2","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin
                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"true","db":"test",
                                    "sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"t
                                    hread":null,"query":null},"op":"r","ts_ms":1710901892117,"transaction":null}                          
 {"id":3}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:33:13.596, value={"before":null,"
                                    after":{"id":3,"remark":"第三行:row3","createtime":"2024-03-20T02:25:32Z"},"source":{"version":"2.4.2.Fin
                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710901892000,"snapshot":"last","db":"test",
                                    "sequence":null,"table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000001","pos":2465,"row":0,"t
                                    hread":null,"query":null},"op":"r","ts_ms":1710901892117,"transaction":null}                          
 {"id":4}                           column=mysql-hbase-test.test.t1:KAFKA_VALUE, timestamp=2024-03-20T10:38:18.788, value={"before":null,"
                                    after":{"id":4,"remark":"第四行:row4","createtime":"2024-03-20T02:38:18Z"},"source":{"version":"2.4.2.Fin
                                    al","connector":"mysql","name":"mysql-hbase-test","ts_ms":1710902298000,"snapshot":"false","db":"test"
                                    ,"sequence":null,"table":"t1","server_id":1563306,"gtid":"ba615057-e11c-11ee-b80e-246e961c91f8:9","fil
                                    e":"mysql-bin.000001","pos":2679,"row":0,"thread":49,"query":null},"op":"c","ts_ms":1710902298665,"tra
                                    nsaction":null}                                                                                       
4 row(s)
Took 0.0091 seconds                                                                                                                       
hbase:005:0> 

MySQL 执行的 delete、update 操作没有同步到 Hbase。

查看消费情况:

代码语言:javascript
复制
[root@vvml-yz-hbase-test~]#kafka-consumer-groups.sh --group connect-hbase-sink-connector --describe --bootstrap-server node2:9092,node3:9092,node4:9092

Consumer group 'connect-hbase-sink-connector' has no active members.

GROUP                        TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
connect-hbase-sink-connector mysql-hbase-test.test.t1 0          3               7               4               -               -               -
[root@vvml-yz-hbase-test~]#

数据变更都写入了 Kafka,但没有都消费。

查看 sink connector 状态:

代码语言:javascript
复制
[root@vvml-yz-hbase-test~]#curl -X GET http://node2:8083/connectors/hbase-sink-connector/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  2168  100  2168    0     0   368k      0 --:--:-- --:--:-- --:--:--  423k
{
  "name": "hbase-sink-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "172.18.4.71:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "172.18.4.71:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Error inserting record in topic mysql-hbase-test.test.t1 with offset 4 and partition 0 to table example_table: \n\tat io.confluent.connect.bigtable.client.BigtableClient.handleWriteErrors(BigtableClient.java:515)\n\tat io.confluent.connect.bigtable.client.BigtableClient.insert(BigtableClient.java:287)\n\tat io.confluent.connect.bigtable.client.InsertWriter.writeInserts(InsertWriter.java:58)\n\tat io.confluent.connect.bigtable.client.InsertWriter.write(InsertWriter.java:48)\n\tat io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:100)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)\n\t... 11 more\nCaused by: org.apache.kafka.connect.errors.ConnectException: Row with specified row key already exists.\n\t... 16 more\n"
    }
  ],
  "type": "sink"
}
[root@vvml-yz-hbase-test~]#

查看 node3 上的日志文件 ~/kafka_2.13-3.7.0/logs/connectDistributed.out,错误信息如下:

代码语言:javascript
复制
[2024-03-20 10:38:18,794] ERROR [hbase-sink-connector|task-0] WorkerSinkTask{id=hbase-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Error inserting record in topic mysql-hbase-test.test.t1 with offset 4 and partition 0 to table example_table:  (org.apache.kafka.connect.runtime.WorkerSinkTask:630)
org.apache.kafka.connect.errors.ConnectException: Error inserting record in topic mysql-hbase-test.test.t1 with offset 4 and partition 0 to table example_table: 
    at io.confluent.connect.bigtable.client.BigtableClient.handleWriteErrors(BigtableClient.java:515)
    at io.confluent.connect.bigtable.client.BigtableClient.insert(BigtableClient.java:287)
    at io.confluent.connect.bigtable.client.InsertWriter.writeInserts(InsertWriter.java:58)
    at io.confluent.connect.bigtable.client.InsertWriter.write(InsertWriter.java:48)
    at io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:100)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Row with specified row key already exists.
    ... 16 more
[2024-03-20 10:38:18,794] ERROR [hbase-sink-connector|task-0] WorkerSinkTask{id=hbase-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Error inserting record in topic mysql-hbase-test.test.t1 with offset 4 and partition 0 to table example_table: 
    at io.confluent.connect.bigtable.client.BigtableClient.handleWriteErrors(BigtableClient.java:515)
    at io.confluent.connect.bigtable.client.BigtableClient.insert(BigtableClient.java:287)
    at io.confluent.connect.bigtable.client.InsertWriter.writeInserts(InsertWriter.java:58)
    at io.confluent.connect.bigtable.client.InsertWriter.write(InsertWriter.java:48)
    at io.confluent.connect.bigtable.BaseBigtableSinkTask.put(BaseBigtableSinkTask.java:100)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
    ... 11 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Row with specified row key already exists.
    ... 16 more

可以看到报错为:Row with specified row key already exists.

原因是 sink connector 将 MySQL 的 update、delete 都转化为 Hbase 数据插入,但自动识别的 rowkey 为 MySQL 表的主键,而该 rowkey 已经存在,所以插入报错了。这种同步行为需要注意。

参考:

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2024-03-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、总体架构
  • 二、安装配置 MySQL
    • 1. 创建 mysql 用户
      • 2. 建立 MySQL 使用的目录
        • 3. 解压安装包
          • 4. 配置环境变量
            • 5. 创建 MySQL 配置文件
              • 6. MySQL 系统初始化
                • 7. 启动 mysql 服务器
                  • 8. 创建 dba 用户
                  • 三、配置 MySQL 主从复制
                  • 四、安装部署 Kafka Connector
                    • 1. 创建插件目录
                      • 2. 解压文件到插件目录
                        • 3. 配置 Kafka Connector
                          • (1)配置属性文件
                          • (2)分发到其它节点
                          • (3)以 distributed 方式启动
                          • (4)确认 connector 插件和自动生成的 topic
                        • 4. 创建 source connector
                          • (1)创建源 mysql 配置文件
                          • (2)创建 mysql source connector
                        • 5. 创建 sink connector
                          • (1)创建目标 hbase 配置文件
                          • (2)创建 hbase sink connector
                        • 6. 存量数据自动同步
                          • 7. 实时数据同步测试
                          • 参考:
                          相关产品与服务
                          云数据库 MySQL
                          腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档