前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Impala be query plan2 - AdmissionController

Impala be query plan2 - AdmissionController

原创
作者头像
jasong
发布2022-09-28 16:50:26
3100
发布2022-09-28 16:50:26
举报
文章被收录于专栏:ClickHouseClickHouse

AdmissionController

AdmissionController 用于根据在一个或多个资源池中配置的可用集群资源限制请求(例如查询、DML)。请求将被允许立即执行、排队等待稍后执行或拒绝(立即或排队后)。资源池可以配置为具有最大并发查询数、最大集群范围内存、最大队列大小、每个查询的最大和最小每主机内存限制,并设置mem_limit查询选项是否会被前面提到的最大/最小每主机限制限制限制。如果执行的查询太多或可用内存不足,查询将排队。一旦队列达到最大队列大小,传入的查询将被拒绝。队列中的请求将在可配置的超时后超时。

代码语言:javascript
复制
void ClientRequestState::FinishExecQueryOrDmlRequest()
    Status LocalAdmissionControlClient::SubmitForAdmission(const AdmissionController::AdmissionRequest& request,RuntimeProfile::EventSequence* query_events,std::unique_ptr<QuerySchedulePB>* schedule_result)
        Status AdmissionController::SubmitForAdmission(const AdmissionRequest& request,Promise<AdmissionOutcome, PromiseMode::MULTIPLE_PRODUCER>* admit_outcome,unique_ptr<QuerySchedulePB>* schedule_result, bool* queued,std::string* request_pool)
            bool AdmissionController::FindGroupToAdmitOrReject(ClusterMembershipMgr::SnapshotPtr membership_snapshot, const TPoolConfig& pool_config,bool admit_from_queue, PoolStats* pool_stats, QueueNode* queue_node,bool& coordinator_resource_limited)
                Status AdmissionController::ComputeGroupScheduleStates(ClusterMembershipMgr::SnapshotPtr membership_snapshot, QueueNode* queue_node)
                    for (const ExecutorGroup* executor_group : executor_groups) {
                        RETURN_IF_ERROR(scheduler_->Schedule(group_config, group_state.get()));
​

Status Scheduler::Schedule(const ExecutorConfig& executor_config, ScheduleState* state) {
  RETURN_IF_ERROR(DebugAction(state->query_options(), "SCHEDULER_SCHEDULE"));
  RETURN_IF_ERROR(ComputeScanRangeAssignment(executor_config, state));
  ComputeFragmentExecParams(executor_config, state);
  ComputeBackendExecParams(executor_config, state);
#ifndef NDEBUG
  state->Validate();
#endif
  state->set_executor_group(executor_config.group.name());
  return Status::OK();
}
​
​
RETURN_IF_ERROR(ComputeScanRangeAssignment(executor_config, state));
  for (const TPlanExecInfo& plan_exec_info : exec_request.plan_exec_info) {
    for (const auto& entry : plan_exec_info.per_node_scan_ranges) {
​
​
​
void Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_config, ScheduleState* state)
    for (const TPlanExecInfo& plan_exec_info : exec_request.plan_exec_info) {
        // set instance_id, host, per_node_scan_ranges
        ComputeFragmentExecParams(executor_config, plan_exec_info,state->GetFragmentScheduleState(plan_exec_info.fragments[0].idx), state);
        //void Scheduler::ComputeFragmentExecParams(const ExecutorConfig& executor_config,const TPlanExecInfo& plan_exec_info, FragmentScheduleState* fragment_state,ScheduleState* state)
            void Scheduler::CreateCollocatedJoinBuildInstances(const ExecutorConfig& executor_config,FragmentScheduleState* fragment_state, ScheduleState* state)
                if (fragment.plan.nodes.size() == 1 && fragment.plan.nodes.at(0).__isset.hdfs_scan_node
                    && fragment.plan.nodes.at(0).hdfs_scan_node.is_colocate_scan) {
                    AddJoinBuildScanInstances(executor_config, fragment_state, state, join_fragment_state);
                    PrintAssgimentScanRange(fragment_state, join_fragment_state);
                }
                
                void Scheduler::AddJoinBuildScanInstances(const ExecutorConfig& executor_config,FragmentScheduleState* fragment_state, ScheduleState* state,FragmentScheduleState* join_fragment_state)
                    vector<TPlanNodeId> scan_node_ids = FindScanNodes(fragment.plan);DCHECK(scan_node_ids.size() == 1);
                    for (const auto& parent_state : join_fragment_state->instance_states) {
                        FInstanceScheduleState& instance_state = fragment_state->instance_states[build_instance_idx++];
                       instance_state.AddScanRanges(scan_node_id, instance_ranges);
                    }
        // Set destinations, per_exch_num_senders, sender_id.
        for (const TPlanFragment& src_fragment : plan_exec_info.fragments)
        {
​
        }
    }
​
void ScheduleState::Validate() 
​
    coord_.reset(new Coordinator(this, *exec_request_, *schedule_.get(), query_events_));
    Status exec_status = coord_->Exec();
        Status Coordinator::Exec() {
        const TQueryExecRequest& request = exec_params_.query_exec_request();
        DCHECK(request.plan_exec_info.size() > 0);
​
        VLOG_QUERY << "Exec() query_id=" << PrintId(query_id())
                    << " stmt=" << request.query_ctx.client_request.stmt;
        stmt_type_ = request.stmt_type;
​
        query_profile_ =
            RuntimeProfile::Create(obj_pool(), "Execution Profile " + PrintId(query_id()));
        finalization_timer_ = PROFILE_FinalizationTimer.Instantiate(query_profile_);
        filter_updates_received_ = PROFILE_FiltersReceived.Instantiate(query_profile_);
​
        host_profiles_ = RuntimeProfile::Create(obj_pool(), "Per Node Profiles");
        query_profile_->AddChild(host_profiles_);
​
        SCOPED_TIMER(query_profile_->total_time_counter());
​
        // initialize progress updater
        const string& str = Substitute("Query $0", PrintId(query_id()));
        progress_.Init(str, exec_params_.query_schedule().num_scan_ranges());
​
        query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(
            query_ctx(), exec_params_.query_schedule().coord_backend_mem_limit());
        filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
            -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false));
​
        InitFragmentStats();
        // create BackendStates and per-instance state, including profiles, and install
        // the latter in the FragmentStats' root profile
        InitBackendStates();
        RETURN_IF_ERROR(StartBackendExec());
        RETURN_IF_ERROR(FinishBackendStartup());
        return Status::OK();
        }

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • AdmissionController
相关产品与服务
数据湖计算 DLC
数据湖计算DLC(Data Lake Compute,DLC)提供了敏捷高效的数据湖分析与计算服务。服务采用无服务器架构(Serverless),开箱即用。使用标准SQL语法即可完成数据处理、多源数据联合计算等数据工作,有效降低用户数据分析服务搭建成本及使用成本,提高企业数据敏捷度。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档