前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >通过kafka/flink加载MySQL表数据消费 快速安装配置

通过kafka/flink加载MySQL表数据消费 快速安装配置

原创
作者头像
用户7689089
修改2021-04-29 20:42:16
1.2K0
修改2021-04-29 20:42:16
举报

说明:对于数据迁移工具来说,好多封装了kafka和flink的,出于好奇,个人试着去下载了一下kafka和flink试着部署一下,本次就简单的记录一下安装过程,踩坑也比较多。部分比较典型的错误,后续整理完也分享一下。本次操作都是单机,单实例,单节点,不涉及分布式或者集群配置。

本文共分为3个阶段:

一、mysql安装部分

二、kafka安装和配置

三、kafka的消费和测试

四、flink通过sql-client客户端加载读取mysql表

==========软件版本:

操作系统:Centos7.4

1、mysql-5.7.22-linux-glibc2.12-x86_64

链接:https://pan.baidu.com/s/1KlR-rRoHC97aX2j2ha05ng

提取码:ksi9

=======================

2、apache-zookeeper-3.5.6-bin.tar.gz

链接:https://pan.baidu.com/s/1zOSeOK_ZiPmNzP8EuwwTBA

提取码:k1is

=====================

3、confluentinc-kafka-connect-jdbc-10.1.1.zip

链接:https://pan.baidu.com/s/1jTOUiXNdNOBQnTiuuiDcOA

提取码:spwr

====================

4、kafka_2.11-2.4.0.tgz

链接:https://pan.baidu.com/s/1u3Q_4F1nQSFWj7qG6ESDZA

提取码:x2oy

=================

5、flink-1.12.2-bin-scala_2.11.tgz

链接:https://pan.baidu.com/s/1tPhpAmLlEhbeV8y-hNnb_A

提取码:qswm

===========java版本和使用的jar包:

mysql-connector-java-8.0.23.jar

链接:https://pan.baidu.com/s/1XDQkUMzJm7dRn-L74Ar-PQ

提取码:shfy

===================================

flink-sql-connector-mysql-cdc-1.0.0.jar

链接:https://pan.baidu.com/s/13z5ocJaebmOM71TXKOCnLQ

提取码:2ine

=============================

[root@localhost ~]# java -version

openjdk version "1.8.0_131"

OpenJDK Runtime Environment (build 1.8.0_131-b12)

OpenJDK 64-Bit Server VM (build 25.131-b12, mixed mode)

====================================================================

一、首先MySQL安装和初始化

这里我就快速将主要步骤贴上来,后续统一的软件目录都放置在/usr/local 下

[root@localhost ~]# useradd mysql

[root@localhost ~]# passwd mysql

[root@localhost ~]# tar -xf mysql-5.7.22-linux-glibc2.12-x86_64.tar.gz

[root@localhost ~]# mv mysql-5.7.22-linux-glibc2.12 /usr/local/mysql

[root@localhost ~]# mkdir -p /usr/local/mysql/data

[root@localhost ~]# chown mysql.mysql /usr/local/mysql/ -R

配置mysql的默认的变量参数文件:

[root@localhost ~]# vim /etc/my.cnf

[root@localhost ~]# cat /etc/my.cnf

[mysqld]

basedir=/usr/local/mysql

datadir=/usr/local/mysql/data

socket=/tmp/mysql.sock

port=3306

server-id=100

log-bin

[root@localhost ~]# chown mysql.mysql /etc/my.cnf

[root@localhost ~]# su - mysql

Last login: Thu Apr 29 16:41:25 CST 2021 on pts/1

[root@localhost ~]# su - mysql

Last login: Thu Apr 29 16:12:57 CST 2021 on pts/1

[mysql@localhost ~]$

[mysql@localhost ~]$ exit

logout

[root@localhost ~]# chown mysql.mysql /usr/local/mysql/ -R

[root@localhost ~]# mkdir -p /usr/local/mysql/data

[root@localhost ~]#

[root@localhost ~]# chown mysql.mysql /usr/local/mysql/ -R

[root@localhost ~]#

[root@localhost ~]# vim /etc/my.cnf

[root@localhost ~]# cat /etc/my.cnf

[mysqld]

basedir=/usr/local/mysql

datadir=/usr/local/mysql/data

socket=/tmp/mysql.sock

port=3306

server-id=100

log-bin

[root@localhost ~]# chown mysql.mysql /etc/my.cnf

[root@localhost ~]#

[root@localhost ~]# su - mysql

Last login: Thu Apr 29 16:41:25 CST 2021 on pts/1

[mysql@localhost ~]$

[mysql@localhost ~]$ vim .bash_profile

把mysql的路径信息加进去,方便调用mysql客户端程序

export MYSQL_HOME=/usr/local/mysql

export PATH=$PATH:$MYSQL_HOME/bin

[mysql@localhost ~]$ source .bash_profile

初始化mysql,并启动mysql数据库

[mysql@localhost ~]$ mysqld --initialize --basedir=/usr/local/mysql --datadir=/usr/local/mysql/data --socket=/tmp/mysql.sock --port=3306

2021-04-29T08:44:26.802747Z 0 [Warning] Gtid table is not ready to be used. Table 'mysql.gtid_executed' cannot be opened.

2021-04-29T08:44:26.805748Z 1 [Note] A temporary password is generated for root@localhost: FvfKr?zGg3B9

[mysql@localhost ~]$

[mysql@localhost ~]$ mysqld_safe --defaults-file=/etc/my.cnf &

[1] 4135

[mysql@localhost ~]$ jobs

[1]+ Running mysqld_safe --defaults-file=/etc/my.cnf &

修改初始root密码,并创建测试库 db1,并创建root远程登录账户,root@‘%’

[mysql@localhost ~]$ mysql -uroot -p'FvfKr?zGg3B9'

mysql: [Warning] Using a password on the command line interface can be insecure.

Welcome to the MySQL monitor. Commands end with ; or \g.

Your MySQL connection id is 2

Server version: 5.7.22-log

mysql> alter user root@localhost identified by '123123';

Query OK, 0 rows affected (0.00 sec)

mysql> flush privileges;

Query OK, 0 rows affected (0.00 sec)

mysql> create user root@'%' identified by '123123';

mysql>grant all privileges on *.* to root@'%';

mysql>create database db1; --------用与后边进行kafka消费指定读取的库

mysql>create table db1.t1(id int primary key ,name varchar(20),time1 timestamp default now()); -------kafka要读取,并消费的表

==================

二、kafka快速配置

使用root操作系统账户来配置

首先解压kafka需要使用zookeeper来做broker连接器注册记录的,用做 meta 信息存储,consumer 的消费状态,group 的管理以及 offset的值我们先解压并启动zookeeper。

[root@localhost ~]# tar -xf apache-zookeeper-3.5.6-bin.tar.gz

[root@localhost ~]# mv apache-zookeeper-3.5.6 /usr/local/zookeeper

[root@localhost ~]# cd /usr/local/zookeeper/

[root@localhost zookeeper]# ls

bin conf docs lib LICENSE.txt NOTICE.txt README.md README_packaging.txt

[root@localhost zookeeper]# cd conf/

[root@localhost conf]# ls

configuration.xsl log4j.properties zoo_sample.cfg

zookeeper默认读取zoo.cfg配置文件,这里默认系统给了一个zoo_sample.cfg文件,我们可以直接cp并重命名一下即可。

这里关于zk 的其他配置,就是用默认值就可以了。

[root@localhost conf]# cp zoo_sample.cfg zoo.cfg

[root@localhost conf]#

接下来是kafka的解压和配置文件配置

[root@localhost ~]# tar -xf kafka_2.11-2.4.0.tgz

[root@localhost ~]# mv kafka_2.11-2.4 /usr/local/kafka

[root@localhost ~]# cd /usr/local/kafka/

[root@localhost kafka]# ls

bin config libs LICENSE NOTICE site-docs

[root@localhost kafka]# cd config/ ----------切换到配置文件目录,找到server开头的的配置文件

[root@localhost config]# ls

connect-console-sink.properties connect-file-sink.properties connect-mirror-maker.properties log4j.properties tools-log4j.properties

connect-console-source.properties connect-file-source.properties connect-standalone.properties producer.properties trogdor.conf

connect-distributed.properties connect-log4j.properties consumer.properties server.properties zookeeper.properties

[root@localhost config]#

[root@localhost config]# vim server.properties

配置监听的端口内部和外部的单网卡可以写同一个,或者外部网卡不写

################################################

listeners=PLAINTEXT://192.168.100.10:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,

# it uses the value for "listeners" if configured. Otherwise, it will use the value

# returned from java.net.InetAddress.getCanonicalHostName().

#advertised.listeners=PLAINTEXT://your.host.name:9092

advertised.listeners=PLAINTEXT://192.168.100.10:9092

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).

# This is a comma separated host:port pairs, each corresponding to a zk

# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

# You can also append an optional chroot string to the urls to specify the

# root directory for all kafka znodes.

zookeeper.connect=localhost:2181 ------------zk的默认的地址和端口

# Timeout in ms for connecting to zookeepe

zookeeper.connection.timeout.ms=6000

解压插件,并配置kafka的conection的配置文件

[root@localhost ~]# unzip confluentinc-kafka-connect-jdbc-10.1.1.zip

[root@localhost ~]# cd /usr/local/kafka/

可以移动到kafka的目录中,并将其命名为connect-jdbc

[root@localhost kafka]# ls

bin config connect-jdbc libs LICENSE logs NOTICE site-docs

[root@localhost~]# cd /usr/local/kafka/config

修改单机连接的配置文件

[root@localhost config]# vim connect-standalone.properties

修改内容如下:

# These are defaults. This file just demonstrates how to override some settings.

bootstrap.servers=192.168.100.10:9092

# Examples:

# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,

#要加载的插件所在地目录,这里把kafka的lib和刚移动过的插件lib在这里声明一下

plugin.path=/usr/local/kafka/lib,/usr/local/kafka/connect-jdbc/lib

[root@localhost lib]# pwd

/usr/local/kafka/connect-jdbc/lib

[root@localhost lib]#

[root@localhost lib]# 这个插件目录中有kafka调用链接mysql 的jdbc的jar接口驱动

[root@localhost lib]# ls

checker-qual-3.5.0.jar mssql-jdbc-8.4.1.jre8.jar oraclepki-19.7.0.0.jar postgresql-42.2.19.jar ucp-19.7.0.0.ja

common-utils-6.0.0.jar ojdbc8-19.7.0.0.jar orai18n-19.7.0.0.jar simplefan-19.7.0.0.jar xdb-19.7.0.0.ja

jtds-1.3.1.jar ojdbc8-production-19.7.0.0.pom osdt_cert-19.7.0.0.jar slf4j-api-1.7.30.jar xmlparserv2-19.7.0.0.ja

kafka-connect-jdbc-10.1.1.jar ons-19.7.0.0.jar osdt_core-19.7.0.0.jar sqlite-jdbc-3.25.2.ja

[root@localhost lib]#

加下来配置连接器的参数文件

[root@localhost etc]# pwd

/usr/local/kafka/connect-jdbc/etc

[root@localhost etc]# ls

sink-quickstart-sqlite.properties source-quickstart-sqlite.properties

[root@localhost etc]# cp source-quickstart-sqlite.properties /usr/local/kafka/config/mysql..properties

[root@localhost config]# vim mysql.properties

配置内容如下:

# a table called 'users' will be written to the topic 'test-sqlite-jdbc-users'.

#connection.url=jdbc:sqlite:test.db

connection.url=jdbc:mysql://192.168.100.10:3306/db1?user=root&password=123123 # --注意格式db1? ,用户名和密码中间使用 &符号,

mode=incrementing

incrementing.column.name=id --增量参照列

topic.prefix=mysql- --kafka的topic命名将以此开头

table.whitelist=t1,t2,t3 --将要加载读取的mysql数据库中的表的白名单

# Define when identifiers should be quoted in DDL and DML statements.

==================

三、kafka的消费和测试

接下来启动zookeeper 和kafka服务

启动服务之前,我们可以先配置root下各个组件的路径变量,方便我们调用命令。生产不建议对root超管去做变量配置,一旦路径配置错误,会影响全局系统。

[root@localhost ~]# vim .bash_profile

export ZK_HOME=/usr/local/zookeepe

export KAFKA_HOME=/usr/local/kafka

export FLINK_HOME=/usr/local/flink ------flink这个预先配置,后边会用到

export PATH=$PATH:$ZK_HOME/bin:$KAFKA_HOME/bin:$FLINK_HOME/bin

[root@localhost ~]#

[root@localhost ~]# source .bash_profile

启动zookeeper服务

[root@localhost ~]# zkServer.sh start

/usr/bin/java

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg ----默认会加载zoo.cfg配置文件

Starting zookeeper ... STARTED

[root@localhost ~]# ps -ef|grep zookeeper #--查看服务进程

启动kafka服务

[root@localhost ~]# kafka-server-start.sh /usr/local/kafka/config/server.properties &

[1] 19309

[root@localhost ~]#

[2021-04-29 19:49:54,740] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)

[2021-04-29 19:49:55,351] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)

[2021-04-29 19:49:55,352] INFO starting (kafka.server.KafkaServer)

[root@localhost ~]# jobs

[1]+ Running kafka-server-start.sh /usr/local/kafka/config/server.properties &

[root@localhost ~]#

三+、开始测试数据加载和消费

使用kafka的kafka-topic.sh尝试创建一个测试topic, 可以先测试服务是否可用 ,具体方式如下:

[root@localhost ~]# kafka-topics.sh --create --zookeeper 192.168.100.10:2181 --replication-factor 1 --partitions 3 --topic test-topic

Created topic test-topic.

查看创建的topic列表

[root@localhost ~]# kafka-topics.sh --list --zookeeper 192.168.100.10:2181

test-topic

[root@localhost ~]# 启动kafka到mysql的连接器

确认是否能都加载到mysql中db1库中的t1表

[root@localhost ~]#

[root@localhost ~]# connect-standalone.sh /usr/local/kafka/config/connect-standalone.properties /usr/local/kafka/config/mysql.properties &

如果能够成功加载mysql的db1.t1表,则可能会看到如下提示

[2021-04-29 19:59:19,874] INFO WorkerSourceTask{id=source-mysql-jdbc-autoincrement-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:209)

[2021-04-29 19:59:19,877] INFO Begin using SQL query: SELECT * FROM `db1`.`t1` WHERE `db1`.`t1`.`id` > ? ORDER BY `db1`.`t1`.`id` ASC (io.confluent.connect.jdbc.source.TableQuerier:164)

读取kafka加载的mysql表数据

接下来启动消费端,来消费kafka已经从mysql生产端加载的数据,先查看已经加载到的topic信息

[root@localhost config]# kafka-topics.sh --list --zookeeper 192.168.100.10:2181

__consumer_offsets

mysql-t1 ---这个就是在mysql.properties参数中定义的topic.prefix=mysql- 的kafka命名topic的前缀

test-topic

==========================

在mysql中插入如下数据:

mysql> select * from db1.t1;

mysql表t1数据
mysql表t1数据

[root@localhost config]# kafka-console-consumer.sh --bootstrap-server 192.168.100.10:9092 --topic mysql-t1 --from-beginning

kafka消费出的json格式数据
kafka消费出的json格式数据

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"}],"optional":false,"name":"t1"},"payload":{"id":10,"name":"test-kafka-consumer","time1":1619719214000}} --timestap 这里转换为时间戳值

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"}],"optional":false,"name":"t1"},"payload":{"id":20,"name":"","time1":1619721861000}}

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"}],"optional":false,"name":"t1"},"payload":{"id":30,"name":"xxxxxxxxxxxx","time1":1619721869000}}

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int64","optional":false,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"time1"}],"optional":false,"name":"t1"},"payload":{"id":40,"name":"AAAAAAAAAAA","time1":1619721920000}}

四、使用flink来加载mysql的数据

flink这里我们只需要解压,然后调用flink-sql-connector-mysql-cdc-1.0.0.jar驱动捕获mysql的binlog的变化,来动态刷新数据变化。

如下:

[root@localhost flink]# tar -xf flink-1.12.2-bin-scala_2.11.tgz

[root@localhost ~]# mv flink-1.12.2-bin /usr/local/flink

[root@localhost ~]#

[root@localhost ~]# start-cluster.sh ----直接启动flink服务

Starting cluster.

Starting standalonesession daemon on host localhost.

Starting taskexecutor daemon on host localhost.

[root@localhost ~]#

[root@localhost ~]# sql-client.sh embedded ---调用flink的sql客户端

No default environment specified.

Searching for '/usr/local/flink/conf/sql-client-defaults.yaml'...found.

Reading default environment from: file:/usr/local/flink/conf/sql-client-defaults.yaml

No session environment specified.

Command history file path: /root/.flink-sql-history

Flink SQL> CREATE TABLE flink_tab( id int primary key, name string,time1 string) --mysql库中的t1的列名必须匹配

> WITH (

> 'connector' = 'mysql-cdc', -- 连接器

> 'hostname' = '192.168.100.10', --mysql地址

> 'port' = '3306', -- mysql端口

> 'username' = 'root', --mysql用户名

> 'password' = '123123', -- mysql密码

> 'database-name' = 'db1', -- 数据库名称

> 'table-name' = 't1' --数数据库表名

> );

[INFO] Table has been created.

Flink SQL>

Flink SQL> select *from flink_tab; -------查看

flink客户端加载映射的mysql中t1表
flink客户端加载映射的mysql中t1表

其中遇到的错误将在后续更新,用到的软件包这里打包分享出来,大家感兴趣可以试着摸索一下,原理可以根据官网和网上的分享理解。

flink官网,关于sql-clent的配置:https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、首先MySQL安装和初始化
  • 二、kafka快速配置
  • 三、kafka的消费和测试
  • 三+、开始测试数据加载和消费
  • 四、使用flink来加载mysql的数据
相关产品与服务
TDSQL-C MySQL 版
TDSQL-C MySQL 版(TDSQL-C for MySQL)是腾讯云自研的新一代云原生关系型数据库。融合了传统数据库、云计算与新硬件技术的优势,100%兼容 MySQL,为用户提供极致弹性、高性能、高可用、高可靠、安全的数据库服务。实现超百万 QPS 的高吞吐、PB 级海量分布式智能存储、Serverless 秒级伸缩,助力企业加速完成数字化转型。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档