前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >无锁CAS整理

无锁CAS整理

作者头像
算法之名
发布2019-08-20 16:09:25
4990
发布2019-08-20 16:09:25
举报
文章被收录于专栏:算法之名算法之名

所有的锁都是悲观的,他们总是假设每一次的临界区操作会产生冲突,如果有多个线程同时需要访问临界区资源,就宁可牺牲性能让线程进行等待,所以说锁会阻塞线程执行.而无锁是一种乐观的策略,它会假设对资源的访问是没有冲突的,所有的线程都可以在不停顿的状态下继续执行.无锁的策略是使用一种叫做比较交换的技术(CAS)来鉴别线程冲突,一旦检测到冲突产生,就重试当前操作,直到没有冲突为止.

CAS算法包含三个参数(V,E,N),V表示要更新的变量,E表示预期值,N表示新值.仅当V值等于E值时(意思是说其他线程没有更新V值),才会将V值设为N.如果V值和E值不同,则说明已经有其他线程做了更新(V值),则当前线程什么都不做.最后,CAS返回当前V的真实值.

最简单的无锁安全整数:AtomicInteger

代码语言:javascript
复制
public class AtomicIntegerDemo {
    static AtomicInteger i = new AtomicInteger();
    public static class AddThread implements Runnable {
        @Override
        public void run() {
            for (int k = 0;k < 10000;k++) {
                i.incrementAndGet();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread[] ts = new Thread[10];
        for (int k = 0;k < 10;k++) {
            ts[k] = new Thread(new AddThread());
            ts[k].start();
            ts[k].join();
        }
        System.out.println(i);
    }
}

运行结果:

100000

因为jdk 8的incrementAndGet()已经牵涉到底层Unsafe类,它有大量的native标识,跟C语言挂钩的,这个我们先不说.我们自己来用无锁的对象引用AtomicReference来模拟实现一个这个过程.

代码语言:javascript
复制
public class AtomicReferenceInteger {
    //i即V值(V,E,N)
    static AtomicReference<Integer> i = new AtomicReference<>(0);
    public static class AddThread implements Runnable {
        @Override
        public void run() {
            for (int k = 0;k < 10000;k++) {
                while (true) {
                    //m即E值
                    Integer m = i.get();
                    //++m即N值,比较m跟i的值是否相等,如果相等,就把++m写入i,如果i值被其他线程修改,则继续循环
                    if (i.compareAndSet(m,++m)) {
                        break;
                    }
                }
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Thread[] ts = new Thread[10];
        for (int k = 0;k < 10;k++) {
            ts[k] = new Thread(new AddThread());
            ts[k].start();
            ts[k].join();
        }
        System.out.println(i);
    }
}

运行结果:

100000

以上可以看出

代码语言:javascript
复制
while (true) {
    //m即E值
    Integer m = i.get();
    //++m即N值,比较m跟i的值是否相等,如果相等,就把++m写入i,如果i值被其他线程修改,则继续循环
    if (i.compareAndSet(m,++m)) {
        break;
    }
}

即模拟实现了incrementAndGet();

带有时间戳的对象引用:AtomicStampedReference

使用带时间戳的对象引用时,对象值和时间戳都必须满足期望值,写入才会成功.因此,即使对象值被反复读写,写回原值,只要时间戳发生变化,就能防止不恰当的写入.

代码语言:javascript
复制
public class AtomicReferenceAcount {
    public static void main(String[] args) {
//        AtomicReference<Integer> money = new AtomicReference<>();
        AtomicStampedReference<Integer> money = new AtomicStampedReference<>(19,0);
//        money.set(19);
        for (int i = 0;i < 10000;i++) {
            final int timestamp = money.getStamp();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
//                    Integer m = money.get();
                        Integer m = money.getReference();
                        if (m < 20) {
                            //比较money跟m的值相等,才更新money为m+20,如果不等则重新来一遍,这里money的值可能会被其他线程修改
                            //当有其他线程改变了时间戳timestamp的时候,整体无法写入
                            if (money.compareAndSet(m, m + 20, timestamp, timestamp )) {
                                System.out.println("余额小于20元,充值成功,余额:" + money.getReference() + "元");
                                break;
                            }
                        } else {
                            break;
                        }
                    }
                }
            }).start();
        }
        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                for (int i = 0;i < 10000;i++) {
                    while (true) {
                        int timestamp = money.getStamp();
                        Integer m = money.getReference();
                        if (m > 10) {
                            System.out.println("大于10元");
                            if (money.compareAndSet(m,m - 10,timestamp,timestamp)) {
                                System.out.println("成功消费10元,余额:" + money.getReference());
                                break;
                            }
                        }else {
                            System.out.println("没有足够金额");
                            break;
                        }
                    }
//                    try {
//                        Thread.sleep(100);
//                    }catch (InterruptedException e) {
//
//                    }
                }
            }
        };
        new Thread(r2).start();
    }
}

我们先让时间戳永远不变,运行结果(部分选取)

余额小于20元,充值成功,余额:39元 大于10元 成功消费10元,余额:29 大于10元 成功消费10元,余额:19 大于10元 成功消费10元,余额:9 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 大于10元 成功消费10元,余额:19 大于10元 成功消费10元,余额:9 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 .

. 没有足够金额 没有足够金额 大于10元 成功消费10元,余额:19 大于10元 成功消费10元,余额:29 大于10元 成功消费10元,余额:19 大于10元 成功消费10元,余额:9 没有足够金额 没有足够金额 没有足够金额 没有足够金额 大于10元 成功消费10元,余额:19 余额小于20元,充值成功,余额:39元 余额小于20元,充值成功,余额:29元 大于10元 余额小于20元,充值成功,余额:29元 余额小于20元,充值成功,余额:39元 余额小于20元,充值成功,余额:29元 成功消费10元,余额:29 大于10元 成功消费10元,余额:19 大于10元 成功消费10元,余额:9

根据结果我们会看到,两边的线程会不断的读取,写入.

现在我们把时间戳改变+1

代码语言:javascript
复制
public class AtomicReferenceAcount {
    public static void main(String[] args) {
//        AtomicReference<Integer> money = new AtomicReference<>();
        AtomicStampedReference<Integer> money = new AtomicStampedReference<>(19,0);
//        money.set(19);
        for (int i = 0;i < 10000;i++) {
            final int timestamp = money.getStamp();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
//                    Integer m = money.get();
                        Integer m = money.getReference();
                        if (m < 20) {
                            //比较money跟m的值相等,才更新money为m+20,如果不等则重新来一遍,这里money的值可能会被其他线程修改
                            //当有其他线程改变了时间戳timestamp的时候,整体无法写入
                            if (money.compareAndSet(m, m + 20, timestamp, timestamp + 1 )) {
                                System.out.println("余额小于20元,充值成功,余额:" + money.getReference() + "元");
                                break;
                            }
                        } else {
                            break;
                        }
                    }
                }
            }).start();
        }
        Runnable r2 = new Runnable() {
            @Override
            public void run() {
                for (int i = 0;i < 10000;i++) {
                    while (true) {
                        int timestamp = money.getStamp();
                        Integer m = money.getReference();
                        if (m > 10) {
                            System.out.println("大于10元");
                            if (money.compareAndSet(m,m - 10,timestamp,timestamp + 1)) {
                                System.out.println("成功消费10元,余额:" + money.getReference());
                                break;
                            }
                        }else {
                            System.out.println("没有足够金额");
                            break;
                        }
                    }
//                    try {
//                        Thread.sleep(100);
//                    }catch (InterruptedException e) {
//
//                    }
                }
            }
        };
        new Thread(r2).start();
    }
}

运行结果:

余额小于20元,充值成功,余额:39元 大于10元 成功消费10元,余额:29 大于10元 成功消费10元,余额:19 大于10元 成功消费10元,余额:9 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额 没有足够金额

.

.

时间戳+1后,无论运行多少次都不会出现重复充值的现象了。

数组的无锁:AtomicIntegerArray

代码语言:javascript
复制
public class AtomicIntegerArrayDemo {
    static AtomicIntegerArray arr = new AtomicIntegerArray(10);
    public static class AddThread implements Runnable {
        @Override
        public void run() {
            //数组内的所有元素各加1000次1
            for (int k = 0;k < 10000;k++) {
                arr.getAndIncrement(k % arr.length());
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread[] ts = new Thread[10];
        //10个线程来执行
        for (int k = 0;k < 10;k++) {
            ts[k] = new Thread(new AddThread());
            ts[k].start();
            ts[k].join();
        }
        System.out.println(arr);
    }
}

运行结果:

[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]

这里arr.getAndIncrement(int i)就是对第i个下标的元素加1

普通变量享受原子操作:AtomicIntegerFieldUpdater

代码语言:javascript
复制
public class AtomicIntegerFieldUpdateDemo {
    public static class Candidate {
        int id;
        volatile int score;
    }
    public final static AtomicIntegerFieldUpdater<Candidate> scoreUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class,"score");
    public static AtomicInteger allScore = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        final Candidate stu = new Candidate();
        Thread[] t = new Thread[10000];
        for (int i = 0;i < 10000;i++) {
            t[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    if (Math.random() > 0.4) {
                        scoreUpdater.incrementAndGet(stu);
                        allScore.incrementAndGet();
                    }
                }
            });
            t[i].start();
            t[i].join();
        }
        System.out.println("score=" + stu.score);
        System.out.println("allScore=" + allScore);
    }
}

运行结果:

score=5952 allScore=5952

无论运行多少次,我们都可以看到score跟allScore相等,说明普通变量int score进行了原子操作(注意int score必须声明为volatile,多线程可见,且不能为私有类型)。

无锁Vector实现

模仿Vector机制来完成一个无锁线程安全的List集合(源码来自amino)

代码语言:javascript
复制
public class LockFreeVector<E> extends AbstractList<E> {
   private static final boolean debug = false;
   /**
    * Size of the first bucket. sizeof(bucket[i+1])=2*sizeof(bucket[i])
    * 第一个数组大小
    */
   private static final int FIRST_BUCKET_SIZE = 8;

   /**
    * number of buckets. 30 will allow 8*(2^30-1) elements
    * 所有数组的个数
    */
   private static final int N_BUCKET = 30;

   /**
    * We will have at most N_BUCKET number of buckets. And we have
    * sizeof(buckets.get(i))=FIRST_BUCKET_SIZE**(i+1)
    * 存放数据的二维数组,方便动态扩展
    */
   private final AtomicReferenceArray<AtomicReferenceArray<E>> buckets;

   /**
    * @author ganzhi
    * 写入类
    * @param <E>
    */
   static class WriteDescriptor<E> {
      //期望值,即E(V,E,N)
      public E oldV;
      //写入的新值,即N
      public E newV;
      //要修改的原子数组,即V
      public AtomicReferenceArray<E> addr;
      //要修改的数组的索引位置
      public int addr_ind;

      /**
       * Creating a new descriptor.
       * 
       * @param addr Operation address  要写入的数组
       * @param addr_ind Index of address  要写入的数组索引
       * @param oldV old operand  预期值
       * @param newV new operand  新值
       */
      public WriteDescriptor(AtomicReferenceArray<E> addr, int addr_ind,
            E oldV, E newV) {
         this.addr = addr;
         this.addr_ind = addr_ind;
         this.oldV = oldV;
         this.newV = newV;
      }

      /**
       * set newV.
       * 给原子数组进行原子操作赋值
       */
      public void doIt() {
         addr.compareAndSet(addr_ind, oldV, newV);
      }
   }

   /**
    * @author ganzhi
    * 为了更有序的读写数组,使用CAS操作写入新数据
    * 写入器
    * @param <E>
    */
   static class Descriptor<E> {
      //整个Vector长度(非数组长度,是几个数组加起来的长度)
      public int size;
      //写入对象,对所有线程可见
      volatile WriteDescriptor<E> writeop;

      /**
       * Create a new descriptor.
       * 
       * @param size Size of the vector
       * @param writeop Executor write operation
       */
      public Descriptor(int size, WriteDescriptor<E> writeop) {
         this.size = size;
         this.writeop = writeop;
      }

      /**
       * 完成写入,doIt()方法才是真正对原子数组的写入
       */
      public void completeWrite() {
         WriteDescriptor<E> tmpOp = writeop;
         if (tmpOp != null) {
            tmpOp.doIt();
            writeop = null; // this is safe since all write to writeop use
            // null as r_value.
         }
      }
   }

   /**
    * 当前线程写入器的原子引用
    */
   private AtomicReference<Descriptor<E>> descriptor;
   private static final int zeroNumFirst = Integer
         .numberOfLeadingZeros(FIRST_BUCKET_SIZE);;

   /**
    * Constructor.
    */
   public LockFreeVector() {
      //初始化一个可以存放30个原子数组的数组,即二维数组
      buckets = new AtomicReferenceArray<AtomicReferenceArray<E>>(N_BUCKET);
      //给第0位初始化一个8位长的原子数组
      buckets.set(0, new AtomicReferenceArray<E>(FIRST_BUCKET_SIZE));
      //初始化一个原子引用的Descriptor对象,无长度,无内容
      descriptor = new AtomicReference<Descriptor<E>>(new Descriptor<E>(0,
            null));
   }

   /**
    * add e at the end of vector.
    * 最核心功能,将元素压入Vector最后一个位置
    * @param e
    *            element added
    */
   public void push_back(E e) {
      //预期写入器
      Descriptor<E> desc;
      //新值写入器
      Descriptor<E> newd;
      do {
         //获取当前线程写入器给预期写入器
         desc = descriptor.get();
         //如果有其他线程在循环跳出后修改了当前线程写入器,则完成一次写入,预防措施
         desc.completeWrite();
         //判断将数据插入到Vector的哪一个数组中,Vector总共有30个数组
         //数组的长度,第一个是8,第二个是16,第三个是32。。。
         int pos = desc.size + FIRST_BUCKET_SIZE;
         int zeroNumPos = Integer.numberOfLeadingZeros(pos);
         //取得第几个数组
         int bucketInd = zeroNumFirst - zeroNumPos;
         //如果这个数组为空
         if (buckets.get(bucketInd) == null) {
            //取得上一个数组的长度*2
            int newLen = 2 * buckets.get(bucketInd - 1).length();
            if (debug)
               System.out.println("New Length is:" + newLen);
            //原子性增加新数组,如果这个数组为空,则创建一个新长度的数组,长度是上一个数组的2倍
            //如果不为空则等待
            buckets.compareAndSet(bucketInd, null,
                  new AtomicReferenceArray<E>(newLen));
         }
         //取得元素在目标数组中的索引位
         int idx = (0x80000000>>>zeroNumPos) ^ pos;
         //创建一个新的写入对象,包含目标数组buckets.get(bucketInd),待插入索引位idx,目标是否为空,插入对象e
         newd = new Descriptor<E>(desc.size + 1, new WriteDescriptor<E>(
               buckets.get(bucketInd), idx, null, e));
         //如果当前线程写入器与预期写入器不等,则重新循环,如果相等则将新值写入器赋给当前线程写入器
      } while (!descriptor.compareAndSet(desc, newd));
      //获取到新写入器的当前线程写入器完成写入
      descriptor.get().completeWrite();
   }

   /**
    * Remove the last element in the vector.
    *
    * @return element removed
    */
   public E pop_back() {
      Descriptor<E> desc;
      Descriptor<E> newd;
      E elem;
      do {
         desc = descriptor.get();
         desc.completeWrite();

         int pos = desc.size + FIRST_BUCKET_SIZE - 1;
         int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE)
               - Integer.numberOfLeadingZeros(pos);
         int idx = Integer.highestOneBit(pos) ^ pos;
         elem = buckets.get(bucketInd).get(idx);
         newd = new Descriptor<E>(desc.size - 1, null);
      } while (!descriptor.compareAndSet(desc, newd));

      return elem;
   }

   /**
    * Get element with the index.
    *
    * @param index
    *            index
    * @return element with the index
    */
   @Override
   public E get(int index) {
      int pos = index + FIRST_BUCKET_SIZE;
      int zeroNumPos = Integer.numberOfLeadingZeros(pos);
      //获取第几个数组
      int bucketInd = zeroNumFirst - zeroNumPos;
      //获取该数组的索引位
      int idx = (0x80000000>>>zeroNumPos) ^ pos;
      return buckets.get(bucketInd).get(idx);
   }

   /**
    * Set the element with index to e.
    *
    * @param index
    *            index of element to be reset
    * @param e
    *            element to set
    */
   /**
     * {@inheritDoc}
     */
   public E set(int index, E e) {
      int pos = index + FIRST_BUCKET_SIZE;
      int bucketInd = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE)
            - Integer.numberOfLeadingZeros(pos);
      int idx = Integer.highestOneBit(pos) ^ pos;
      AtomicReferenceArray<E> bucket = buckets.get(bucketInd);
      while (true) {
         E oldV = bucket.get(idx);
         if (bucket.compareAndSet(idx, oldV, e))
            return oldV;
      }
   }

   /**
    * reserve more space.
    *
    * @param newSize
    *            new size be reserved
    */
   public void reserve(int newSize) {
      int size = descriptor.get().size;
      int pos = size + FIRST_BUCKET_SIZE - 1;
      int i = Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE)
            - Integer.numberOfLeadingZeros(pos);
      if (i < 1)
         i = 1;

      int initialSize = buckets.get(i - 1).length();
      while (i < Integer.numberOfLeadingZeros(FIRST_BUCKET_SIZE)
            - Integer.numberOfLeadingZeros(newSize + FIRST_BUCKET_SIZE - 1)) {
         i++;
         initialSize *= FIRST_BUCKET_SIZE;
         buckets.compareAndSet(i, null, new AtomicReferenceArray<E>(
               initialSize));
      }
   }

   /**
    * size of vector.
    *
    * @return size of vector
    */
   public int size() {
      return descriptor.get().size;
   }

   /**
     * {@inheritDoc}
     */
   @Override
   public boolean add(E object) {
      push_back(object);
      return true;
   }
}

这里我们重点对push_back(E e)方法(将对象添加到Vector的最末尾),get(int index)方法(取出第几个)进行了中文标注,其间Vector是一个二维数组,当第一个数组存满后,扩展到第二个数组,每个数组的长度都是乘2扩展的。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档