专栏首页算法之名ConcurrentSkipListMap跳表原理解析

ConcurrentSkipListMap跳表原理解析

我们首先来看一下ConcurrentSkipListMap的继承结构图。

内部结构如下(图片来源于网络),这里面Node其实就是HeadIndex中的level1,level2,level3中的一个个绿点。

ConcurrentSkipListMap的整体数据结构是一种多层链表结构,在Java中,我们都知道链表有LinkedList,但LinkedList是一个双向链表.由插入时候的以下代码就可以看出来。

void linkBefore(E e, Node<E> succ) {
    // assert succ != null;
    final Node<E> pred = succ.prev;
    final Node<E> newNode = new Node<>(pred, e, succ);
    succ.prev = newNode;
    if (pred == null)
        first = newNode;
    else
        pred.next = newNode;
    size++;
    modCount++;
}

而ConcurrentSkipListMap中的各层链表为单向链表,并且key值有序排列。这里有其Node节点的代码可以看出,它只有一个next的后续节点。

static final class Node<K,V> {
    final K key;
    volatile Object value;
    volatile Node<K,V> next;

以及它的两个构造器

Node(K key, Object value, Node<K,V> next) {
    this.key = key;
    this.value = value;
    this.next = next;
}
Node(Node<K,V> next) {
    this.key = null;
    this.value = this;
    this.next = next;
}

我们先来看一下,当我们new一个ConcurrentSkipListMap的时候会发生什么。

final Comparator<? super K> comparator; //函数式接口——比较器
public ConcurrentSkipListMap() {
    this.comparator = null;
    initialize();
}

private transient volatile HeadIndex<K,V> head; //HeadIndex继承于Index,这里为原始头索引(当整个数据结构中还没有分层,没有链表的时候)

private static final Object BASE_HEADER = new Object();
private void initialize() {
    keySet = null;
    entrySet = null;
    values = null;
    descendingMap = null;
    //这里head会被初始化,他的属性中,Node的Key为null,value为Object对象,下一个节点为null;
    //head的分层索引down为null,链表的后续索引right为null,层级level为第一层。
    head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
                              null, null, 1);
}
static final class HeadIndex<K,V> extends Index<K,V> {
    final int level; //层数
    HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}
static class Index<K,V> {
    final Node<K,V> node; //节点
    final Index<K,V> down; //分层索引,分层索引跟上层索引的Node的key,value相同,next不同
    volatile Index<K,V> right; //链表后续索引

索引的构造器

Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
    this.node = node;
    this.down = down;
    this.right = right;
}

做为一个Map,我们来看一下它的put方法。

public V put(K key, V value) {
    //写入时不允许写入null值
    if (value == null)
        throw new NullPointerException();
    return doPut(key, value, false);
}
private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K,V> z;             // added node
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        //在最底层链表中获取目标的前置节点
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            //如果该前置节点的后续节点不为null
            if (n != null) {
                Object v; int c;
                //获取后续节点的后续节点
                Node<K,V> f = n.next;
                //如果n不为前置节点的后续节点,重新进入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循环
                //因为可能有其他线程已经插入了其他节点在b的后续节点
                if (n != b.next)               // inconsistent read
                    break;
                //如果后续节点n的值为null,删除该节点n,用n的后续节点顶替n,重新进入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循环
                if ((v = n.value) == null) {   // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                //如果b为null或者v等于自己的value,说明b已经被其他线程删除了
                //重新进入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循环
                if (b.value == null || v == n) // b is deleted
                    break;
                //如果我们查找的key大于后续节点的key,向后续节点推进
                if ((c = cpr(cmp, key, n.key)) > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                //如果我们查找的key等于后续节点的key
                if (c == 0) {
                    //通过无锁竞争,将value替换后续节点的值,并返回后续节点的原值
                    //竞争失败的,重新进入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循环
                    //这里我们可以看到修改已存在的key的时候,是不会进行层数变化的
                    if (onlyIfAbsent || n.casValue(v, value)) {
                        @SuppressWarnings("unchecked") V vv = (V)v;
                        return vv;
                    }
                    break; // restart if lost race to replace value
                }
                // else c < 0; fall through
            }
            //如果后续节点n为null,初始化一个节点,放入我们要存储的key,value,该节点的后续节点为n
            z = new Node<K,V>(key, value, n);
            //通过无锁竞争,将该新节点替换掉b的后续节点n,此时只有一个线程可以竞争成功,并替换
            //竞争失败的线程,重新进入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循环
            if (!b.casNext(n, z))
                break;         // restart if lost race to append to b
            //竞争成功的,退出'for (;;)'循环
            break outer;
        }
    }
    //到此时表示已经节点已经put成功了,但对于跳表来说,来要根据随机数的值来表示是否向上增加层数与上层节点
    //获取一个伪随机的种子
    int rnd = ThreadLocalRandom.nextSecondarySeed();
    //如果该种子的二进制与10000000000000000000000000000001进行与运算为0
    //即该种子的二进制最高位与最末尾必须为0,其他位无所谓
    //如果该种子的二进制最高位与最末位不为0,不增加新节点的层数
    if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
        //定义层level,从1开始
        int level = 1, max;
        //判断该种子值的二进制从第二位开始向左有多少个连续的1,层数加多少个1
        //这里由于是随机值,所以层数level是不确定的
        while (((rnd >>>= 1) & 1) != 0)
            ++level;
        Index<K,V> idx = null;
        //获取头索引head
        HeadIndex<K,V> h = head;
        //如果level小于等于头索引的层数
        if (level <= (max = h.level)) {
            //根据层数level不断创建新增节点的下层索引
            //注意此时只是新增了新节点的索引,并没有关联到跳表的真实体中
            for (int i = 1; i <= level; ++i)
                idx = new Index<K,V>(z, idx, null);
        }
        //如果层数level大于头索引的层数
        else { // try to grow by one level
            //将层数level变更为头索引的层数加1
            level = max + 1; // hold in array and later pick the one to use
            //创建一个数量为level+1的索引数组
            @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                (Index<K,V>[])new Index<?,?>[level+1];
            //根据层数level不断创建新增节点的下层索引,并放入数组中
            //此时只是新增了新节点的索引,并没有关联到跳表的真实体中
            for (int i = 1; i <= level; ++i)
                idxs[i] = idx = new Index<K,V>(z, idx, null);
            for (;;) {
                //获取头索引
                h = head;
                //获取头索引的层数
                int oldLevel = h.level;
                //如果level小于等于oldLevel,说明已经有其他线程修改了头索引的层数,退出循环
                if (level <= oldLevel) // lost race to add level
                    break;
                //定义一个新的头索引,取值h
                HeadIndex<K,V> newh = h;
                //获取头索引的节点
                Node<K,V> oldbase = h.node;
                //该段代码的意思其实只是新增一层新链表,这一层新链表以原头索引为下层索引,新增节点索引为链表后续索引,所以这
                //一层新链表只有头索引和新增节点的索引两个索引,但由于有多个线程的参与,该循环体可能会不断执行
                for (int j = oldLevel+1; j <= level; ++j)
                    newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                //通过无锁竞争,使用newh来替代h(此时h不等于newh,只是跳表的头索引位置被替换掉了)
                //竞争成功的线程,将h变更为newh,idx变更为oldlevel层的新增节点索引,并退出循环'for (;;)'
                //因为此时新链表层已经确定,仅为头索引和新节点索引,而之前所有的层的链表均要插入新节点索引,所以会作此变更
                //竞争失败的重新进入循环'for (;;)'
                if (casHead(h, newh)) {
                    h = newh;
                    idx = idxs[level = oldLevel];
                    break;
                }
            }
        }
        // find insertion points and splice in
        //获取level,该level为原头节点的层数,即原跳表的层数,不包括新层
        splice: for (int insertionLevel = level;;) {
            //获取头索引的层数
            int j = h.level;
            for (Index<K,V> q = h, r = q.right, t = idx;;) {
                //如果头索引为null或者新增节点索引为null,退出'for (int insertionLevel = level;;)'
                //此处表示有其他线程删除了头索引或者新增节点的索引
                if (q == null || t == null)
                    break splice;
                //如果头索引的链表后续索引存在,如果是新层则为新节点索引,如果是老层,则为原索引
                if (r != null) {
                    //获取r的节点
                    Node<K,V> n = r.node;
                    // compare before deletion check avoids needing recheck
                    //获取我们插入的key和n的key的比较值
                    int c = cpr(cmp, key, n.key);
                    //删除空值索引,具体解释见doGet中
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;
                        r = q.right;
                        continue;
                    }
                    //如果我们插入的key大于n的key,继续向后续推进
                    if (c > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }
                //如果j为跳表原层数      
                if (j == insertionLevel) {
                    //此时q为我们需要在第j层插入新增节点的前置索引
                    //将新节点索引通过无锁竞争插入q与r之间,竞争失败的会重新进入'for (Index<K,V> q = h, r = q.right, t = idx;;)'循环
                    if (!q.link(r, t))
                        break; // restart
                    //如果新增节点的值为null,表示该节点已经被其他线程删除
                    if (t.node.value == null) {
                        findNode(key);
                        break splice;
                    }
                    if (--insertionLevel == 0)
                        break splice;
                }

                if (--j >= insertionLevel && j < level)
                    t = t.down;
                q = q.down;
                r = q.right;
            }
        }
    }
    return null;
}
boolean casValue(Object cmp, Object val) {
    //无锁竞争,如果cmp等于valueOffset,将val替换cmp
    return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
}
private boolean casHead(HeadIndex<K,V> cmp, HeadIndex<K,V> val) {
    return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
}
final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
    //获取调用索引对象的节点
    Node<K,V> n = node;
    //将新索引的链表后续索引(newSucc)设为老索引(succ)
    newSucc.right = succ;
    //如果调用索引对象的值不为null,通过无锁竞争,将新索引替换老索引
    return n.value != null && casRight(succ, newSucc);
}
private Node<K,V> findNode(Object key) {
    if (key == null)
        throw new NullPointerException(); // don't postpone errors
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            if (n == null)
                break outer;
            Node<K,V> f = n.next;
            if (n != b.next)                // inconsistent read
                break;
            if ((v = n.value) == null) {    // n is deleted
                n.helpDelete(b, f);
                break;
            }
            if (b.value == null || v == n)  // b is deleted
                break;
            if ((c = cpr(cmp, key, n.key)) == 0)
                return n;
            if (c < 0)
                break outer;
            b = n;
            n = f;
        }
    }
    return null;
}

关于层数是如何来增加的,这个就依靠于

然后我们来看一下它的get方法。

public V get(Object key) {
    return doGet(key);
}
private V doGet(Object key) {
    if (key == null)
        throw new NullPointerException();
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        //在最底层链表中获取目标的前置节点
        for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
            Object v; int c;
            //如果找到的前置节点没有后续节点,直接跳出循环'for (;;)',然后返回null
            if (n == null)
                break outer;
            //如果后续节点不为null,获取后续节点到后续节点
            Node<K,V> f = n.next;
            //如果n已经不为前置节点到后续节点了,重新进入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循环
            if (n != b.next)                // inconsistent read
                break;
            //如果后续节点n的值为null
            if ((v = n.value) == null) {    // n is deleted
                //通过无锁竞争删除该节点n,因为只允许有一个线程可以删除成功,并重新进入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循环
                n.helpDelete(b, f);
                break;
            }
            //如果前置节点的值为null或者后续节点的值为null重新进入'for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;)'循环
            if (b.value == null || v == n)  // b is deleted
                break;
            //如果查找的键与后续节点的键相同,返回后续节点的值
            if ((c = cpr(cmp, key, n.key)) == 0) {
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
            //如果查找的键小于后续节点的键,直接跳出循环'for (;;)',然后返回null
            if (c < 0)
                break outer;
            //如果查找的键大于后续节点的键,继续向链表的后端推进
            b = n;
            n = f;
        }
    }
    return null;
}

以下这段findPredecessor方法整体的意思为:从最上层的头索引开始向右查找(链表的后续索引),如果后续索引的节点的Key大于我们要查找的Key.则头索引移到下层链表,在下层链表查找,以此反复,一直查找到没有下层的分层索引为止,返回该索引的节点。如果后续索引的节点的Key小于我们要查找的Key,则在该层链表中向后查找。由于查找的Key永远小于索引节点的Key,所以只能找到目标的前置索引节点。其中会有空值索引的存在,这里是通过CAS来进行处理的。这里需要注意的是我们要找的值最终都是在最底层链表中找到的,它不会在高层和中层链表中去找最终值(就算高层、中层链表中有这个值)。

比方说我们要在这个图中找55,它是不会直接在L3中直接找到的,而是经过箭头的方向在L1找55的前置节点。

private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
    if (key == null)
        throw new NullPointerException(); // don't postpone errors
    for (;;) {
        //获取原始头索引以及该头索引的链表后续索引,当ConcurrentSkipListMap刚初始化的时候,r为null
        //但是我们查找肯定不会在空的跳表中查找,所以我们认定头索引的后续索引r不为null.
        for (Index<K,V> q = head, r = q.right, d;;) {
            //如果链表后续索引不为null
            if (r != null) {
                //获取链表后续索引的节点
                Node<K,V> n = r.node;
                K k = n.key;
                //如果该节点的值value为null
                if (n.value == null) {
                    //删除空值索引,即把r的后续索引顶替掉r
                    //如果无锁竞争失败,即多个线程都在删空索引,只有一个线程能删成功,从头开始进入循环'for (Index<K,V> q = head, r = q.right, d;;)'
                    if (!q.unlink(r))
                        break;           // restart
                    //无锁竞争成功,空值索引被成功删除,重新获取r值,并跳过后续代码,进入'for (Index<K,V> q = head, r = q.right, d;;)'下一轮循环
                    //这里我们需要注意,这里的q和r并不代表哪个固定的索引,他们都是不断在具体的索引中不断变化的
                    r = q.right;         // reread r
                    continue;
                }
                //如果该节点的值value不为null,且查找的key值大于该节点的key值
                //向后续索引推进
                if (cpr(cmp, key, k) > 0) {
                    q = r;
                    r = r.right;
                    continue;
                }
            }
            //如果q的分层索引为null,返回q的节点
            if ((d = q.down) == null)
                return q.node;
            //如果q的分层索引不为null,将q赋值为自身的分层索引
            q = d;
            //r赋值为分层索引的链表后续索引
            r = d.right;
        }
    }
}
final boolean unlink(Index<K,V> succ) {
    //前索引的节点的value是否为null,以及用后索引替换前索引是否成功
    return node.value != null && casRight(succ, succ.right);
}
final boolean casRight(Index<K,V> cmp, Index<K,V> val) {
    //无锁竞争,如果前索引cmp等于rightOffset的时候,使用后索引来替换前索引,并返回true;否则返回false
    return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
}
static final int cpr(Comparator c, Object x, Object y) {
    //比较x,y是否相等,如果返回0则相等,为正数说明x大于y
    return (c != null) ? c.compare(x, y) : ((Comparable)x).compareTo(y);
}

关于比较器,可以参考本人博客Java函数式编程整理

void helpDelete(Node<K,V> b, Node<K,V> f) {
    /*
     * 如果本节点的后续节点为f,且本身为b的后续节点
     */
    if (f == next && this == b.next) {
        //如果后续节点为null或者后续节点值不为后续节点
        if (f == null || f.value != f) // not already marked
            //通过无锁竞争生成一个key为null,value和next都相同的后续节点(value和next也可能为null)
            casNext(f, new Node<K,V>(f));
        else
            //如果后续节点不为空,通过无锁竞争,将f的后续节点替换掉本身节点,即删除本身节点。
            b.casNext(this, f.next);
    }
}
boolean casNext(Node<K,V> cmp, Node<K,V> val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • ConcurrentHashMap 1.8原理解析

    JDK 1.8中,Hash家族有这么一些存在,HashMap,HashTable,LinkedHashMap,ConcurrentHashMap。这里面支持线程...

    算法之名
  • ReentrantLock的可重入特性 顶

    在自旋分布式锁实现 中我们已经分析了ReentrantLock的自旋特性,现在我们来分析一下它的可重入特性。

    算法之名
  • Mybatis初始化的builder建造者模式

    在Mybatis的初始化的主要工作是加载并解析mybatis-config.xml的配置文件、映射配置文件以及相关的注解信息。因为使用了建造者模式,BashBu...

    算法之名
  • Kafka源码系列之源码分析zookeeper在kafka的作用

    浪尖的kafka源码系列以kafka0.8.2.2源码为例给大家进行讲解的。纯属个人爱好,希望大家对不足之处批评指正。 一,zookeeper在分布式集群的作...

    Spark学习技巧
  • Kafka源码系列之源码分析zookeeper在kafka的作用

    Spark学习技巧
  • ConcurrentHashMap源码解析(JDK1.8)

    package java.util.concurrent; import java.io.ObjectStreamField; import java.io....

    武培轩
  • MySQL|索引背后

    01 索引 以MySQL中的索引为例子总结。 数据库查询是数据库的最主要功能之一,实现高效的查询速度一定是MySQL非常关心的事情。 索引(Index)正是帮...

    double
  • python基础(一)

    用户2398817
  • springmvc 项目完整示例04 整合mybatis mybatis所需要的jar包 mybatis配置文件 sql语句 mybatis应用

    MyBatis 本是apache的一个开源项目iBatis, 2010年这个项目由apache software foundation 迁移到了google c...

    noteless
  • mybatis的if test 字符串的坑

    mybatis是使用的OGNL表达式来进行解析的,在OGNL的表达式中,'y'会被解析成字符,因为java是强类型的,char 和 一个string 会导致不等...

    分母为零

扫码关注云+社区

领取腾讯云代金券

玩转腾讯云 有奖征文活动