前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Impala Plan Schedule

Impala Plan Schedule

原创
作者头像
jasong
修改2024-09-23 21:18:26
1120
修改2024-09-23 21:18:26
举报
文章被收录于专栏:Impala

一 基础知识

数据库中一个逻辑查询计划生成后, 需要进行ToPhysical Plan 转化为物理的查询计划, 本文主要讲解 Scan算子 是怎么初始化和分发的, 其他算子可类推

Impala 中, 逻辑的查询计划是在Impala Fe中生成, 并携带在Plan Request 中, 交由BE Admission Control 来处理生成物理的查询计划, 分为以下几个步骤

1 FE 生成计划过程中, 首先因为Hash Join 等算子, 产生的Join Probe 和Join Builder 算子, 即一个Plan 可能生成多个Plan Fragment

2 BE 过程中, 因为是MPP 数据, 需要将一个Plan Fragment 按照 Executor(本文中为Backend)来做合理的拆分, 该步骤步骤主要在Impala Scheduler::Schedule CompactScanRangeAssignment 完成, 进行ScanRange和Backend 的绑定工作

3 BE 过程中, 因为是多线程并发的模型, 需要将分发给同一个Backend的 单个算子进行并行化处理, 即FragmentInstance 进行初始化 改步骤主要在 Scheduler::Schedule ComputeFragmentExecParams 来初始化

4 将初始化完成的Instance 二次完成 BackenScheudeState

5 在每个Executor 内部都有一个ControlService KRPC( 来接收Cooridinator 分发的任务ExecQueryFInstancesRequestPB)

....

本文先将这些, 可参考之前文档, 可关注后续文档

二 Common

代码语言:javascript
复制
syntax="proto2";
 
package impala;
​
message UniqueIdPB {  //Query Id
  required fixed64 hi = 1;
  required fixed64 lo = 2;
}
QueryId
FragmentInstanceId
BackendId
SessionId 
RegistrationId
​
message ColumnValuePB {
  optional bool bool_val = 1;
  optional int32 byte_val = 6;
  optional int32 short_val = 7;
  optional int32 int_val = 2;
  optional int64 long_val = 3;
  optional double double_val = 4;
  optional string string_val = 5;
  optional string binary_val = 8;
  optional string timestamp_val = 9;
  optional bytes decimal_val = 10;
  optional int32 date_val = 11;
}
​
message NetworkAddressPB {
  required string ip/host = 1;
  required int32 port = 2;
}
​
PlanNodeId int32
InstanceId int32
FragmentIdx int32

三 Level0 ScanRange

BE

代码语言:javascript
复制
// Specification of an individual data range which is held in its entirety by a storage
// server. Corresponds to TScanRange and should be kept in sync with it.
message ScanRangePB {
  // One of these must be set for every ScanRangePB.
  optional HdfsFileSplitPB hdfs_file_split = 1;
  optional HBaseKeyRangePB hbase_key_range = 2;
  optional KuduTabletDescPB kudu_tablet_desc = 3;
  optional ExternalTableSplitPB external_table_split_desc = 4;
  optional bytes file_metadata = 5;
}
​
​
// A scan range plus the parameters needed to execute that scan.
message ScanRangeParamsPB {
  optional ScanRangePB scan_range = 1;
  optional int32 volume_id = 2 [default = -1];
  optional bool try_hdfs_cache = 3 [default = false];
  optional bool is_remote = 4;
}
         
// List of ScanRangeParamsPB. This is needed so that per_node_scan_ranges in
// PlanFragmentInstanceCtxPB can be a map since protobuf doesn't support repeated map
// values.
message ScanRangesPB {                                                                                                                     
  repeated ScanRangeParamsPB scan_ranges = 1;
}

FE

代码语言:javascript
复制
// Specification of a subsection of a single HDFS file. Corresponds to HdfsFileSpiltPB and
// should be kept in sync with it.
struct THdfsFileSplit {
  // File name (not the full path).  The path is assumed to be relative to the
  // 'location' of the THdfsPartition referenced by partition_id.
  1: required string relative_path
        
  // starting offset
  2: required i64 offset
        
  // length of split
  3: required i64 length
        
  // ID of partition within the THdfsTable associated with this scan node.
  4: required i64 partition_id
        
  // total size of the hdfs file
  5: required i64 file_length
        
  // compression type of the hdfs file
  6: required CatalogObjects.THdfsCompression file_compression
        
  // last modified time of the file
  7: required i64 mtime
        
  // Hash of the partition's path. This must be hashed with a hash algorithm that is
  // consistent across different processes and machines. This is currently using
  // Java's String.hashCode(), which is consistent. For testing purposes, this can use
  // any consistent hash.
  9: required i32 partition_path_hash
        
  // The absolute path of the file, it's used only when data files are outside of
  // the Iceberg table location (IMPALA-11507).
  10: optional string absolute_path
        
  // Whether the HDFS file is stored with erasure coding. 
  11: optional bool is_erasure_coded
}
​
​
struct TScanRange {
  // one of these must be set for every TScanRange
  1: optional THdfsFileSplit hdfs_file_split                  
  //4: optional TExternalTableDesc external_table_desc
  5: optional binary file_metadata
} 
​
​
​
// location information for a single scan range
struct TScanRangeLocation {
  // Index into TQueryExecRequest.host_list.
  1: required i32 host_idx;
            
  // disk volume identifier of a particular scan range at 'server';
  // -1 indicates an unknown volume id; 
  // only set for TScanRange.hdfs_file_split
  2: optional i32 volume_id = -1
            
  // If true, this block is cached on this server.
  3: optional bool is_cached = false
}                                                                                                                                  
 
// A single scan range plus the hosts that serve it
struct TScanRangeLocationList {
  1: required PlanNodes.TScanRange scan_range
  // non-empty list     
  2: list<TScanRangeLocation> locations
}
​
// A specification for scan ranges. Scan ranges can be
// concrete or specs, which are used to generate concrete ranges.
// Each type is stored in a separate list.
struct TScanRangeSpec { 
   1: optional list<TScanRangeLocationList> concrete_ranges
   2: optional list<PlanNodes.TFileSplitGeneratorSpec> split_specs
} 

FE->BE

代码语言:javascript
复制
void TScanRangeToScanRangePB(const TScanRange& tscan_range, ScanRangePB* scan_range_pb) {
  if (tscan_range.__isset.hdfs_file_split) {
    //scan_range_pb->mutable_hdfs_file_split()
    HdfsFileSplitPB* hdfs_file_split = scan_range_pb->mutable_hdfs_file_split();
    hdfs_file_split->set_relative_path(tscan_range.hdfs_file_split.relative_path);
    hdfs_file_split->set_offset(tscan_range.hdfs_file_split.offset);
    hdfs_file_split->set_length(tscan_range.hdfs_file_split.length);
    hdfs_file_split->set_partition_id(tscan_range.hdfs_file_split.partition_id);
    hdfs_file_split->set_file_length(tscan_range.hdfs_file_split.file_length);
    hdfs_file_split->set_file_compression(
        THdfsCompressionToProto(tscan_range.hdfs_file_split.file_compression));
    hdfs_file_split->set_mtime(tscan_range.hdfs_file_split.mtime);
    hdfs_file_split->set_partition_path_hash(
        tscan_range.hdfs_file_split.partition_path_hash);
    hdfs_file_split->set_is_erasure_coded(tscan_range.hdfs_file_split.is_erasure_coded);
    if (tscan_range.hdfs_file_split.__isset.absolute_path) {
      hdfs_file_split->set_absolute_path(
          tscan_range.hdfs_file_split.absolute_path);
    }
    
    if (tscan_range.hdfs_file_split.__isset.delete_delta_files) {
      for (auto & dd_file : tscan_range.hdfs_file_split.delete_delta_files) {
        auto dd_file_pb  = hdfs_file_split->mutable_delete_delta_files()->Add();
        dd_file_pb->set_relative_path(dd_file.relative_path);
        dd_file_pb->set_file_length(dd_file.file_length);
        dd_file_pb->set_mtime(dd_file.mtime);
      }
    }
  }
​
  if (tscan_range.__isset.external_table_desc) {
    ExternalTableSplitPB* external_table_split_pb =
        scan_range_pb->mutable_external_table_split_desc();
    external_table_split_pb->set_sql(tscan_range.external_table_desc.sql);
    external_table_split_pb->set_host(tscan_range.external_table_desc.host);
    external_table_split_pb->set_port(tscan_range.external_table_desc.port);
  }
​
  if (tscan_range.__isset.file_metadata) {
    scan_range_pb->set_file_metadata(tscan_range.file_metadata);
  }
}
​
//By Scheduler::AssignmentCtx::RecordScanRangeAssignment(

四 Level1 FragmentInstanceScheduleState

也是AdminssionControl 传输的单位 FragmentInstanceRequest

代码语言:javascript
复制
// Information about the input fragment instance of a join node.
message JoinBuildInputPB {
  // The join node id that will consume this join build.
  optional int32 join_node_id = 1;
 
  // Fragment instance id of the input fragment instance.
  optional UniqueIdPB input_finstance_id = 2;
}
​
message FragmentInstanceExecParamsPB {              
  // The fragment instance id.               
  optional UniqueIdPB instance_id = 1; 
  //Fragment Instance ID
                                             
  // Ordinal number of the corresponding fragment in the query, i.e. TPlanFragment.idx.
  optional int32 fragment_idx = 2;           
                                                
  // Map from plan node id to a list of scan ranges.
  map<int32, ScanRangesPB> per_node_scan_ranges = 5;                                                                                       
                                             
  // 0-based ordinal number of this particular instance. This is within its fragment, not
  // query-wide, so eg. there will be one instance '0' for each fragment.
  optional int32 per_fragment_instance_idx = 6;
                                    
  // In its role as a data sender, a fragment instance is assigned a "sender id" to
  // uniquely identify it to a receiver. -1 = invalid.
  optional int32 sender_id = 7 [default = -1];
                                             
  // List of input join build finstances for joins in this finstance.//FragmentInstanceExecParamsPB 最多一个JoinBuild
  repeated JoinBuildInputPB join_build_inputs = 8;
                                             
  // If this is a join build fragment, the number of fragment instances that consume the
  // join build. -1 = invalid.               
  optional int32 num_join_build_outputs = 9 [default = -1];
}
​
​
/// Execution parameters for a single fragment instance. Contains both intermediate
/// info needed by the scheduler and info that will be sent back to the coordinator.
///
/// FInstanceScheduleStates are created as children of FragmentScheduleStates (in
/// 'instance_states') and then the calculated execution parameters, 'exec_params', are
/// transferred to the corresponding BackendExecParamsPB in
/// Scheduler::ComputeBackendExecParams().
struct FragmentInstanceScheduleState {
  NetworkAddressPB address;
  /// Contains any info that needs to be sent back to the coordinator. Computed during
  /// Scheduler::ComputeFragmentExecParams() then transferred to the corresponding
  /// BackendExecParamsPB in Scheduler::ComputeBackendExecParams(), after which it
  /// is no longer valid to access.
  FragmentInstanceExecParamsPB exec_params;//1:1 
  
  //SomeMethod For manger FragmentInstanceExecParamasPB
  FInstanceScheduleState(const UniqueIdPB& instance_id, const NetworkAddressPB& host,
      const NetworkAddressPB& krpc_host, int per_fragment_instance_idx,
      const FragmentScheduleState& fragment_state);
​
  /// Adds the ranges in 'scan_ranges' to the scan at 'scan_idx' in 'exec_params'.
  void AddScanRanges(int scan_idx, const std::vector<ScanRangeParamsPB>& scan_ranges); //addScanRangeS For InStance 
  //By ComputeFragmentExecParams
  /* at FragmentScheduleState
   if (!fragment_state->scan_range_assignment.empty()) {
      DCHECK_EQ(fragment_state->scan_range_assignment.size(), 1);
      auto first_entry = fragment_state->scan_range_assignment.begin();
      for (const PerNodeScanRanges::value_type& entry : first_entry->second) {
        instance_state.AddScanRanges(entry.first, entry.second);
      }
    }
  */
}
代码语言:javascript
复制
// Specification of one output destination of a plan fragment
message PlanFragmentDestinationPB {                                                                                                                                 
  // The globally unique fragment instance id.
  optional UniqueIdPB fragment_instance_id = 1;
         
  // ip + port of the KRPC backend service on the destination.
  optional NetworkAddressPB address = 2;
}  
​
// Context to collect information that is shared among all instances of a particular plan
// fragment. Corresponds to a TPlanFragment with the same idx in the
// TExecPlanFragmentInfo.
message PlanFragmentCtxPB {                                                                                                                                         
  // Ordinal number of corresponding fragment in the query.
  optional int32 fragment_idx = 1;
         
  // Output destinations, one per output partition. The partitioning of the output is
  // specified by TPlanFragment.output_sink.output_partition in the corresponding
  // TPlanFragment. The number of output partitions is destinations.size().
  repeated PlanFragmentDestinationPB destinations = 2;
}        
​
​
// Protobuf portion of the execution parameters of a single fragment instance. Every
// fragment instance will also have a corresponding TPlanFragmentInstanceCtx with the same
// fragment_idx.
message PlanFragmentInstanceCtxPB {
  // Ordinal number of corresponding fragment in the query.
  optional int32 fragment_idx = 1;
    
  // Map from plan node id to initial scan ranges for each scan node in
  // TPlanFragment.plan_tree
  map<int32, ScanRangesPB> per_node_scan_ranges = 2;
    
  // List of input join build finstances for joins in this finstance.
  repeated JoinBuildInputPB join_build_inputs = 3;
}
​
// ExecQueryFInstances
message ExecQueryFInstancesRequestPB {
  // This backend's index into Coordinator::backend_states_, needed for subsequent rpcs to
  // the coordinator.
  optional int32 coord_state_idx = 1;
 
  // Sidecar index of the TQueryCtx.
  optional int32 query_ctx_sidecar_idx = 2;                                                                                                                                                          
  // Sidecar index of the TExecPlanFragmentInfo.
  optional int32 plan_fragment_info_sidecar_idx = 3;
 
  // The minimum query-wide memory reservation (in bytes) required for the backend
  // executing the instances in fragment_instance_ctxs. This is the peak minimum
  // reservation that may be required by the concurrently-executing operators at any
  // point in query execution. It may be less than the initial reservation total claims
  // (below) if execution of some operators never overlaps, which allows reuse of
  // reservations.
  optional int64 min_mem_reservation_bytes = 4;
 
  // Total of the initial buffer reservations that we expect to be claimed on this
  // backend for all fragment instances in fragment_instance_ctxs. I.e. the sum over all
  // operators in all fragment instances that execute on this backend. This is used for
  // an optimization in InitialReservation. Measured in bytes.
  optional int64 initial_mem_reservation_total_claims = 5;
 
  // The backend memory limit (in bytes) as set by the admission controller. Used by the
  // query mem tracker to enforce the memory limit.
  optional int64 per_backend_mem_limit = 6;
 
  // General execution parameters for different fragments. Corresponds to 'fragments' in
  // the TExecPlanFragmentInfo sidecar.
  repeated PlanFragmentCtxPB fragment_ctxs = 7;
 
  // Execution parameters for specific fragment instances. Corresponds to
  // 'fragment_instance_ctxs' in the TExecPlanFragmentInfo sidecar.
  repeated PlanFragmentInstanceCtxPB fragment_instance_ctxs = 8;
}
 
message ExecQueryFInstancesResponsePB {
  // Success or failure of the operation.
  optional StatusPB status = 1;
}

五 Level2-1 FragmentScheduleState

1:1 FragmentScheduleState/TPlanFragment

1:n(FragmentInstnace)

代码语言:javascript
复制
/// map from scan node id to a list of scan ranges
typedef std::map<TPlanNodeId, std::vector<ScanRangeParamsPB>> PerNodeScanRanges;
​
/// map from an impalad host address to the per-node assigned scan ranges;
/// records scan range assignment for a single fragment
typedef std::unordered_map<NetworkAddressPB, PerNodeScanRanges>
    FragmentScanRangeAssignment;
​
​
/// Execution parameters shared between fragment instances. This struct is a container for
/// any intermediate data needed for scheduling that will not be sent back to the
/// coordinator as part of the QuerySchedulePB along with a pointer to the corresponding
/// FragmentExecParamsPB in the QuerySchedulePB.
struct FragmentScheduleState {
  /// Only needed as intermediate state during exec parameter computation.
  /// For scheduling, refer to FInstanceExecParamsPB.per_node_scan_ranges
  FragmentScanRangeAssignment scan_range_assignment; //Only For ScanNode Init //FragmentInstanceScheduleState  AddScanRanges By scan_range_assignment
​
  bool is_coord_fragment;
  const TPlanFragment& fragment;
​
  /// Fragments that are inputs to an ExchangeNode of this fragment.
  std::vector<FragmentIdx> exchange_input_fragments;
​
  /// Instances of this fragment. Instances on a backend are clustered together - i.e. all
  /// instances for a given backend will be consecutive entries in the vector. These have
  /// their protobuf params Swap()-ed to the BackendExecParamsPB during
  /// Scheduler::ComputeBackendExecParams() and are no longer valid after that.
  std::vector<FInstanceScheduleState> instance_states;
​
  /// Pointer to the corresponding FragmentExecParamsPB in the parent ScheduleState's
  /// 'query_schedule_pb_'
  FragmentExecParamsPB* exec_params;//Init 
​
  FragmentScheduleState(const TPlanFragment& fragment, FragmentExecParamsPB* exec_params);
  
  //Init By void ScheduleState::Init() {
  /*
  const TPlanFragment& root_fragment = request_.plan_exec_info[0].fragments[0];
  if (RequiresCoordinatorFragment()) {
    fragment_schedule_states_[root_fragment.idx].is_coord_fragment = true;
    // the coordinator instance gets index 0, generated instance ids start at 1
    next_instance_id_ = CreateInstanceId(next_instance_id_, 1);
  }
  */
};

六 Level2-2 BackendScheduleState

代码语言:javascript
复制
// Execution parameters for a single backend. Used to construct the
// Coordinator::BackendStates.
message BackendExecParamsPB {
  // The id of this backend.
  optional UniqueIdPB backend_id = 1;                                                                                                     
  
  // The hostname + port of the KRPC backend service on this backend.
  optional NetworkAddressPB address = 8;
            
  // The IP address + port of the KRPC backend service on this backend.
  optional NetworkAddressPB krpc_address = 9;
            
  // The fragment instance params assigned to this backend. All instances of a
  // particular fragment are contiguous in this list. This can be empty only for the
  // coordinator backend, that is, if 'is_coord_backend' is true.
  repeated FInstanceExecParamsPB instance_params = 2;
            
  // The minimum query-wide buffer reservation size (in bytes) required for this backend.
  // This is the peak minimum reservation that may be required by the
  // concurrently-executing operators at any point in query execution. It may be less
  // than the initial reservation total claims (below) if execution of some operators
  // never overlaps, which allows reuse of reservations.
  optional int64 min_mem_reservation_bytes = 3;
            
  // Total of the initial buffer reservations that we expect to be claimed on this
  // backend for all fragment instances in instance_params. I.e. the sum over all
  // operators in all fragment instances that execute on this backend. This is used for
  // an optimization in InitialReservation. Measured in bytes.
  optional int64 initial_mem_reservation_total_claims = 4;
            
  // Total thread reservation for fragment instances scheduled on this backend. This is
  // the peak number of required threads that may be required by the
  // concurrently-executing fragment instances at any point in query execution.
  optional int64 thread_reservation = 5;
            
  // Number of slots that this query should count for in admission control.
  // This is calculated as the maximum # of instances of any fragment on this backend.
  // I.e. 1 if mt_dop is not used and at most the mt_dop value if mt_dop is specified
  // (but less if the query is not actually running with mt_dop instances on this node).
  optional int32 slots_to_use = 6;
            
  // Indicates whether this backend is the coordinator.
  optional bool is_coord_backend = 7;
}
​
/// Execution parameters for a single backend. This gets created for every backend that
/// participates in query execution, which includes every backend that has fragments
/// scheduled on it and the coordinator backend.
///
/// Created by ScheduleState::GetOrCreateBackendScheduleState() and initialized in
/// Scheduler::ComputeBackendExecParams(). Used as an input to the
/// AdmissionController and Coordinator::BackendState.
struct BackendScheduleState {
  BackendDescriptorPB be_desc;
​
  /// Pointer to the corresponding protobuf struct containing any parameters for this
  /// backend that will need to be sent back to the coordinator. Owned by
  /// ScheduleState::query_schedule_pb_.
  BackendExecParamsPB* exec_params; //BackendExecParamsPB
​
  explicit BackendScheduleState(BackendExecParamsPB* exec_params)
    : exec_params(exec_params) {}
};

七 Level3 ScheduleState

1:1 Query/Scheduler/Coordinator

1:n Fragment /Backend

Scheduler Setup3 Scheduler::ComputeBackendExecParams

代码语言:javascript
复制
​
// Contains the output from scheduling and admission control that is used by the
// coordinator to start query execution.
message QuerySchedulePB {
  optional UniqueIdPB query_id = 1;
 
  // The per-fragment execution parameters for this schedule.
  repeated FragmentExecParamsPB fragment_exec_params = 2;
 
  // The per-backend execution parameters for this schedule.
  repeated BackendExecParamsPB backend_exec_params = 3; //Backend/Executor
 
  // Total number of scan ranges of this query.
  optional int64 num_scan_ranges = 4;
 
  // The memory limit per executor that will be imposed on the query.
  // Set by the admission controller with a value that is only valid if it was admitted
  // successfully. -1 means no limit.
  optional int64 per_backend_mem_limit = 5;
 
  // The per executor memory used for admission accounting.
  // Set by the admission controller with a value that is only valid if it was admitted
  // successfully. Can be zero if the query is only scheduled to run on the coordinator.
  optional int64 per_backend_mem_to_admit = 6;
 
  // The memory limit for the coordinator that will be imposed on the query. Used only if
  // the query has a coordinator fragment.
  // Set by the admission controller with a value that is only valid if it was admitted
  // successfully. -1 means no limit.
  optional int64 coord_backend_mem_limit = 7;
 
  // The coordinator memory used for admission accounting.
  // Set by the admission controller with a value that is only valid if it was admitted                                                                             
  // successfully.
  optional int64 coord_backend_mem_to_admit = 8;
}
​
/*
void ScheduleState::Init() {
  *query_schedule_pb_->mutable_query_id() = query_id_;
  // extract TPlanFragments and order by fragment idx
  for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) {
    for (const TPlanFragment& fragment: plan_exec_info.fragments) {
      fragments_.emplace(fragment.idx, fragment);
    }
  }
​
  // this must only be called once
  DCHECK_EQ(fragment_schedule_states_.size(), 0);
  for (int i = 0; i < fragments_.size(); ++i) {
    auto it = fragments_.find(i);
    DCHECK(it != fragments_.end());
    fragment_schedule_states_.emplace_back(
        it->second, query_schedule_pb_->add_fragment_exec_params());
        //1 add_fragment_exec_params
        //2 add fragment_schedule_states_
  }*/
​
​
/// Map from an impalad backend address to the state for that backend.
typedef std::unordered_map<NetworkAddressPB, BackendScheduleState>
    PerBackendScheduleStates;
​
​
/// ScheduleState is a container class for scheduling data used by Scheduler and
/// AdmissionController, which perform the scheduling logic itself, and it is only
/// intended to be accessed by them. The information needed for the coordinator to begin
/// execution is stored in 'query_schedule_pb_', which is returned from the
/// AdmissionController on successful admission. Everything else is intermediate data
/// needed to calculate the schedule but is discarded after a scheduling decision is made.
///
/// The general usage pattern is:
/// - FragmentScheduleStates are created for each fragment in the plan. They are given
///   pointers to corresponding FragmentExecParamsPBs created in the QuerySchedulePB.
/// - FInstanceScheduleStates are created as children of the FragmentScheduleStates for
///   each finstance and assigned to hosts. The FInstanceScheduleStates each have a
///   corresponding FInstanceExecParamsPB that they initially own.
/// - The scheduler computes the BackendScheduleState for each backend that was assigned a
///   fragment instance (and the coordinator backend). They are given pointers to
///   corresponding BackendExecParamsPBs created in the QuerySchedulePB and the
///   FInstanceExecParamsPB are Swap()-ed into them.
/// - The ScheduleState is passed to the admission controller, which keeps updating the
///   memory requirements by calling UpdateMemoryRequirements() every time it tries to
///   admit the query and sets the final values once the query gets admitted successfully.
/// - On successful admission, the QuerySchedulePB is returned to the coordinator and
///   everything else is discarded.
class ScheduleState {
 public:
  /// For testing only: specify 'is_test=true' to build a ScheduleState object without
  /// running Init() and to seed the random number generator for deterministic results.
  ScheduleState(const UniqueIdPB& query_id, const TQueryExecRequest& request,
      const TQueryOptions& query_options, RuntimeProfile* summary_profile, bool is_test);
​
 private:
  /// These references are valid for the lifetime of this query schedule because they
  /// are all owned by the enclosing QueryExecState.
  const UniqueIdPB& query_id_;
  const TQueryExecRequest& request_;
​
  /// The query options from the TClientRequest
  const TQueryOptions& query_options_;
​
  /// Contains the results of scheduling that will be sent back to the coordinator.
  /// Ownership is transferred to the coordinator after scheduling has completed.
  std::unique_ptr<QuerySchedulePB> query_schedule_pb_;
​
  /// TODO: move these into QueryState
  RuntimeProfile* summary_profile_;
​
  /// Maps from plan node id to its fragment idx. Filled in c'tor.
  std::vector<int32_t> plan_node_to_fragment_idx_;
​
  /// Maps from plan node id to its index in plan.nodes. Filled in c'tor.
  std::vector<int32_t> plan_node_to_plan_node_idx_;
​
  /// Populated in Init(), then calculated in Scheduler::ComputeFragmentExecParams().
  /// Indexed by fragment idx (TPlanFragment.idx).
  std::vector<FragmentScheduleState> fragment_schedule_states_;  //For ScheduleState Init  
​
  /// Map from backend address to corresponding BackendScheduleState. Created in
  /// GetOrCreateBackendScheduleState().
  PerBackendScheduleStates per_backend_schedule_states_;  //For ComputeFragmentExecParams
  
  
​
  /// Used to generate consecutive fragment instance ids.
  UniqueIdPB next_instance_id_;
​
  /// The largest min memory reservation across all executors. Set in
  /// Scheduler::Schedule().
  int64_t largest_min_reservation_ = 0;
​
  /// The coordinator's backend memory reservation. Set in Scheduler::Schedule().
  int64_t coord_min_reservation_ = 0;
​
  /// The name of the executor group that this schedule was computed for. Set by the
  /// Scheduler and only valid after scheduling completes successfully.
  std::string executor_group_;
​
  /// Random number generated used for any randomized decisions during scheduling.
  std::mt19937 rng_;
​
  /// Map from fragment idx to references into the 'request_'.
  std::unordered_map<int32_t, const TPlanFragment&> fragments_;
​
  /// Populate fragments_ and fragment_schedule_states_ from request_.plan_exec_info.
  /// Sets is_coord_fragment and exchange_input_fragments.
  /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
  void Init();
​
  /// Returns true if a coordinator fragment is required based on the query stmt type.
  bool RequiresCoordinatorFragment() const {
    return request_.stmt_type == TStmtType::QUERY;
  }
};
​
​
​
//1 add backend_exec
//1 Create Backend Scheudle Statue
BackendScheduleState& ScheduleState::GetOrCreateBackendScheduleState(
    const NetworkAddressPB& address) {
  auto it = per_backend_schedule_states_.find(address);
  if (it == per_backend_schedule_states_.end()) {
​
    
    //query_schedule_pb_ add BackendExecParamsPB
    //per_backend_schedule_states_ BackendSchedulerStates 
    //** query_schedule_pb_ also add backend_exec_param  for Init Backend States
    BackendExecParamsPB* be_params = query_schedule_pb_->add_backend_exec_params();
    it = per_backend_schedule_states_.emplace(address, BackendScheduleState(be_params))
             .first;
  }
  return it->second;
}
​
​
void Scheduler::ComputeBackendExecParams(
    const ExecutorConfig& executor_config, ScheduleState* state) {
  for (FragmentScheduleState& f : state->fragment_schedule_states()) {
    const NetworkAddressPB* prev_host = nullptr;
    int num_hosts = 0;
    for (FInstanceScheduleState& i : f.instance_states) {
      //Create Scheduler State For Backend
      BackendScheduleState& be_state = state->GetOrCreateBackendScheduleState(i.host);

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一 基础知识
  • 二 Common
  • 三 Level0 ScanRange
  • 四 Level1 FragmentInstanceScheduleState
  • 五 Level2-1 FragmentScheduleState
  • 六 Level2-2 BackendScheduleState
  • 七 Level3 ScheduleState
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档