并发编程之ConcurrentSkipListMap

在并发包下面,还有一种数据结构,这就是跳表(SkipList),第一次听说跳表是在redis里面,但当时只是大概理解意思。这次学习并发包,遇到了ConcurrentSkipListMap,那就一次从跳表特性,put和get操作,以及Doug Lea通过怎样一种结构,保证了它的线程安全性。

What is ConcurrentSkipListMap

首先说说什么是跳表,常见数据结构有线性表和树以及图,当然这几种就不多说,跳表是一种不同于这三种的结构。

先看一个例子。

下面是一个简单的链表即线性表:

此时增加节点为O(1),删除和查找为O(n)。

假设此时,又给出一个索引链表,如果需要查找数据,得先去找索引,然后所以告诉你待查元素是不是归它管。例如增加索引后的结构为:

这样以来,比如你要找35号节点,本来如果通过链表,你需要从头找到尾,有了上一层的索引,

你只需要分别查找1,7,11,23,27,35就可以了。

是不是对跳表有了更深的一层理解?

总结下SkipList的特性:

1. 由很多层结构组成。

2. 最底层的链表包含所有元素。

3. 以空间换时间。

4. 每个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素。

5. 如果一个元素出现在Level n 的链表中,则它在Level n 以下的链表也都会出现。

下面结合ConcurrentSkipListMap的put,get,size操作分析,来进一步学习其结构。

ConcurrentSkipListMap分析

首先来看ConcurrentSkipListMap特性:

iterators和spliterators 都是弱一致性的

线程安全性

不允许null键以及null值

containsKey和get,put,remove方法等操作的时间复杂度平均为log(n)

put操作

先看put操作代码:

public V put(K key, V value) {

if (value == null)

throw new NullPointerException();

return doPut(key, value, false);

}

简单的判断value是否为null,然后调用doPut方法,讲doPut方法前,先说说private Node findPredecessor(Object key, Comparator cmp) 方法,它的作用是 找到合适与key的前一个位置。

private Node findPredecessor(Object key, Comparator cmp) {

if (key == null) //判断为null

throw new NullPointerException();

for (;;) {

for (Index q = head, r = q.right, d;;) { //head为最顶上一层的头节点。

if (r != null) { //r为head下一个节点。

Node n = r.node; //获得r的node

K k = n.key; //获得n的key。

if (n.value == null) { //r为无效节点

if (!q.unlink(r))

break; // restart 取消q绑定的r,失败就重新来。

r = q.right; // reread r 成功的话,就重新r,此时r为q.right.right,也就是删除了原来的r。

continue;

}

if (cpr(cmp, key, k) > 0) { //如果key>k,因为skiplist是有顺序的,所以可以这样比较。

q = r; //q为r,找下一个节点。

r = r.right;

continue;

}

}

/**

* 前面方法有两个用,

* 1 是找无效节点解除绑定

* 2 是比较key和k,如果key>k,则直接会跳过下面的往后找,直到key>k

*/

//当前面方法过了后,就看是down是不是null,

if ((d = q.down) == null) //此时q是最底层节点,那么就是这个node。

return q.node; //找到了这个节点,down==null。

q = d; //往下找一层。

r = d.right;

}

}

}

findPredecessor的思路主要是从上层的第一个节点开始找,如果找到大于(或小于),就往下,因为上层节点一定有down指针指向下一层。最终找到的节点只能是最下面那一层,因为由这个返回条件可以知晓:

if ((d = q.down) == null) //此时q是最底层节点,那么就是这个node。

return q.node; //找到了这个节点,down==null。

接下来看doPut方法:

private V doPut(K key, V value, boolean onlyIfAbsent) {

Node z; //待添加的node。

if (key == null) //key不允许为null。

throw new NullPointerException();

Comparator cmp = comparator; //获得比较器。

/**

* 以下为在最底层合适位置插入一个节点key,value。

* 或者替换最底层一个节点。

*/

outer: for (;;) {

for (Node b = findPredecessor(key, cmp), n = b.next;;) {

//找到对应与key的前一个节点。

if (n != null) { //当b.next不为null时候,就是把key插入到n和b之间。

Object v; int c;

Node f = n.next;

if (n != b.next) // 不一致读。

break;

if ((v = n.value) == null) { // n被删除了。

n.helpDelete(b, f); //就是CAS方法删除。

break;

}

if (b.value == null || v == n) // b is deleted 再检测一次是不是被删除了。

break;

if ((c = cpr(cmp, key, n.key)) > 0) {

//如果key>n.key,那么就往下一个找。

//这一步为了再次检测一部,防止并发操作,别的线程先插入,

b = n;

n = f;

continue;

}

if (c == 0) { //相等就替换。

if (onlyIfAbsent || n.casValue(v, value)) {

//此处能够执行的条件为onlyIfAbsent == true,也就是不存在才替换,

//存在直接返回,或者cas成功。

@SuppressWarnings("unchecked") V vv = (V)v;

return vv;

}

break; // restart if lost race to replace value

}

// else c

}

//当,n为null。

//或者前面检测都通过了,

//则直接插入到n后面

z = new Node(key, value, n);

if (!b.casNext(n, z)) //将z插入到b和n之间。

break; // 插入失败重新开始。

break outer;

}

}

/**

* 以下为随机选择一个数,从刚刚插入的z节点,做一条单独的由

* down节点连接而成的链。

*

* 然后,在通过splice循环,把这条链水平方向连起来。从而形成网状结构。

*/

//因为是跳表,所以要往它插入层的下层继续插入。

int rnd = ThreadLocalRandom.nextSecondarySeed(); //获取一个当前线程的随机数。

if ((rnd & 0x80000001) == 0) { // test highest and lowest bits 测试最高和最低位。

int level = 1, max; //最高层和最低层

while (((rnd >>>= 1) & 1) != 0) // // 抛硬币决定层次,

++level;

Index idx = null;

HeadIndex h = head;

if (level

for (int i = 1; i

idx = new Index(z, idx, null); //在每一层都添加。

}

else { //选择的这个level>max,那么就要增加一层。

level = max + 1; // 只增加一层。

@SuppressWarnings("unchecked")Index[] idxs =

(Index[])new Index[level+1];

for (int i = 1; i

idxs[i] = idx = new Index(z, idx, null); //首先从第一层到level层都连接上节点。

for (;;) {

h = head;

int oldLevel = h.level;

if (level

break;

HeadIndex newh = h;

Node oldbase = h.node;

for (int j = oldLevel+1; j

newh = new HeadIndex(oldbase, newh, idxs[j], j);

if (casHead(h, newh)) { //替换head节点。

h = newh;

idx = idxs[level = oldLevel];

break;

}

}

}

// 找到插入的点,并且把index插入。

splice: for (int insertionLevel = level;;) { //从插入的level层次开始。

int j = h.level;

for (Index q = h, r = q.right, t = idx;;) {

if (q == null || t == null) //退出

break splice;

if (r != null) { //r不为null,

Node n = r.node;

// 在删除前比较,防止需要检查两次。

int c = cpr(cmp, key, n.key);

if (n.value == null) { //已经被删除了,就不一致读。

if (!q.unlink(r))

break;

r = q.right;

continue;

}

if (c > 0) { //右移

q = r;

r = r.right;

continue;

}

}

//找到位置了。就开始插入。

if (j == insertionLevel) {

if (!q.link(r, t)) //连接

break; // restart

if (t.node.value == null) {

findNode(key); //如果此时当前节点被删除,删除已经删除的节点。并且退出循环。

break splice;

}

// 标志的插入层自减 ,如果== 0 ,表示已经到底了,插入完毕,退出循环

if (--insertionLevel == 0)

break splice;

}

// 上面节点已经插入完毕了,插入下一个节点

if (--j >= insertionLevel && j

t = t.down;

q = q.down;

r = q.right;

}

}

}

return null;

}

代码很长,具体意思已经写道代码注释里面。其实开始我看的时候,那张图看得很明白,但是具体实现流程却总是没有想通。感觉精髓在于它的实现过程。

doPut操作,主要步骤分为下面三步:

以下为在最底层合适位置插入一个节点key,value。 或者替换最底层一个节点。

随机选择一个数n,如果这个数小于maxLevel,那么直接从1~n上增加节点,否则就从1~maxLevel+1上增加,并且新增加一链,从刚刚插入的z节点,做一条单独的由 down连接而成的链。这是纵向的。

然后,再通过splice循环,把这条链水平方向连起来。从而形成网状结构。

这样一来,整个插入过程就明了了。

下面再看一张完整的skiplist存储结构图:

get操作

接下来看看get操作相关的doGet方法:

private V doGet(Object key) {

if (key == null) //判断为null

throw new NullPointerException();

Comparator cmp = comparator; //获取comparator。

outer: for (;;) {

for (Node b = findPredecessor(key, cmp), n = b.next;;) { //找到predecessor。

Object v; int c;

if (n == null) //如果b.next为null,即b为最后一个。

break outer;

Node f = n.next;

if (n != b.next) // inconsistent read //不一致读。

break;

if ((v = n.value) == null) { // n is deleted n被删除了。

n.helpDelete(b, f);

break;

}

if (b.value == null || v == n) // b is deleted b被删除了。

break;

if ((c = cpr(cmp, key, n.key)) == 0) { // key就是这个key,所以直接返回

@SuppressWarnings("unchecked") V vv = (V)v;

return vv;

}

if (c

break outer;

b = n;

n = f;

}

}

return null;

}

由上图代码,代码主要有以下几步:

1. 通过findPredecessor来获得离得最近的前趋节点。

2. 获得这个前趋节点的值从而判断前趋节点状态,比如是否被删除,是否有别的线程已经更改过了。

3. 和前趋节点比较,根据比较结果来判定是否成功get到,例如,如果判断比后者小(说明判断逻辑不对),就直接退出。当比较结果相等,则直接返回。

remove操作

接下来看remove操作:

public V remove(Object key) {

return doRemove(key, null);

}

直接调用doRemove方法:

final V doRemove(Object key, Object value) {

if (key == null) //判空。

throw new NullPointerException();

Comparator cmp = comparator;

outer: for (;;) {

for (Node b = findPredecessor(key, cmp), n = b.next;;) { //找到predecessor节点。

Object v; int c;

if (n == null) //说明后面没有了,已经删掉了。

break outer;

Node f = n.next;

if (n != b.next) // 不一致读,break内层循环

break;

if ((v = n.value) == null) { // n已经被删掉了。但是没删完全,就帮他一起删

n.helpDelete(b, f);

break; //break内层循环

}

if (b.value == null || v == n) // 重新找一遍predecessor。

break;

if ((c = cpr(cmp, key, n.key))

break outer;

if (c > 0) { // 右移

b = n;

n = f;

continue;

}

// value != null 表示需要同时校验key-value值

if (value != null && !value.equals(v))

break outer;

if (!n.casValue(v, null)) // CAS替换value为null。

break;

if (!n.appendMarker(f) || !b.casNext(n, f)) //尝试取消链接。

findNode(key); // 在这个方法里面会调用helpDelete方法,从而把value为null的值删除。

else {

findPredecessor(key, cmp); // 清除索引连接,就是消除为null的。

// head.right == null表示该层已经没有节点,删掉该层

if (head.right == null)

tryReduceLevel();

}

@SuppressWarnings("unchecked") V vv = (V)v;

return vv; //走到这里,说明删除成功了。

}

}

return null;

}

具体意思已经在代码注释中写出,那么,

删除是一种什么逻辑呢?简单理一下思路:

首先假设要删除图下的11节点:

由findPredecessor找到一个节点,就是最下面一层的9。

假设正常下把11删除了,然后继续操作。

接着会依次找到第二层和第一层的7,并依次将11置为null,并调用findNode或者findPredecessor去helpDelete。

size操作

和其他并发集合一样,size操作也是不准确的,它只能提供一个某一个时刻的值:

public int size() {

long count = 0;

for (Node n = findFirst(); n != null; n = n.next) {

if (n.getValidValue() != null)

++count;

}

return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;

}

弱一致性

开始看了多篇Concurrent包下面的集合类,虽然读到了weak-reference,但是都没有注意,所以这里必须等注意下了。

比起其他的集合例如ArrayList,HashMap,TreeMap等,记得是fail-fast的,也就是如果同时有多个线程对其结构发起了改动操作,会使得modCount改变从而抛出错误。

而线程安全集合例如ConcurrentHashMap,ConcurrentSkipListMap等,基于CAS实现的,就算有多个线程同时修改结构,也不会抛出错误,同时,也不能保证读取的准确性,比如一个线程先读,当获得值后,一定能保证集合里面有这个值吗?说不定在读取后没返回之前就被另一个集合删除了。

这就是并发集合的弱一致性。

心得

ConcurrentSkipListMap整体实现思路不难理解,但是对于skiplist链表操作还是有难度的。

Doug Lea大佬,就单单用CAS操作,就能实现如此众多的并发工具类,当然,其中包含了许多的逻辑判断,以及并发条件下判断。

如果只有一个线程,则正常进行,如果操作过程中,获取的值变了,或者程序上下文变量变了,说明被人抢先一步,那么就需要continue从头开始操作。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190130G0222K00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券

玩转腾讯云 有奖征文活动