首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >impala be query plan 3 prepare->open->close

impala be query plan 3 prepare->open->close

原创
作者头像
jasong
发布2022-09-28 16:54:47
5390
发布2022-09-28 16:54:47
举报
文章被收录于专栏:ClickHouseClickHouse

知识点 ControlServerice

QueryState

为特定查询创建的所有后端执行状态的中心类(例如:各个片段实例的FragmentInstanceStates)。此类包含或使可访问状态在片段实例之间共享;相反,片段实例特定的状态收集在FragmentInstanceState中。QueryState的生存期由引用计数决定。代表查询执行并访问其任何状态的任何线程都必须获取对相应QueryState的引用,并至少在该访问期间保持该引用。通过QueryExecMgr::Get-/ReleaseQueryState()或QueryState::ScopedRef(后者用于仅限于单个函数或块范围的引用)获取和发布引用。只要引用计数大于0,查询的所有控制结构(包含在该类中或可通过该类访问,如FragmentInstanceStates)都保证是活动的。

FragmentInstanceState

FragmentInstanceState处理单个计划片段实例执行的所有方面,包括成功和错误情况下的设置和终结。Close()在Exec()结束时自动发生,释放为此片段实例分配的所有内存,并关闭所有数据流。

堆栈

ControlService::ControlService(MetricGroup* metric_group)
  this->ExecQueryFInstances(static_cast<const ExecQueryFInstancesRequestPB*>(req),
      void ControlService::ExecQueryFInstances(const ExecQueryFInstancesRequestPB* request, ExecQueryFInstancesResponsePB* response, RpcContext* rpc_context)
          const Status& fragment_info_sidecar_status = GetSidecar(request->plan_fragment_info_sidecar_idx(), rpc_context, &fragment_info);
          Status resp_status = ExecEnv::GetInstance()->query_exec_mgr()->StartQuery(request, query_ctx, fragment_info);
              QueryState* qs = GetOrCreateQueryState(query_ctx, request->per_backend_mem_limit(), &dummy);
                        Status status = qs->Init(request, fragment_info);
                            这里主要为初始化
                            TExecPlanFragmentInfo by fragemtn_ifno 
                        unique_ptr<Thread> t;
                        status = Thread::Create("query-exec-mgr",Substitute("query-state-$0", PrintId(query_id)), &QueryExecMgr::ExecuteQueryHelper, this, qs, &t, true);
                            bool QueryState::StartFInstances() 
​
​
​
bool QueryState::StartFInstances();
    start_finstances_status = FragmentState::CreateFragmentStateMap(fragment_info_, exec_rpc_params_, this, fragment_state_map_)
        for(fragment_size)
            //根据instance_size 创建分布式fragment, 即单个查询拆分为多个fragment
            FragmentState* fragment_state = state->obj_pool()->Add(new FragmentState(state, frag, frag_ctx));
        for(fragment_size)
            fragment_state->init();
                Status PlanNode::CreateTree(FragmentState* state, const TPlan& plan, PlanNode** root)
                    Status status = CreateTreeHelper(state, plan.nodes, NULL, &node_idx, root); (递归创建)
                    Status PlanNode::CreateTreeHelper(FragmentState* state, const std::vector<TPlanNode>& tnodes, PlanNode* parent, int* node_idx, PlanNode** root)
                        const TPlanNode& tnode = tnodes[*node_idx];
                        int num_children = tnode.num_children;
                        RETURN_IF_ERROR(CreatePlanNode(state->obj_pool(), tnode, &node));(创建PlanNode)
                            *node = pool->Add(new ScanPlanNode());/PartitionedHashJoinPlanNode/
                        for(num_children)
                            CreateTreeHelper(state, tnodes, node, node_idx, nullptr)); 递归
                        RETURN_IF_ERROR(node->Init(tnode, state));
              Status  = HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* state)(这里可以了解下impala ShardedState 的概念,将同一个be节点不同的 scanode 放到了一个队列来处理)
​
    for (auto& fragment : fragment_state_map_) {
        FragmentState* fragment_state = fragment.second;
        for (int i = 0; i < fragment_state->instance_ctxs().size(); ++i)
            //创建 FragmentInstanceState
            FragmentInstanceState* fis = obj_pool_.Add(new FragmentInstanceState(this, fragment_state, *instance_ctx, *instance_ctx_pb));
            fis_map_.emplace(fis->instance_id(), fis);
            unique_ptr<Thread> t;
            //执行单个 FragmentInstanceState ExecFInstance 
            Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,[this, fis]() { this->ExecFInstance(fis); }, &t, true);
            void QueryState::ExecFInstance(FragmentInstanceState* fis)
                Status status = fis->Exec();(Status FragmentInstanceState::Exec())
                    Status status = Prepare();
                    status = Open();
                    Close();
​
​
Status FragmentInstanceState::Prepare()
    runtime_state_ = obj_pool()->Add(new RuntimeState(query_state_, fragment_,instance_ctx_, fragment_ctx_, instance_ctx_pb_, ExecEnv::GetInstance()));
        Init();
            resource_pool_ = ExecEnv::GetInstance()->thread_mgr()->CreatePool();
            instance_mem_tracker_ = obj_pool()->Add(new MemTracker(runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker()));
    runtime_state_->resource_pool()->AcquireThreadToken();(获取一个线程资源)
    const PlanNode* plan_tree = fragment_state_->plan_tree();
    //ExecNode 执行节点,根据 PlanNode 创建ExecNode 
    RETURN_IF_ERROR(ExecNode::CreateTree(runtime_state_, *plan_tree, query_state_->desc_tbl(), &exec_tree_));
        RETURN_IF_ERROR(plan_node.CreateExecNode(state, root));(这里举例一个PartitionedHashJoinNode)
            ObjectPool* pool = state->obj_pool();
            *node = pool->Add(new PartitionedHashJoinNode(state, *this, state->desc_tbl()));(Status HdfsScanPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) ScanNode 也是同理)
        for (auto& child : plan_node.children_) { 递归创建子节点的ExecNode 
            ExecNode* child_node;
            RETURN_IF_ERROR(CreateTree(state, *child, descs, &child_node));
            DCHECK(child_node != nullptr);
            (*root)->children_.push_back(child_node);
        }
    //当前 Fragement Instance State ExecNode 创建完成
    //1 ExchangeNode
    // set #senders of exchange nodes before calling Prepare()
    vector<ExecNode*> exch_nodes;
    exec_tree_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
    //2  scanNode 
    vector<ExecNode*> scan_nodes;
    ScanRangesPB no_scan_ranges;
    exec_tree_->CollectScanNodes(&scan_nodes);
        static_cast<ScanNode*>(scan_node)->SetScanRanges(scan_ranges.scan_ranges());
    
      
    //3
    RETURN_IF_ERROR(exec_tree_->Prepare(runtime_state_)); //Status ExecNode::Prepare(RuntimeState* state) 
        mem_tracker_.reset(new MemTracker(runtime_profile_, -1, runtime_profile_->name(),
        for (int i = 0; i < children_.size(); ++i) {
            RETURN_IF_ERROR(children_[i]->Prepare(state));
        }
            //Status HdfsScanNodeMt::Prepare(RuntimeState* state) 
        
    //4 prepare sink_
    const DataSinkConfig* sink_config = fragment_state_->sink_config();
    DCHECK(sink_config != nullptr);
    sink_ = sink_config->CreateSink(runtime_state_);
    RETURN_IF_ERROR(sink_->Prepare(runtime_state_, runtime_state_->instance_mem_tracker()));
​
    //5 row batch 数据
    row_batch_.reset(new RowBatch(exec_tree_->row_desc(), runtime_state_->batch_size(),runtime_state_->instance_mem_tracker()));
​
​
Status FragmentInstanceState::Open() 
    RETURN_IF_ERROR(exec_tree_->Open(runtime_state_));
    return sink_->Open(runtime_state_);
​
​
void FragmentInstanceState::Close()
    for (int i = 0; i < children_.size(); ++i) {
        children_[i]->Close(state);
    }

认识一个下FragmentInstanceState

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 知识点 ControlServerice
    • QueryState
      • FragmentInstanceState
      • 堆栈
      • 认识一个下FragmentInstanceState
      相关产品与服务
      数据库
      云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档