自打Hive出现之后,经过几年的发展,SQL on Hadoop相关的系统已经百花齐放,速度越来越快,功能也越来越齐全。本文并不是要去比较所谓“交互式查询哪家强”,而是试图梳理出一个统一的视角,来看看各家系统有哪些技术上相通之处。
考虑到系统使用的广泛程度与成熟度,在具体举例时一般会拿Hive和Impala为例,当然在调研的过程中也会涉及到一些其他系统,如Spark SQL,Presto,TAJO等。
在SQL on Hadoop系统中,有两种架构:
一般来说,对于SQL on Hadoop系统很重要的一个评价指标就是:快。
在Hive逐渐普及之后,就逐渐有了所谓交互式查询的需求,因为无论是BI
系统,还是adhoc
,都不能按照离线那种节奏玩。短期可以靠商业方案或者关系数据库去支撑一下,但是长远的解决方案就是参考过去的MPP数据库架构打造一个专门的系统,于是就有了Impala
,Presto
等等。
从任务执行的角度说,这类引擎的任务执行其实跟DAG
模型是类似的,当时也有Spark
这个DAG模型的计算框架了,但这终究是别人家的孩子,而且往Spark上套sql又是Hive的那种玩法了。于是在Impala问世之后就强调自己计算全部在内存中完成
,性能也是各种碾压当时还只有MR作为计算模型的Hive。那么Hive所代表的基于已有的计算模型
方式是否真的不行?
类MPP模式优势:
stage
一出结果马上推送或者拉到下一个stage处理,比如多表join时前两个表有结果直接给第三个表,不像MR要等两个表完全join完再给第三个表join。MPP模式劣势:
但是,经过不断的发展,Hive也能跑在DAG框架上了,不仅有Tez,还有Spark。上面提到的一些劣势,其实大都也可以在计算模型中解决,只不过考虑到计算模型的通用性和本身的设计目标,不会去专门满足(所以如果从这个角度分类,Impala属于“专用系统”,Spark则属于“通用系统”)。
在最近Cloudera做的benchmark中,虽然Impala仍然一路领先,但是基于Spark的Spark SQL完全不逊色于Presto,基于Tez的Hive也不算很差,至少在多用户并发模式下能超过Presto,足见MPP模式并不是绝对占上风的。所以这种架构上的区别在我看来并不是制胜的关键,至少不是唯一的因素,真正要做到快速查询,各个方面的细节都要有所把握。后面说的都是这些细节。
不管是上面提到的哪种架构,一个SQL on Hadoop系统一般都会有一些通用的核心组件,这些组件根据设计者的考虑放在不同的节点角色中,在物理上节点都按照master
/worker
的方式去做,如果master压力太大,一些本来适合放在master上的组件可以放到一个辅助master上:
HDFS
来说,需要根据I/O Format
把文件转换成K/V
,Serde
再完成K/V到数据行的映射。而对于非HDFS存储来说就需要一些专门的handler
/connector
。从SQL到执行计划,大致分为5步:
AST
。这一步一般都有第三方工具库可以完成,比如Antlr
。TableScanOperator
,聚合会产生GroupByOperator
;aggregate
,join
,还有top n
这几个操作并行化,比如aggregate会分成类似- MR那样的本地aggregate,shuffle和全局aggregate三步。plan fragment
。其他类MPP系统也是类似的概念。物理计划中的一个计算单元(或者说Job),由“输入,处理,输出”三要素组成,而逻辑执行计划中的operator相对粒度更细,一个逻辑操作符一般处于这三要素之一的角色。下面分别举两个例子,直观的认识下sql、逻辑计划、物理计划之间的关系,具体解释各个operator的话会比较细碎,就不展开了。
select count(1) from status_updates where ds = '2009-08-01'
Hive_compile
引用自美团技术团队,其中SubPlan就是物理计划的一个计算单元:
select c1.rank, count(*)
from dim.city c1 join dim.city c2 on c1.id = c2.id
where c1.id > 10 group by c1.rank limit 10;
Presto_compile
LocalExecutionPlan
。
上面的SQL语句生成的逻辑执行计划Plan如上图所示。那么Presto是如何对上面的逻辑执行计划进行拆分,以较高的并行度去执行完这个计划呢,我们来看看物理执行计划。
SubPlan有几个重要的属性planDistribution
、outputPartitioning
、partitionBy
属性。
关于执行计划的优化,虽然不一定是整个编译流程中最难的部分,但却是最有看点的部分,而且目前还在不断发展中。Spark系之所以放弃Shark另起炉灶做Spark SQL,很大一部分原因是想自己做优化策略,避免受Hive的限制,为此还专门独立出优化器组件Catalyst
(当然Spark SQL目前还是非常新,其未来发展给人不少想象空间)。总之这部分工作可以不断的创新,优化器越智能,越傻瓜化,用户就越能解放出来解决业务问题。
早期在Hive中只有一些简单的规则优化,比如谓词下推(把过滤条件尽可能的放在table scan之后就完成),操作合并(连续的filter用and合并成一个operator,连续的projection也可以合并)。后来逐渐增加了一些略复杂的规则,比如相同key的join + group by合并为1个MR,还有star schema join。
在Hive 0.12引入的相关性优化(correlation optimizer)算是规则优化的一个高峰,他能够减少数据的重复扫描,具体来说,如果查询的两个部分用到了相同的数据,并且各自做group by / join的时候用到了相同的key,这个时候由于数据源和shuffle的key是一样的,所以可以把原来需要两个job分别处理的地方合成一个job处理。
比如下面这个sql:
SELECT
sum(l_extendedprice) / 7.0 as avg_yearly
FROM
(SELECT l_partkey, l_quantity, l_extendedprice
FROM lineitem JOIN part ON (p_partkey=l_partkey)
WHERE p_brand='Brand#35' AND p_container = 'MED PKG')touter
JOIN
(SELECT l_partkey as lp, 0.2 * avg(l_quantity) as lq
FROM lineitem GROUP BY l_partkey) tinner
ON (touter.l_partkey = tinnter.lp)
WHERE touter.l_quantity < tinner.lq
这个查询中两次出现lineitem
表,group by
和两处join
用的都是l_partkey
,所以本来两个子查询和一个join用到三个job,现在只需要用到一个job就可以完成。
但是,基于规则的优化(RBO)不能解决所有问题。
在关系数据库中早有另一种优化方式,也就是基于代价的优化CBO。CBO通过收集表的数据信息(比如字段的基数,数据分布直方图等等)来对一些问题作出解答,其中最主要的问题就是确定多表join的顺序。CBO通过搜索join顺序的所有解空间(表太多的情况下可以用有限深度的贪婪算法),并且算出对应的代价,可以找到最好的顺序。这些都已经在关系数据库中得到了实践。
目前Hive已经启动专门的项目,也就是Apache Optiq
(现在改名为Apache Calcite
,Phoenix
也用了这个东西)来做这个事情。
即使有了高效的执行计划,如果在运行过程本身效率较低,那么再好的执行计划也会大打折扣。这里主要关注CPU和IO方面的执行效率。
在具体的计算执行过程中,低效的cpu会导致系统的瓶颈落在CPU上,导致IO无法充分利用。在一项针对Impala和Hive的对比时发现,Hive在某些简单查询上(TPC-H Query 1)也比Impala慢主要是因为Hive运行时完全处于CPU bound的状态中,磁盘IO只有20%,而Impala的IO至少在85%。
在SQL on Hadoop中出现CPU bound的主要原因有以下几种:
a + 2 * b
之类的表达式计算,解释器会构造一个expression tree
,解释的过程就是递归调用子节点做evaluation的过程。又比如以DAG形式的operator/task在执行的过程中,上游节点会层层调用下游节点来获取产生的数据。这些都会产生大量的调用。针对上面的问题,目前大多数系统中已经加入了以下两个解决办法中至少一个:
a + 2 * b
这个表达式就会生成对应的执行语言的代码,而且可以直接用primitive type
,而不是用固定的解释性代码。具体实现来说,JVM系的如Spark SQL,Presto可以用反射,C++系的Impala则使用了llvm生成中间码。对于判断数据类型造成的分支判断,动态代码的效果可以消除这些类型判断,还可以展开循环,可以对比下面这段代码,左边是解释性代码,右边是动态生成代码。另一个方法是vectorization
(向量化)
基本思路是放弃每次处理一行的模式,改用每次处理一小批数据(比如1k行),当然前提条件是使用列存储格式。这样一来,这一小批连续的数据可以放进cache里面,cpu不仅减少了branch instruction,甚至可以用SIMD
加快处理速度。
具体的实现参考下面的代码,对一个long型的字段增加一个常量。通过把数据表示成数组,过滤条件也用selVec装进数组,形成了很紧凑的循环:
add(int vecNum, long[] result, long[] col1, int[] col2, int[] selVec)
{
if (selVec == null)
for (int i = 0; i < vecNum; i++)
result[i] = col1[i] + col2[i];
else
for (int i = 0; i < vecNum; i++)
{
int selIdx = selVec[i];
result[selIdx] = col1[selIdx] + col2[selIdx];
}
}
由于SQL on Hadoop存储数据都是在HDFS上,所以IO层的优化其实大多数都是HDFS的事情,各大查询引擎则提出需求去进行推动。
要做到高效IO:
block replica
。HDFS参数是dfs.client.read.shortcircuit
和dfs.domain.socket.path
。dfs.datanode.hdfs-blocks-metadata.enabled
。对于分析类型的workload来说,最好的存储格式自然是列存储,这已经在关系数据库时代得到了证明。
目前hadoop生态中有两大列存储格式,一个是由Hortonworks和Microsoft开发的ORCFile
,另一个是由Cloudera和Twitter开发的Parquet
。
ORCFile顾名思义,是在RCFile
的基础之上改造的。RCFile虽然号称列存储,但是只是“按列存储”而已,将数据先划分成row group,然后row group内部按照列进行存储。这其中没有列存储的一些关键特性,而这些特性在以前的列式数据库中(比如我以前用过的Infobright)早已用到。好在ORCFile已经弥补了这些特性,包括:
ORCFile的结构如下图:
row group
,也叫strip
。index
,存放每个数据单元(默认10000行)的min/max值用于过滤;stream
,然后再进行snappy
或gz
压缩。Parquet的设计原理跟ORC类似,不过它有两个特点:
Dremel
就在实现层面做出了范例,Parquet则完全仿照了Dremel。对嵌套格式做列存储的难点在于,存储时需要标记某个数据对应于哪一个存储结构,或者说是哪条记录,所以需要用数据清楚的进行标记。
在Dremel中提出用definition level
和repetition level
来进行标记:
比如下图是一个二级嵌套数组。图中的e跟f在都属于第二层的重复记录(同一个level2),所以f的r值为2,而c跟d则是不同的level2,但属于同一个level1,所以d的r值为1。对于顶层而言(新的一个嵌套结构),r值就为0。
但是仅仅这样还不够。上图说明了r值的作用,但是还没有说明d值的作用,因为按照字面解释,d值对于每一个字段都是可以根据schema得到的,那为什么还要从行记录级别标记?这是因为记录中会插入一些null值,这些null值代表着他们“可以存在”但是因为是repeated或者是optional所以没有值的情况,null值是用来占位的(或者说是“想象”出来的),所以他们的值需要单独计算。null的d值就是说这个结构往上追溯到哪一层(不包括平级)就不是null(不是想象)了。在dremel paper中有完整的例子,例子中country的第一个null在code = en所在的结构里面,那么language不是null(不考虑code,他跟country平级),他就是第二层;又比如country的第二个null在url = http://B 所在的结构里面,那么name不是null(不考虑url,因为他跟本来就是null的language平级),所以就是第一层。
通过这种方式,就对一个树状的嵌套格式完成了存储。在读取的时候可以通过构造一个状态机进行遍历。
有意思的是,虽然parquet支持嵌套格式,但是Impala还没有来得及像Hive那样增加array,map,struct等复杂格式,当然这项功能已经被列入roadmap了,相信不久就会出现。
在最近我们做的Impala2.0测试中,顺便测试了存储格式的影响。parquet相比sequencefile在压缩比上达到1:5,查询性能也相差5-10倍,足见列存储一项就给查询引擎带来的提升。
对于一个MR Job,reduce task的数量一直是需要人为估算的一个麻烦事,基于MR的Hive也只是根据数据源大小粗略的做估计,不考虑具体的Job逻辑。但是在之后的框架中考虑到了这个情况,增加了运行时调整资源分配的功能。Tez中引入了vertex manager
,可以根据运行时收集到的数据智能地判断reduce动作需要的task。类似的功能在TAJO
中也有提到,叫progressive query optimization
,而且TAJO不仅能做到动态调整task数量,还能动态调整join顺序。
在Hadoop已经进入2.x的时代,所有想要得到广泛应用的SQL on Hadoop系统势必要能与YARN进行集成。虽然这是一个有利于资源合理利用的好事,但是由于加入了YARN这一层,却给系统的性能带来了一定的障碍,因为启动AppMaster和申请container也会占用不少时间,尤其是前者,而且container的供应如果时断时续,那么会极大的影响时效性。在Tez和Impala中对这些问题给出了相应的解决办法:
long lived app master
,AppMaster启动后长期驻守,而非像是MR那样one AM per Job
。具体实现时,可以给fair scheduler或capacity scheduler配置的每个队列配上一个AM池,有一定量的AM为提交给这个队列的任务服务。到这里为止,已经从上到下顺了一遍各个层面用到的技术,当然SQL on Hadoop本身就相当复杂,涉及到方方面面,时间精力有限不可能一一去琢磨。比如其他一些具有技术复杂度的功能有:
尽管现在相关系统已经很多,也经过了几年的发展,但是目前各家系统仍然在不断的进行完善,比如:
毕竟相比已经比较成熟的关系数据库,分布式环境下需要解决的问题更多,未来一定还会出现很多精彩的技术实践,让我们在海量数据中更快更方便的查到想要的数据。
——END——