无锁CAS整理

所有的锁都是悲观的,他们总是假设每一次的临界区操作会产生冲突,如果有多个线程同时需要访问临界区资源,就宁可牺牲性能让线程进行等待,所以说锁会阻塞线程执行.而无锁是一种乐观的策略,它会假设对资源的访问是没有冲突的,所有的线程都可以在不停顿的状态下继续执行.无锁的策略是使用一种叫做比较交换的技术(CAS)来鉴别线程冲突,一旦检测到冲突产生,就重试当前操作,直到没有冲突为止.

CAS算法包含三个参数(V,E,N),V表示要更新的变量,E表示预期值,N表示新值.仅当V值等于E值时(意思是说其他线程没有更新V值),才会将V值设为N.如果V值和E值不同,则说明已经有其他线程做了更新(V值),则当前线程什么都不做.最后,CAS返回当前V的真实值.

最简单的无锁安全整数:AtomicInteger

public class AtomicIntegerDemo {
    static AtomicInteger i = new AtomicInteger();
    public static class AddThread implements Runnable {
        @Override
        public void run() {
            for (int k = 0;k < 10000;k++) {
                i.incrementAndGet();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread[] ts = new Thread[10];
        for (int k = 0;k < 10;k++) {
            ts[k] = new Thread(new AddThread());
            ts[k].start();
            ts[k].join();
        }
        System.out.println(i);
    }
}

运行结果:

100000

因为jdk 8的incrementAndGet()已经牵涉到底层Unsafe类,它有大量的native标识,跟C语言挂钩的,这个我们先不说.我们自己来用无锁的对象引用AtomicReference来模拟实现一个这个过程.

public class AtomicReferenceInteger {
    //i即V值(V,E,N)
    static AtomicReference<Integer> i = new AtomicReference<>(0);
    public static class AddThread implements Runnable {
        @Override
        public void run() {
            for (int k = 0;k < 10000;k++) {
                while (true) {
                    //m即E值
                    Integer m = i.get();
                    //++m即N值,比较m跟i的值是否相等,如果相等,就把++m写入i,如果i值被其他线程修改,则继续循环
                    if (i.compareAndSet(m,++m)) {
                        break;
                    }
                }
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Thread[] ts = new Thread[10];
        for (int k = 0;k < 10;k++) {
            ts[k] = new Thread(new AddThread());
            ts[k].start();
            ts[k].join();
        }
        System.out.println(i);
    }
}

运行结果:

100000

以上可以看出

while (true) {
    //m即E值
    Integer m = i.get();
    //++m即N值,比较m跟i的值是否相等,如果相等,就把++m写入i,如果i值被其他线程修改,则继续循环
    if (i.compareAndSet(m,++m)) {
        break;
    }
}

即模拟实现了incrementAndGet();

带有时间戳的对象引用:AtomicStampedReference

使用带时间戳的对象引用时,对象值和时间戳都必须满足期望值,写入才会成功.因此,即使对象值被反复读写,写回原值,只要时间戳发生变化,就能防止不恰当的写入.

public class AtomicReferenceAcount {
    public static void main(String[] args) {
//        AtomicReference<Integer> money = new AtomicReference<>();
        AtomicStampedReference<Integer> money = new AtomicStampedReference<>(19,0);
//        money.set(19);
        for (int i = 0;i < 10000;i++) {
            final int timestamp = money.getStamp();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
//                    Integer m = money.get();
                        Integer m = money.getReference();
                        if (m < 20) {
                            //比较money跟m的值相等,才更新money为m+20,如果不等则重新来一遍,这里money的值可能会被其他线程修改
                            //当有其他线程改变了时间戳timestamp的时候,整体无法写入
                            if (money.compareAndSet(m, m + 20, timestamp, timestamp )) {
                                System.out.println("余额小于20元,充值成功,余额:" + money.getReference() + "元");
                                break;
                            }
                        } else {
                            break;
                        }
                    }
                }
            }).start();
        }
        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                for (int i = 0;i < 10000;i++) {
                    while (true) {
                        int timestamp = money.getStamp();
                        Integer m = money.getReference();
                        if (m > 10) {
                            System.out.println("大于10元");
                            if (money.compareAndSet(m,m - 10,timestamp,timestamp)) {
                                System.out.println("成功消费10元,余额:" + money.getReference());
                                break;
                            }
                        }else {
                            System.out.println("没有足够金额");
                            break;
                        }
                    }
//                    try {
//                        Thread.sleep(100);
//                    }catch (InterruptedException e) {
//
//                    }
                }
            }
        };
        new Thread(r2).start();
    }
}

我们先让时间戳永远不变,运行结果(部分选取)

余额小于20元,充值成功,余额:39元 大于10元 成功消费10元,余额:29 大于10元 成功消费10元,余额:19 大于10元 成功消费10元,余额:9 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 大于10元 成功消费10元,余额:19 大于10元 成功消费10元,余额:9 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 .

. 没有足够金额 没有足够金额 大于10元 成功消费10元,余额:19 大于10元 成功消费10元,余额:29 大于10元 成功消费10元,余额:19 大于10元 成功消费10元,余额:9 没有足够金额 没有足够金额 没有足够金额 没有足够金额 大于10元 成功消费10元,余额:19 余额小于20元,充值成功,余额:39元 余额小于20元,充值成功,余额:29元 大于10元 余额小于20元,充值成功,余额:29元 余额小于20元,充值成功,余额:39元 余额小于20元,充值成功,余额:29元 成功消费10元,余额:29 大于10元 成功消费10元,余额:19 大于10元 成功消费10元,余额:9

根据结果我们会看到,两边的线程会不断的读取,写入.

现在我们把时间戳改变+1

public class AtomicReferenceAcount {
    public static void main(String[] args) {
//        AtomicReference<Integer> money = new AtomicReference<>();
        AtomicStampedReference<Integer> money = new AtomicStampedReference<>(19,0);
//        money.set(19);
        for (int i = 0;i < 10000;i++) {
            final int timestamp = money.getStamp();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
//                    Integer m = money.get();
                        Integer m = money.getReference();
                        if (m < 20) {
                            //比较money跟m的值相等,才更新money为m+20,如果不等则重新来一遍,这里money的值可能会被其他线程修改
                            //当有其他线程改变了时间戳timestamp的时候,整体无法写入
                            if (money.compareAndSet(m, m + 20, timestamp, timestamp + 1 )) {
                                System.out.println("余额小于20元,充值成功,余额:" + money.getReference() + "元");
                                break;
                            }
                        } else {
                            break;
                        }
                    }
                }
            }).start();
        }
        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                for (int i = 0;i < 10000;i++) {
                    while (true) {
                        int timestamp = money.getStamp();
                        Integer m = money.getReference();
                        if (m > 10) {
                            System.out.println("大于10元");
                            if (money.compareAndSet(m,m - 10,timestamp,timestamp + 1)) {
                                System.out.println("成功消费10元,余额:" + money.getReference());
                                break;
                            }
                        }else {
                            System.out.println("没有足够金额");
                            break;
                        }
                    }
//                    try {
//                        Thread.sleep(100);
//                    }catch (InterruptedException e) {
//
//                    }
                }
            }
        };
        new Thread(r2).start();
    }
}

运行结果:

余额小于20元,充值成功,余额:39元 大于10元 成功消费10元,余额:29 大于10元 成功消费10元,余额:19 大于10元 成功消费10元,余额:9 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额

.

.

时间戳+1后,无论运行多少次都不会出现重复充值的现象了。

数组的无锁:AtomicIntegerArray

public class AtomicIntegerArrayDemo {
    static AtomicIntegerArray arr = new AtomicIntegerArray(10);
    public static class AddThread implements Runnable {
        @Override
        public void run() {
            //数组内的所有元素各加1000次1
            for (int k = 0;k < 10000;k++) {
                arr.getAndIncrement(k % arr.length());
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread[] ts = new Thread[10];
        //10个线程来执行
        for (int k = 0;k < 10;k++) {
            ts[k] = new Thread(new AddThread());
            ts[k].start();
            ts[k].join();
        }
        System.out.println(arr);
    }
}

运行结果:

[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]

这里arr.getAndIncrement(int i)就是对第i个下标的元素加1

普通变量享受原子操作:AtomicIntegerFieldUpdater

public class AtomicIntegerFieldUpdateDemo {
    public static class Candidate {
        int id;
        volatile int score;
    }
    public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class,"score");
    public static AtomicInteger allScore = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        final Candidate stu = new Candidate();
        Thread[] t = new Thread[10000];
        for (int i = 0;i < 10000;i++) {
            t[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    if (Math.random() > 0.4) {
                        scoreUpdater.incrementAndGet(stu);
                        allScore.incrementAndGet();
                    }
                }
            });
            t[i].start();
            t[i].join();
        }
        System.out.println("score=" + stu.score);
        System.out.println("allScore=" + allScore);
    }
}

运行结果:

score=5952 allScore=5952

无论运行多少次,我们都可以看到score跟allScore相等,说明普通变量int score进行了原子操作(注意int score必须声明为volatile,多线程可见,且不能为私有类型)。

无锁Vector实现

模仿Vector机制来完成一个无锁线程安全的List集合(源码来自amino)

public class LockFreeVector<E> extends AbstractList<E> {
   private static final boolean debug = false;
   /**
    * Size of the first bucket. sizeof(bucket[i+1])=2*sizeof(bucket[i])
    * 第一个数组大小
    */
   private static final int FIRST_BUCKET_SIZE = 8;

   /**
    * number of buckets. 30 will allow 8*(2^30-1) elements
    * 所有数组的个数
    */
   private static final int N_BUCKET = 30;

   /**
    * We will have at most N_BUCKET number of buckets. And we have
    * sizeof(buckets.get(i))=FIRST_BUCKET_SIZE**(i+1)
    * 存放数据的二维数组,方便动态扩展
    */
   private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets;

   /**
    * @author ganzhi
    * 写入类
    * @param <E>
    */
   static class WriteDescriptor<E> {
      //期望值,即E(V,E,N)
      public E oldV;
      //写入的新值,即N
      public E newV;
      //要修改的原子数组,即V
      public AtomicReferenceArray<E> addr;
      //要修改的数组的索引位置
      public int addr_ind;

      /**
       * Creating a new descriptor.
       * 
       * @param addr Operation address  要写入的数组
       * @param addr_ind Index of address  要写入的数组索引
       * @param oldV old operand  预期值
       * @param newV new operand  新值
       */
      public WriteDescriptor(AtomicReferenceArray<E> addr, int addr_ind,
            E oldV, E newV) {
         this.addr = addr;
         this.addr_ind = addr_ind;
         this.oldV = oldV;
         this.newV = newV;
      }

      /**
       * set newV.
       * 给原子数组进行原子操作赋值
       */
      public void doIt() {
         addr.compareAndSet(addr_ind, oldV, newV);
      }
   }

   /**
    * @author ganzhi
    * 为了更有序的读写数组,使用CAS操作写入新数据
    * 写入器
    * @param <E>
    */
   static class Descriptor<E> {
      //整个Vector长度(非数组长度,是几个数组加起来的长度)
      public int size;
      //写入对象,对所有线程可见
      volatile WriteDescriptor<E> writeop;

      /**
       * Create a new descriptor.
       * 
       * @param size Size of the vector
       * @param writeop Executor write operation
       */
      public Descriptor(int size, WriteDescriptor<E> writeop) {
         this.size = size;
         this.writeop = writeop;
      }

      /**
       * 完成写入,doIt()方法才是真正对原子数组的写入
       */
      public void completeWrite() {
         WriteDescriptor<E> tmpOp = writeop;
         if (tmpOp != null) {
            tmpOp.doIt();
            writeop = null; // this is safe since all write to writeop use
            // null as r_value.
         }
      }
   }

   /**
    * 当前线程写入器的原子引用
    */
   private AtomicReference<Descriptor<E>> descriptor;
   private static final int zeroNumFirst = Integer
         .numberOfLeadingZeros(FIRST_BUCKET_SIZE);;

   /**
    * Constructor.
    */
   public LockFreeVector() {
      //初始化一个可以存放30个原子数组的数组,即二维数组
      buckets = new AtomicReferenceArray<AtomicReferenceArray<E>>(N_BUCKET);
      //给第0位初始化一个8位长的原子数组
      buckets.set(0, new AtomicReferenceArray<E>(FIRST_BUCKET_SIZE));
      //初始化一个原子引用的Descriptor对象,无长度,无内容
      descriptor = new AtomicReference<Descriptor<E>>(new Descriptor<E>(0,
            null));
   }

   /**
    * add e at the end of vector.
    * 最核心功能,将元素压入Vector最后一个位置
    * @param e
    *            element added
    */
   public void push_back(E e) {
      //预期写入器
      Descriptor<E> desc;
      //新值写入器
      Descriptor<E> newd;
      do {
         //获取当前线程写入器给预期写入器
         desc = descriptor.get();
         //如果有其他线程在循环跳出后修改了当前线程写入器,则完成一次写入,预防措施
         desc.completeWrite();
         //判断将数据插入到Vector的哪一个数组中,Vector总共有30个数组
         //数组的长度,第一个是8,第二个是16,第三个是32。。。
         int pos = desc.size + FIRST_BUCKET_SIZE;
         int zeroNumPos = Integer.numberOfLeadingZeros(pos);
         //取得第几个数组
         int bucketInd = zeroNumFirst - zeroNumPos;
         //如果这个数组为空
         if (buckets.get(bucketInd) == null) {
            //取得上一个数组的长度*2
            int newLen = 2 * buckets.get(bucketInd - 1).length();
            if (debug)
               System.out.println("New Length is:" + newLen);
            //原子性增加新数组,如果这个数组为空,则创建一个新长度的数组,长度是上一个数组的2倍
            //如果不为空则等待
            buckets.compareAndSet(bucketInd, null,
                  new AtomicReferenceArray<E>(newLen));
         }
         //取得元素在目标数组中的索引位
         int idx = (0x80000000>>>zeroNumPos) ^ pos;
         //创建一个新的写入对象,包含目标数组buckets.get(bucketInd),待插入索引位idx,目标是否为空,插入对象e
         newd = new Descriptor<E>(desc.size + 1, new WriteDescriptor<E>(
               buckets.get(bucketInd), idx, null, e));
         //如果当前线程写入器与预期写入器不等,则重新循环,如果相等则将新值写入器赋给当前线程写入器
      } while (!descriptor.compareAndSet(desc, newd));
      //获取到新写入器的当前线程写入器完成写入
      descriptor.get().completeWrite();
   }

   /**
    * Remove the last element in the vector.
    *
    * @return element removed
    */
   public E pop_back() {
      Descriptor<E> desc;
      Descriptor<E> newd;
      E elem;
      do {
         desc = descriptor.get();
         desc.completeWrite();

         int pos = desc.size + FIRST_BUCKET_SIZE - 1;
         int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE)
               - Integer.numberOfLeadingZeros(pos);
         int idx = Integer.highestOneBit(pos) ^ pos;
         elem = buckets.get(bucketInd).get(idx);
         newd = new Descriptor<E>(desc.size - 1, null);
      } while (!descriptor.compareAndSet(desc, newd));

      return elem;
   }

   /**
    * Get element with the index.
    *
    * @param index
    *            index
    * @return element with the index
    */
   @Override
   public E get(int index) {
      int pos = index + FIRST_BUCKET_SIZE;
      int zeroNumPos = Integer.numberOfLeadingZeros(pos);
      //获取第几个数组
      int bucketInd = zeroNumFirst - zeroNumPos;
      //获取该数组的索引位
      int idx = (0x80000000>>>zeroNumPos) ^ pos;
      return buckets.get(bucketInd).get(idx);
   }

   /**
    * Set the element with index to e.
    *
    * @param index
    *            index of element to be reset
    * @param e
    *            element to set
    */
   /**
     * {@inheritDoc}
     */
   public E set(int index, E e) {
      int pos = index + FIRST_BUCKET_SIZE;
      int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE)
            - Integer.numberOfLeadingZeros(pos);
      int idx = Integer.highestOneBit(pos) ^ pos;
      AtomicReferenceArray<E> bucket = buckets.get(bucketInd);
      while (true) {
         E oldV = bucket.get(idx);
         if (bucket.compareAndSet(idx, oldV, e))
            return oldV;
      }
   }

   /**
    * reserve more space.
    *
    * @param newSize
    *            new size be reserved
    */
   public void reserve(int newSize) {
      int size = descriptor.get().size;
      int pos = size + FIRST_BUCKET_SIZE - 1;
      int i = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE)
            - Integer.numberOfLeadingZeros(pos);
      if (i < 1)
         i = 1;

      int initialSize = buckets.get(i - 1).length();
      while (i < Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE)
            - Integer.numberOfLeadingZeros(newSize + FIRST_BUCKET_SIZE - 1)) {
         i++;
         initialSize *= FIRST_BUCKET_SIZE;
         buckets.compareAndSet(i, null, new AtomicReferenceArray<E>(
               initialSize));
      }
   }

   /**
    * size of vector.
    *
    * @return size of vector
    */
   public int size() {
      return descriptor.get().size;
   }

   /**
     * {@inheritDoc}
     */
   @Override
   public boolean add(E object) {
      push_back(object);
      return true;
   }
}

这里我们重点对push_back(E e)方法(将对象添加到Vector的最末尾),get(int index)方法(取出第几个)进行了中文标注,其间Vector是一个二维数组,当第一个数组存满后,扩展到第二个数组,每个数组的长度都是乘2扩展的。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 浅析类装载 顶

    [Loaded com.guanjian.Parent from file:/E:/classload/out/production/classload/] ...

    算法之名
  • Spring Security详解 顶

    2020-01-05 01:57:16.482 INFO 3932 --- [ main] .s.s.UserDetailsService...

    算法之名
  • 归并排序算法 顶

    合并排序算法就是把多个有序数据表合并成一个有序数据表。如果参与合并的只有两个有序表,则称为二路合并。

    算法之名
  • WPF 如何在绑定失败异常

    在开发 WPF 程序,虽然 xaml 很好用,但是经常会出现小伙伴把绑定写错了。因为默认的 VisualStudio 是没有自动提示,这时很容易复制粘贴写出一个...

    林德熙
  • Codeforces Round #519 C. Smallest Word(思维)(1043C)

    题目链接:http://codeforces.com/contest/1043/problem/C

    Ch_Zaqdt
  • Java单体应用 - 架构模式 - 03.设计模式-23.策略模式

    原文地址:http://www.work100.net/training/monolithic-architecture-design-patterns-str...

    光束云
  • Android 带你撸一个好玩的 DoodleView(涂鸦)

    可以看到这个这个自定义 View 的功能还是很丰富的,无论是设置画笔的形状、颜色、粗细,还是进行重置和保存,该有的 API,基本都已经实现了。有需要的读者直接 ...

    developerHaoz
  • 教你从头写游戏服务器框架(3)

    使用异步非阻塞编程,确实能获得很好的性能。但是在代码上,确非常不直观。因为任何一个可能阻塞的操作,都必须要要通过“回调”函数来链接。比如一个玩家登录,你需要先读...

    韩伟
  • 进程调度(一)——FIFO算法

    这是最早出现的置换算法。该算法总是淘汰最先进入内存的页面,即选择在内存中驻留时间最久的页面予以淘汰。该算法实现简单,只需把一个进程已调入内存的页面,按先后次序链...

    AI那点小事
  • C++11基础学习系列一

    ---- 概述 C++11标准越来越趋于稳定和成熟,国外c++11如火如荼而国内却依然处于观望期。每当提到C++很多程序员都很抵触,特别是学术界的呼声更高一些。...

    BrianLv

扫码关注云+社区

领取腾讯云代金券