前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Hive sql窗口函数源码分析

Hive sql窗口函数源码分析

作者头像
数据仓库践行者
发布2020-04-18 00:02:59
1.4K0
发布2020-04-18 00:02:59
举报

在了解了窗口函数实现原理 spark、hive中窗口函数实现原理复盘sparksql比hivesql优化的点(窗口函数)之后,今天又撸了一遍hive sql 中窗口函数的源码实现,写个笔记记录一下。

简单来说,窗口查询有两个步骤:将记录分割成多个分区;然后在各个分区上调用窗口函数。

传统的 UDAF 函数只能为每个分区返回一条记录,而我们需要的是不仅仅输入数据是一张表,输出数据也是一张表(table-in, table-out),因此 Hive 社区引入了分区表函数 Partitioned Table Function (PTF)。

1、代码流转图

PTF 运行在分区之上、能够处理分区中的记录并输出多行结果的函数。

hive会把QueryBlock,翻译为执行操作树OperatorTree,其中每个operator都会有三个重要的方法:

  • initializeOp() --初始化算子
  • process() --执行每一行数据
  • forward() --把处理好的每一行数据发送到下个Operator

当遇到窗口函数时,会生成PTFOperator,PTFOperator 依赖PTFInvocation读取已经排好序的数据,创建相应的输入分区:PTFPartition inputPart;

WindowTableFunction 负责管理窗口帧、调用窗口函数(UDAF)、并将结果写入输出分区: PTFPartition outputPart。

2、其它细节

PTFOperator.process(Object row, int tag)-->PTFInvocation.processRow(row)

void processRow(Object row) throws HiveException {  if ( isStreaming() ) {    handleOutputRows(tabFn.processRow(row));  } else {    inputPart.append(row);     //主要操作就是把数据 append到 ptfpartition中,这里的partition与map-reduce中的分区不同,map-reduce分区是按照key的hash分,而这里是要把相同的key要放在同一个ptfpartition,方便后续的windowfunction操作  }}

真正对数据的操作是当相同的key完全放入同一个ptfpartition之后,时机就是finishPartition:

void finishPartition() throws HiveException {  if ( isStreaming() ) {    handleOutputRows(tabFn.finishPartition());  } else {    if ( tabFn.canIterateOutput() ) {      outputPartRowsItr = inputPart == null ? null :        tabFn.iterator(inputPart.iterator());    } else {      outputPart = inputPart == null ? null : tabFn.execute(inputPart);       //这里TableFunctionEvaluator      outputPartRowsItr = outputPart == null ? null : outputPart.iterator();    }    if ( next != null ) {      if (!next.isStreaming() && !isOutputIterator() ) {        next.inputPart = outputPart;      } else {        if ( outputPartRowsItr != null ) {          while(outputPartRowsItr.hasNext() ) {            next.processRow(outputPartRowsItr.next());          }        }      }    }  }
  if ( next != null ) {    next.finishPartition();  } else {    if (!isStreaming() ) {      if ( outputPartRowsItr != null ) {        while(outputPartRowsItr.hasNext() ) {          forward(outputPartRowsItr.next(), outputObjInspector);        }      }    }  }}

还有一个雷区,PTFPartition append():

public void append(Object o) throws HiveException {
  if ( elems.rowCount() == Integer.MAX_VALUE ) {    //当一个ptfpartition加入的条数等于Integer.MAX_VALUE时会抛异常    throw new HiveException(String.format("Cannot add more than %d elements to a PTFPartition",        Integer.MAX_VALUE));  }
  @SuppressWarnings("unchecked")  List<Object> l = (List<Object>)      ObjectInspectorUtils.copyToStandardObject(o, inputOI, ObjectInspectorCopyOption.WRITABLE);  elems.addRow(l);}

需要把相同key的数据完全放入一个ptfPartition进行操作,这时对加入的的条数做了限制,不能>=Integer.MAX_VALUE(21亿),这块需要注意。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-04-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据仓库践行者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、代码流转图
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档