HBase行锁与MVCC分析

四个部分分析:

  • 案例场景
  • 流程解析
  • 0.94-0.96实现方案分析
  • 模拟试验及分析

一、案例场景

转发微博

抱歉,此微博已被作者删除。查看帮助:http://t.cn/zWSudZc

| 转发| 收藏| 评论

本来是不同的短链码,结果删除后,会只在同一个token上操作,也就是 被=zWSudZc  

引发几个操作:

delete zWSudZc mid

decr zWSudZc  shareCount

引起的问题是发现写操作堵死在 zWSudZc这个rowKey上

微博feed如果采用HBase,以mid为rowKey,热门微博的操作也会面临这种问题。分析这个问题前我们先要了解HBase 如何保证写一致性:

冲突预防:避免分布式的并发写操作,将对特定数据项的所有写操作路由到单个节点上(可以是全局主节点或者分区主节点)。为了避免冲突,数据库必须牺牲网络隔离情况下的可用性。这种方法常用于许多提供强一致性保证的系统(例如大多数关系数据库,HBase,MongoDB)。

可以做如下猜想,单节点更新时:

  • 写操作会lock住读锁
  • 写操作集中执行,排队等待耗时。

二、流程解析

checkAndPut append increment operation in HRegion (HBase 0.94.X)

  • startRegionOperation (lock.readLock().lock());
  • rowLock lock
  • updatesLock.readLock().lock()
  • mvcc begion
  • mvcc finish
  • updatesLock.unLock
  • closeRegionOperation
  • get scan
  • startRegionOperation
  • MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);
  • closeRegionOperation

三种锁区别

region lock updatesLock 都是ReentrantReadWriteLock。ReentrantReadWriteLock 可多读,有写锁被占则阻塞其他所有操作。updatesLock 只在region flush时写锁被占用,region lock 没有出现writelock被占用情况,怀疑无用。rowlock  为MultiVersionConsistencyControl 中 ConcurrentHashMap<HashedBytes, CountDownLatch> 类型,变量名lockedRows  闭锁

MVCC  MultiVersionConsistencyControl

  • 管理memstore的读/写一致性。Use MVCC to make this set of increments/appends atomic to reads
  • 0.94  0.94.2 中是待实现。TODO  in  increment append checkAnd (少一次MVCC,后续流程会看到)
  • 0.96  realized 
  • put operation,目前项目用的比较多的操作
  • 0.94:  HRegion internalPut

三、0.94-0.96实现方案分析

0.94中

  • increment  append  checkAndPut都使用了行锁和mvcc,但put调用的internalPut没有使用行锁,只使用了mvcc
  • 流程:
  • startRegionOperation (lock.readLock().lock());
  • rowLock lock
  • updatesLock.readLock().lock()
  • mvcc begion
  • mvcc finish
  • updatesLock.unLock
  • closeRegionOperation

0.96:

流程:

    (1)  Acquire RowLock

    (1a) BeginMVCC + Finish MVCC

    (2)  Begin MVCC

    (3)  Do work

    (4)  Release RowLock

    (5)  Append to WAL

    (6)  Finish MVCC

wait for all prior MVCC transactions to finish - while we hold the row lock (so that we are guaranteed to see the latest state)

如果版本升级到0.96  由于MVCC的介入  increment操作可能更慢

0.96预计做的改进:

commiter也认为两次mvcc没必要 ,改进流程  https://issues.apache.org/jira/browse/HBASE-7263

(1)  Acquire RowLock

(1a) Grab+Release RowWriteLock (instead of BeginMVCC + Finish MVCC)

(1b) Grab RowReadLock (new step!)

(2)  Begin MVCC

(3)  Do work

(4)  Release RowLock

(5)  Append to WAL

(6)  Finish MVCC

(7)  Release RowReadLock (new step!)

另外也去掉了client端无用的分配lockid方法

四、模拟试验及分析

  • 构造模拟代码

HBaseInsertTest1类,  TestKeyValueSkipListSet为提取 HBase的KeyValueSkipListSet作为公有类,存储数据使用

package com.daodao.hbase;

import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * Created with IntelliJ IDEA.
 *
 * @author guanpu
 *         Date: 13-1-9
 *         Time: 下午5:53
 *         分析0.94 insert操作性能
 */
public class HBaseInsertTest1 {
    volatile TestKeyValueSkipListSet kvset;
    final ReentrantReadWriteLock lock =
            new ReentrantReadWriteLock();
    final ReentrantReadWriteLock updatesLock =
            new ReentrantReadWriteLock();
    private final MultiVersionConsistencyControl mvcc =
            new MultiVersionConsistencyControl();
    private static AtomicInteger finishedCount;
    private static AtomicLong mvccTime = new AtomicLong(0l);
    private static AtomicLong rowlockTime = new AtomicLong(0l);
    private static AtomicLong lockTime = new AtomicLong(0l);
    private static AtomicLong updateLockTime = new AtomicLong(0l);
    private static AtomicLong insertTime = new AtomicLong(0l);
    private static AtomicLong releaseTime = new AtomicLong(0l);

    private final ConcurrentHashMap<String, CountDownLatch> lockedRows =
            new ConcurrentHashMap<String, CountDownLatch>();

    public HBaseInsertTest1() {
        kvset = new TestKeyValueSkipListSet(new KeyValue.KVComparator());
        finishedCount = new AtomicInteger(0);
    }

    class HBaseInsertTask implements Runnable {

        public void run() {
            for (int i = 0; i < 100000; i++) {
                String key = "key" + i;
                long time = System.nanoTime();
                MultiVersionConsistencyControl.WriteEntry localizedWriteEntry = null;
                try {


                    lock.readLock().lock();   // like startRegionOperation do
                    lockTime.set(lockTime.get() + (System.nanoTime() - time));

                    time = System.nanoTime();
                    Integer lid = getLock(key);     //get rowKey lock
                    lockTime.set(System.nanoTime() - time);

                    time = System.nanoTime();
                    updatesLock.readLock().lock();
                    updateLockTime.set(updateLockTime.get() + (System.nanoTime() - time));

                    time = System.nanoTime();
                    localizedWriteEntry = mvcc.beginMemstoreInsert();
                    mvccTime.set(mvccTime.get() + (System.nanoTime() - time));

                    time = System.nanoTime();
                    kvset.add(new KeyValue(Bytes.toBytes(key), Bytes.toBytes("f"), Bytes.toBytes("column"),
                            1l, Bytes.toBytes(1l)));
                    insertTime.set(insertTime.get() + (System.nanoTime() - time));

                    time = System.nanoTime();
                    mvcc.completeMemstoreInsert(localizedWriteEntry);
                    mvccTime.set(mvccTime.get() + (System.nanoTime() - time));
                } catch (Exception e) {
                    System.out.println(e);
                } finally {
                    time = System.nanoTime();
                    updatesLock.readLock().unlock();

                    CountDownLatch rowLatch = lockedRows.remove(key);
                    rowLatch.countDown();

                    lock.readLock().unlock();
                    releaseTime.set(releaseTime.get() + (System.nanoTime() - time));

                }
            }
            finishedCount.set(finishedCount.get() + 1);
            return;
        }

        private Integer getLock(String key) {
            CountDownLatch rowLatch = new CountDownLatch(1);

            // loop until we acquire the row lock (unless !waitForLock)
            while (true) {

                CountDownLatch existingLatch = lockedRows.putIfAbsent(key, rowLatch);
                if (existingLatch == null) {
                    break;
                } else {
                    try {
                        if (!existingLatch.await(30000,
                                TimeUnit.MILLISECONDS)) {
                            System.out.println("some thing wrong in waiting");
                            return null;
                        }
                    } catch (InterruptedException ie) {
                        // Empty
                    }
                }
            }
            return 1;
        }
    }

    private class DaodaoTestWatcher implements Runnable {

        @Override
        public void run() {
            long time = System.nanoTime();
            while (finishedCount.get() != 50) {

            }
            System.out.println("cost time:" + (System.nanoTime() - time) / 1000000000.0);
            System.out.println("cost time:  mvcc" + mvccTime.get() / 1000000000.0 / 50);
            System.out.println("cost time:  lock" + lockTime.get() / 1000000000.0 / 50);
            System.out.println("cost time:  update" + updateLockTime.get() / 1000000000.0 / 50);
            System.out.println("cost time:  rowlock" + rowlockTime.get() / 1000000000.0 / 50);
            System.out.println("cost time:  release" + releaseTime.get() / 1000000000.0 / 50);
        }
    }

    public void test() {
        ExecutorService executorService = Executors.newFixedThreadPool(200);
        for (int i = 0; i < 50; i++)
            executorService.execute(new HBaseInsertTask());
        executorService.execute(new DaodaoTestWatcher());


    }

    public static void main(String[] args) {
        new HBaseInsertTest1().test();
    }


}

耗时:

cost time:24.727145
cost time: mvcc22.98698292
cost time: lock0.0
cost time: update0.009690879999999999
cost time: rowlock0.0
cost time: release0.05001874

去掉mvcc

cost time:5.190751
cost time:  mvcc0.0073236
cost time:  lock0.0
cost time:  update0.017533220000000002
cost time:  rowlock0.0
cost time:  release1.3753079

0.96代码,在 updatesLock.readLock().lock(); 之后 增加:

                     time = System.nanoTime();
                    // wait for all prior MVCC transactions to finish - while we hold the row lock
                    // (so that we are guaranteed to see the latest state)
                    mvcc.completeMemstoreInsert(mvcc.beginMemstoreInsert());
                    mvccTime.set(mvccTime.get() + (System.nanoTime() - time));

耗时:

cost time:43.04134
cost time:  mvcc40.70520202
cost time:  lock0.0
cost time:  update0.00937416
cost time:  rowlock0.0
cost time:  release0.05023072

0.94中 increment  append  checkAndPut都使用了行锁和mvcc,但put调用的internalPut没有使用行锁,只使用了mvcc

优化方案:对于单版本服务,可以都更改为加行锁,去掉mvcc,写性能会获得进一步提升。

如果rowkey改为固定单个rowkey

0.94版本 耗时 (各个均为总耗时):

cost time:27.660935 cost time: mvcc3.888678 cost time: lock0.0 cost time: insert9.319777 cost time: update0.964697 cost time: rowlock0.0 cost time: release16.997803

但实际跑HBase插入时key变化耗时比不变key 快4倍,

跑standalone单机测试,两者速度基本相同。性能消耗应该在寻找region或网络传输,需要进一步验证。

总结:

  • region更新耗时主要集中在MVCC
  • 单版本的数据库,我认为可以去掉各种更新操作的MVCC,在修改操作中预先获取rowkey的写锁即可,避免全Region范围的MVCC
  • 从客户端到HBase的单rowkey 整体流程瓶颈 还需要进一步探索真实分布式环境下的状况。

----------------------------------------扩展----------------------------------

MySQL MVCC  by @曾经的阿飞(军伟)

MySQL5.6对与read-trasanction的优化,http://t.cn/zjnPhdq,将trx_list拆分成ro_trx_list和rw_trx_list,创建read-view只需对rw_trx_list进行snapshot,因此读事务不会影响到read-view的snapshot从而不会制约trx_sys_t::mutex,可以提高性能。@yangwm @慢半拍de刀刀 @启盼cobain @jolestar @蔚1984

mvcc 原理

1、与行级锁对应

行级锁 悲观锁

   R  W

R  y  n

W  n  n

MVCC

保存version

更新10 v,读取9 v

扩展知识:乐观锁

select -》  update  =》 再select看 是否有改动,如果有则rollback; 适用于冲突比较少的情况。

redis服务器端 是否也 实现了乐观锁。  ---- 待确认  单线程串行方式是否需要加锁?

2、 innodb mvcc

每行纪录有tx_id  rollback_point  两个字段去做控制,从而实现。

table  : row  c1 c2  tx_id  rollback_point 

rollback_point 指向上一个版本的纪录。

mysql 隔离级别  四种:read onCommit(读到没有提交的事务) 、read Committed(只能读到已提交的数据,从当前active transaction list中判断,从指针回溯)、 repeatable read(可重复读)、Serializable(串行化,所有语句加 select for update,后台加锁)

Read View 小于 active transaction 则正常读。  Read View有间隙 ,读到中间版本也时正确的。

 非Serializable 时,需要手动调用

@蔚1984 的 http://boneylw.sinaapp.com/?p=16  MVCC分析也可以对比阅读一下。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏技术专栏

慕课网高并发实战(七)- J.U.C之AQS

3.不断重新尝试获取锁(当前结点为head的直接后继才会 尝试),如果获取失败,则会阻塞自己,直到被唤醒

17420
来自专栏友弟技术工作室

ElasticSearch入门实战1

12030
来自专栏web编程技术分享

【手把手】JavaWeb 入门级项目实战 -- 文章发布系统 (第五节)

484110
来自专栏我是攻城师

ElasticSearch入门之彼行我释(四)

35650
来自专栏沃趣科技

MySQL的一个表最多可以有多少个字段

问题由来 引用我们客户的原话: *创建如下表,提示我:* ? *如果我将下面表中的varchar(200),修改成text(或blob):报错变为另一个:* ?...

89290
来自专栏信安之路

Mysql注入导图-学习篇

接触SQLi(SQL injection)已有大半年,不得不说,这是个庞大的领域。每次以为自己都弄懂了之后都会有新的东西冒出来,需要再次学习,一路走来效率不高,...

18300
来自专栏tkokof 的技术,小趣及杂念

小话游戏脚本(三)

在此就heSript实现过程中的一些解决方案和自己的想法陈列一番,由于自己编程水平实在拙劣,又没什么实际经验,所以导致相关的代码非常糟糕,所以竭诚欢迎大家批评...

8110
来自专栏扎心了老铁

spark-streaming集成Kafka处理实时数据

在这篇文章里,我们模拟了一个场景,实时分析订单数据,统计实时收益。 场景模拟 我试图覆盖工程上最为常用的一个场景: 1)首先,向Kafka里实时的写入订单数据,...

83650
来自专栏挖掘大数据

整合Kafka到spark-streaming实例

在这篇文章里,我们模拟了一个场景,实时分析订单数据,统计实时收益。

2.4K90
来自专栏决胜机器学习

设计模式专题(二十) ——职责链模式

设计模式专题(二十)——职责链模式 (原创内容,转载请注明来源,谢谢) 一、概述 职责链模式(Chainof Responsibility),是使多个对象都有...

35690

扫码关注云+社区

领取腾讯云代金券