前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Impala HDFS_SCAN_NODE之IO threads模型

Impala HDFS_SCAN_NODE之IO threads模型

作者头像
skyyws
发布2022-05-20 08:36:14
5740
发布2022-05-20 08:36:14
举报
文章被收录于专栏:skyyws的技术专栏

本文主要从代码出发,跟大家一起分享下Impala HDFS_SCAN_NODE中的IO threads模型。首先,在Impala中,有几个io threads相关的配置,通过对这几个参数进行配置,我们就可以增加处理io的线程数,相关的几个配置如下所示:

1
1

以我们最常见的hdfs存储引擎为例,如果impalad节点与datanode节点在一台机器上,对于impala来说,就是可以通过本地的disk直接读取数据;如果impalad节点与datanode在不同的机器上,那么就是remote的读取。在我们内部的生产环境,大部分都是这样的情况:有一个公共的HDFS集群,业务所有的离线数据都存储在上面,我们需要单独部署一个Impala集群,对于HDFS集群上的某些数据进行Ad-hoc类的多维分析,此时impala就是通过remote来读取hdfs的数据,那么将num_remote_hdfs_io_threads配置项调整的大一些,就可以适当地加快hdfs scan的速度。 在正式开启介绍之前,我们需要知道Impala的scan node模型分为两层:1)IO threads,这层主要就是通过IO读取远端的hdfs数据,并且返回,通过配置num_remote_hdfs_io_threads参数,就可以调整读取的线程数,值得一提的是,一些谓词可以下推到远端的hdfs,减少扫描返回的数据量;2)Scanner,当数据从远端的HDFS返回之后,会由专门的scanner线程进行处理,可能的操作包括:数据解码、cast计算等。本文我们主要讲的就是第一层IO threads,其他更多的介绍可以参考:Why Impala Scan Node is very slow中Tim Armstrong的回答,这篇CSDN的博客也有介绍:Impala高性能探秘之HDFS数据访问。 下面,我们就结合代码来简单看下这个参数是如何起作用的。在Impala的BE代码中,有一个类专门用来管理IO相关的操作,用于访问本地磁盘或者远端的文件系统,即DiskIoMgr。在这个类中,有一个disk_queues_成员,这是一个集合,每个成员都代表一个disk对应的队列,或者是一种远端文件系统,例如HDFS/S3等,如下所示:

代码语言:javascript
复制
// disk-io-mrg.h
  /// Per disk queues. This is static and created once at Init() time.  One queue is
  /// allocated for each local disk on the system and for each remote filesystem type.
  /// It is indexed by disk id.
  std::vector<DiskQueue*> disk_queues_;

首先会在构造函数中,对这个变量进行resize操作,如下所示:

代码语言:javascript
复制
// disk-io-mrg.cc
disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS);

这里的num_local_disks指的就是本地磁盘的个数,而REMOTE_NUM_DISKS就是一个enum变量,用来控制远端访问的偏移:

代码语言:javascript
复制
// disk-io-mrg.h
  /// "Disk" queue offsets for remote accesses.  Offset 0 corresponds to
  /// disk ID (i.e. disk_queue_ index) of num_local_disks().
  enum {
    REMOTE_DFS_DISK_OFFSET = 0,
    REMOTE_S3_DISK_OFFSET,
    REMOTE_ADLS_DISK_OFFSET,
    REMOTE_ABFS_DISK_OFFSET,
    REMOTE_OZONE_DISK_OFFSET,
    REMOTE_NUM_DISKS
  };

所以,impala将每一种远端的文件系统访问,也当成了一个disk,按照上述的enum顺序,放到disk_queues_中,作为一个成员变量。接着在Init函数中,会循环对这个disk_queues_变量进行初始化:

代码语言:javascript
复制
// disk-io-mrg.cc
  for (int i = 0; i < disk_queues_.size(); ++i) {
    disk_queues_[i] = new DiskQueue(i);
    int num_threads_per_disk;
    string device_name;
    if (i == RemoteDfsDiskId()) {
      num_threads_per_disk = FLAGS_num_remote_hdfs_io_threads;
      device_name = "HDFS remote";

在整个for循环中,会根据id来判断是需要对哪一个队列进行操作,这里以HDFS为例,id就是本地磁盘的数量+HDFS在enum中的offset:

代码语言:javascript
复制
// disk-io-mrg.cc
  /// The disk ID (and therefore disk_queues_ index) used for DFS accesses.
  int RemoteDfsDiskId() const { return num_local_disks() + REMOTE_DFS_DISK_OFFSET; }

如果是要访问远端的HDFS,那么对应的线程数量,即num_threads_per_disk,就是我们通过配置文件指定的num_remote_hdfs_io_threads的值,默认是8。表示会启动8个线程用于处理远端的HDFS访问操作。接着,impala就会循环创建对应数量的线程:

代码语言:javascript
复制
// disk-io-mrg.cc
    for (int j = 0; j < num_threads_per_disk; ++j) {
      stringstream ss;
      ss << "work-loop(Disk: " << device_name << ", Thread: " << j << ")";
      std::unique_ptr<Thread> t;
      RETURN_IF_ERROR(Thread::Create("disk-io-mgr", ss.str(), &DiskQueue::DiskThreadLoop,
          disk_queues_[i], this, &t));
      disk_thread_group_.AddThread(move(t));
    }

在进行线程创建的时候,将函数DiskQueue::DiskThreadLoop绑定到了该线程上,该函数就是通过一个while循环来不断的进行处理,相关的函数调用如下所示:

代码语言:javascript
复制
DiskThreadLoop(disk-io-mrg.cc)
-GetNextRequestRange(disk-io-mrg.cc)
--GetNextRequestRange(request-context.cc)
-DoRead(scan-range.cc)/Write(disk-io-mgr.cc)

GetNextRequestRange函数就是用来获取当前这个DiskQueue(例如远端的HDFS访问queue)的下一个RequestRange,来进行具体的io操作。RequestRange代表一个文件中的连续字节序列,主要分为:ScanRange和WriteRange。每个disk线程一次只能处理一个RequestRange。这里impala采用了一个两层的设计,在GetNextRequestRange中,首先会需要获取一个RequestContext对象,RequestContext可以理解为一个查询的某个instance下的所有IO请求集合,可以简单理解为某个表的RequestRange集合都被封装在一个RequestContext对象中。获取RequestContext的代码如下所示:

代码语言:javascript
复制
*request_context = request_contexts_.front();
request_contexts_.pop_front();
DCHECK(*request_context != nullptr);

request_contexts_是一个RequestContext类型的list,每一个DiskQueue都包含了这样一个队列,表示该DiskQueue上的所有的待处理的RequestContext列表。这里我们可以简单的理解为每个表的扫描请求,都在这个队列中等待处理。首先会从队列的头部取出一个RequestContext,然后将该对象弹出。该DiskQueue的其他线程就可以继续处理后续的RequestContext对象,这样就不会因为当前的RequestContext对象处理时间过长,而阻塞了其他的RequestContext对象处理。 关于request_contexts_队列成员更新,不是本文介绍的重点,只要知道:当提交查询的时候,impalad会自动进行解析,然后进行封装,最后添加到该队列中即可。 在获取到RequestContext对象之后,我们就可以通过该RequestContext的GetNextRequestRange方法获取具体的RequestRange对象进行实际的扫描操作了。 上面的描述可能不太容易理解,我们将上述的各个成员之间的包含关系以及操作流程进行了整理成了一张图,如下所示:

2
2

最终获取到了一个RequestRange之后,会进行判断,是READ还是WRITE,进行相应地处理。这里我们以READ为例,相关函数调用如下所示:

代码语言:javascript
复制
DiskThreadLoop(disk-io-mrg.cc)
-GetNextRequestRange(disk-io-mrg.cc)
--GetNextRequestRange(request-context.cc)
-DoRead(scan-range.cc)
-ReadDone(request-context.cc)

从上面的相关代码,我们可以知道,如果我们将num_remote_hdfs_io_threads参数配置的更大一些,那么就会有更多的线程并发的通过DiskThreadLoop获取到RequestRange进行处理,从而可以在一定程度上提到SCAN的速度,进而加快整个查询进程。 在Impala的profile中,对于HDFS的IO theads的指标,即AverageHdfsReadThreadConcurrency,相关介绍如下所示:

3
3

可以简单理解为该HDFS_SCAN_NODE有多少个IO线程用于处于读写请求操作。所以说,如果线上查询的这个指标很小,那么就要考虑适当调整num_remote_hdfs_io_threads这个参数了。与这个指标很相似的是AverageScannerThreadConcurrency,这个表示scanner线程的执行数量,与我前面提到的scan node两层模型中的scanner对应,这个之后再详细介绍。除此之外,还有其他的一些指标,例如ScannerIoWaitTime,表示scanner等到IO线程的数据就绪的时间,如果这个时间很长,那么说明IO线程存在瓶颈。还有很多指标,就不再一一展开描述。我们在线上排查慢查询的时候,这些指标都是非常有用的信息。 上面提到了profile中的指标信息。另外,在impala服务启动之后,我们也可以通过web页面上的/threadz页面查看“disk-io-mgr”这个组下面的线程信息,就可以看到用于处理远端HDFS读取的线程:

3
3

上面的User/Kernel CPU和IO-wait的时间,都是直接从机器上读取的:

代码语言:javascript
复制
// os-util.h
/// Populates ThreadStats object for a given thread by reading from
/// /proc/<pid>/task/<tid>/stats. Returns OK unless the file cannot be read or is in an
/// unrecognised format, or if the kernel version is not modern enough.
Status GetThreadStats(int64_t tid, ThreadStats* stats);

对于每个disk queue,impala还绑定了对应的metric信息,如下所示:

4
4

这些metric代表的就是读取延时和大小的统计直方图信息。 到这里,关于HDFS_SCAN_NODE的IO threads就介绍的差不多了,我们通过代码分析,知道了Impala对于disk以及各种远端dfs的处理,这些都是属于IO threads部分,后续有时间再跟大家一起学习scanner模块的相关知识。本文涉及到的代码分析模块,都是笔者自己根据源码分析解读出来,如有错误,欢迎指正。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-03-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档