前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >FlinkSQL实时计算Demo

FlinkSQL实时计算Demo

作者头像
用户8483969
发布2021-04-09 11:35:29
2.9K0
发布2021-04-09 11:35:29
举报
文章被收录于专栏:bgmonkeybgmonkey

Flink实时消费业务数据Demo

Debezium监控MySQL用FlinkSQL实时消费

1、环境准备

代码语言:javascript
复制
## 各组件版本
MySQL:5.7.21-log ## 开启binlog
kafka_2.11-2.4.1 ## Kafka
Flink:1.12.0 ## Flink_1.12.0官方推荐使用Kafka_2.4.1
Zookeeper:3.4.6
## 所需组件下载地址
## kafka_2.11-2.4.1.tgz
链接:https://pan.baidu.com/s/1-YUvHj8B10VG_LA7O_akPA 提取码:pv7f 

## flink-1.12.0-bin-scala_2.11.tgz
链接:https://pan.baidu.com/s/1GDmKNbaEmq9fpCx93a41pg 提取码:hz5b 

## debezium-connector-mysql-1.3.1.Final-plugin.tar.gz
链接:https://pan.baidu.com/s/1AtR9buds1AvfRnJ4JU-v6g 提取码:lkm2 

## 所需jar包
链接:https://pan.baidu.com/s/1HFLuMcEdQN48DJplCx_e8A 提取码:5ipk 

2、环境部署

前提:开启MySQL并启用binlog 启动zookeeper、kafka、flink

2.1、在kafka环境下安装debezium连接器
代码语言:javascript
复制
在kafka目录下新建plugins目录
将debezium-connector-mysql-1.3.1.Final-plugin.tar.gz解压到plugins下
2.2、编辑kafka-connect配置信息

connect-distribute.properties

代码语言:javascript
复制
## 修改如下内容
bootstrap.servers=master:9092,slave1:9092,slave2:9092
## 重点配置 plugin.path,注意:路径为连接器解压路径的父级目录
plugin.path=/user/kafka/plugins
2.3、开启kafka-connect服务
代码语言:javascript
复制
## 启动
bin/connect-distributed.sh config/connect-distributed.properties

## 后台启动
bin/connect-distributed.sh -daemon config/connect-distributed.properties

## 测试是否启动成功
curl -H "Accept:application/json" master:8083/

## 查看connectors下已有的连接器
curl -H "Accept:application/json" localhost:8083/connectors/
2.4、注册MySQL的监听器

详细信息在Debezium官网都能找到详细解释 地址: https://debezium.io/documentation/reference/1.3/connectors/mysql.html#configure-the-mysql-connector_debezium

代码语言:javascript
复制
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" master:8083/connectors/ -d '{ 
"name": "inventory-connector", 
"config": { 
"connector.class": "io.debezium.connector.mysql.MySqlConnector", 
"tasks.max": "1", 
"database.hostname": "master", 
"database.port": "3306", 
"database.user": "root", 
"database.password": "root", 
"database.server.id": "184054", 
"database.server.name": "dbserver1", 
"database.include.list": "test", 
"table.include.list":"test.customers", 
"database.history.kafka.bootstrap.servers": "master:9092", "database.history.kafka.topic": "dbhistory.master" }
}'


## 配置解读:
name:在Kafka Connect服务中注册时的连接器名称
connector.class:连接器的类名
database.hostname:MySQL服务器地址
database.server.id:该数据库客户端的数字ID,在MySQL集群中所有当前正在运行的数据库进程中,该ID必须唯一。该连接器作为另一个服务器(具有此唯一ID)加入MySQL数据库集群,因此它可以读取binlog。默认情况下,尽管我们建议设置一个显式值,但是会在5400和6400之间生成一个随机数。

database.server.name:MySQL服务器或群集的逻辑名称
database.include.list:数据库的列表
table.include.list:表名
database.history.kafka.bootstrap.servers:连接器将用于建立与Kafka群集的初始连接的主机/端口对的列表。该连接将用于检索先前由连接器存储的数据库架构历史,并用于写入从源数据库读取的每个DDL语句。这应该指向Kafka Connect进程使用的同一Kafka群集。
database.history.kafka.topic:连接器将在其中存储数据库架构历史记录的Kafka主题的全名
2.5、查看Kafka的Topic
代码语言:javascript
复制
真正存储binlog的topic:dbserver1.test.customers
2.6、配置FlinkSQL连接Kafka源表
代码语言:javascript
复制
-- 开启FlinkSQL
./bin/sql-client.sh embedded 

-- MySQL中建表语句
CREATE TABLE customers(
id int,
first_name varchar(255),
last_name varchar(255),
email varchar(255)
);

-- FlinkSQL客户端连接Kafka
CREATE TABLE customers(
id int,
first_name varchar(255),
last_name varchar(255),
email varchar(255)
) WITH (
 'connector' = 'kafka',
 'topic' = 'dbserver1.test.customers',
 'properties.bootstrap.servers' = 'master:9092',
 'properties.group.id' = 'testGroup',
 'debezium-json.schema-include'='true',
 'debezium-json.ignore-parse-errors'='true',
'scan.startup.mode' = 'earliest-offset',
 'format' = 'debezium-json'
);

-- FlinkSQL结果sink到mysql
CREATE TABLE datashow (
  first_name varchar(255),
  nums bigint,
  PRIMARY KEY (first_name) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://master:3306/test',
   'table-name' = 'datashow',
   'username' = 'root',
   'password' = 'root'
);

-- 统计每个姓名出现的次数
insert into datashow 
select first_name, count(1) cnt from customers group by first_name;

提交统计SQL未执行,原因是我提交了一条空记录,查看日志发现报错: You can set job configuration ‘table.exec.sink.not-null-enforcer’=‘drop’

代码语言:javascript
复制
## 设置参数将key为null的值过滤掉 
##在FlinkSQL客户端执行命令
set table.exec.sink.not-null-enforcer=drop 

## 再次提交统计SQL即可
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-04-07 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flink实时消费业务数据Demo
    • Debezium监控MySQL用FlinkSQL实时消费
      • 1、环境准备
      • 2、环境部署
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档