前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >零基础学Flink:Flink SQL(上)

零基础学Flink:Flink SQL(上)

作者头像
麒思妙想
发布2020-07-10 09:59:31
9770
发布2020-07-10 09:59:31
举报
文章被收录于专栏:麒思妙想麒思妙想

前面几篇内容,我们结合案例来介绍了,两流Join,热销榜,以及状态容错,今天我们依旧基于这个数据,来说说Flink SQL,如果对原理有兴趣的同学,也可以移步到《Stream SQL 的执行原理与 Flink 的实现 》,去了解相关内容。

今天我们分几步来介绍,首先什么是动态表,如何注册,数据流如何转换。本文配图主要来自官方文档。

SQL和关系代数设计的时候,并没有考虑流计算,所以流计算和关系数据的计算,有很多概念上的差异。

首先这是一张Flink官方的表

关系代数 / SQL

流计算

关系数据可以表示成一个元组的集合。

一条流是由一条无界的元组数据流组成

一条查询时,包含完整的输入数据。

计算流数据的时候,无法得到所有数据,必须要等待有合适的数据流入。

批查询在终止时,结果是有固定大小的。

流式查询会根据接收到的记录不断更新其结果,而且永远不会完。

尽管有这些不同,但是并非使用关系计算流数据变得不可能,下面我们就来详细说说。

动态表

动态表可以说是Flink Table API 和 SQL的核心,动态表可以像普通关系型数据表一样被查询,只是他吐出的数据,是一个持续的数据流。

  1. 一个流首先被定义转化成动态表
  2. 对动态表进行持续查询,然后这个查询的结果还要被定义成动态表
  3. 最后动态表还需要重新转化成流

如何定义一个动态表?

要想像传统关系查询一样处理流数据,就需要将流转换成一个表,那么如何转换这个表呢?

这个观念大家应该很好理解,我们截取一个流的片段,然后流输入的每一条,其实就相当于关系型数据的一条记录。在关系型数据库里,我们需要首先定义数据表,而流数据在处理的时候,才可以定义元数据。

下面是我们在flink里如何将流定义成动态表,

代码语言:javascript
复制
tableEnv.registerDataStream("orders", oraderStream, "rowtime.rowtime, c1,c2, dm ,v ");

orders 是我们定义的表名, orderStream 则是一条数据流, 后面的字符串参数是流里面数据的列名,这里要注意的是 rowtime.rowtime 它是定义时间窗口的时间属性,在我们的数据列里,可以将一列指定为rowtime,也可以添加一列来辅助计算。

持续查询

在我们进行普通的数据映射和过滤的时候,流和关系表的计算几乎是没什么区别,只是有界与无界的区分。

当进行聚合的时候,数据持续输入,都会对聚合结果有影响,例如下图,对用户点击进行统计的时候,随着时间增长,用户点击的发生,其点击数据是会持续增加的,这就造成了持续查询的数据在不停的更新。

下图是有时间窗口的聚合,在时间窗口内,聚合可以当成一个小的关系型聚合计算来理解。

动态表输出流

动态表输出力流有三种模式Append-only stream,Retract stream,Upsert stream

Append-only stream 只有在动态Table仅通过INSERT更改修改时才能使用此模式,即它仅附加,并且以前发出的结果永远不会更新。

Retract stream 此模式。返回值是boolean类型。它用true或false来标记数据的插入和撤回,返回true代表数据插入,false代表数据的撤回

Upsert stream 和 Retract stream最大的区别在于,更新数据的时候只使用一条编码消息,所以效率更高。

代码案例

我们还是以几篇文章使用的订单流进行。我设计了两个查询,

  • 一个是按时间窗口和商品类型对销售额进行聚合
  • 另外一个是只按商品类型对销售额进行聚合。

前面部分消费kafka的部分没有什么变化,只是在获取初始数据流的时候,将首字段设置成了Timestamp类型。并在获取流的时候,加入watermarker。

然后就是前文提到的

代码语言:javascript
复制
tableEnv.registerDataStream("orders", oraderStream, "rowtime.rowtime, c1,c2, dm ,v ");

将流注册成动态表,并设置元数据,注意

代码语言:javascript
复制
rowtime.rowtime

这个写法,前文有详细说明。

在有时间聚合的动态表转换的时候,我使用了

代码语言:javascript
复制
toAppendStream

没有时间聚合的情况,使用了

代码语言:javascript
复制
toRetractStream

下面是完整代码:

代码语言:javascript
复制
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.io.IOException;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
public class App {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Map properties= new HashMap();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "test");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
//        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("topicOrder", "order");
        ParameterTool parameterTool = ParameterTool.fromMap(properties);
        FlinkKafkaConsumer010 consumer010Order = new FlinkKafkaConsumer010(
                parameterTool.getRequired("topicOrder"), new DeserializationSchema() {
            @Override
            public TypeInformation getProducedType() {
                return TypeInformation.of(new TypeHint<Tuple5<Timestamp,String,Integer,String,Integer>>(){});
            }

            @Override
            public Tuple5<Timestamp,String,Integer,String,Integer> deserialize(byte[] message) throws IOException {
                //%d,%s,%d,%s,%d
                String[] res = new String(message).split(",");
                Timestamp timestamp = new Timestamp(Long.valueOf(res[0]));
                String catlog = res[1];
                Integer subcat = Integer.valueOf(res[2]);
                String dm = res[3];
                Integer value = Integer.valueOf(res[4]);
                Time eventTime = new Time(System.currentTimeMillis());
                return Tuple5.of(timestamp,catlog,subcat,dm,value);
            }

            @Override
            public boolean isEndOfStream(Object nextElement) {
                return false;
            }
        }, parameterTool.getProperties());
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        DataStream<Tuple5<Timestamp,String,Integer,String,Integer>> oraderStream = env.addSource(consumer010Order).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple5<Timestamp,String,Integer,String,Integer>>() {
            @Override
            public long extractAscendingTimestamp(Tuple5<Timestamp,String,Integer,String,Integer> value) {
                return value.f0.getTime();
            }
        });;
        tableEnv.registerDataStream("orders", oraderStream, "rowtime.rowtime, c1,c2, dm ,v ");
        String sql = "select sum(orders.v),CONCAT(orders.c1,CAST(orders.c2 AS VARCHAR)),TUMBLE_END(rowtime, INTERVAL '10' SECOND)  from orders group by CONCAT(orders.c1,CAST(orders.c2 AS VARCHAR)),TUMBLE(rowtime, INTERVAL '10' SECOND)";
        //sql = "select sum(orders.v),CONCAT(orders.c1,CAST(orders.c2 AS VARCHAR))  from orders group by CONCAT(orders.c1,CAST(orders.c2 AS VARCHAR))";
        Table result1 = tableEnv.sqlQuery(sql);
        result1.printSchema();
        tableEnv.toAppendStream(result1, Row.class).print();
//        tableEnv.toRetractStream(result1, Row.class).print();
        env.execute("sql ");
    }
}

结果

select sum(orders.v), CONCAT(orders.c1,CAST(orders.c2 AS VARCHAR)), TUMBLE_END(rowtime, INTERVAL '10' SECOND) from orders group by CONCAT(orders.c1,CAST(orders.c2 AS VARCHAR)), TUMBLE(rowtime, INTERVAL '10' SECOND)

对应的执行结果

select sum(orders.v), CONCAT(orders.c1,CAST(orders.c2 AS VARCHAR)) from orders group by CONCAT(orders.c1,CAST(orders.c2 AS VARCHAR))

对应的执行结果

写在结尾:

Joins are a common and well-understood operation in batch data processing to connect the rows of two relations. However, the semantics of joins on dynamic tables are much less obvious or even confusing. https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/joins.html

这段话也算是官方吐槽吧,Join两条流的时候,确实用SQL表达会有很大的障碍,目前我是打算啃啃这块骨头的,如果一两周内没更新下篇,就是翻车了,啃的时候门牙崩了,请读者们多做自我检讨吧......

参考文献

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sourceSinks.html#defining-a-streamtablesource

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/joins.html

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/dynamic_tables.html

https://gist.github.com/mustafaakin/457859b8bf703c64029071c1139b593d

https://blog.csdn.net/aa518189/article/details/87816139

学学漫威,文尾贴个彩蛋吧,下面是来自某群里的吐槽,个人观点倒是积极的,俗话说,褒贬是买主,喝彩是闲人。 希望Flink越来越好吧....

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 麒思妙想 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档