Impala原生提供了每个SQL执行过程中的profile信息,profile里面有很多的参数可以供我们参考,来排查SQL执行过程中遇到的各种问题。由于目前官方没有对这些参数进行一一解释,因此本文旨在通过阅读代码的方式,来介绍一些在实际使用过程中碰到的参数,希望对大家有所帮助。首先要介绍的是如下所示的几个参数:
PerReadThreadRawHdfsThroughput
TotalReadThroughput
RowsReturnedRate
光从名字,一般也能够具体猜到各个参数对应的含义,但是对于有些比较相近的还是会觉得模棱两可。这里我们将结合代码和线上实际的例子进行解释。以下是从线上获取的SQL的某个SCAN HDFS阶段的profile,我们可以看到有非常多的参数:
HDFS_SCAN_NODE (id=0):(Total: 859.326ms, non-child: 859.326ms, % non-child: 100.00%)
Hdfs split stats (<volume id>:<# splits>/<split lengths>): 7:2/109.50 MB 4:1/256.00 MB 6:3/321.95 MB 8:2/133.57 MB 13:3/145.23 MB 17:4/134.13 MB 11:3/73.06 MB 9:3/82.40 MB 12:4/40.55 MB 14:6/419.93 MB 3:4/46.20 MB 5:4/57.32 MB 16:4/297.92 MB 2:4/126.17 MB 19:3/87.91 MB 15:1/1.85 MB 10:2/819.10 KB 1:2/144.55 MB 18:3/62.38 MB
ExecOption: PARQUET Codegen Enabled, Codegen enabled: 58 out of 58
Runtime filters: Not all filters arrived (arrived: [], missing [0, 2]), waited for 802ms
Hdfs Read Thread Concurrency Bucket: 0:93.33% 1:0% 2:0% 3:0% 4:6.667% 5:0% 6:0% 7:0% 8:0% 9:0% 10:0% 11:0% 12:0% 13:0% 14:0% 15:0% 16:0% 17:0% 18:0% 19:0% 20:0% 21:0% 22:0% 23:0% 24:0%
File Formats: PARQUET/NONE:58
BytesRead(500.000ms): 0, 0, 262.69 MB, 509.71 MB, 509.71 MB, 509.71 MB, 509.71 MB, 509.71 MB, 509.71 MB, 509.71 MB, 509.71 MB, 509.71 MB, 509.71 MB, 509.71 MB, 509.71 MB, 509.71 MB
- FooterProcessingTime: (Avg: 632.093us ; Min: 159.464us ; Max: 8.987ms ; Number of samples: 58)
- AverageHdfsReadThreadConcurrency: 0.27
- AverageScannerThreadConcurrency: 1.53
- BytesRead: 509.71 MB (534465620)
- BytesReadDataNodeCache: 0
- BytesReadLocal: 499.15 MB (523393997)
- BytesReadRemoteUnexpected: 0
- BytesReadShortCircuit: 499.15 MB (523393997)
- CachedFileHandlesHitCount: 298 (298)
- CachedFileHandlesMissCount: 4 (4)
- CollectionItemsRead: 0 (0)
- DecompressionTime: 0.000ns
- MaxCompressedTextFileLength: 0
- NumColumns: 6 (6)
- NumDictFilteredRowGroups: 42 (42)
- NumDisksAccessed: 20 (20)
- NumRowGroups: 64 (64)
- NumScannerThreadsStarted: 11 (11)
- NumScannersWithNoReads: 1 (1)
- NumStatsFilteredRowGroups: 0 (0)
- PeakMemoryUsage: 565.39 MB (592850582)
- PerReadThreadRawHdfsThroughput: 1.29 GB/sec
- RemoteScanRanges: 9 (9)
- RowBatchQueueGetWaitTime: 50.119ms
- RowBatchQueuePutWaitTime: 5s774ms
- RowsRead: 6.81M (6809291)
- RowsReturned: 887.09K (887085)
- RowsReturnedRate: 1.03 M/sec
- ScanRangesComplete: 58 (58)
- ScannerThreadsInvoluntaryContextSwitches: 12 (12)
- ScannerThreadsTotalWallClockTime: 8s369ms
- MaterializeTupleTime(*): 1s271ms
- ScannerThreadsSysTime: 340.000ms
- ScannerThreadsUserTime: 1s468ms
- ScannerThreadsVoluntaryContextSwitches: 2.53K (2532)
- TotalRawHdfsOpenFileTime(*): 11.923ms
- TotalRawHdfsReadTime(*): 386.655ms
- TotalReadThroughput: 63.71 MB/sec
下面我们就根据代码和上面的例子,来看一下这几个参数的计算方法和含义。首先,图中的第一个参数为BytesRead,这个从字面意思理解起来也很简单,就是这个ScanNode读取到的字节数,单位是BYTES,相关的代码如下所示:
// 这里的代码是为了方便阅读,才放到一起,实际在项目中,存在于多个文件,后面不再赘述
const string ScanNode::BYTES_READ_COUNTER = "BytesRead";
RuntimeProfile::Counter* bytes_read_counter_; // # bytes read from the scanner
bytes_read_counter_ =
ADD_COUNTER(runtime_profile(), BYTES_READ_COUNTER, TUnit::BYTES);
从代码我们可以看到,就是一个简单的Counter,记录了该扫描节点扫描到的总字节数,对于示例中的值为509.71MB。请注意,如果读取的是KUDU表的话,那么这个BytesRead为0。介绍完BytesRead,我们可以开始介绍PerReadThreadRawHdfsThroughput和TotalReadThroughput这两个参数了,相关代码如下所示:
// PerReadThreadRawHdfsThroughput参数相关代码
RuntimeProfile::Counter* read_timer_; // total read time
const string ScanNode::TOTAL_HDFS_READ_TIMER = "TotalRawHdfsReadTime(*)";
read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_READ_TIMER);
const string ScanNode::PER_READ_THREAD_THROUGHPUT_COUNTER =
"PerReadThreadRawHdfsThroughput";
per_read_thread_throughput_counter_ = runtime_profile()->AddDerivedCounter(
PER_READ_THREAD_THROUGHPUT_COUNTER, TUnit::BYTES_PER_SECOND,
bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_read_counter_, read_timer_));
int64_t RuntimeProfile::UnitsPerSecond(
const RuntimeProfile::Counter* total_counter,
const RuntimeProfile::Counter* timer) {
DCHECK(total_counter->unit() == TUnit::BYTES ||
total_counter->unit() == TUnit::UNIT);
DCHECK(timer->unit() == TUnit::TIME_NS);
if (timer->value() == 0) return 0;
double secs = static_cast<double>(timer->value()) / 1000.0 / 1000.0 / 1000.0;
return total_counter->value() / secs;
}
其中涉及到了TotalRawHdfsReadTime(*),该参数对应的是一个时间类型的Counter,记录了HDFS read所耗费的时间,在图中对应的值为386.655ms,该参数值只统计实际读HDFS过程中耗费的时间,如果在读取文件的过程中,碰到错误的文件handle,进行了重试等操作,其中所耗费的时间不会进行记录。具体的代码实现可以参考ScanRange.cc文件中的Read方法。还有一个TotalRawHdfsOpenFileTime(*)参数,则表示获取文件handle所耗费的时间。PerReadThreadRawHdfsThroughput是一个DerivedCounter,这里使用了一个boost库的bind函数,其实就是将bytes_read_counter_和read_timer_作为参数,传递给将UnitsPerSecond这个函数进行计算。因此,最终的计算结果为:BytesRead/TotalRawHdfsReadTime(*)=509.71MB/386.655ms=1.287 GB/sec,与图中的参数值相同。综上所述,我们可以将PerReadThreadRawHdfsThroughput理解为该ScanNode上的平均吞吐量。下面来看一下TotalReadThroughput参数的含义,可以与PerReadThreadRawHdfsThroughput对比一下,看有何不同:
// TotalReadThroughput参数相关代码
const string ScanNode::TOTAL_THROUGHPUT_COUNTER = "TotalReadThroughput";
total_throughput_counter_ = runtime_profile()->AddRateCounter(
TOTAL_THROUGHPUT_COUNTER, bytes_read_counter_);
DEFINE_int32(periodic_counter_update_period_ms, 500, "Period to update rate counters and"
" sampling counters in ms");
void PeriodicCounterUpdater::UpdateLoop() {
while (true) {
system_time before_time = get_system_time();
SleepForMs(FLAGS_periodic_counter_update_period_ms);
posix_time::time_duration elapsed = get_system_time() - before_time;
int elapsed_ms = elapsed.total_milliseconds();
{
lock_guard<SpinLock> ratelock(instance_->rate_lock_);
for (RateCounterMap::iterator it = rate_counters_.begin();
it != rate_counters_.end(); ++it) {
it->second.elapsed_ms += elapsed_ms;
int64_t value;
if (it->second.src_counter != NULL) {
value = it->second.src_counter->value();
} else {
DCHECK(it->second.sample_fn != NULL);
value = it->second.sample_fn();
}
int64_t rate = value * 1000 / (it->second.elapsed_ms);
it->first->Set(rate);
}
}
//省略余下部分代码
}
void ScanNode::Close(RuntimeState* state) {
if (is_closed()) return;
// Close filter
for (auto& filter_ctx : filter_ctxs_) {
if (filter_ctx.expr_eval != nullptr) filter_ctx.expr_eval->Close(state);
}
ScalarExpr::Close(filter_exprs_);
// ScanNode::Prepare() started periodic counters including 'total_throughput_counter_'
// and 'bytes_read_timeseries_counter_'. Subclasses may also have started counters.
runtime_profile_->StopPeriodicCounters();
ExecNode::Close(state);
}
TotalReadThroughput是一个RateCounter,是会间隔一定的时间计算一次的参数,这里的时间间隔为periodic_counter_update_period_ms参数配置的,默认是500ms。TotalReadThroughput就是每隔500ms来计算当前Scan节点上的吞吐,每次计算之前都会将时间累加,所以计算方法为BytesRead/(500ms*n)。当ScanNode关闭的时候,这些RateCounter也会被关闭掉。需要注意的是,impala启动一个线程来实现对RateCounter的定时计算(除了RateCounter之外,还有一些其他的Counter,这里就不再展开),如下所示:
/// Thread performing asynchronous updates.
boost::scoped_ptr<boost::thread> update_thread_;
instance_->update_thread_.reset(
new thread(&PeriodicCounterUpdater::UpdateLoop, instance_));
下面来介绍一下RowsReturnedRate这个参数,相关代码如下所示:
RuntimeProfile::Counter* rows_returned_counter_;
rows_returned_counter_ = ADD_COUNTER(runtime_profile_, "RowsReturned", TUnit::UNIT);
const string RuntimeProfile::TOTAL_TIME_COUNTER_NAME = "TotalTime";
Counter* total_time_counter() { return counter_map_[TOTAL_TIME_COUNTER_NAME]; }
const string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsReturnedRate";
RuntimeProfile::Counter* rows_returned_rate_;
rows_returned_rate_ = runtime_profile()->AddDerivedCounter(
ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
bind<int64_t>(&RuntimeProfile::UnitsPerSecond, rows_returned_counter_,
runtime_profile()->total_time_counter()));
其中,RowsReturned表示扫描返回的记录数,对应例子中的887085,我们可以看到还有一个RowsRead为6809291,那这两个参数有什么区别呢?很简单,RowsRead表示本次扫描的总记录数,而RowsReturned表示实际返回的记录数,也就是说,RowsReturned是RowsRead过滤之后的结果。sql中的where过滤条件会被impala下推到scan的节点,当然如果是对于分区列的过滤,那么在扫描的时候会直接跳过不符合条件的分区,这些跳过的分区也不会算在RowsRead里面。这里还涉及到一个TotalTime,表示该ScanNode阶段所耗费的总时间,从例子中,我们可以获取到是859.326ms,包括了上面介绍的读HDFS耗费的时间,获取文件handle的时间,还有其他的一些在ScanNode阶段耗费的时间总和。而RowsReturnedRate的计算方法为:RowsReturned/TotalTime=887085/859.326ms=1032 K/sec,与例子中的数值吻合,表示该Scan节点上,本次扫描平局每秒返回的记录数,这也是一个比较常用的指标。
本文主要介绍了在Scan阶段的几个比较常见的参数指标,其他还有许多参数笔者目前也不是很清楚,后续如果有时间,也会继续更新,希望能对大家有所帮助。文中的参数含义,都是笔者基于代码的理解,如果有误,敬请指出。