Hbase源码系列之regionserver应答数据请求服务设计

一,基本介绍

Hbase源码系列主要是以hbase-1.0.0为例讲解hbase源码。本文主要是将Regionserver服务端RPC的结构及处理流程。希望是帮助大家彻底了解hbase Regionserver的内部结构。

本文会涉及Regionserver端接受客户端链接,处理读事件,交由调度器去执行,然后由Responder将结果返回给客户端整个过程。

建议大家多读读浪尖前面关于JAVA网络IO模型相关文章<JAVA的网络IO模型彻底讲解>和kafka的<Kafka源码系列之Broker的IO服务及业务处理>两篇文章,对大家设计服务端会有很大的帮助。

二,重要类

1,RSRpcServices

Regionserver的rpc服务的实现类

2,RpcServer

Regionserver处理的重要类。包括以下几个角色:

Listener

Responder

Reader

rpcScheduler

3,SimpleRpcSchedulerFactory

SimpleRPCScheduler的工厂类。

4,SimpleRpcScheduler

RegionServer的默认调度类,是可以配置的。包括三个优先级的线程池:general(普通表读写),高优先级(系统表),副本同步。

每个优先级的调度器又有不同的实现:

普通的调度器实现,可以有三种:

A),FastPathBalancedQueueRpcExecutor

B),RWQueueRpcExecutor

C),BalancedQueueRpcExecutor

对于副本同步及高优先级的只有一种实现

FastPathBalancedQueueRPCExecutor

总的等级有以下几种

normal_QOS < QOS_threshold < replication_QOS < replay_QOS < admin_QOS < high_QOS

分配策略是

public boolean dispatch(CallRunner callTask) throws InterruptedException {
//    normal_QOS < QOS_threshold < replication_QOS < replay_QOS < admin_QOS < high_QOS
 RpcServer.Call call = callTask.getCall(); //MultiServerCallable
 int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser());
    if (priorityExecutor != null && level > highPriorityLevel) {
 return priorityExecutor.dispatch(callTask);
 } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
 return replicationExecutor.dispatch(callTask);
 } else {
 return callExecutor.dispatch(callTask);
 }
  }

优先级对应的数值如下

public static final int NORMAL_QOS = 0;
public static final int QOS_THRESHOLD = 10; //highPriorityLevel
public static final int HIGH_QOS = 200;
public static final int REPLICATION_QOS = 5;
public static final int REPLAY_QOS = 6;
public static final int ADMIN_QOS = 100;
public static final int SYSTEMTABLE_QOS = HIGH_QOS;

5,Listener

一个Listener包含了一个固定数目的Reader,在一个固定线程数的ExecutorPool中运行,默认是10。

Listener主要负责监听accept事件,然后轮训获取一个Reader去做读事件。

6,Reader

每个Reader都有一个Selector,负责读取消息,然后封装成call,再将call封装成CallRunner,交给Scheduler去执行。然后Reader继续去遍历处理读事件。

7,Responder

在CallRunner#run 执行call结束之后,会让call将自身加入一个队列里,有Responder获取并应答客户端。

每个Regionserver只会有一个Responder。

三,相关源码

1,调度器初始化过程

通过反射得到了SimpleRPCSchedulerFactory。

Class<?> rpcSchedulerFactoryClass = rs.conf.getClass(
 REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
 SimpleRpcSchedulerFactory.class);
rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());

然后在创建RpcServer的时候,创建了SimpleRPCScheduler。

rpcServer = new RpcServer(rs, name, getServices(),
 bindAddress, // use final bindAddress for this server.
 rs.conf,
 rpcSchedulerFactory.create(rs.conf, this, rs));

Create方法中,重点指定了三种调度器的线程数目

public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
 int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
 HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
  return new SimpleRpcScheduler(
    conf,
 handlerCount,
 conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
 HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
 conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
 HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
 priority,
 server,
 HConstants.QOS_THRESHOLD);
}

2,Listener的初始化过程

在RSRpcServices的start方法中,调用了RpcServer的start,然后启动了listener.start()

具体的初始化是在Listener的构造函数中做的

super(name);
backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);

// Bind the server socket to the binding addrees (can be different from the default interface)
bind(acceptChannel.socket(), bindAddress, backlogLength);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();

readers = new Reader[readThreads];
readPool = Executors.newFixedThreadPool(readThreads,
  new ThreadFactoryBuilder().setNameFormat(
 "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
 ",port=" + port).setDaemon(true)
  .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
for (int i = 0; i < readThreads; ++i) {
  Reader reader = new Reader();
 readers[i] = reader;
 readPool.execute(reader);
}
LOG.info(getName() + ": started " + readThreads + " reader(s) listening on port=" + port);

// Register accepts on the server socket with the selector.
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("RpcServer.listener,port=" + port);
this.setDaemon(true);

当监听到有接收事件之后,轮询取出一个Reader,将Channel注册到该reader的Selector上由该Reader监听读事件。

Reader getReader() {
 currentReader = (currentReader + 1) % readers.length;
  return readers[currentReader];
}

注册

reader.startAdd();
SelectionKey readKey = reader.registerChannel(channel);
c = getConnection(channel, System.currentTimeMillis());
readKey.attach(c);

3,Reader的初始化过程及处理过程

Reader是在Listener构建的时候初始化并加到线程池中执行的。

readers = new Reader[readThreads];
readPool = Executors.newFixedThreadPool(readThreads,
  new ThreadFactoryBuilder().setNameFormat(
 "RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
 ",port=" + port).setDaemon(true)
  .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
for (int i = 0; i < readThreads; ++i) {
  Reader reader = new Reader();
 readers[i] = reader;
 readPool.execute(reader);
}

Reader的数目由,控制。

"hbase.ipc.server.read.threadpool.size", 10

Reader的具体处理,主要是经过Reader处理之后交给了调度器去执行。

实际上最终是在Connection的processRequest方法中交给调度器执行的

if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
 callQueueSize.add(-1 * call.getSize());

 ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
 metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
 setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
 "Call queue is full on " + server.getServerName() +
 ", too many items queued ?");
 responder.doRespond(call);
}

调度器分配的策略

public boolean dispatch(CallRunner callTask) throws InterruptedException {
//    normal_QOS < QOS_threshold < replication_QOS < replay_QOS < admin_QOS < high_QOS
 RpcServer.Call call = callTask.getCall(); //MultiServerCallable
 int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser());
    if (priorityExecutor != null && level > highPriorityLevel) {
 return priorityExecutor.dispatch(callTask);
 } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
 return replicationExecutor.dispatch(callTask);
 } else {
 return callExecutor.dispatch(callTask);
 }
  }

4,Responder应答的过程

在交给调度器执行后,会将call交给Responder,由其最终监听写事件,给客户端答复。

Responder线程是在构建RpcServer的时候初始化,start的时候start

在其run方法中,会循环调用

registerWrites();

然后执行具体写事件

Set<SelectionKey> keys = writeSelector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
  SelectionKey key = iter.next();
 iter.remove();
  try {
 if (key.isValid() && key.isWritable()) {
      doAsyncWrite(key);
 }
  } catch (IOException e) {
 LOG.debug(getName() + ": asyncWrite", e);
 }
}

四,总结

根据源码,我画出了Regionserver的服务端请求处理图,也可以叫Regionserver的Rpc结构图。如下:

从图中我们可以总结出一下几点:

1,这个也是经典的Rector多线程模型(变动是会将应答汇聚到一个线程)。

2,一个线程负责接收事件监听客户端链接请求。

3,多个线程负责处理客户端请求。

4,有具体的业务逻辑执行交由调度器去执行客户端的请求(默认是,普通表,副本请求,系统表三种级别线程池)。

5,一个线程负责应答。

可以对比浪尖前面<Kafka源码系列之Broker的IO服务及业务处理>就可以看出二者的不同。

Kafka的Broker是IO线程和业务线程分离,均是多线程,应答也是交由IO线程组做的。而hbase的regionserver是将IO线程进行了读写分离,读线程是多线程,而写(应答线程)是单线程来做的。

IO请求处理方面来说kafka是很优秀的优的,但是hbase regionserver的调度器实现了按等级分离线程池模型,保证更优先级的操作能执行这个特点也比较不错。会发现某些管理操作阻塞,但是读写正常,这个我遇到过。这就体现了它优势。

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2017-07-06

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏比原链

Derek解读Bytom源码-P2P网络 地址簿

Gitee地址:https://gitee.com/BytomBlockchain/bytom

1123
来自专栏JackieZheng

Spring实战——Profile

  看到Profile这个关键字,或许你从来没有正眼瞧过他,又或者脑海中有些模糊的印象,比如除了这里Springmvc中的Profile,maven中也有Pro...

3026
来自专栏Kubernetes

Kubernetes如何通过Devi

Device Plugins Device Pulgins在Kubernetes 1.10中是beta特性,开始于Kubernetes 1.8,用来给第三方设备...

4898
来自专栏安恒网络空间安全讲武堂

[HCTF] share write up

从http://share.2018.hctf.io/robots.txt中获取到题目部分源码

952
来自专栏c#开发者

WCF-OracleDB adapter常见错误解决方法

Microsoft.ServiceModel.Channels.Common.MetadataException: Invalid argument: <Bts...

3467
来自专栏黑泽君的专栏

day54_BOS项目_06

第一步:根据提供的 业务受理.pdm 文件生成建表文件 bos_qp.sql 第二步:由于业务受理.pdm 文件中有伪表,所以我们需要修改生成的建表文件,修改如...

932
来自专栏24K纯开源

Mac OS X平台下QuickLook开发教程

一、引言       Quick Look技术是Apple在Mac OS X 10.5中引入的一种用于快速查看文件内容的技术。用户只需要选中文件单击空格键即可快...

2948
来自专栏FreeBuf

腾讯御见捕获Flash 0day漏洞(CVE-2018-5002)野外攻击

腾讯御见威胁情报中心近日监控到一例使用Adobe Flash 0day漏洞(CVE-2018-5002)的APT攻击,攻击者疑通过即时聊天工具和邮箱等把恶意Ex...

1090
来自专栏高性能服务器开发

+从零实现一款12306刷票软件1.4

这里还有个注意细节,就是通过POST请求发送的数据需要对一些符号做URL Encode,这个我在上一篇文章《从零实现一个http服务器》也详细做了介绍,还不清楚...

2212
来自专栏blackpiglet

在 Kubernetes中,fluentd 以 sidecar 模式收集日志,并发送至 ElasticSearch

ElasticSearch 在日志收集和分析领域非常流行,而 fluentd 是一种万用型的日志收集器,当然也支持 ES(ElasticSearch)。不过在 ...

1582

扫码关注云+社区

领取腾讯云代金券