专栏首页坚毅的PHPHbase 源码分析之 Get 流程及rpc原理

Hbase 源码分析之 Get 流程及rpc原理

分析版本为hbase 0.94

附上趋势团队画的图:

rpc角色表:

HBase通信信道

HBase的通信接口

客户端

服务端

HBase Client

Master Server

HMasterInterface

HBase Client

Region Server

HRegionInterface

Region Server

Master Server

HMasterRegionInterface

客户端发起请求:

htable.get(Get)

public Result get(final Get get) throws IOException {
return new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) {
public Result call() throws IOException {
return server.get(location.getRegionInfo().getRegionName(), get);
}
}.withRetries();
}

调用get方法后,客户端进入睡眠,睡眠时间为pause * HConstants.RETRY_BACKOFF[ntries];  

pause= HBASE_CLIENT_PAUSE(1秒)

RETRY_BACKOFF[] = { 1, 1, 1, 2, 2, 4, 4, 8, 16, 32 };

有结果则中断执行返回rpc结果,否则重试十次(默认DEFAULT_HBASE_CLIENT_RETRIES_NUMBER=10)

第一次进行get时,客户端需要先进行rpc通信,获得root表 meta表信息,确定row对应的location

通过ServerCallable维持HRegionInterface 的server实例 ,server为HConnectionManager的getHRegionConnection方法获取的HBaseRPC的VersionedProtocol代理,其实是

WritableRpcEngine实例,call方法则会调用成员HbaseClient的call方法与regionserver进行远程通信

服务器端:

当regionserver 收到来自客户端的Get请求时,调用接口 

public Result get(byte[] regionName, Get get) 
{ 
... 
HRegion region = getRegion(regionName); 
return region.get(get, getLockFromId(get.getLockId())); 
... 
} 

在HRegion中

Scan scan = new Scan(get); 会先根据设置的columnFamily存放familyMap对  ----  columnFamily:null

public Get addFamily(byte [] family) {
familyMap.remove(family);
familyMap.put(family, null);
return this;
}

如果查询的family不在htableDescriptor中,返回错误

scanner = getScanner(scan);
public RegionScanner getScanner(Scan scan) throws IOException {
return getScanner(scan, null);
}

additionalScanners为null 所以在RegionScannerImpl的构造中只会使用StoreScanner

return instantiateRegionScanner(scan, additionalScanners); return new RegionScannerImpl(scan, additionalScanners);

RegionScannerImpl 是 HRegion中的子类

for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
StoreScanner scanner = store.getScanner(scan, entry.getValue());
scanners.add(scanner);
}

按照familyMap的数量存放对应数量的 StoreScanner

Hregion initialize时会对应每个columnFamily存放一个stores Future<Store> future = completionService.take(); Store store = future.get(); this.stores.put(store.getColumnFamilyName().getBytes(), store);

scanners 添加从Store中获取的scanner

store.getScanners(cacheBlocks, isGet, isCompaction, matcher)

Store 类:

memStoreScanners = this.memstore.getScanners();
List<StoreFileScanner> sfScanners = StoreFileScanner
.getScannersForStoreFiles(storeFiles, cacheBlocks, isGet, isCompaction, matcher);
List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
scanners.addAll(sfScanners);
// Then the memstore scanners
scanners.addAll(memStoreScanners);
return scanners;

memStoreScanners  为Collections.<KeyValueScanner>singletonList( new MemStoreScanner())

Store中为StoreScanner添加了StoreFileScanner和 memStoreScanner

进行scan时

scanner = getScanner(scan); scanner.next(results);

现在分析RegionScannerImpl中的next方法,此时正式进入获取数据流程

@Override
public synchronized boolean next(List<KeyValue> outResults)
throws IOException {
// apply the batching limit by default
return next(outResults, batch);
}

batch默认为-1

   startRegionOperation();

  outResults.addAll(results);

startRegionOperation 会为操作加读锁,lock.readLock().lock();

然后遍历storeHeap,找到对应Row

do {     this.storeHeap.next(results, limit - results.size()); } while (Bytes.equals(currentRow, nextRow = peekRow()));

 this.storeHeap 会不断poll出存储的scanner

因RegionScannerImpl 中 memStoreScanners后添加,所以会先从memStoreScanners中查询,如果没有则从StoreFileScanner中查询

RegionScannerImpl 的 storeHeap为KeyValueHeap,会强制转型scanner为 InternalScanner

InternalScanner currentAsInternal = (InternalScanner)this.current;  

总结下目前流程get request -> regionServer -> region -> storeHeap -> scanner -> find row

但上述流程没有解释reguest是怎么找到regionServer去处理请求的,下边我们在分析下

服务器端服务在HMaster和HRegionServer启动时,中都会生成一个全局的RpcServer  

hmaster的rpc server:

hmaster会使用org.apache.hadoop.hbase.executor.ExecutorService启动多种线程服务 (This is a generic executor service. This component abstracts a threadpool, a queue to which EventHandler.EventTypes can be submitted, and a Runnable that handles the object that is added to the queue. ):

MASTER_OPEN_REGION   (默认5)

MASTER_CLOSE_REGION   (默认5)

MASTER_SERVER_OPERATIONS  (默认3)

MASTER_META_SERVER_OPERATIONS  (默认5)

MASTER_TABLE_OPERATIONS (单线程)

logCleaner (单线程)

infoServer  (master-status 等信息展示)

rpcServer (我们需要用的rpc服务)

RpcServer是个接口,实现类为HBaseServer,启动时会开启responder listener handlers几种类去响应请求,如设置了priorityHandlers的数目,会另外启动priorityHandlers,listener监听端口,提供请求给handlers,handlers则调用RpcEngine,反射出需要的方法并执行,通过responder写结果回去(this.responder.doRespond)。

HMaster的 handlers的个数由hbase.master.handler.count

HRegionServer的 handlers的个数由 hbase.regionserver.handler.count 指定

HRegionServer的启动和HMaster类似,它启动以下线程:

this.service = new ExecutorService(getServerName().toString());
this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_OPEN_ROOT,
conf.getInt("hbase.regionserver.executor.openroot.threads", 1));
this.service.startExecutorService(ExecutorType.RS_OPEN_META,
conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
this.service.startExecutorService(ExecutorType.RS_CLOSE_REGION,
conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_CLOSE_ROOT,
conf.getInt("hbase.regionserver.executor.closeroot.threads", 1));
this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));

hlogRoller(daemon)

cacheFlusher(daemon)

compactionChecker(daemon)

Leases(它不是线程,会启动后台线程)

splitLogWorker

rpcServer

HBaseClient 和 HMaster关系由HMasterInterface描述:

Clients interact with the HMasterInterface to gain access to meta-level  HBase functionality, like finding an HRegionServer and creating/destroying  tables.

HBaseClient 和 HRegionServer关系由HRegionInterface描述:

Clients interact with HRegionServers using a handle to the HRegionInterface

参考资料:

http://zjushch.iteye.com/blog/1173304

http://www.spnguru.com/2010/07/hbase-%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90-rpc%E6%9C%BA%E5%88%B6-%E5%9F%BA%E7%A1%80/

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • hbase-filter使用

    使用filter需要guava jar包 The Guava project contains several of Google's core librari...

    财主刀刀
  • 简单可行的code review规则

    曾经有一段垃圾代码放在我的面前,我没有拒绝,等我真正开始接手的时候我才后悔莫及,程序员最痛苦的事莫过于此!

    财主刀刀
  • mc参数备忘&java-json备忘

    mc参数(摘自 http://www.blogjava.net/jzone/articles/302991.html) 查看方法 telnet进去 或 ech...

    财主刀刀
  • Linux 错误修复笔记

    Spaceack
  • python 3.x 爬虫基础---Urllib详解

    前言   爬虫也了解了一段时间了希望在半个月的时间内结束它的学习,开启python的新大陆,今天大致总结一下爬虫基础相关的类库---Urllib。 Urllib...

    kmonkey
  • SAP最佳业务实践:SD–回扣流程: 免费商品(121)-2业务处理

    image.png 一、VA01输入销售订单 在此活动中,创建一个销售订单。 无 如果销售的H11少于1,000PC,系统会出现以下警告: 免费货物的最小数量...

    SAP最佳业务实践
  • python3 django整理(八) Django 创建admin用户,并登陆操作添加博文

    进行登陆。 之后按照下面链接进行app中数据库表的更新 python3 django整理(六)配置数据库(mysql) 上面的输入刚才新建的用户名和...

    学到老
  • Cypress学习4-操作页面元素(Actions行为事件)

    ui自动化操作页面上的元素,常用的方法就那么几个,输入文本,点击元素,清空文本,点击按钮。 还有一些特殊的checkbox,radio,滚动条等。

    上海-悠悠
  • Cypress系列(62)- 改造 PageObject 模式

    PageObject(页面对象)模式是自动化测试中的一个最佳实践,相信很多小伙伴都知道的

    小菠萝测试笔记
  • 分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业失效转移

    当作业节点执行作业异常崩溃时,其所分配的作业分片项在下次重新分片之前不会被重新执行。开启失效转移功能后,这部分作业分片项将被其他作业节点抓取后“执行”。为什么此...

    芋道源码

扫码关注云+社区

领取腾讯云代金券