2019 年 8 月 22 日,Flink 发布了 1.9 版本,社区版本的 Flink 新增 了一个 SQL DDL 的新特性,但是暂时还不支持流式的一些概念的定义,比如说水位。
可以体验一下,如果使用 ddl 的方式直接定义一个表从 kafka 中读取数据,并定义成一个表
CREATE TABLE user_visit (
user_name VARCHAR,
ts timestamp
) WITH (
'connector.type' = 'kafka',
'connector.version' = '0.10',
'connector.topic' = 'flink-test-05',
'connector.startup-mode' = 'latest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = '192.168.17.24,192.168.17.25,192.168.17.26',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = '192.168.17.26:9092,192.168.17.27:9092,192.168.17.28:9092',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
)
如上面的 sql,基本语法是 create table () with ()
with 后面是一些基本的属性,比如 connector.type 描述了 从 kafka 中读取数据
connector.version 描述了 使用的是哪个版本的 kafka
connector.topic 描述了 从 哪个 topic 中读取数据
connector.startup-mode 描述了 从 哪个位置开始读取数据 等等。
使用 tableEnvironment.sqlUpdate(sql) 就可以把这个表注册到 环境之中,后面就可以直接使用了
另外,还可以定义一些 sink 表,比如 sink 到 mysql 中
mysql 的表结构为:
flink sql 为:
CREATE TABLE pvuv_sink (
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://192.168.17.24:3306/flink_test',
'connector.table' = 'pvuv_sink',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.write.flush.max-rows' = '1'
)
使用 jdbc sink 前,需要在 maven 中依赖 flink-jdbc 的子项目
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.9.0</version>
</dependency>
还可以定义一个计算的 sql,计算每个小时的 pv 和 uv,结果写入 到 mysql 的 pvuv_sink 表中
INSERT INTO pvuv_sink
SELECT
time_convert(ts) dt,
COUNT(*) AS pv,
COUNT(DISTINCT user_name) AS uv
FROM user_visit
GROUP BY time_convert(ts)
这里,我们还注册了一个 udf,用来把 Timestamp 格式转换为 String 的格式
class TimeUDF extends ScalarFunction {
def eval(ts: Timestamp): String = {
val df = new SimpleDateFormat("yyyy-MM-dd HH");
df.format(ts)
}
}
tEnv.registerFunction("time_convert",new TimeUDF
完整的代码:
package com.dsj361.flinksql
import java.sql.Timestamp
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.functions.ScalarFunction
/**
*
*
* @author wangkai
* @date 2019/12/12 15:52
*/
object FlinkDDLTest extends App {
class TimeUDF extends ScalarFunction {
def eval(ts: Timestamp): String = {
val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
df.format(ts)
}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
// 注册一个 udf
tEnv.registerFunction("time_convert",new TimeUDF)
val create_sql =
"""
| CREATE TABLE user_visit (
| user_name VARCHAR,
| ts timestamp
|) WITH (
| 'connector.type' = 'kafka',
| 'connector.version' = '0.10',
| 'connector.topic' = 'flink-test-05',
| 'connector.startup-mode' = 'latest-offset',
| 'connector.properties.0.key' = 'zookeeper.connect',
| 'connector.properties.0.value' = '192.168.17.24,192.168.17.25,192.168.17.26',
| 'connector.properties.1.key' = 'bootstrap.servers',
| 'connector.properties.1.value' = '192.168.17.26:9092,192.168.17.27:9092,192.168.17.28:9092',
| 'update-mode' = 'append',
| 'format.type' = 'json',
| 'format.derive-schema' = 'true'
|)
|""".stripMargin
println(create_sql)
val sink_sql =
"""
|CREATE TABLE pvuv_sink (
| dt VARCHAR,
| pv BIGINT,
| uv BIGINT
|) WITH (
| 'connector.type' = 'jdbc',
| 'connector.url' = 'jdbc:mysql://192.168.17.24:3306/flink_test',
| 'connector.table' = 'pvuv_sink',
| 'connector.username' = 'root',
| 'connector.password' = '123456',
| 'connector.write.flush.max-rows' = '1'
|)
|""".stripMargin
tEnv.sqlUpdate(create_sql)
tEnv.sqlUpdate(sink_sql)
tEnv.sqlUpdate(
"""
|INSERT INTO pvuv_sink
|SELECT
| time_convert(ts) dt,
| COUNT(*) AS pv,
| COUNT(DISTINCT user_name) AS uv
|FROM user_visit
|GROUP BY time_convert(ts)
|
|""".stripMargin)
env.execute("Job_name")
}
由于当前版本(Flink 1.9)的 dll 还不支持 时间以及水位相关语义的定义,所以在使用 窗口的时候,需要使用 api 来定义表
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = StreamTableEnvironment.create(env)
tEnv.registerFunction("Utc2Local",new Utc2Local)
// 不知如何去在 ddl 中定义一个窗口,只能用 api 来 定义了
val kafkaConf = KafkaConfiguration.getConfig(ModeEnum.DEV)
val prop:Properties = new Properties
prop.put("bootstrap.servers", "192.168.17.26:9092,192.168.17.27:9092,192.168.17.28:9092")
prop.put("group.id","Flink_kafka_test_5")
prop.put("enable.auto.commit","true")
prop.put("auto.commit.interval.ms","100")
val consumer = new FlinkKafkaConsumer010[ObjectNode]("flink-test-05", new JsonNodeDeserializationSchema, prop)
val ds = env.addSource(consumer)
// 数据结构为:user_name:String, ts:TIMESTAMP
.map(f => {
(f.get("user_name").asText(), DateUtils.parseDateNewFormat(f.get("ts").asText()).getTime)
})
.assignAscendingTimestamps(f => f._2)
tEnv.registerDataStream("my_table", ds, 'user_name, 'ts.rowtime)
如上面的代码,定义了 从 kafka 的 topic flink-test-05 中读取数据,并使用 JsonNodeDeserializationSchema 反序列化成 ObjectNode
使用 ts 字段作为 rowtime 字段,每10s钟一个窗口
使用窗口函数,计算
select Utc2Local(tumble_end(ts,INTERVAL '10' second)) as time_end,
count(*) as cnt
from my_table
group by tumble(ts,INTERVAL '10' second)