前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink SQL vs Spark SQL

Flink SQL vs Spark SQL

作者头像
麒思妙想
发布2020-07-10 13:16:11
3.6K0
发布2020-07-10 13:16:11
举报
文章被收录于专栏:麒思妙想麒思妙想

Spark已经在大数据分析领域确立了事实得霸主地位,而Flink则得到了阿里系的亲赖前途一片光明。我们今天会SparkSQL和FlinkSQL的执行流程进行一个梳理。并提供2个简单的例子,以供参考。

对比分析

Spark SQL 的核心是Catalyst优化器,首先将SQL处理成未优化过的逻辑计划(Unresolved Logical Plan),其只包括数据结构,不包含任何数据信息。然后通过解析,形成解析后的逻辑计划([Analyzed] Logical Plan),这里节点上就会绑定各种信息。通过优化规则,形成优化后的逻辑计划(Optimized Logical Plan),这里会对一些低效的逻辑计划进行转换。逻辑计划之后,会进行物理执行就计划,物理计划阶段会将逻辑计划生成的子树进行进一步转化生成物理算子树,物理算子树上的节点会直接生成RDD或对RDD进行转化(transformation/execute)操作。

一段SQL为例,Select* from topScore where club = ‘AC米兰’ 生成的逻辑计划树中有Relation、Filter、Project三个子节点对应数据表、过滤逻辑(club=‘AC米兰’)和列剪裁逻辑(只涉及2列[name,club])。接下来物理计划和逻辑计划一一映射,Relation逻辑节点转化成FileSourceScanExec执行节点,Filter逻辑节点转换成FilterExec执行节点,Project逻辑节点转化成ProjectExec执行节点。生成的物理执行计划与直接执行RDD程序相似。

Flink SQL 是Fllink提供的SQL的SDK API。SQL是比Table更高阶的API,集成在Table library中提供,在流和批上都可以用此API开发业务。

其完全依靠calcite(sql parser)去做语法解析,validate后生成calcite logical plan. 而Table API先自己生成table API的logical plan,再通过calcite relbuilder translation成calcite logical plan。

使用calcite cost-based optimizor 进行优化。也就是说和spark不同, flink 的SQL Parsing, Analysing, Optimizing都是托管给calcite(flink会加入一些optimze rules). Calcite 会基于优化规则来优化这些 Logical Plan,根据运行环境的不同会应用不同的优化规则(Flink提供了批的优化规则,和流的优化规则)。Calcite提供的内置优化规则(如条件下推,剪枝等),再基于flink定制的一些优化rules(根据是streaming还是batch选择rulue)去优化logical Plan。生成phsyical plan,基于flink里头的rules生成了DataStream Plan(Physical Plan)。

逻辑和spark类似,只不过calcite做了catalyst的事(sql parsing,analysis和optimizing)

代码案例

首先构建数据源,这里我用了'18-'19赛季意甲联赛的射手榜数据

代码语言:javascript
复制
rank,player,club,matches,red_card,total_score,total_score_home,total_score_visit,pass,shot
代码语言:javascript
复制
1,C-罗纳尔多,尤文图斯,26,0,19,5,7,111,61
2,夸利亚雷拉,桑普多利亚,26,0,19,5,5,76,42
3,萨帕塔,亚特兰大,26,0,16,1,4,53,31
4,米利克,那不勒斯,26,0,14,0,1,61,34
5,皮亚特克,热那亚,19,0,13,2,0,56,31
6,因莫比莱,拉齐奥,24,0,12,3,3,65,35
7,卡普托,恩波利,26,0,12,2,4,47,28
8,帕沃莱蒂,卡利亚里,23,0,10,0,1,44,22
9,佩塔尼亚,斯帕尔,25,0,10,2,0,44,29
10,热尔维尼奥,帕尔马,21,0,9,0,0,21,15
11,伊卡尔迪,国际米兰,23,0,9,3,2,44,23

数据列代表,排名、球员、所属俱乐部、比赛、红牌、总进球数、主场进球数、客场进球数、传球数、射门数

Spark SQL

代码语言:javascript
复制
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
public class SparkSQLTest {
    public static final String PATH = "E:\\devlop\\workspace\\streaming1\\src\\main\\resources\\testdata.csv";
    public static void main(String[] args) throws Exception {
        SparkSession ss = SparkSession.builder().appName("local").master("local").getOrCreate();
        ss.read().option("header", "true").csv(PATH)
                .registerTempTable("topScore");
        Dataset ds = ss.sql("select * from topScore").toDF();
        ds.collectAsList().forEach(it->System.out.println(it));
    }
}

spark的程序非常简单,就可以实现对csv进行查询,

代码语言:javascript
复制
option("header", "true")

设置了第一行作为列头,并将csv文件注册为表“topScore”。接下来直接通过SQL进行查询就好了。

输出结果:

[1,C-罗纳尔多,尤文图斯,26,0,19,5,7,111,61] [2,夸利亚雷拉,桑普多利亚,26,0,19,5,5,76,42] [3,萨帕塔,亚特兰大,26,0,16,1,4,53,31] [4,米利克,那不勒斯,26,0,14,0,1,61,34] [5,皮亚特克,热那亚,19,0,13,2,0,56,31] [6,因莫比莱,拉齐奥,24,0,12,3,3,65,35] [7,卡普托,恩波利,26,0,12,2,4,47,28] [8,帕沃莱蒂,卡利亚里,23,0,10,0,1,44,22] [9,佩塔尼亚,斯帕尔,25,0,10,2,0,44,29] [10,热尔维尼奥,帕尔马,21,0,9,0,0,21,15] [11,伊卡尔迪,国际米兰,23,0,9,3,2,44,23]

Flink SQL

代码语言:javascript
复制
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple10;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
public class FlinkSQLTest {
    public static final String PATH = "E:\\devlop\\workspace\\streaming1\\src\\main\\resources\\testdata.csv";
    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env =  ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
        DataSet<Tuple10<Integer,String,String,Integer,Integer,Integer,Integer,Integer,Integer,Integer>> csvInput = env.readCsvFile(PATH)
                                    .ignoreFirstLine()
                                    .types(Integer.class,String.class,String.class,Integer.class,Integer.class,Integer.class,Integer.class,Integer.class,Integer.class,Integer.class);
        Table topScore = tableEnv.fromDataSet(csvInput);
        tableEnv.registerTable("topScore",topScore);
        Table t = tableEnv.sqlQuery("select * from topScore");
        TypeInformation<Tuple10<Integer,String,String,Integer,Integer,Integer,Integer,Integer,Integer,Integer>> info = TypeInformation.of(new TypeHint<Tuple10<Integer,String,String,Integer,Integer,Integer,Integer,Integer,Integer,Integer>>(){});
        DataSet<Tuple10<Integer,String,String,Integer,Integer,Integer,Integer,Integer,Integer,Integer>> dst10 = tableEnv.toDataSet(t,info);
        dst10.collect().forEach(it->System.out.println(it));
    }
}

出于常年做通用型BI产品的习惯,还是不太喜欢直接使用POJO,使用了元组,但是这样其实不是个好习惯,无形中增加了编程的复杂度。

代码语言:javascript
复制
 DataSet<Tuple10<Integer,String,String,Integer,Integer,Integer,Integer,Integer,Integer,Integer>> csvInput = env.readCsvFile(PATH)
                                    .ignoreFirstLine()
                                    .types(Integer.class,String.class,String.class,Integer.class,Integer.class,Intege
代码语言:javascript
复制
TypeInformation<Tuple10<Integer,String,String,Integer,Integer,Integer,Integer,Integer,Integer,Integer>> info = TypeInformation.of(new TypeHint<Tuple10<Integer,String,String,Integer,Integer,Integer,Integer,Integer,Integer,Integer>>(){});
        DataSet<Tuple10<Integer,String,String,Integer,Integer,Integer,Integer,Integer,Integer,Integer>> dst10 = table

输出结果:

(1,C-罗纳尔多,尤文图斯,26,0,19,5,7,111,61) (2,夸利亚雷拉,桑普多利亚,26,0,19,5,5,76,42) (3,萨帕塔,亚特兰大,26,0,16,1,4,53,31) (4,米利克,那不勒斯,26,0,14,0,1,61,34) (5,皮亚特克,热那亚,19,0,13,2,0,56,31) (6,因莫比莱,拉齐奥,24,0,12,3,3,65,35) (7,卡普托,恩波利,26,0,12,2,4,47,28) (8,帕沃莱蒂,卡利亚里,23,0,10,0,1,44,22) (9,佩塔尼亚,斯帕尔,25,0,10,2,0,44,29) (10,热尔维尼奥,帕尔马,21,0,9,0,0,21,15) (11,伊卡尔迪,国际米兰,23,0,9,3,2,44,23)

好了,两个案例还是非常简单的,都只是进行了数据的简单查询,和打印。下次我们提升一点难度。

参考文档:

  • https://databricks.com/session/bi-style-analytics-on-spark-without-shark-using-sparksql-schemardd
  • https://subscription.packtpub.com/book/big_data_and_business_intelligence/9781785889271/8/ch08lvl1sec58/the-spark-sql-architecture
  • https://blog.csdn.net/lmalds/article/details/60959055
  • https://cloud.tencent.com/developer/article/1055500
  • https://www.jianshu.com/p/3191b5b91d38
  • https://blog.csdn.net/rlnLo2pNEfx9c/article/details/82847514
  • https://blog.csdn.net/UUfFO/article/details/80456866
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-03-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 麒思妙想 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
腾讯云 BI
腾讯云 BI(Business Intelligence,BI)提供从数据源接入、数据建模到数据可视化分析全流程的BI能力,帮助经营者快速获取决策数据依据。系统采用敏捷自助式设计,使用者仅需通过简单拖拽即可完成原本复杂的报表开发过程,并支持报表的分享、推送等企业协作场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档