Presto
Presto是专为大数据实时查询计算而设计开发的产品,拥有如下特点:
– 多数据源:通过自定义Connector能支持Mysql,Hive,Kafka等多种数据源
– 支持SQL:完全支持ANSI SQL
– 扩展性:支持自定义开发Connector和UDF
– 混合计算:可以根据需要将开源于不同数据源的多个Catalog进行混合join计算
– 高性能:10倍于Hive的查询性能
– 流水线:基于Pipeline设计,在数据处理过程当中不用等到所有数据都处理完成后再查看结果
基本概念
服务进程
- Coordinator
- 主要作用:接受查询请求、解析查询语句、生成查询执行计划、任务调度、Worker管理
- 部署于服务器的一个单独节点上,是整个集群的Master节点
- 与Worker通信获得最新的状态信息,同时接受Client的查询请求
- 所有信息交互通过StatmentResource类提供的RESTful接口完成
- Worker
- 工作节点,完成数据的处理和Task的执行
- 定时向Coordinator上报心跳
Presto模型
- Connector
连接器,可以理解为Presto访问不同数据源的驱动程序;每个Connector都实现了Presto的标准SPI接口,因此只要自己实现SPI接口,就可以实现适合自己需求的Connector。Presto Connector Manager根据对应Connector的配置文件中
connector.name
属性来决定访问数据源时使用的Connector - Catalog
类似于Mysql中的数据库实例,配置Connector配置文件时的文件名就是对应数据源的Catalog名。
- Schema
类似于Mysql中的Database,一个Catalog+一个Schema就唯一确定了一系列可查询的表集合。
- Table
就是传统数据库中表的概念。一份表的全称组合是 Catalog.Schema.Table
查询执行模型
Presto在执行SQL语句时,将其解析成相应的查询,并分配给Worker执行这些Task
– Statement
即输入的SQL语句;Presto支持符合ANSI标准的SQL语句,由字句,表达式和断言组成
– Query
即查询执行,当Presto接受SQL并执行时,会解析SQL并转变成一个查询执行和相关的查询计划。查询由运行在多个Worker上且相互关联的Stage组成的
> Query和Statement的区别
Statement指的是用户输入的SQL文本;Query指的是分布到所有Worker之间执行的实际查询操作,是为了完成SQL所表述的查询而实例化的配置信息,组件,查询计划,优化信息等;由Stage、Task、Driver、Split、Operator、DataSource组成。
- Stage
查询执行阶段,一个Query由多个有层次关系的Stage组成。一个Stage代表查询执行计划的一部分。通常情况下,每个Query都有个Root Stage,用于聚集其他Stage的输出数据并反馈给用户。Stage并不会在集群中实际执行,只是Coordinator对于查询计划进行管理和建模的逻辑概念。Presto中有4种Stage:
- Coordinator_Only:用于执行DDL或者DML语句中最终的表结构创建和更改
- Single:没有下游Stage,结果直接输出给Coordinator,用于聚合其他Stage的输出数据,并最终返回给终端用户
- Source:没有上游Stage,从Coordinator获取数据,用于直接连接数据源,获取原始数据。也会根据查询计划的优化相关完成断言下发和条件过滤等
- Fixed:用于接收其子Stage产生的数据并进行分布式聚合和分组运算
- Exchange
交换。Stage间通过Exchange来连接另一个Stage。Presto中有两种Exchange:
如果当前Stage是Source类型的Stage,那么该Stage是直接通过相应的Connector读取数据的,通过Source Operator与Connector进行交互。
- Output Buffer:产生数据的Stage通过这个Exchange将数据传送给下游Stage
- Exchange Client:消费数据的Stage通过这个Exchange从上游Stage获取数据
- Task
Stage逻辑上被分为一系列Task,Task是实际运行在集群上的任务。一个Stage被拆分为一系列Task;每个Task被拆分为一或多个Split;这样Stage和Task都可以并行执行。
- Split
分片。一个分片是一个大数据集中的一个子集。当执行查询时,首先从Coordinator得到一个表的所有Split,然后根据查询执行计划选取合适的Worker执行Task处理Split
- Driver
一个Task包含一或多个Driver,是作用于一个Split的一系列Operator集合。一个Driver用于处理一个Split产生相应输出,输出由Task收集并传递给下游Stage中的Task。
- Operator
一个Operator代表对一个Split的一种操作,如过滤,转换等。一个Operator依次读取Split中的数据,将Operator所代表的计算和操作应用在此数据上并产生输出。Operator通过最小单位Page分别读取和输出数据。每次只读取、写入一个Page对象
- Page
Presto中的最小数据单元,包含多个Block对象,每个Block对象是一个字节数组,存储一个字段的若干行。多个Block横切就是一行真实数据。Page最大为1MB,最多16×1024行数据
Presto执行查询的模型关系
整体架构
- 硬件方面
硬件必须满足大内存,万兆网络和高计算性能特点。集群为Master-Slave的拓扑架构。
- 软件方面
- Client通过HTTP发送SQL查询语句给Coordinator
- Coordinator解析查询语句,生成查询执行计划。Coordinator会根据数据本地性生成对应的HttpRemoteTask
- Coordinator分发Task到对应Work,通过HttpClient发送给节点上TaskResource提供的RESTful接口;Worker启动一个SqlTaskExecution对象或者更新对应对象需要处理的Split
- 执行处于上游Source Stage中的Task,这些Task通过Connector从数据源中读取数据
- 处于下游Stage的Task读取上游输出结果,并在内存中进行计算和处理
- Coordinator会不断从Single Stage中的Task处获取结果,并缓存到Buffer中,直到所有计算结束
- Client不停从Coordinator中获取本次查询结果,直到获取了所有结果
RESTful框架解析
Presto几乎所有操作都依赖AirLift框架构建的RESTful服务来完成(数据传输,节点通信,心跳感应,计算调度,计算分布等)。包括4类RESTful接口,包括Statement,Query,Stage,Task。
Statement接口
与SQL相关的请求由该接口处理,包括提交SQL语句,获取查询执行结果,取消查询等。
/v1/statement
– POST
传入SQL并获取查询结果,调用query.getNextResults
分批返回查询结果/v1/statement/{queryId}/{token}
– GET
根据Token返回queryId对应的查询的部分执行结果,Token用于保证结果顺序/v1/statement/{queryId}/{token}
– DELETE
用于取消一个query。Token参数无实际用途,仅用于将此方法和QueryResource中的cancellQuery方法区别开。同时两个方法的区别还在于这里的cancelQuery还包含了特殊状态处理和exchangeClient的关闭和清理query.getNextResults
获得查询执行结果,只等待maxWaitTime
指定的时间,超时仅返回该时间内处理完成的数据,会调用createNectResultsUri
生成请求下一批结果的地址一起返回给客户端
Query服务接口
处理与查询执行相关的请求,包括查询状态的查询,生成和取消查询,或者取消一个Stage
/v1/query/{queryId}
– GET
查询一个query的执行状态/v1/query
– POST
生成一个新的查询,传入SQL新建查询/v1/query/{queryId}
– DELETE
取消一个查询/v1/query/stage/{stageId}
– DELETE
取消一个Stage
Stage服务接口
处理Stage相关的请求,只有一个功能,即取消或者结束一个指定的Stage
/v1/stage/{stageId}
– DELETE
提前结束一个Stage
Task服务接口
与Task相关的请求,如Task的创建,更新,状态查询和结果查询等。
/v1/task/{taskId}
– POST
创建一个新的Task或者更新Task状态,如果存在taskID对应的Task,就而根据taskUpdateRequest中的内容更新Task,否则创建一个新的Task/v1/task/{taskId}
– GET
获取Task相关的信息/v1/task/{taskId}
– DELETE
删除或提前结束对应的Task/v1/task/{taskId}/results/{outputId}/{token}
– GET
用于获得TaskId指定的Task生成的用于输出给下游Task(由outputId标识)的数据/v1/task/{taskId}/results/{outputId}/{token}
– DELETE
用于删除TaskId指定的Task生成的用于输出给下游Task(由outputId标识)的数据
提交查询
主要流程
- 从指定文件,命令行参数或者Cli中获取SQL语句
- 将得到的SQL语句组装成RESTful请求,并发送给Coordinator,并处理返回的Response
- Cli分批读取结果显示在屏幕,直到查询全部完成
源码解析
- 启动PrestoCli(无–help或–version)处理SQL
- 根据参数选择直接提交SQL或者启动Cli终端
- 查询分为两部分:初始执行和循环发送请求获取查询结果
- 初始执行方法为
cli.QueryRunner.startQuery(query)
这个方法组装Uri地址将SQL语句发送给Coordinator的/v1/statement
来调用createQuery
执行SQL语句。随后调用cli.Query.renderOutput(...)
来获取查询结果 - 根据传入的interactive标识来决定是否实时更新结果
- 如果动态显示则间隔打印结果
- 分别调用
client.advance()
来请求Coordinator来获得当前已处理完的部分数据 - 根据返回值中
NextUri
是否为null类修改valid
的值来空值是否继续循环调用client.advance()
- 如果不动态显示就等待到所有结果返回
生成查询执行计划(上)
基本概念
- Node
语法解析后生成AST(抽象语法树),其中的每一个节点都是一个Node(抽象类),包含的子类如下:
- Approximate:近似查询
- ExplainOption:表示Explain中的可选参数
- Expression:SQL中出现的表达式
- FrameBound:用于窗口函数中的滑动窗口参数
- Relation:抽象类,包含多个节点或者多个节点的关系,如Union,Join
- Select:表示查询的Select部分
- SelectItem:表示Select中的列(AllColumns表示*)
- SortItem:表示排序列和其类型
- Statement:表示presto中所有可用的SQL语句
- TableElement:表示建表语句中描述表的每一列
- Window:表示一个窗口函数
- WindowFrame:表示窗口函数中的滑动窗口函数
- With:表示查询中所有的With语句
- WithQuery:表示一个With语句
- MetadataAPI
提供了对元数据进行操作的接口,将不同Connector对其元数据的操作抽象为统一接口,不同的Connector都实现了ConnectorNetadata接口,包含Metadata API中元数据的操作接口。
词法和语法分析
通过sqlParser.createStatement(query)
分析语法并创建Statement
- 规则
Presto使用ANTLR4编写SQL语法。
- 词法分析
采用Visitor的模式进行语法分析,通过递归遍历整棵树,根据不同的Node调用不同的visit***
方法,返回对应的对象,最终返回一颗抽象语法树,即Statement对象
获取QueryExecution
QueryExecution表示一次查询执行,用于启动,停止和管理一个查询,以及这个查询的相关信息。包含如下三个实现类
– DataDefinitionExecution
– SqlQueryExecution
– FailedQueryExecution
- 获取QueryExecutionFactory
根据不同的Statement类型,从
executionFactories
Map中获取对应Statement类型与QueryExecutionFactory实现类的关系。该Map通过Guice注入传递到SqlQueryManager中
- DataDefinitionExecutionFactory:负责Create table等DDL操作的SQL语句
- SqlQueryExecutionFactory:负责除了DDL操作的其他SQL语句
- 创建QueryExecution
- 当上文词法语法解析错误时,或遇到未知Statement时,会返回FailedQueryExecution
- 调用
QueryExecutionFactory.createQueryExecution()
,返回对应的QueryExecution(当返回DataDefinitionExecution时,会将对应的DataDefinitionTask绑定到这个QueryExecution上)
- 启动QueryExecution
将QueryExecution与配置的队列规则进行匹配,如果满足条件且队列未满,就加入队列。队列查询按FIFO规则调度查询
- 启动DataDefinitionExecution
- 启动SqlQueryExecution
- 调用analyzeQuery生成查询计划
- 在集群上调度运行查询计划
- 直接调用与之绑定的DataDefinitionTask
语义分析
StatementAnalyzer对Statement进行语义分析,针对不同的Statement类型使用不同方法进行分析