前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink SQL DDL 和 窗口函数实战

Flink SQL DDL 和 窗口函数实战

作者头像
kk大数据
发布2019-12-19 11:22:33
5.1K0
发布2019-12-19 11:22:33
举报
文章被收录于专栏:kk大数据kk大数据

一、Flink SQL DDL

2019 年 8 月 22 日,Flink 发布了 1.9 版本,社区版本的 Flink 新增 了一个 SQL DDL 的新特性,但是暂时还不支持流式的一些概念的定义,比如说水位。

二、定义 create table 语句 从 kafka 中读取数据

可以体验一下,如果使用 ddl 的方式直接定义一个表从 kafka 中读取数据,并定义成一个表

CREATE TABLE user_visit (
    user_name VARCHAR,
    ts timestamp
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = '0.10',
    'connector.topic' = 'flink-test-05',
    'connector.startup-mode' = 'latest-offset',
    'connector.properties.0.key' = 'zookeeper.connect',
    'connector.properties.0.value' = '192.168.17.24,192.168.17.25,192.168.17.26',
    'connector.properties.1.key' = 'bootstrap.servers',
    'connector.properties.1.value' = '192.168.17.26:9092,192.168.17.27:9092,192.168.17.28:9092',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
)

如上面的 sql,基本语法是 create table () with ()

with 后面是一些基本的属性,比如 connector.type 描述了 从 kafka 中读取数据

connector.version 描述了 使用的是哪个版本的 kafka

connector.topic 描述了 从 哪个 topic 中读取数据

connector.startup-mode 描述了 从 哪个位置开始读取数据 等等。

使用 tableEnvironment.sqlUpdate(sql) 就可以把这个表注册到 环境之中,后面就可以直接使用了

另外,还可以定义一些 sink 表,比如 sink 到 mysql 中

mysql 的表结构为:

flink sql 为:

CREATE TABLE pvuv_sink (
    dt VARCHAR,
    pv BIGINT,
    uv BIGINT
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://192.168.17.24:3306/flink_test',
    'connector.table' = 'pvuv_sink',
    'connector.username' = 'root',
    'connector.password' = '123456',
    'connector.write.flush.max-rows' = '1'
)

使用 jdbc sink 前,需要在 maven 中依赖 flink-jdbc 的子项目

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.12</artifactId>
            <version>1.9.0</version>
        </dependency>

还可以定义一个计算的 sql,计算每个小时的 pv 和 uv,结果写入 到 mysql 的 pvuv_sink 表中

INSERT INTO pvuv_sink
SELECT
 time_convert(ts) dt,
  COUNT(*) AS pv,
  COUNT(DISTINCT user_name) AS uv
FROM user_visit
GROUP BY time_convert(ts)

这里,我们还注册了一个 udf,用来把 Timestamp 格式转换为 String 的格式

class TimeUDF extends ScalarFunction {
    def eval(ts: Timestamp): String = {
      val df = new SimpleDateFormat("yyyy-MM-dd HH");
      df.format(ts)
    }
  }
tEnv.registerFunction("time_convert",new TimeUDF

完整的代码:

package com.dsj361.flinksql
import java.sql.Timestamp
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.functions.ScalarFunction
/**
 *
 *
 * @author wangkai
 * @date 2019/12/12 15:52
 */
object FlinkDDLTest extends App {
  class TimeUDF extends ScalarFunction {
    def eval(ts: Timestamp): String = {
      val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      df.format(ts)
    }
  }
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tEnv = StreamTableEnvironment.create(env)
  // 注册一个 udf
  tEnv.registerFunction("time_convert",new TimeUDF)
  val create_sql =
    """
      | CREATE TABLE user_visit (
      |    user_name VARCHAR,
      |    ts timestamp
      |) WITH (
      |    'connector.type' = 'kafka',
      |    'connector.version' = '0.10',
      |    'connector.topic' = 'flink-test-05',
      |    'connector.startup-mode' = 'latest-offset',
      |    'connector.properties.0.key' = 'zookeeper.connect',
      |    'connector.properties.0.value' = '192.168.17.24,192.168.17.25,192.168.17.26',
      |    'connector.properties.1.key' = 'bootstrap.servers',
      |    'connector.properties.1.value' = '192.168.17.26:9092,192.168.17.27:9092,192.168.17.28:9092',
      |    'update-mode' = 'append',
      |    'format.type' = 'json',
      |    'format.derive-schema' = 'true'
      |)
      |""".stripMargin
  println(create_sql)
  val sink_sql =
    """
      |CREATE TABLE pvuv_sink (
      |    dt VARCHAR,
      |    pv BIGINT,
      |    uv BIGINT
      |) WITH (
      |    'connector.type' = 'jdbc',
      |    'connector.url' = 'jdbc:mysql://192.168.17.24:3306/flink_test',
      |    'connector.table' = 'pvuv_sink',
      |    'connector.username' = 'root',
      |    'connector.password' = '123456',
      |    'connector.write.flush.max-rows' = '1'
      |)
      |""".stripMargin
  tEnv.sqlUpdate(create_sql)
  tEnv.sqlUpdate(sink_sql)
  tEnv.sqlUpdate(
    """
      |INSERT INTO pvuv_sink
      |SELECT
      | time_convert(ts) dt,
      |  COUNT(*) AS pv,
      |  COUNT(DISTINCT user_name) AS uv
      |FROM user_visit
      |GROUP BY time_convert(ts)
      |
      |""".stripMargin)
  env.execute("Job_name")
}

三、Flink SQL 窗口函数实战

由于当前版本(Flink 1.9)的 dll 还不支持 时间以及水位相关语义的定义,所以在使用 窗口的时候,需要使用 api 来定义表

val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  val tEnv = StreamTableEnvironment.create(env)
  tEnv.registerFunction("Utc2Local",new Utc2Local)
  // 不知如何去在 ddl 中定义一个窗口,只能用 api 来 定义了
  val kafkaConf = KafkaConfiguration.getConfig(ModeEnum.DEV)
  val prop:Properties = new Properties
  prop.put("bootstrap.servers", "192.168.17.26:9092,192.168.17.27:9092,192.168.17.28:9092")
  prop.put("group.id","Flink_kafka_test_5")
  prop.put("enable.auto.commit","true")
  prop.put("auto.commit.interval.ms","100")
  val consumer = new FlinkKafkaConsumer010[ObjectNode]("flink-test-05", new JsonNodeDeserializationSchema, prop)
  val ds = env.addSource(consumer)
    // 数据结构为:user_name:String, ts:TIMESTAMP
    .map(f => {
      (f.get("user_name").asText(), DateUtils.parseDateNewFormat(f.get("ts").asText()).getTime)
    })
    .assignAscendingTimestamps(f => f._2)
  tEnv.registerDataStream("my_table", ds, 'user_name, 'ts.rowtime)

如上面的代码,定义了 从 kafka 的 topic flink-test-05 中读取数据,并使用 JsonNodeDeserializationSchema 反序列化成 ObjectNode

使用 ts 字段作为 rowtime 字段,每10s钟一个窗口

使用窗口函数,计算

select Utc2Local(tumble_end(ts,INTERVAL '10' second)) as time_end,
       count(*) as cnt
  from my_table
  group by tumble(ts,INTERVAL '10' second)
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-12-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 KK架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Flink SQL DDL
  • 二、定义 create table 语句 从 kafka 中读取数据
  • 三、Flink SQL 窗口函数实战
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档