导语:hbase作为hadoop生态的一部分,在越来越多的大数据方面得到了应用,但是如何用好它是一个比较考究的过程,本文暂时先研究了hbaserpc部分的代码,rpc作为调用的入口,先从入口处明白原理以及内部实际控制参数进行调整来满足自己的以为需要
HBase 采用了和 Hadoop 相同的 RPC 机制,作为它的主要通信手段.这是一个轻量的,不同于 Java 标准的 RMI 的一种方式,HBase RPC 有明显的客户端和服务端之分。由 HBase Client,Region server, Master server 三者组成了三个信道。最右边的一列是通信两端之间约定的通信接口。客户端调用这个接口,而服务端实现这个接口。所以最基本的工作流程就是:
(1)客户端取得一个服务端通信接口的实例
(2)客户端调用这个实例中的方法
(3)客户端向服务端传输调用请求
(4)服务端接口实现被调用
(5)服务端向客户端传输结果
HbaseRPC 结构如下:
Hbase 的协议文件位于源码的 hbase-protocol 模块下,1.1.3 版本的协议文件如下:
Hbase 服务端 rpc 服务接口为类 RpcServerInterface,其具体实现类为 RpcServer,其服务端 rpc 总体架构如下图
JavaNIO 线程以 reactor 模式负责接受网络请求,其核心源码如下
while (running) {
SelectionKey key = null;
try {
selector.select(); // FindBugs IS2_INCONSISTENT_SYNC
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try{
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key);
}
} catch (IOException ignored) {
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
}
key = null;
}
}
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
try {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
channel.socket().setKeepAlive(tcpKeepAlive);
} catch (IOException ioe) {
channel.close();
throw ioe;
}
Reader reader = getReader();
try {
reader.startAdd();
SelectionKey readKey = reader.registerChannel(channel);
c = getConnection(channel, System._currentTimeMillis_());
readKey.attach(c);
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections ;
}
if (LOG.isDebugEnabled())
LOG.debug(getName() ": connection from " c.toString()
"; # active connections: " numConnections);
} finally {
reader.finishAdd();
}
}
}
从源码中可以看出,其主要职责就是负责接受网络请求并把链接转给 Reader
reader 主要职责是从网络链接里读取数据并把数据封装成 call 交给 rpcschedule 进行调度,
其调用链路如下:
doread 方法会依次调用 Connection 对象的 readAndProcess、process、processOneRpc、processRequest 方法完成协议的解析以及需要调用服务端的那个方法,因为传输协议为 protobuf 因此服务端提供服务的 service 都必须实现 BlockingService 接口,如果是调用 regionserver 那么对应的实现类为 RSRpcServices,在查找到要调用的方法后把服务封装为一个 Call 对象交给 rpcschedule 进行调度。
RpcScheduler 实现了对接受到的 rpc 请求进行调度,hbase 的 rpc 调度器有两个分别是 FifoRpcScheduler 和 SimpleRpcScheduler,区别是 fifo 是先进先出调度,而 SimpleRpcScheduler 是按照优先级进行调度的,hbase 默认使用了 SimpleRpcScheduler 调度器,可以通过参数 hbase.region.server.rpc.scheduler.factory.class 进行修改。
RpcExecutor 真正负责调度执行 call 对象,其调用链路如下图
如上图,hbase 的 RpcExecutor 分为 BalancedQueueRpcExecutor 和 RWQueueRpcExecutor,分别是平衡队列调度和读写因子调度队列,在 rpc 请求加入到 RpcExecutor 后 RpcExecutor 会按照规则进行调度,请求最终会调用 call 即调用业务方法,最后把响应通过 Respoder 返回给客户端。hbase 在最外层分为 3 个 RpcExecutor,每种 Executor 都有一定的线程数来处理队里里的请求,调度器中的 3 个 RpcExecutor 分别是
callExecutor 具体是那种类型取决与参数 hbase.ipc.server.callqueue.handler.factor,默认是 0,如设置该值后为 RWQueueRpcExecutor 否则为 BalancedQueueRpcExecutor,priorityExecutor 和 replicationExecutor 为 BalancedQueueRpcExecutor,如果其封装的请求是基于 meta 表格的操作,将其划分到 priorityExecutor 组里;如果其封装的请求是基于用户表格的操作,将其划分到 callExecutor 组里;如果其封装的是 replication 请求,将其划分到 replicationExecutor 组里。每个调度池分一到多个队列,默认都是一个队列,这样产品组中的所有 Handler 都会去竞争该队列中的资源,为了防止竞争惨烈的情况发生,可将每一个产品组划分成多个产品队列,让每个 Handler 只去抢占指定队列中的资源。在 HRegionServer 中,可通过如下方法来计算 callExecutor 组可以划分成多少个产品队列:
Math.max(1,hbase.regionserver.handler.count*
hbase.ipc.server.callqueue.handler.factor)
其中 hbase.ipc.server.callqueue.handler.factor 属性值默认为 0,即在默认情况下只将该产品组划分成一个产品队列,单个产品队列的容量并不是按需使用无限增长的,HBase 对其长度及空间大小都做了相应的阀值控制,其中:hbase.ipc.server.max.callqueue.length 用于限制产品队列的长度(默认为 handler 数乘以 10),hbase.ipc.server.max.callqueue.size 用于限制产品队列的空间大小(默认为 1G),成功将 CallRunner 产品分配给 Handler 之后,该 Handler 开始对其进行消费处理,消费过程主要是通过调用 RpcServer 的 call 方法来执行指定服务的相应方法,并通过 Responder 将方法的执行结果返回给客户端。
如果 hbase.ipc.server.callqueue.handler.factor 为 0,那么 callExecutor 池 BalancedQueueRpcExecutor,如果 hbase.ipc.server.callqueue.handler.factor 大于 0,那么池的 executor 为 RWQueueRpcExecutor,该 executor 中分为 3 个队列:write,read 和 scan,分别通过参数 hbase.ipc.server.callqueue.read.ratio 和参数 hbase.ipc.server.callqueue.scan.ratio 进行控制,其中 hbase.ipc.server.callqueue.handler.factor 用来控制队列个数
默认值 10,Reader 网络 IO 个数,reader 的个数决定了从网络 io 里读取数据的速度也就是网络吞吐量
默认值值 true
默认值 true
regionserver 的 rcp 请求队列处理线程数,默认为 30
regionserver 操作系统表 rpc 请求处理线程数,默认为 20
call 队列最大长度,默认值为 handler 的个数*10
改值决定了 regionserver 的 rpcexecutor 的类型,如果大于 0,那么 rpcexecutor 为读写分别调度,改值为厚点数,该值要配合参数 hbase.ipc.server.callqueue.read.ratio 联合使用(其中还需要设置参数 hbase.ipc.server.callqueue.scan.ratio),含义是 hbase 支持 put、get、scan 分开调度,可以结合自己的业务场景来控制读写
客户端 rpc 调用超时时间,默认为 5 分钟
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。