前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >HBASE BufferedMutator 批量写入使用举例与源码解析

HBASE BufferedMutator 批量写入使用举例与源码解析

原创
作者头像
大鹅
发布2021-06-04 15:55:24
2K0
发布2021-06-04 15:55:24
举报

toc

1. 基本介绍

BufferedMutator主要用来异步批量的将数据写入一个hbase表,就像Htable一样。通过Connection获取一个实例。

Map/reduce 任务是BufferedMutator的好的使用案例。Map/Reduce任务获益于batch操作,但是没有留出flush接口。BufferedMutator从Map/Reduce任务接受数据,会依据一些先验性的经验批量提交数据,比如puts堆积的数量,由于批量提交时异步的,所以M/R逻辑不会因为数据的batch提交而阻塞。Map/Reduce 批处理任务每个线程会有一个BufferedMutator。单个BufferedMutator也能够很高效用于大数据量的在线系统,来成批的写puts入hbase表。

2. BufferedMutator使用举例

这里分为以下两个批量写入场景

2.1 单次一张表批量写入
代码语言:txt
复制
Configuration conf =  HBaseConfiguration.create();
 conf.set("hbase.zookeeper.quorum", "zookeeperHost");
final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
 for (int i = 0; i < e.getNumExceptions(); i++) {
        LOG.info("Failed to sent put " + e.getRow(i) + ".");              }
        }
        };
 BufferedMutatorParams params = new BufferedMutatorParams(TABLE)
        .listener(listener);
 params.writeBufferSize(123123L);
        try {
        Connection conn = ConnectionFactory.createConnection(conf);
 BufferedMutator mutator = conn.getBufferedMutator(params);
 Put p = new Put(Bytes.toBytes("someRow"));
 p.addColumn(FAMILY, Bytes.toBytes("someQualifier"), Bytes.toBytes("some value"));
 mutator.mutate(p);
 mutator.close();
 conn.close();
 } catch (IOException e1) {
 // TODO Auto-generated catch block
 e1.printStackTrace();
 }

        }
多次多张表批量写入

可以使用一个Map保存多个Table的连接,这里使用的是线程安全的ConcurrentHashMap,如果是单线程的场景可以换成HashMap以提高效率。

代码语言:txt
复制
private static Map<String, BufferedMutator> tableConnectionMgr = new ConcurrentHashMap<>();
private BufferedMutator getTableConnection(String tableName) throws IOException {
    if (tableConnectionMgr.get(tableName) != null) {
        return tableConnectionMgr.get(tableName);
    }
    Connection connection = ConnectionFactory.createConnection(config);
    BufferedMutator table = connection.getBufferedMutator(TableName.valueOf(tableName));
    tableConnectionMgr.put(tableName, table);
    log.info("hbase table: {} connect established!", tableName);
    return tableConnectionMgr.get(tableName);
}

3 源码介绍

3.1 主要类介绍

BufferedMutatorParams

实例化一个BufferedMutator所需要的参数。

主要参数TableName(表名),writeBufferSize(写缓存大小),maxKeyValueSize(最大key-value大小),ExecutorService(执行线程池),ExceptionListener(监听BufferedMutator的异常)。

BufferedMutatorImpl

用来和hbase表交互,类似于Htable,但是意味着批量,异步的puts。通过HConnectionImplementation获得实例,具体方法如下:

代码语言:txt
复制
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
 if (params.getTableName() == null) {
 throw new IllegalArgumentException("TableName cannot be null.");
 }
 if (params.getPool() == null) {
    params.pool(HTable.getDefaultExecutor(getConfiguration()));
 }
 if (params.getWriteBufferSize() == BufferedMutatorParams.UNSET) {
    params.writeBufferSize(connectionConfig.getWriteBufferSize());
 }
 if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
    params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
 }
 return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
}

AsyncProcess

AsyncProcess内部维护的有一个线程池,我们的操作会被封装成runnable,然后扔到线程池里执行。这个过程是异步的,直到任务数达到最大值。

HConnectionImplementation

一个集群的链接。通过它可以找到master,定位到regions的分布,保持locations的缓存,并指导如何校准localtions信息。

3.2 源码过程
3.2.1 BufferedMutator构建的过程
  1. 首先是要构建一个HBaseConfiguration
代码语言:txt
复制
Configuration conf =  HBaseConfiguration.create();  conf.set("hbase.zookeeper.quorum", "zookeeperHost");
  1. 接着是构建BufferedMutatorParams
代码语言:txt
复制
```java
代码语言:txt
复制
final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
代码语言:txt
复制
 @Override
代码语言:txt
复制
 public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
代码语言:txt
复制
 for (int i = 0; i < e.getNumExceptions(); i++) {
代码语言:txt
复制
 LOG.info("Failed to sent put " + e.getRow(i) + ".");
代码语言:txt
复制
 }
代码语言:txt
复制
  }
代码语言:txt
复制
};
代码语言:txt
复制
BufferedMutatorParams params = new BufferedMutatorParams(TABLE)
代码语言:txt
复制
    .listener(listener);
代码语言:txt
复制
params.writeBufferSize(123);
代码语言:txt
复制
```
  1. 最后构建HConnection
代码语言:txt
复制
Connection conn = ConnectionFactory.*createConnection*(getConf())
  1. 最后构建BufferMutator
代码语言:txt
复制
BufferedMutator mutator = conn.getBufferedMutator(params)
3.2.2 数据发送的过程
  1. 构建put或者Listput
  2. 调用BufferedMutator.mutate方法
  3. 刷写到hbase
代码语言:txt
复制
> 刷写到hbase三种方法:
>
代码语言:txt
复制
> 一,显式调用BufferedMutator.flush
>
代码语言:txt
复制
> 二,发送结束的时候调用BufferedMutator.close
>
代码语言:txt
复制
> 三,它根据当前缓存大于了设置的写缓存大小
代码语言:txt
复制
while (undealtMutationCount.get() != 0  && currentWriteBufferSize.get() > writeBufferSize) {   backgroundFlushCommits(false); }
代码语言:txt
复制
最终都是调用的backgroundFlushCommits方法。
  1. rpc的过程
代码语言:txt
复制
入口是backgroundFlushCommits方法。Ap是AsyncProcess的实例。
代码语言:txt
复制
ap.submit(tableName, taker, true, null, false);
代码语言:txt
复制
首先是构建了一个HashMap,可以通过server找到该server上我们需要的region
代码语言:txt
复制
```java
代码语言:txt
复制
//可以根据我们的server找到要发送到该server的actions
代码语言:txt
复制
Map<ServerName, MultiAction<Row>> actionsByServer =
代码语言:txt
复制
 new HashMap<ServerName, MultiAction<Row>>();
代码语言:txt
复制
```
代码语言:txt
复制
获取所有的region信息,所有region的副本都被包括在内
代码语言:txt
复制
```java
代码语言:txt
复制
RegionLocations locs = connection.locateRegion(
代码语言:txt
复制
    tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
代码语言:txt
复制
```
代码语言:txt
复制
获取默认的region信息此时一个region只会返回一个默认id指定的位置。
代码语言:txt
复制
```java
代码语言:txt
复制
loc = locs.getDefaultRegionLocation();
代码语言:txt
复制
```
代码语言:txt
复制
将row操作转变为action,并加入actionsByServer 
代码语言:txt
复制
```java
代码语言:txt
复制
//可以操作将row操作变为Action
代码语言:txt
复制
Action<Row> action = new Action<Row>(r, ++posInList);
代码语言:txt
复制
setNonce(ng, r, action);
代码语言:txt
复制
retainedActions.add(action);
代码语言:txt
复制
// TODO: replica-get is not supported on this path
代码语言:txt
复制
byte[] regionName = loc.getRegionInfo().getRegionName();
代码语言:txt
复制
addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
代码语言:txt
复制
it.remove();
代码语言:txt
复制
```
代码语言:txt
复制
接着是
代码语言:txt
复制
AsyncProcess.submitMultiActions
代码语言:txt
复制
AsyncRequestFutureImpl<CResult>
代码语言:txt
复制
.sendMultiAction(actionsByServer, 1, null, false);
代码语言:txt
复制
内部主要是根据server,获取MultiAction,然后构建Runnable
代码语言:txt
复制
```java
代码语言:txt
复制
for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
代码语言:txt
复制
  ServerName server = e.getKey();
代码语言:txt
复制
 MultiAction<Row> multiAction = e.getValue();
代码语言:txt
复制
 Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
代码语言:txt
复制
 numAttempt);
代码语言:txt
复制
 // make sure we correctly count the number of runnables before we try to reuse the send
代码语言:txt
复制
  // thread, in case we had to split the request into different runnables because of backoff
代码语言:txt
复制
 if (runnables.size() > actionsRemaining) {
代码语言:txt
复制
    actionsRemaining = runnables.size();
代码语言:txt
复制
 }
代码语言:txt
复制
```
代码语言:txt
复制
然后,遍历执行Runnable
代码语言:txt
复制
```java
代码语言:txt
复制
for (Runnable runnable : runnables) {
代码语言:txt
复制
 if ((--actionsRemaining == 0) && reuseThread
代码语言:txt
复制
      && numAttempt % HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER != 0) {
代码语言:txt
复制
    runnable.run();
代码语言:txt
复制
 } else {
代码语言:txt
复制
 try {
代码语言:txt
复制
 pool.submit(runnable);
代码语言:txt
复制
```
  1. Runnable的构建及Run方法
代码语言:txt
复制
主要是进入getNewMultiActionRunnable
代码语言:txt
复制
```java
代码语言:txt
复制
List<Runnable> toReturn = new ArrayList<Runnable>(actions.size());
代码语言:txt
复制
for (DelayingRunner runner : actions.values()) {
代码语言:txt
复制
  incTaskCounters(runner.getActions().getRegions(), server);
代码语言:txt
复制
 String traceText = "AsyncProcess.sendMultiAction";
代码语言:txt
复制
 Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress);
代码语言:txt
复制
 // use a delay runner only if we need to sleep for some time
代码语言:txt
复制
 if (runner.getSleepTime() > 0) {
代码语言:txt
复制
    runner.setRunner(runnable);
代码语言:txt
复制
 traceText = "AsyncProcess.clientBackoff.sendMultiAction";
代码语言:txt
复制
 runnable = runner;
代码语言:txt
复制
    if (connection.getConnectionMetrics() != null) {
代码语言:txt
复制
 connection.getConnectionMetrics().incrDelayRunners();
代码语言:txt
复制
 connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
代码语言:txt
复制
 }
代码语言:txt
复制
  } else {
代码语言:txt
复制
 if (connection.getConnectionMetrics() != null) {
代码语言:txt
复制
 connection.getConnectionMetrics().incrNormalRunners();
代码语言:txt
复制
 }
代码语言:txt
复制
  }
代码语言:txt
复制
  runnable = Trace.wrap(traceText, runnable);
代码语言:txt
复制
 toReturn.add(runnable);
代码语言:txt
复制
```
代码语言:txt
复制
进入SingleServerRequestRunnable,分析其Run方法
代码语言:txt
复制
```java
代码语言:txt
复制
// setup the callable based on the actions, if we don't have one already from the request
代码语言:txt
复制
if (callable == null) {
代码语言:txt
复制
  callable = createCallable(server, tableName, multiAction);
代码语言:txt
复制
}
代码语言:txt
复制
RpcRetryingCaller<MultiResponse> caller = createCaller(callable, rpcTimeout);
代码语言:txt
复制
try {
代码语言:txt
复制
 if (callsInProgress != null) {
代码语言:txt
复制
 callsInProgress.add(callable);
代码语言:txt
复制
 }
代码语言:txt
复制
  res = caller.callWithoutRetries(callable, operationTimeout);
代码语言:txt
复制
```
代码语言:txt
复制
然后是RpcRetryingCaller中调用了MultiServerCallable的call方法,主要是构建请求,调用RPC。这就进入了服务端也即RSRpcServices的mutil方法。
代码语言:txt
复制
```java
代码语言:txt
复制
responseProto = getStub().multi(controller, requestProto);
代码语言:txt
复制
```
3.2.3 HRegionserver端处理

RSRpcServices是服务端,本文对应的服务端实现是RSRpcServices.mutli。

代码语言:txt
复制
if (request.hasCondition()) {
  Condition condition = request.getCondition();
  byte[] row = condition.getRow().toByteArray();
  byte[] family = condition.getFamily().toByteArray();
  byte[] qualifier = condition.getQualifier().toByteArray();
 CompareOp compareOp = CompareOp.valueOf(condition.getCompareType().name());
 ByteArrayComparable comparator =
      ProtobufUtil.toComparator(condition.getComparator());
 processed = checkAndRowMutate(region, regionAction.getActionList(),
 cellScanner, row, family, qualifier, compareOp,
 comparator, regionActionResultBuilder);
} else {
  mutateRows(region, regionAction.getActionList(), cellScanner,
 regionActionResultBuilder);
 processed = Boolean.TRUE;
}

根据条件进入checkAndRowMutate或者mutateRows。

根据类型做不同的操作,然后正式进入执行操作

代码语言:txt
复制
MutationType type = action.getMutation().getMutateType();
  if (rm == null) {
    rm = new RowMutations(action.getMutation().getRow().toByteArray());
 }
 switch (type) {
 case PUT:
      rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
      break;
    case DELETE:
      rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
      break;
    default:
 throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
 }
 // To unify the response format with doNonAtomicRegionMutation and read through client's
  // AsyncProcess we have to add an empty result instance per operation
 resultOrExceptionOrBuilder.clear();
 resultOrExceptionOrBuilder.setIndex(i++);
 builder.addResultOrException(
      resultOrExceptionOrBuilder.build());
}
region.mutateRow(rm);

HRegion.mutateRow方法

HRegion.mutateRowsWithLocks

代码语言:txt
复制
public void mutateRowsWithLocks(Collection<Mutation> mutations,
 Collection<byte[]> rowsToLock) throws IOException {
  mutateRowsWithLocks(mutations, rowsToLock, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
public void mutateRowsWithLocks(Collection<Mutation> mutations,
 Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
  MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
 processRowsWithLocks(proc, -1, nonceGroup, nonce);
}

具体处理的过程,可以自行去看了,源码注释条例很清晰。

4. 总结

Hbase的JAVA API客户端,写操作有三种实现:

  • HTablePool
代码语言:txt
复制
源码请看hbase权威指南。
代码语言:txt
复制
![](https://img-blog.csdnimg.cn/20190914124230128.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2FzZDEzNjkxMg==,size_16,color_FFFFFF,t_70)
  • HConnection
代码语言:txt
复制
这种方式要自己实现一个线程池。
代码语言:txt
复制
```java
代码语言:txt
复制
Connection conn = ConnectionFactory.createConnection(conf);
代码语言:txt
复制
TableName tabName=  TableName.valueOf("tableName");
代码语言:txt
复制
Table table=conn.getTable(tabName);
代码语言:txt
复制
```
  • BufferedMutator
代码语言:txt
复制
建议put操作采用这种方式。
代码语言:txt
复制
批量,异步puts操作。

5. Ref

  1. https://cloud.tencent.com/developer/article/1032502
  2. hbase权威指南

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 基本介绍
  • 2. BufferedMutator使用举例
    • 2.1 单次一张表批量写入
      • 多次多张表批量写入
      • 3 源码介绍
        • 3.1 主要类介绍
          • 3.2 源码过程
            • 3.2.1 BufferedMutator构建的过程
            • 3.2.2 数据发送的过程
            • 3.2.3 HRegionserver端处理
        • 4. 总结
        • 5. Ref
        相关产品与服务
        云服务器
        云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档