Hbase源码系列之scan源码解析及调优

一,hbase的scan基本使用问题介绍

Hbase的Scan方法是基于Rowkey进行数据扫描的,过程中client会将我们的请求,转化为向服务端的RPC请求。那么这个时候我们可以考虑的优化,那么主要有一下三点:

A,减少带宽(通过过滤器减少无用数据的 传输);

B,减少RPC请求的次数;

C,加缓存。

具体的转化为scan相关的操作如下:

1,scan可以设置过滤器

过滤器可以减少数据网络传输的数据量。

过滤器可以用来扫描ROWkey不连续的数据。

2,scan可以设置每批次的扫描行数

Scan.setCaching(20);设置一个批次应该请求几行数据。几个Result。

3,scan可以设置每批次扫描的列数。

Scan.setBatch(1);设置每一行请求几列的数据。一个Result几个cell。

通过2、3可以减少RPC请求的次数,这样可以提升扫描性能,但是也会带来GC的风险。

重要的计算公式:

Rpc次数=(行数×每行的列数)/Min(每行的列数,批量大小)/扫描器缓存

合理设置2,3可以降低RPC请求次数,提升性能。

4,对于一次扫描,频繁使用的数据呢可以设置缓存。

Scan.setCacheBlocks(false);不建议使用。

5,scan占用内存

Scan的并发数*cache数*单个Result大小。合理使用2,3.

往往,为了不产生热点问题,会将Rowkey离散化,然后造成扫描不连续,这个时候每次扫描就会转化为n多个scan并发进行。此时就要考虑内存压力,防止GC,导致不正常的程序退出。

二,源码相关的重要类

1,HTable

重点是Hbase类初始化的时候提交的请求的任务的线程池的初始化

HConnectionImplementation的getThreadPool方法。

getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
 conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);

具体内容如下:

private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint,
 BlockingQueue<Runnable> passedWorkQueue) {
 // shared HTable thread executor not yet initialized
 if (maxThreads == 0) {
    maxThreads = Runtime.getRuntime().availableProcessors() * 8;
 }
 if (coreThreads == 0) {
    coreThreads = Runtime.getRuntime().availableProcessors() * 8;
 }
 long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
 //设置阻塞任务队列
 BlockingQueue<Runnable> workQueue = passedWorkQueue;
  if (workQueue == null) {
    workQueue =
 new LinkedBlockingQueue<Runnable>(maxThreads *
 conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
 HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
 }
  ThreadPoolExecutor tpe = new ThreadPoolExecutor(
      coreThreads, //核心线程数
 maxThreads,//最大线程数
 keepAliveTime,//线程池中超过corePoolSize数目的空闲线程最大存活时间;
 TimeUnit.SECONDS,//keepAliveTime时间单位
 workQueue,//阻塞任务队列
 Threads.newDaemonThreadFactory(toString() + nameHint));//新建线程工厂
 tpe.allowCoreThreadTimeOut(true);//allowCoreThreadTimeOut为true该值为true,则线程池数量最后销毁到0个。
  //allowCoreThreadTimeOut为false销毁机制:超过核心线程数时,而且(超过最大值或者timeout过),就会销毁。
 return tpe;
}

2,ClientSimpleScanner

继承关系如下:

class ClientSimpleScanner extends ClientScanner

abstract class ClientScanner extends AbstractClientScanner

abstract class AbstractClientScanner implements ResultScanner

主要是实现了scan的动作

3,HRegionServer

为客户端提供HRegions,并向master注册自身,一个集群中会有很多个HRegionServer.

在此篇文章中主要关注点

createClusterConnection方法中,注册了,客户端RPC的时候服务端的实现类对应关系。

ConnectionUtils.createShortCircuitConnection(conf, null, userProvider.getCurrent(),
 serverName, rpcServices, rpcServices);

客户端使用的是BlockingStub,服务端使用的是RSRpcServices。共同的父类是ClientService.BlockingInterFace。

4,RSRpcServices

实现了regionserver的RPC服务。

本文只关注scan的方法实现。

5,RegionScannerImpl

负责合并多个store的scanner。内部维护的有KeyValueHeap,维护了一个优先队列存放的是StoreScanner。

6,StoreScanner

可以扫描memstore和store中的数据。

重要的内部成员KeyValueHeap,内部的优先队列,维护的是MemStoreScanner和StoreFileScanner

7,KeyValueHeap

内部维护了一个PriorityQueue<KeyValueScanner>队列,存储的就是InternalScanner,StoreScanner是InternalScanner的子类。

三,scan的源码实现

Scan的源码实现过程,主要是帮助大家更好的阅读源码。

主要分成两个大节:

A,客户端scan的过程

B),服务端scan的过程

由于源码内容比较多,本文只会贴出讲解重点环节的源码。

1,hbase scan过程客户端的实现

入口是HTable的getScan方法

t.getScanner(s)

实际上是构建了一个

ClientSimpleScanner

数据读的入口是ClientSimpleScanner的父类ClientScanner的next方法。

//cache读取结束的时候直接进入rpc
if (cache.size() == 0) {
  loadCache();
}
//假如cache有的话直接读取,
if (cache.size() > 0) {
 return cache.poll();
}

在loadCache方法里面实际上直接调用的是,自身的call方法获取结果

values = call(callable, caller, scannerTimeout, true);

经过一层封装,调用的是ScannerCallableWithReplicas的call

提交并执行call

// submit call for the primary replica.
addCallsForCurrentReplica(cs, rl); //提交call

执行的caller是ScannerCallable,被封装成了QueueingFuture,扔到线程池里执行

public void submit(RetryingCallable<V> task, int callTimeout, int id) {
  QueueingFuture<V> newFuture = new QueueingFuture<V>(task, callTimeout, id);
 executor.execute(Trace.wrap(newFuture));
 tasks[id] = newFuture;
}

进入ScannerCallable的call方法之前实际上是会先调用其prepare方法(QueueingFuture),主要做一些准备工作

RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
 id, getConnection(), getTableName(), getRow());
location = id < rl.size() ? rl.getRegionLocation(id) : null;
if (location == null || location.getServerName() == null) {
 // With this exception, there will be a retry. The location can be null for a replica
  //  when the table is created or after a split.
 throw new HBaseIOException("There is no location for replica id #" + id);
}
ServerName dest = location.getServerName();
setStub(super.getConnection().getClient(dest));
if (!instantiated || reload) {
  checkIfRegionServerIsRemote();
 instantiated = true;
}

然后正式进入call

ScanResponse response;
if (this.scannerId == -1L) {
 //第一次
 response = openScanner(); //
} else {
//第一次后
 response = next();//进行下一次rpc
}

openScanner和next方法中,在构建不同的Request之后其它处理都是一样的

ScanResponse response = getStub().scan(controller, request);//blockingstub

然后就进入了server端并获取ScanResponse。

2,hbase scan服务端的实现

Hbase scan的客户端发送Rpc请求之后,进入服务端RSRpcServices对应的scan方法

ScanResponse scan(final RpcController controller, final ScanRequest request)

首先,会判断是否已经构建了scannerid也即是否是第一次请求

if (request.hasScannerId()) {
//不是第一次,直解获取
  rsh = getRegionScanner(request);
} else {
//第一次进行构建
  rsh = newRegionScanner(request, builder);
}

然后进入真正的scan

scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
 results, builder, lastBlock, context);

接着在scan方法中,也对行数是否超过请求行数做了限制

// Collect values to be returned here
moreRows = scanner.nextRaw(values, scannerContext);
checkLimitOfRows(numOfCompleteRows, limitOfRows, moreRows, scannerContext,  builder);

此处的scanner是RegionScannerImpl,然后进入其nextInternal方法,初始化或者清空处理函数,这个是记录状态,比如当前batch大小,结果的size等。

if (scannerContext.getKeepProgress()) {
 // Progress should be kept. Reset to initial values seen at start of method invocation.
 scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
 initialTimeProgress);
} else {
  scannerContext.clearProgress();
}

然后重点的方法是populateResult方法

populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);

具体重点内容

scannerContext.setKeepProgress(true); heap.next(results, scannerContext); scannerContext.setKeepProgress(tmpKeepProgress);

进入KeyValueHeap的next方法,

InternalScanner currentAsInternal = (InternalScanner)this.current; boolean moreCells = currentAsInternal.next(result, scannerContext);

进入StoreScanner的next方法

当前rowkey获取去的列数加1,然后判断取出的当前列簇的总列数是否超过设置的大小列。

this.countPerRow++;
if (storeLimit > -1 &&
 this.countPerRow > (storeLimit + storeOffset)) {

在正常范围内,将结果cell加入返回的列表

if (this.countPerRow > storeOffset) {
  outResult.add(cell);

更新已经获取的当前Rowkey的batch大小和结果的size

// Update the progress of the scanner context    scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
//            更新当前获取的batch
 scannerContext.incrementBatchProgress(1);

判断batch和size是否超限,超限的话切入下一个Rowkey。Batch参数起作用的地方。

if (scannerContext.checkBatchLimit(LimitScope.BETWEEN_CELLS)) {
 break LOOP;
}
if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_CELLS)) {
 break LOOP;
}

我们这个scan就讲解到这个地方。

其实,应该关注点比较多,贴源码比较累赘,我这是大致骨架都有了,大家可以根据这个骨架结合源码去看,节省时间。

四,总结

1,对Scanner嵌套关系的总结

A,RegionScannerImpl包含了内部维护的有KeyValueHeap,维护了一个优先队列存放的是StoreScanner。

B,StoreScanner的重要的内部成员KeyValueHeap,内部的优先队列,维护的是MemStoreScanner和StoreFileScanner

C,获取数据首先是从RegionScannerImpl的队列中取出,StoreScanner。然后从StoreScanner中取出一个MemStoreScanner和StoreFileScanner,然后调用其next方法,将结果放入返回的list中。

2,对于filter的使用,请大家先参考hbase权威指南,后面浪尖再接个各个filter和源码讲解。

3,对于Rpc数据请求次数调节

Scan.setCaching(20);//控制一次rpc返回几行,即几个Result

Scan.setBatch(1);//控制一次rpc返回几列,即几个cell

Rpc次数=(行数×每行的列数)/Min(每行的列数,批量大小)/扫描器缓存

4,对于缓存。

Scan.setCacheBlocks(false);不建议使用。

5,对于客户端的内存占用

Scan的并发数*cache数(Result数)*单个Result大小。

注意,要结合3和5进行调节,既要避免频繁的RPC请求,又要避免客户端GC。要求是要了解每条数据的大致大小。

本文中设计到另一个重点就是RPC,RPC是想在spark前夕讲解,这里希望读者假如不了解的话可以去网上先了解一下。重点要记住的几个点是客户端的方法和服务端的方法实现,及是如何对应的。

对于hbase 1.0.0,列举以下几种,方便大家自己去阅读相关源码。无论是管理,还是数据请求,最终的regionserver相关的RPC实现都是RSRpcServices类里面对应的具体方法(参数要一致哦)。

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2017-07-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏熊二哥

快速入门系列--WebAPI--04在老版本MVC4下的调整

WebAPI是建立在MVC和WCF的基础上的,原来微软老是喜欢封装的很多,这次终于愿意将http编程模型的相关细节暴露给我们了。在之前的介绍中,基本上都基于.N...

2396
来自专栏Java 源码分析

Exectors框架 源码分析

Exectors框架 源码分析 1. 在阅读源码时做了大量的注释,并且做了一些测试分析源码内的执行流程,由于博客篇幅有限,并且代码阅读起来没有 IDE 方便,...

2706
来自专栏Linux驱动

第1阶段——uboot分析之启动函数bootm命令 (9)

本节主要学习: 详细分析UBOOT中"bootcmd=nand read.jffs2 0x30007FC0 kernel;bootm 0x30007FC0...

1865
来自专栏分布式系统进阶

Influxdb 数据写入流程

因此对写入请求的处理就在函数 func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Reque...

1883
来自专栏jeremy的技术点滴

sed命令工作原理及命令备忘

3459
来自专栏腾讯云API

腾讯云API:无服务器函数

无服务器函数是一个很好玩的东西,可以通过这个程序跑一些脚本,在一定程度上,是很方便的。但是作为新鲜事物,一般很难被大家接受,所以,我今天在这里,就做一个小例子,...

8845
来自专栏岑玉海

hbase源码系列(六)HMaster启动过程

  这一章是server端开始的第一章,有兴趣的朋友先去看一下hbase的架构图,我专门从网上弄下来的。   按照HMaster的run方法的注释,我们可以了解...

5699
来自专栏有趣的django

37.Django1.11.6文档

第一步 入门 检查版本 python -m django --version 创建第一个项目 django-admin startproject mysite ...

4848
来自专栏分布式系统进阶

ReplicaManager源码解析1-消息同步线程管理

基本上就是作三件事: 构造FetchRequest, 同步发送FetchRequest并接收FetchResponse, 处理FetchResponse, 这三...

1582
来自专栏圣杰的专栏

ASP.NET Core 中断请求了解一下(翻译)

假设有一个耗时的Action,在浏览器发出请求返回响应之前,如果刷新了页面,对于浏览器(客户端)来说前一个请求就会被终止。而对于服务端来说,又是怎样呢?前一个请...

1333

扫码关注云+社区

领取腾讯云代金券