首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从一个sql引发的hive谓词下推的全面复盘及源码分析(下)

从一个sql引发的hive谓词下推的全面复盘及源码分析(下)

作者头像
数据仓库践行者
发布2020-04-20 11:52:33
1.6K0
发布2020-04-20 11:52:33
举报

文章比较长,列一下大纲:

3、hive谓词下推源码分析

3.1 生成逻辑执行计划时优化

Hive sql 编译有六个过程:

词法语法解析—>语义解析—>生成逻辑执行计划—>优化逻辑执行计划—>生成物理执行计划—>优化物理执行计划

其中,谓词下推的第一次优化就出现在生成逻辑执行计划时,主要是针对 join,left join,right join,full join中的谓词进行优化,对 join之后的谓词,没有顾及到

我们以下面的sql为做为案例,进行分析:

select 
  t1.*,
  t2.* 
from tmp.test1 t1 
left join tmp.test2 t2 on t1.id=t2.id and t1.openid='pear' and t2.openid='apple'

该sql生成的AST Tree:

nil
   TOK_QUERY
      TOK_FROM
         TOK_LEFTOUTERJOIN
            TOK_TABREF
               TOK_TABNAME
                  test1
               t1
            TOK_TABREF
               TOK_TABNAME
                  test2
               t2
            and
               and
                  =
                     .
                        TOK_TABLE_OR_COL
                           t1
                        id
                     .
                        TOK_TABLE_OR_COL
                           t2
                        id
                  =
                     .
                        TOK_TABLE_OR_COL
                           t1
                        openid
                     'pear'
               =
                  .
                     TOK_TABLE_OR_COL
                        t2
                     openid
                  'apple'
      TOK_INSERT
         TOK_DESTINATION
            TOK_DIR
               TOK_TMP_FILE
         TOK_SELECT
            TOK_SELEXPR
               TOK_ALLCOLREF
                  TOK_TABNAME
                     t1
            TOK_SELEXPR
               TOK_ALLCOLREF
                  TOK_TABNAME
                     t2
   <EOF>

我们重点关注 TOK_LEFTOUTERJOIN下的子节点,在生成逻辑执行计划时,所做的处理:

3.1.1语义解析

语义解析是将 ast tree分解存入QB 中

TOK_LEFTOUTERJOIN 下有三个子节点 TOK_TABREF、 TOK_TABREF、and 分别是左表、右表、join的连接条件,在语义解析只处理了两个TOK_TABREF 节点,将左右表的表名、别名存入了QB,然后,把 TOK_LEFTOUTERJOIN 整颗子树 存入了 QB.QBParseInfo.joinExpr 结构中,供后面生成逻辑执行计划时使用

TOK_TABREF
   TOK_TABNAME   -->  QB.QBParseInfo.aliasToSrc: map(t1,TOK_TABNAME)
      test1      -->  QB.aliasToTabs: map(t1,test1)
   t1            -->  QB.aliases
TOK_TABREF
   TOK_TABNAME   -->  QB.QBParseInfo.aliasToSrc: map(t2,TOK_TABNAME)
      test2      -->  QB.aliasToTabs: map(t2,test2)
   t2            -->  QB.aliases
3.1.2 生成逻辑执行计划
....
} else {
        //处理join 左右表,及关联条件,并计算出哪些过滤条件可以下放  
        QBJoinTree joinTree = genJoinTree(qb, joinExpr, aliasToOpInfo);
        qb.setQbJoinTree(joinTree);
        /*
         * if there is only one destination in Query try to push where predicates
         * as Join conditions
         */
        Set<String> dests = qb.getParseInfo().getClauseNames();
        if ( dests.size() == 1 && joinTree.getNoOuterJoin()) {
          String dest = dests.iterator().next();
          ASTNode whereClause = qb.getParseInfo().getWhrForClause(dest);
          if ( whereClause != null ) {
            extractJoinCondsFromWhereClause(joinTree, qb, dest,
                (ASTNode) whereClause.getChild(0),
                aliasToOpInfo );
          }
        }

        if (!disableJoinMerge) {
          mergeJoinTree(qb);
        }
      }

      // 将join条件中符合下推的谓词,生成FIL算子,并把该算子放TS算子后面     
      pushJoinFilters(qb, qb.getQbJoinTree(), aliasToOpInfo); 
      srcOpInfo = genJoinPlan(qb, aliasToOpInfo); //生成join的逻辑执行计划
} else {
...

以上是生成逻辑执行计划时,对join的处理模块

对 QBJoinTree joinTree = genJoinTree(qb, joinExpr, aliasToOpInfo) 抽丝剥茧:

在处理and处理之前,先对TOK_TABREF进行了处理

TOK_TABREF
   TOK_TABNAME              -->  QB.QBParseInfo.aliasToSrc: map(t1,TOK_TABNAME)
      test1           -->  QB.aliasToTabs: map(t1,test1)
   t1            -->  QBJoinTree.leftAlias,QBJoinTree.leftAliases,QBJoinTree.baseSrc,QB.aliases
TOK_TABREF
   TOK_TABNAME        -->  QB.QBParseInfo.aliasToSrc: map(t2,TOK_TABNAME)
      test2          -->  QB.aliasToTabs: map(t2,test2)
   t2            -->  QBJoinTree.rightAliases,QBJoinTree.baseSrc,QB.aliases
and
   and
      =
         .
            TOK_TABLE_OR_COL   --> QBJoinTree.expressions[0]
               t1                --> leftCondAl1
            id
         .
            TOK_TABLE_OR_COL   --> QBJoinTree.expressions[1]
               t2                --> rightCondAl2
            id
      =
         .
            TOK_TABLE_OR_COL   --> QBJoinTree.filters[0]
               t1         --> leftCondAl1
            openid
         'pear'
   =
      .
         TOK_TABLE_OR_COL     --> QBJoinTree.filtersForPushing[1]
            t2           --> leftCondAl2
         openid
      'apple'
void applyEqualityPredicateToQBJoinTree(QBJoinTree joinTree,
      JoinType type,
      List<String> leftSrc,
      ASTNode joinCond,
      ASTNode leftCondn,
      ASTNode rightCondn,
      List<String> leftCondAl1,
      List<String> leftCondAl2,
      List<String> rightCondAl1,
      List<String> rightCondAl2) throws SemanticException {
    if (leftCondAl1.size() != 0) {
      if ((rightCondAl1.size() != 0)
          || ((rightCondAl1.size() == 0) && (rightCondAl2.size() == 0))) {
        if (type.equals(JoinType.LEFTOUTER) ||
            type.equals(JoinType.FULLOUTER)) {
          if (conf.getBoolVar(HiveConf.ConfVars.HIVEOUTERJOINSUPPORTSFILTERS)) {
             //t1.openid='pear' 如果是左联接,并且又是左表的过滤条件 
            joinTree.getFilters().get(0).add(joinCond);
          } else {
            LOG.warn(ErrorMsg.OUTERJOIN_USES_FILTERS.getErrorCodedMsg());
            joinTree.getFiltersForPushing().get(0).add(joinCond);
          }
        } else {
  .....
      else if (leftCondAl2.size() != 0) {
      if ((rightCondAl2.size() != 0)
          || ((rightCondAl1.size() == 0) && (rightCondAl2.size() == 0))) {
        if (type.equals(JoinType.RIGHTOUTER)
            || type.equals(JoinType.FULLOUTER)) {
          if (conf.getBoolVar(HiveConf.ConfVars.HIVEOUTERJOINSUPPORTSFILTERS)) {
            joinTree.getFilters().get(1).add(joinCond);
          } else {
            LOG.warn(ErrorMsg.OUTERJOIN_USES_FILTERS.getErrorCodedMsg());
            joinTree.getFiltersForPushing().get(1).add(joinCond);
          }
        } else {
           // t2.openid='apple' 
          joinTree.getFiltersForPushing().get(1).add(joinCond);
        }
 ....

QBJoinTree.filters 结构用来存储不能下推的过滤条件,QBJoinTree.filtersForPushing 结构用来存储可以下推的过滤条件,[0]代表是左表,[1]代表是右表。

通过applyEqualityPredicateToQBJoinTree方法,将可以下推的t2.openid='apple'放入QBJoinTree.filtersForPushing中为后面生成FilterOperator做准备

private void pushJoinFilters(QB qb, QBJoinTree joinTree,
      Map<String, Operator> map,
      boolean recursively) throws SemanticException {
    if ( recursively ) {
      if (joinTree.getJoinSrc() != null) {
        pushJoinFilters(qb, joinTree.getJoinSrc(), map);
      }
    }
    ArrayList<ArrayList<ASTNode>> filters = joinTree.getFiltersForPushing();
    int pos = 0;
    for (String src : joinTree.getBaseSrc()) {
      if (src != null) {
        Operator srcOp = map.get(src);
        ArrayList<ASTNode> filter = filters.get(pos);
        for (ASTNode cond : filter) {
          //生成FilterOperator
          srcOp = genFilterPlan(qb, cond, srcOp, false);
        }
        map.put(src, srcOp);
      }
      pos++;
    }
  }

到此,FilterOperator已经生成,并放在了TS的后面

3.2 优化器PredicatePushDown

我们已经知道join中的谓词下推是在生成逻辑执行计划时,就做了优化,那么来分析优化器PredicatePushDown时,我们以下面的sql为例:

select 
  t1.*,
  t2.* 
from tmp.test1 t1 
left join tmp.test2 t2 on t1.id=t2.id where t1.openid='pear' and t2.openid='apple'

该sql生成的asttree:

ASTNode:
nil
   TOK_QUERY
      TOK_FROM
         TOK_LEFTOUTERJOIN
            TOK_TABREF
               TOK_TABNAME
                  tmp
                  test1
               t1
            TOK_TABREF
               TOK_TABNAME
                  tmp
                  test2
               t2
            =
               .
                  TOK_TABLE_OR_COL
                     t1
                  id
               .
                  TOK_TABLE_OR_COL
                     t2
                  id
      TOK_INSERT
         TOK_DESTINATION
            TOK_DIR
               TOK_TMP_FILE
         TOK_SELECT
            TOK_SELEXPR
               TOK_ALLCOLREF
                  TOK_TABNAME
                     t1
            TOK_SELEXPR
               TOK_ALLCOLREF
                  TOK_TABNAME
                     t2
         TOK_WHERE
            and
               =
                  .
                     TOK_TABLE_OR_COL
                        t1
                     openid
                  'pear'
               =
                  .
                     TOK_TABLE_OR_COL
                        t2
                     openid
                  'apple'
   <EOF>

未进行谓下推优化前生成的逻辑执行计划:

 TS[0]  TS[1]
 
 RS[2]  RS[3]
  
  JOIN[4]
  
  FIL[5]  --> t1.openid='pear' and t2.openid='apple'
  
  SEL[6]
  
  FS[7]
3.2.1 hive的遍历分发器

PredicatePushDown预先制定了10个规则:

OpWalkerInfo opWalkerInfo = new OpWalkerInfo(pGraphContext);

    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
    opRules.put(new RuleRegExp("R1",
      FilterOperator.getOperatorName() + "%"),
      OpProcFactory.getFilterProc());  //FIL算子的规则
    opRules.put(new RuleRegExp("R2",
      PTFOperator.getOperatorName() + "%"),
      OpProcFactory.getPTFProc());
    opRules.put(new RuleRegExp("R3",
      CommonJoinOperator.getOperatorName() + "%"),
      OpProcFactory.getJoinProc());   //JOIN算子规则
    opRules.put(new RuleRegExp("R4",
      TableScanOperator.getOperatorName() + "%"),
      OpProcFactory.getTSProc());     //TS算子的规则
    opRules.put(new RuleRegExp("R5",
      ScriptOperator.getOperatorName() + "%"),
      OpProcFactory.getSCRProc());
    opRules.put(new RuleRegExp("R6",
      LimitOperator.getOperatorName() + "%"),
      OpProcFactory.getLIMProc());
    opRules.put(new RuleRegExp("R7",
      UDTFOperator.getOperatorName() + "%"),
      OpProcFactory.getUDTFProc());
    opRules.put(new RuleRegExp("R8",
      LateralViewForwardOperator.getOperatorName() + "%"),
      OpProcFactory.getLVFProc());
    opRules.put(new RuleRegExp("R9",
      LateralViewJoinOperator.getOperatorName() + "%"),
      OpProcFactory.getLVJProc());
    opRules.put(new RuleRegExp("R10",
        ReduceSinkOperator.getOperatorName() + "%"),
        OpProcFactory.getRSProc());     //RS算子的规则

    // The dispatcher fires the processor corresponding to the closest matching
    // rule and passes the context along
    Dispatcher disp = new DefaultRuleDispatcher(OpProcFactory.getDefaultProc(),
        opRules, opWalkerInfo);
    GraphWalker ogw = new DefaultGraphWalker(disp);

    // Create a list of topop nodes
    ArrayList<Node> topNodes = new ArrayList<Node>();
    topNodes.addAll(pGraphContext.getTopOps().values());
    ogw.startWalking(topNodes, null);   //遍历并执行规则

DefaultGraphWalker是Hive默认的深度优先遍历的实现

public void startWalking(Collection<Node> startNodes,
      HashMap<Node, Object> nodeOutput) throws SemanticException {
    toWalk.addAll(startNodes);
    while (toWalk.size() > 0) {
      Node nd = toWalk.remove(0);
      walk(nd);
      // Some walkers extending DefaultGraphWalker e.g. ForwardWalker
      // do not use opQueue and rely uniquely in the toWalk structure,
      // thus we store the results produced by the dispatcher here
      // TODO: rewriting the logic of those walkers to use opQueue
      if (nodeOutput != null && getDispatchedList().contains(nd)) {
        nodeOutput.put(nd, retMap.get(nd));
      }
    }

    // Store the results produced by the dispatcher
    while (!opQueue.isEmpty()) {
      Node node = opQueue.poll();
      if (nodeOutput != null && getDispatchedList().contains(node)) {
        nodeOutput.put(node, retMap.get(node));
      }
    }
  }

先将当前节点放到待处理的栈opStack中,然后从opStack取节点出来,如果取出来的节点没有Children,或者Children已经全部处理完毕,才对当前节点进行处理dispatch,如果当前节点有Children且还没有处理完,则将当前节点的Children放到栈顶,然后重新从栈中取节点进行处理。

那在遍历的过程中,如何针对不同的节点进行不同的处理呢?

在遍历之前,先预置一些针对不同的节点不同规则的处理器(PredicatePushDown预先制定了10个规则),然后在遍历过程中,通过分发器Dispatcher选择最合适的处理器进行处理。

这里使用的分发器Dispatcher是DefaultRuleDispatcher,DefaultRuleDispatcher选择处理器的逻辑如下:

  @Override
  public Object dispatch(Node nd, Stack<Node> ndStack, Object... nodeOutputs)
      throws SemanticException {

    // find the firing rule
    // find the rule from the stack specified
    Rule rule = null;
    int minCost = Integer.MAX_VALUE;
    for (Rule r : procRules.keySet()) {
      int cost = r.cost(ndStack);
      if ((cost >= 0) && (cost <= minCost)) {
        minCost = cost;
        rule = r;
      }
    }

    NodeProcessor proc;

    if (rule == null) {
      proc = defaultProc;
    } else {
      proc = procRules.get(rule);
    }

    // Do nothing in case proc is null
    if (proc != null) {
      // Call the process function
      return proc.process(nd, ndStack, procCtx, nodeOutputs);
    } else {
      return null;
    }
  }

遍历所有的规则Rule,调用每个规则的cost方法计算cost,找其中cost最小的规则对应的处理器,如果没有找到,则使用默认处理器,如果没有设置默认处理器,则不做任何事情。

3.2.2 优化过程

由上面分析,我们知道,优化时是从最下面的子节点开始,也就是首先会计算FS[7],接着SEL[6],然后FIL[5]…

PredicatePushDown预先制定了10个规则中,在这个案例中,接照设定的顺序,会经历四个优化 规则FilterPPD、JoinPPD 、ReduceSinkPPD 、TableScanPPD

FilterPPD

每个PPD 都会实现一个process方法

public static class FilterPPD extends DefaultPPD implements NodeProcessor {

    @Override
    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
        Object... nodeOutputs) throws SemanticException {
      return process(nd, stack, procCtx, false, nodeOutputs);
    }

    Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
            boolean onlySyntheticJoinPredicate, Object... nodeOutputs) throws SemanticException {
      LOG.info("Processing for " + nd.getName() + "("
          + ((Operator) nd).getIdentifier() + ")");

      OpWalkerInfo owi = (OpWalkerInfo) procCtx;
      Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;

      // if this filter is generated one, predicates need not to be extracted
      ExprWalkerInfo ewi = owi.getPrunedPreds(op);
      // Don't push a sampling predicate since createFilter() always creates filter
      // with isSamplePred = false. Also, the filterop with sampling pred is always
      // a child of TableScan, so there is no need to push this predicate.
      if (ewi == null && !((FilterOperator)op).getConf().getIsSamplingPred()
              && (!onlySyntheticJoinPredicate
                      || ((FilterOperator)op).getConf().isSyntheticJoinPredicate())) {
        // get pushdown predicates for this operator's predicate
        ExprNodeDesc predicate = (((FilterOperator) nd).getConf()).getPredicate();
        ewi = ExprWalkerProcFactory.extractPushdownPreds(owi, op, predicate);
        if (!ewi.isDeterministic()) {
          /* predicate is not deterministic */
          if (op.getChildren() != null && op.getChildren().size() == 1) {
            createFilter(op, owi
                .getPrunedPreds((Operator<? extends OperatorDesc>) (op
                .getChildren().get(0))), owi);
          }
          return null;
        }
        logExpr(nd, ewi);
        owi.putPrunedPreds((Operator<? extends OperatorDesc>) nd, ewi);
        if (HiveConf.getBoolVar(owi.getParseContext().getConf(),
            HiveConf.ConfVars.HIVEPPDREMOVEDUPLICATEFILTERS)) {
          // add this filter for deletion, if it does not have non-final candidates
          owi.addCandidateFilterOp((FilterOperator)op);
          Map<String, List<ExprNodeDesc>> residual = ewi.getResidualPredicates(true);
          createFilter(op, residual, owi);
        }

FilterPPD把可能下推的谓词抽取出,做为候选项存入OpWalkerInfo.opToPushdownPredMap.pushdownPreds 中

JoinPPD

JoinPPD 计算出可以下推的表:

private Set<String> getQualifiedAliases(JoinOperator op, RowSchema rs) {
      Set<String> aliases = new HashSet<String>();
      JoinCondDesc[] conds = op.getConf().getConds();
      Map<Integer, Set<String>> posToAliasMap = op.getPosToAliasMap(); //所有的表别名
      int i;
      for (i=conds.length-1; i>=0; i--){
        if (conds[i].getType() == JoinDesc.INNER_JOIN) { //如果是inner join左右表都可以下推
          aliases.addAll(posToAliasMap.get(i+1));
        } else if (conds[i].getType() == JoinDesc.FULL_OUTER_JOIN) {//如果full join跳出
          break;
        } else if (conds[i].getType() == JoinDesc.RIGHT_OUTER_JOIN) {//如果中right join就获取右表
          aliases.addAll(posToAliasMap.get(i+1));
          break;
        } else if (conds[i].getType() == JoinDesc.LEFT_OUTER_JOIN) {
          continue; //如果是 left join就继续,这样i会小于0
        }
      }
      if(i == -1){
        aliases.addAll(posToAliasMap.get(0)); //left join将左表加入
      }
      Set<String> aliases2 = rs.getTableNames();
      aliases.retainAll(aliases2);
      return aliases;
    }
  }

经过上面计算,可以知道在该sql中 test1表可以下推,下面会把需要下推的谓词和不能下推的谓词分开

public static class JoinerPPD extends DefaultPPD implements NodeProcessor {
    @Override
    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
        Object... nodeOutputs) throws SemanticException {
      LOG.info("Processing for " + nd.getName() + "("
          + ((Operator) nd).getIdentifier() + ")");
      OpWalkerInfo owi = (OpWalkerInfo) procCtx;
      Set<String> aliases = getAliases(nd); //获取可以下推的表
      // we pass null for aliases here because mergeWithChildrenPred filters
      // aliases in the children node context and we need to filter them in
      // the current JoinOperator's context
      mergeWithChildrenPred(nd, owi, null, null);
      ExprWalkerInfo prunePreds =
          owi.getPrunedPreds((Operator<? extends OperatorDesc>) nd);
      if (prunePreds != null) {
        Set<String> toRemove = new HashSet<String>();
        // we don't push down any expressions that refer to aliases that can;t
        // be pushed down per getQualifiedAliases
        for (Entry<String, List<ExprNodeDesc>> entry : prunePreds.getFinalCandidates().entrySet()) {
          String key = entry.getKey();
          List<ExprNodeDesc> value = entry.getValue();
          if (key == null && ExprNodeDescUtils.isAllConstants(value)) {
            continue;   // propagate constants
          }
          if (!aliases.contains(key)) {
            toRemove.add(key); 
          }
        }
        for (String alias : toRemove) {
          for (ExprNodeDesc expr :
            prunePreds.getFinalCandidates().get(alias)) {
            // add expr to the list of predicates rejected from further pushing
            // so that we know to add it in createFilter()
            ExprInfo exprInfo;
            if (alias != null) {
              exprInfo = prunePreds.addOrGetExprInfo(expr);
              exprInfo.alias = alias;
            } else {
              exprInfo = prunePreds.getExprInfo(expr);
            }
            prunePreds.addNonFinalCandidate(exprInfo != null ? exprInfo.alias : null, expr);
          }
          prunePreds.getFinalCandidates().remove(alias); //移除不能下推的表
        }
        return handlePredicates(nd, prunePreds, owi);//处理join之后的谓词
      }
      return null;
    }

prunePreds.addNonFinalCandidate存入不能够下推的谓词t2='apple'

prunePreds.getFinalCandidates()存入能够下推的谓词t1='pear'

我们来看handlePredicates(nd, prunePreds, owi)的实现

 protected Object handlePredicates(Node nd, ExprWalkerInfo prunePreds, OpWalkerInfo owi)
        throws SemanticException {
      if (HiveConf.getBoolVar(owi.getParseContext().getConf(),
          HiveConf.ConfVars.HIVEPPDREMOVEDUPLICATEFILTERS)) {
        return createFilter((Operator)nd, prunePreds.getResidualPredicates(true), owi);
      }
      return null;
    }
  }

prunePreds.getResidualPredicates(true)获取了不能够下推的谓词,针对这部分谓词重新生成FilterOperator -> FIL[8] 并放在JOIN[4] 后面,下面看生成过程

ExprNodeDesc condn = ExprNodeDescUtils.mergePredicates(preds); //获取不能下推的谓词 t2=apple
...

    // add new filter op
    List<Operator<? extends OperatorDesc>> originalChilren = op
        .getChildOperators();
    op.setChildOperators(null);
    Operator<FilterDesc> output = OperatorFactory.getAndMakeChild(
        new FilterDesc(condn, false), new RowSchema(inputRS.getSignature()), op); //生成新的FilterOperator
    output.setChildOperators(originalChilren); //将新生的FIL[8]的父节点设置为JOIN[4]
    for (Operator<? extends OperatorDesc> ch : originalChilren) {
      List<Operator<? extends OperatorDesc>> parentOperators = ch
          .getParentOperators();
      int pos = parentOperators.indexOf(op);
      assert pos != -1;
      parentOperators.remove(pos);
      parentOperators.add(pos, output); // add the new op as the old
    }

    if (HiveConf.getBoolVar(owi.getParseContext().getConf(),
        HiveConf.ConfVars.HIVEPPDREMOVEDUPLICATEFILTERS)) {
      // remove the candidate filter ops
      removeCandidates(op, owi);
    }

    // push down current ppd context to newly added filter
    ExprWalkerInfo walkerInfo = owi.getPrunedPreds(op);
    if (walkerInfo != null) {
      walkerInfo.getNonFinalCandidates().clear();
      owi.putPrunedPreds(output, walkerInfo);
    }
    return output;
  }

综上可知,JoinPPD 的主要作用就是把能够下推的谓词和不能够下推的谓词分开,将不能够下推的谓词重新生成FilterOperator ,并清理之前的FilterOperator算子

ReduceSinkPPD

如果在sql中存在这样的情况,比如t.col=1 并且在jion 时 t.col=s.col and t.col=u.col 在这样的情况下, ReduceSinkPPD 会添加过滤器s.col=1 和u.col=1

public static class ReduceSinkPPD extends DefaultPPD implements NodeProcessor {
    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
                          Object... nodeOutputs) throws SemanticException {
      super.process(nd, stack, procCtx, nodeOutputs);
      Operator<?> operator = (Operator<?>) nd;
      OpWalkerInfo owi = (OpWalkerInfo) procCtx;
      if (operator.getNumChild() == 1 &&
          operator.getChildOperators().get(0) instanceof JoinOperator) {
        if (HiveConf.getBoolVar(owi.getParseContext().getConf(),
            HiveConf.ConfVars.HIVEPPDRECOGNIZETRANSITIVITY)) {
          JoinOperator child = (JoinOperator) operator.getChildOperators().get(0);
          int targetPos = child.getParentOperators().indexOf(operator);
          applyFilterTransitivity(child, targetPos, owi);
        }
      }
      return null;
    }

    /**
     * Adds additional pushdown predicates for a join operator by replicating
     * filters transitively over all the equijoin conditions.
     *
     * If we have a predicate "t.col=1" and the equijoin conditions
     * "t.col=s.col" and "t.col=u.col", we add the filters "s.col=1" and
     * "u.col=1". Note that this does not depend on the types of joins (ie.
     * inner, left/right/full outer) between the tables s, t and u because if
     * a predicate, eg. "t.col=1" is present in getFinalCandidates() at this
     * point, we have already verified that it can be pushed down, so any rows
     * emitted must satisfy s.col=t.col=u.col=1 and replicating the filters
     * like this is ok.
     */
    private void applyFilterTransitivity(JoinOperator join, int targetPos, OpWalkerInfo owi)
        throws SemanticException {

      ExprWalkerInfo joinPreds = owi.getPrunedPreds(join);
      if (joinPreds == null || !joinPreds.hasAnyCandidates()) {
        return;
      }
      Map<String, List<ExprNodeDesc>> oldFilters = joinPreds.getFinalCandidates();
      Map<String, List<ExprNodeDesc>> newFilters = new HashMap<String, List<ExprNodeDesc>>();

      List<Operator<? extends OperatorDesc>> parentOperators = join.getParentOperators();

      ReduceSinkOperator target = (ReduceSinkOperator) parentOperators.get(targetPos);
      List<ExprNodeDesc> targetKeys = target.getConf().getKeyCols();

      ExprWalkerInfo rsPreds = owi.getPrunedPreds(target);
      for (int sourcePos = 0; sourcePos < parentOperators.size(); sourcePos++) {
        ReduceSinkOperator source = (ReduceSinkOperator) parentOperators.get(sourcePos);
        List<ExprNodeDesc> sourceKeys = source.getConf().getKeyCols();
        Set<String> sourceAliases = new HashSet<String>(Arrays.asList(source.getInputAliases()));
        //遍历可以下推的谓词oldFilters
          for (Map.Entry<String, List<ExprNodeDesc>> entry : oldFilters.entrySet()) {
          if (entry.getKey() == null && ExprNodeDescUtils.isAllConstants(entry.getValue())) {
            // propagate constants
            for (String targetAlias : target.getInputAliases()) {
                //判断RS的输入表中存不存可以下推的表
              rsPreds.addPushDowns(targetAlias, entry.getValue());
            }
            continue;
          }
          if (!sourceAliases.contains(entry.getKey())) {
            continue;
          }
          for (ExprNodeDesc predicate : entry.getValue()) {
            ExprNodeDesc backtrack = ExprNodeDescUtils.backtrack(predicate, join, source);
            if (backtrack == null) {
              continue;
            }
            ExprNodeDesc replaced = ExprNodeDescUtils.replace(backtrack, sourceKeys, targetKeys);
            if (replaced == null) {
              continue;
            }
            for (String targetAlias : target.getInputAliases()) {
              rsPreds.addFinalCandidate(targetAlias, replaced);
            }
          }
        }
      }
    }
  }

这次的sql不存在 ReduceSinkPPD 可以优化的场景

TableScanPPD
  @Override
    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
        Object... nodeOutputs) throws SemanticException {
      LOG.info("Processing for " + nd.getName() + "("
          + ((Operator) nd).getIdentifier() + ")");
      OpWalkerInfo owi = (OpWalkerInfo) procCtx;
      TableScanOperator tsOp = (TableScanOperator) nd;
      mergeWithChildrenPred(tsOp, owi, null, null); //将需要下推的谓词信息合并到TS算子,并存入procCtx的opToPushdownPredMap结构中
      if (HiveConf.getBoolVar(owi.getParseContext().getConf(),
          HiveConf.ConfVars.HIVEPPDREMOVEDUPLICATEFILTERS)) {
        // remove all the candidate filter operators
        // when we get to the TS
        removeAllCandidates(owi);
      }
      ExprWalkerInfo pushDownPreds = owi.getPrunedPreds(tsOp); //或取要下推的谓词
      // nonFinalCandidates predicates should be empty
      assert pushDownPreds == null || !pushDownPreds.hasNonFinalCandidates();
      return createFilter(tsOp, pushDownPreds, owi); //创建FIL算子
    }

  }
3.2.3 总结

优化后的逻辑执行计划:

TS[0]    TS[1]

FIL[9]    --> t1.openid='pear' 

RS[2]    RS[3]
 
  JOIN[4]

  FIL[8]  --> t1.openid='pear' 

  SEL[6]

  FS[7]  

FilterPPD把可能下推的谓词抽取出,做为候选项存入OpWalkerInfo.opToPushdownPredMap.pushdownPreds 中

JoinPPD 的主要作用就是把能够下推的谓词和不能够下推的谓词分开,将不能够下推的谓词重新生成FilterOperator –> FIL[8]

TableScanPPD 将能够下推的谓词生成FIL[9] 并置于TS[0]之后

3.3 CBO

3.3.1 简单串一下CBO

CBO的入口类是 CalcitePlanner.java

 default: {
        SemanticAnalyzer semAnalyzer = HiveConf
            .getBoolVar(queryState.getConf(), HiveConf.ConfVars.HIVE_CBO_ENABLED) ?
                new CalcitePlanner(queryState) : new SemanticAnalyzer(queryState);
        return semAnalyzer;
      }

Hive中判断是否用CBO ,先判断HiveConf.ConfVars.HIVE_CBO_ENABLED是不是为true 如果为true,则进入CalcitePlanner.java , 否则SemanticAnalyzer.java

控制CBO的参数set hive.cbo.enable默认为true

public class CalcitePlanner extends SemanticAnalyzer {

  private final AtomicInteger noColsMissingStats = new AtomicInteger(0);
  private SemanticException semanticException;
  private boolean runCBO = true;
  private boolean disableSemJoinReordering = true;
  private EnumSet<ExtendedCBOProfile> profilesCBO;

  public CalcitePlanner(QueryState queryState) throws SemanticException {
    super(queryState);
    if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CBO_ENABLED)) {
      runCBO = false;
      disableSemJoinReordering = false;
    }
  }

  public void resetCalciteConfiguration() {
    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CBO_ENABLED)) {
      runCBO = true;
      disableSemJoinReordering = true;
    }
  }

  @Override
  @SuppressWarnings("nls")
  public void analyzeInternal(ASTNode ast) throws SemanticException {
    if (runCBO) {
      PreCboCtx cboCtx = new PreCboCtx();
      super.analyzeInternal(ast, cboCtx);
    } else {
      super.analyzeInternal(ast);
    }
  }

CalcitePlanner 继承了SemanticAnalyzer ,从此sql的编译过程就在CalcitePlanner 、SemanticAnalyzer 这两个类中穿梭

CalcitePlanner 先调用了SemanticAnalyzer 的super.analyzeInternal(ast, cboCtx) 方法,进行了语义解析后,又调用自己已经重写的genOPTree 来生成逻辑执行计划

  @SuppressWarnings("rawtypes")
  Operator genOPTree(ASTNode ast, PlannerContext plannerCtx) throws SemanticException {
    Operator sinkOp = null;
    ...
            // 1. Gen Optimized AST
            ASTNode newAST = getOptimizedAST(); //生成更优化的ASTTree
            System.out.println("newAST:"+newAST.dump());

            // 1.1. Fix up the query for insert/ctas
            newAST = fixUpCtasAndInsertAfterCbo(ast, newAST, cboCtx);

            // 2. Regen OP plan from optimized AST
            init(false);
            if (cboCtx.type == PreCboCtx.Type.CTAS) {
              // Redo create-table analysis, because it's not part of doPhase1.
              setAST(newAST);
              newAST = reAnalyzeCtasAfterCbo(newAST);
            }
            Phase1Ctx ctx_1 = initPhase1Ctx();
            if (!doPhase1(newAST, getQB(), ctx_1, null)) { //对于新的ASTTree再进一遍语义解析
              throw new RuntimeException("Couldn't do phase1 on CBO optimized query plan");
            }
            // unfortunately making prunedPartitions immutable is not possible
            // here with SemiJoins not all tables are costed in CBO, so their
            // PartitionList is not evaluated until the run phase.
            getMetaData(getQB());

            disableJoinMerge = false;
            sinkOp = genPlan(getQB()); //生成逻辑执行计划
            LOG.info("CBO Succeeded; optimized logical plan.");
      ...

由代码中可以看出CBO在生成逻辑执行计划时,先是对之前使用anltr生成的ASTTree进行了优化生成newAST,然后,针对newAST 重新进行语义解析,再然后,针对语义解析后的QB生成逻辑执行计划

逻辑计划生成后,继续SemanticAnalyzer.analyzeInternal(ast, cboCtx) 进行逻辑执行计划的优化

  if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD) &&
            !pctx.getContext().isCboSucceeded()) {
      //关闭cbo
      transformations.add(new PredicateTransitivePropagate());
      if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCONSTANTPROPAGATION)) {
        transformations.add(new ConstantPropagate());
      }
      transformations.add(new SyntheticJoinPredicate());
      transformations.add(new PredicatePushDown());
    } else if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD) &&
            pctx.getContext().isCboSucceeded()) {
      //同时打开cbo和谓词下推开关
      transformations.add(new SyntheticJoinPredicate());
      transformations.add(new SimplePredicatePushDown());
      transformations.add(new RedundantDynamicPruningConditionsRemoval());
    }

如果我们关闭了谓词下推的优化标签set hive.optimize.ppd=false ,就不会进行谓词下推的优化。如果打开 开关,谓词下推优化会走SimplePredicatePushDown.java

总结一下CBO有关逻辑执行计划生成的轨迹:CalcitePlanner.analyzeInternal–>SemanticAnalyzer.analyzeInternal –>SemanticAnalyzer.genResolvedParseTree –>CalcitePlanner.genOPTree –>CalcitePlanner.getOptimizedAST –>SemanticAnalyzer.genResolvedParseTree –>SemanticAnalyzer.genPlan

生成之后,再SimplePredicatePushDown.java 进行谓词下推的优化

3.3.2 优化过程

以下面的sql为例:

select 
  t1.*,
  t2.* 
from tmp.test1 t1 
left join tmp.test2 t2 on t1.id=t2.id where t1.openid='pear' and t2.openid='apple'

该sql生成的asttree:

ASTNode:
nil
   TOK_QUERY
      TOK_FROM
         TOK_LEFTOUTERJOIN
            TOK_TABREF
               TOK_TABNAME
                  tmp
                  test1
               t1
            TOK_TABREF
               TOK_TABNAME
                  tmp
                  test2
               t2
            =
               .
                  TOK_TABLE_OR_COL
                     t1
                  id
               .
                  TOK_TABLE_OR_COL
                     t2
                  id
      TOK_INSERT
         TOK_DESTINATION
            TOK_DIR
               TOK_TMP_FILE
         TOK_SELECT
            TOK_SELEXPR
               TOK_ALLCOLREF
                  TOK_TABNAME
                     t1
            TOK_SELEXPR
               TOK_ALLCOLREF
                  TOK_TABNAME
                     t2
         TOK_WHERE
            and
               =
                  .
                     TOK_TABLE_OR_COL
                        t1
                     openid
                  'pear'
               =
                  .
                     TOK_TABLE_OR_COL
                        t2
                     openid
                  'apple'
   <EOF>

ASTNode newAST = getOptimizedAST() 对ASTTree进行了优化

优化后的ASTTree

newAST:
TOK_QUERY
   TOK_FROM
      TOK_JOIN
         TOK_SUBQUERY
            TOK_QUERY
               TOK_FROM
                  TOK_TABREF
                     TOK_TABNAME
                        tmp
                        test1
                     t1
               TOK_INSERT
                  TOK_DESTINATION
                     TOK_DIR
                        TOK_TMP_FILE
                  TOK_SELECT
                     TOK_SELEXPR
                        .
                           TOK_TABLE_OR_COL
                              t1
                           id
                        id
                     TOK_SELEXPR
                        TOK_FUNCTION
                           TOK_STRING
                              2147483647
                           'pear'
                        openid
                     TOK_SELEXPR
                        .
                           TOK_TABLE_OR_COL
                              t1
                           day
                        day
                  TOK_WHERE
                     and
                        =
                           .
                              TOK_TABLE_OR_COL
                                 t1
                              openid
                           'pear'
                        TOK_FUNCTION
                           TOK_ISNOTNULL
                           .
                              TOK_TABLE_OR_COL
                                 t1
                              id
            $hdt$_0
         TOK_SUBQUERY
            TOK_QUERY
               TOK_FROM
                  TOK_TABREF
                     TOK_TABNAME
                        tmp
                        test2
                     t2
               TOK_INSERT
                  TOK_DESTINATION
                     TOK_DIR
                        TOK_TMP_FILE
                  TOK_SELECT
                     TOK_SELEXPR
                        .
                           TOK_TABLE_OR_COL
                              t2
                           id
                        id
                     TOK_SELEXPR
                        TOK_FUNCTION
                           TOK_STRING
                              2147483647
                           'apple'
                        openid
                     TOK_SELEXPR
                        .
                           TOK_TABLE_OR_COL
                              t2
                           day
                        day
                  TOK_WHERE
                     and
                        =
                           .
                              TOK_TABLE_OR_COL
                                 t2
                              openid
                           'apple'
                        TOK_FUNCTION
                           TOK_ISNOTNULL
                           .
                              TOK_TABLE_OR_COL
                                 t2
                              id
            $hdt$_1
         =
            .
               TOK_TABLE_OR_COL
                  $hdt$_0
               id
            .
               TOK_TABLE_OR_COL
                  $hdt$_1
               id
   TOK_INSERT
      TOK_DESTINATION
         TOK_DIR
            TOK_TMP_FILE
      TOK_SELECT
         TOK_SELEXPR
            .
               TOK_TABLE_OR_COL
                  $hdt$_0
               id
            t1.id
         TOK_SELEXPR
            TOK_FUNCTION
               TOK_STRING
                  2147483647
               'pear'
            t1.openid
         TOK_SELEXPR
            .
               TOK_TABLE_OR_COL
                  $hdt$_0
               day
            t1.day
         TOK_SELEXPR
            .
               TOK_TABLE_OR_COL
                  $hdt$_1
               id
            t2.id
         TOK_SELEXPR
            TOK_FUNCTION
               TOK_STRING
                  2147483647
               'apple'
            t2.openid
         TOK_SELEXPR
            .
               TOK_TABLE_OR_COL
                  $hdt$_1
               day
            t2.day

优化后的newAST将两个join的表,生成两个子查询。在每子查询中,把可以下推的谓词生成了TOK_WHERE ,这样的结构是已经进行了谓词下推的结构。

CBO的优化借助了Apache Calcite框架,下面是生成的逻辑执行计划

TS[0]    TS[3]

FIL[1]    FIL[4] -->两个谓词都提至TS算子后面

SEL[2]    SEL[5]

RS[6]    RS[7]

  JOIN[8]

  SEL[9]

  FS[10]

由上可知,打开CBO开关后,谓词下推在生成ASTTree阶段就已经做了优化

如果打set hive.optimize.ppd=true谓词下推优化的开关会怎么样?

TS[0]    TS[3]

FIL[11]    FIL[12]

SEL[2]    SEL[5]

RS[6]    RS[7]

  JOIN[8]

  SEL[9]

  FS[10]

打开开关后,我们会发现整个逻辑树还是进行了优化,只是优化后FIL算子位置没有变

3.3.3 总结

CBO对谓词下推的优化更彻底,Hive2版本以后,在CBO模块下了挺大的功夫,值得我们去细细研究

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据仓库践行者 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 3、hive谓词下推源码分析
    • 3.1 生成逻辑执行计划时优化
      • 3.1.1语义解析
      • 3.1.2 生成逻辑执行计划
    • 3.2 优化器PredicatePushDown
      • 3.2.1 hive的遍历分发器
      • 3.2.2 优化过程
      • 3.2.3 总结
    • 3.3 CBO
      • 3.3.1 简单串一下CBO
      • 3.3.2 优化过程
      • 3.3.3 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档