前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >2021年大数据Flink(三十二):​​​​​​​Table与SQL案例准备 API

2021年大数据Flink(三十二):​​​​​​​Table与SQL案例准备 API

作者头像
Lansonli
发布2021-10-11 14:55:38
7900
发布2021-10-11 14:55:38
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客

API

获取环境

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#create-a-tableenvironment

代码语言:javascript
复制
// **********************

// FLINK STREAMING QUERY

// **********************

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;



EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();

StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);

// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);



// ******************

// FLINK BATCH QUERY

// ******************

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;



ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();

BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);



// **********************

// BLINK STREAMING QUERY

// **********************

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;



StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);



// ******************

// BLINK BATCH QUERY

// ******************

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.TableEnvironment;



EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();

TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

​​​​​​​创建表

代码语言:javascript
复制
// get a TableEnvironment

TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section



// table is the result of a simple projection query

Table projTable = tableEnv.from("X").select(...);



// register the Table projTable as table "projectedTable"

tableEnv.createTemporaryView("projectedTable", projTable);
代码语言:javascript
复制
tableEnvironment

  .connect(...)

  .withFormat(...)

  .withSchema(...)

  .inAppendMode()

  .createTemporaryTable("MyTable")

​​​​​​​查询表

Table API

代码语言:javascript
复制
// get a TableEnvironment

TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// register Orders table

// scan registered Orders table

Table orders = tableEnv.from("Orders");// compute revenue for all customers from France

Table revenue = orders

  .filter($("cCountry")

.isEqual("FRANCE"))

  .groupBy($("cID"), $("cName")

  .select($("cID"), $("cName"), $("revenue")

.sum()

.as("revSum"));

// emit or convert Table

// execute query

SQL

代码语言:javascript
复制
// get a TableEnvironment

TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section



// register Orders table

// compute revenue for all customers from France

Table revenue = tableEnv.sqlQuery(

    "SELECT cID, cName, SUM(revenue) AS revSum " +

    "FROM Orders " +

    "WHERE cCountry = 'FRANCE' " +

    "GROUP BY cID, cName"

  );

// emit or convert Table

// execute query
代码语言:javascript
复制
// get a TableEnvironment

TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section



// register "Orders" table

// register "RevenueFrance" output table

// compute revenue for all customers from France and emit to "RevenueFrance"

tableEnv.executeSql(

    "INSERT INTO RevenueFrance " +

    "SELECT cID, cName, SUM(revenue) AS revSum " +

    "FROM Orders " +

    "WHERE cCountry = 'FRANCE' " +

    "GROUP BY cID, cName"

  );

​​​​​​​写出表

代码语言:javascript
复制
// get a TableEnvironment

TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// create an output Table

final Schema schema = new Schema()

    .field("a", DataTypes.INT())

    .field("b", DataTypes.STRING())

    .field("c", DataTypes.BIGINT());

tableEnv.connect(new FileSystem().path("/path/to/file"))

    .withFormat(new Csv().fieldDelimiter('|').deriveSchema())

    .withSchema(schema)

    .createTemporaryTable("CsvSinkTable");

// compute a result Table using Table API operators and/or SQL queries

Table result = ...

// emit the result Table to the registered TableSink

result.executeInsert("CsvSinkTable");

​​​​​​​与DataSet/DataStream集成

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api

  • Create a View from a DataStream or DataSet
代码语言:javascript
复制
// get StreamTableEnvironment

// registration of a DataSet in a BatchTableEnvironment is equivalent

StreamTableEnvironment tableEnv = ...; 



// see "Create a TableEnvironment" section

DataStream<Tuple2<Long, String>> stream = ...



// register the DataStream as View "myTable" with fields "f0", "f1"

tableEnv.createTemporaryView("myTable", stream);



// register the DataStream as View "myTable2" with fields "myLong", "myString"

tableEnv.createTemporaryView("myTable2", stream, $("myLong"), $("myString"));

Convert a DataStream or DataSet into a Table

Convert a Table into a DataStream or DataSet

Convert a Table into a DataStream

Append Mode: This mode can only be used if the dynamic Table is only modified by INSERT changes, i.e, it is append-only and previously emitted results are never updated.

追加模式:只有当动态表仅通过插入更改进行修改时,才能使用此模式,即,它是仅追加模式,并且以前发出的结果从不更新。

Retract Mode: This mode can always be used. It encodes INSERT and DELETE changes with a boolean flag.

撤回模式:此模式始终可用。它使用布尔标志对插入和删除更改进行编码。

代码语言:javascript
复制
// get StreamTableEnvironment.

StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section



// Table with two fields (String name, Integer age)

Table table = ...



// convert the Table into an append DataStream of Row by specifying the class

DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);



// convert the Table into an append DataStream of Tuple2<String, Integer>

 //   via a TypeInformation

TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(

  Types.STRING(),

  Types.INT());

DataStream<Tuple2<String, Integer>> dsTuple = 

  tableEnv.toAppendStream(table, tupleType);

// convert the Table into a retract DataStream of Row.

//   A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.

//   The boolean field indicates the type of the change.

//   True is INSERT, false is DELETE.

DataStream<Tuple2<Boolean, Row>> retractStream = 

  tableEnv.toRetractStream(table, Row.class);

Convert a Table into a DataSet

代码语言:javascript
复制
// get BatchTableEnvironment

BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);



// Table with two fields (String name, Integer age)

Table table = ...



// convert the Table into a DataSet of Row by specifying a class

DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);



// convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformationTupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(

  Types.STRING(),

  Types.INT());

DataSet<Tuple2<String, Integer>> dsTuple = 

  tableEnv.toDataSet(table, tupleType);

​​​​​​​TableAPI

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html

​​​​​​​SQLAPI

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-05-01 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • API
    • 获取环境
      • ​​​​​​​创建表
        • ​​​​​​​查询表
          • Table API
          • SQL
        • ​​​​​​​写出表
          • ​​​​​​​与DataSet/DataStream集成
            • ​​​​​​​TableAPI
              • ​​​​​​​SQLAPI
              相关产品与服务
              大数据
              全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档