前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >14-Flink-Table-&-SQL实战

14-Flink-Table-&-SQL实战

原创
作者头像
王知无-import_bigdata
修改2019-03-15 17:34:10
1.2K0
修改2019-03-15 17:34:10
举报

戳更多文章:

1-Flink入门

2-本地环境搭建&构建第一个Flink应用

3-DataSet API

4-DataSteam API

5-集群部署

6-分布式缓存

7-重启策略

8-Flink中的窗口

9-Flink中的Time

简介

Apache Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。您可以轻松地在基于API构建的所有API和库之间切换。例如,您可以使用CEP库从DataStream中提取模式,然后使用Table API分析模式,或者可以在预处理上运行Gelly图算法之前使用SQL查询扫描,过滤和聚合批处理表数据。

Flink SQL的编程模型

创建一个TableEnvironment

TableEnvironment是Table API和SQL集成的核心概念,它主要负责:

  1、在内部目录中注册一个Table

  2、注册一个外部目录

  3、执行SQL查询

  4、注册一个用户自定义函数(标量、表及聚合)

  5、将DataStream或者DataSet转换成Table

  6、持有ExecutionEnvironment或者StreamExecutionEnvironment的引用

一个Table总是会绑定到一个指定的TableEnvironment中,相同的查询不同的TableEnvironment是无法通过join、union合并在一起。

TableEnvironment有一个在内部通过表名组织起来的表目录,Table API或者SQL查询可以访问注册在目录中的表,并通过名称来引用它们。

在目录中注册表

TableEnvironment允许通过各种源来注册一个表:

  1、一个已存在的Table对象,通常是Table API或者SQL查询的结果

代码语言:txt
复制
     Table projTable = tableEnv.scan("X").select(...);

  2、TableSource,可以访问外部数据如文件、数据库或者消息系统

代码语言:txt
复制
     TableSource csvSource = new CsvTableSource("/path/to/file", ...);

  3、DataStream或者DataSet程序中的DataStream或者DataSet

代码语言:txt
复制
     //将DataSet转换为Table
代码语言:txt
复制
     Table table= tableEnv.fromDataSet(tableset);

注册TableSink

注册TableSink可用于将 Table API或SQL查询的结果发送到外部存储系统,例如数据库,键值存储,消息队列或文件系统(在不同的编码中,例如,CSV,Apache Parquet ,Avro,ORC],......):

代码语言:txt
复制
TableSink csvSink = new CsvTableSink("/path/to/file", ...); 
  
代码语言:txt
复制
  2、 String[] fieldNames = {"a", "b", "c"}; 
                TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG}; 
                tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);

实战案例一

基于Flink SQL的WordCount:

代码语言:txt
复制
public class WordCountSQL {

    public static void main(String[] args) throws Exception{

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

        List list  =  new ArrayList();
        String wordsStr = "Hello Flink Hello TOM";
        String[] words = wordsStr.split("\\W+");
        for(String word : words){
            WC wc = new WC(word, 1);
            list.add(wc);
        }
        DataSet<WC> input = env.fromCollection(list);
        tEnv.registerDataSet("WordCount", input, "word, frequency");
        Table table = tEnv.sqlQuery(
                "SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
        DataSet<WC> result = tEnv.toDataSet(table, WC.class);
        result.print();
    }//main

    public static class WC {
        public String word;//hello
        public long frequency;//1

        // public constructor to make it a Flink POJO
        public WC() {}

        public WC(String word, long frequency) {
            this.word = word;
            this.frequency = frequency;
        }

        @Override
        public String toString() {
            return "WC " + word + " " + frequency;
        }
    }

}

输出如下:

代码语言:txt
复制
WC TOM 1
WC Hello 2
WC Flink 1

实战案例二

本例稍微复杂,首先读取一个文件中的内容进行统计,并写入到另外一个文件中:

代码语言:txt
复制
public class SQLTest {

	public static void main(String[] args) throws Exception{

		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
		env.setParallelism(1);

		DataSource<String> input = env.readTextFile("test.txt");
		input.print();
		//转换成dataset
		DataSet<Orders> topInput = input.map(new MapFunction<String, Orders>() {
			@Override
			public Orders map(String s) throws Exception {
				String[] splits = s.split(" ");
				return new Orders(Integer.valueOf(splits[0]), String.valueOf(splits[1]),String.valueOf(splits[2]), Double.valueOf(splits[3]));
			}
		});
		//将DataSet转换为Table
		Table order = tableEnv.fromDataSet(topInput);
		//orders表名
		tableEnv.registerTable("Orders",order);

		Table tapiResult = tableEnv.scan("Orders").select("name");
		tapiResult.printSchema();

		Table sqlQuery = tableEnv.sqlQuery("select name, sum(price) as total from Orders group by name order by total desc");

		//转换回dataset
		DataSet<Result> result = tableEnv.toDataSet(sqlQuery, Result.class);

		//将dataset map成tuple输出
		/*result.map(new MapFunction<Result, Tuple2<String,Double>>() {
			@Override
			public Tuple2<String, Double> map(Result result) throws Exception {
				String name = result.name;
				Double total = result.total;
				return Tuple2.of(name,total);
			}
		}).print();*/


		TableSink sink = new CsvTableSink("SQLTEST.txt", "|");
		//writeToSink

		/*sqlQuery.writeToSink(sink);
		env.execute();*/

		String[] fieldNames = {"name", "total"};
		TypeInformation[] fieldTypes = {Types.STRING, Types.DOUBLE};
		tableEnv.registerTableSink("SQLTEST", fieldNames, fieldTypes, sink);
		sqlQuery.insertInto("SQLTEST");
		env.execute();
	}

	/**
	 * 源数据的映射类
	 */
	public static class Orders {
		/**
		 * 序号,姓名,书名,价格
		 */
		public Integer id;
		public String name;
		public String book;
		public Double price;

		public Orders() {
			super();
		}
		public Orders(Integer id, String name, String book, Double price) {
			this.id = id;
			this.name = name;
			this.book = book;
			this.price = price;
		}
	}
	/**
	 * 统计结果对应的类
	 */
	public static class Result {
		public String name;
		public Double total;

		public Result() {}
	}
	}//

以上所有代码,大家在公众号回复Flink即可下载,可以直接本地运行,方便大家调试

所有代码,我放在了我的公众号,回复Flink可以下载

  • 海量【java和大数据的面试题+视频资料】整理在公众号,关注后可以下载~
  • 更多大数据技术欢迎和作者一起探讨~

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 戳更多文章:
  • 简介
  • Flink SQL的编程模型
    • 创建一个TableEnvironment
      • 在目录中注册表
        • 注册TableSink
        • 实战案例一
        • 实战案例二
        相关产品与服务
        数据库
        云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档