专栏首页编程坑太多『互联网架构』软件架构-分布式系列并发编程atomic&collections(31)

『互联网架构』软件架构-分布式系列并发编程atomic&collections(31)

在java中提供了一种对于原子操作的类,Atomic的包名为java.util.concurrent.atomic。这个包里面提供了一组原子变量的操作类,这些类可以保证在多线程环境下,当某个线程在执行atomic的方法时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM从等待队列中选择一个线程执行。

Atomic

  • CAS

能够弄懂atomic包下这些原子操作类的实现原理,就要先明白什么是CAS操作。

1.CAS指的是现代CPU广泛支持的一种对内存中的共享数据进行操作的一种特殊指令。这个指令会对内存中的共享数据做原子的读写操作。在Java并发应用中通常指CompareAndSwap或CompareAndSet,即比较并交换,是实现并发算法时常用到的一种技术。java.util.concurrent包中借助CAS实现了区别于synchronized同步锁的一种乐观锁。乐观锁就是每次去取数据的时候都乐观的认为数据不会被修改,因此这个过程不会上锁,但是在更新的时候会判断一下在此期间的数据有没有更新

2.CAS思想

CAS有三个参数,当前内存值V、旧的预期值A、即将更新的值B,当且仅当预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做,并返回false

3.CAS优缺点

系统在硬件层面保证了CAS操作的原子性,不会锁住当前线程,它的效率是很高的。但是在并发越高的条件下,失败的次数会越多,CAS如果长时间不成功,会极大的增加CPU的开销,因此CAS不适合竞争十分频繁的场景 CAS只能保证一个共享变量的原子操作,对多个共享变量操作时,无法保证操作的原子性,这时就可以用锁,或者把多个共享变量合并成一个共享变量来操作。JDK提供了AtomicReference类来保证引用对象的原子性,可以把多个变量放在一个对象里来进行CAS操作

  • ABA

CAS在操作值的时候检查值是否已经变化,没有变化的情况下才会进行更新。但是如果一个值原来是A,变成B,又变成A,那么CAS进行检查时会认为这个值没有变化,但是实际上却变化了。ABA问题的解决方法是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A就变成1A-2B-3A。JDK提供了AtomicStampedReference来解决ABA问题

*Atomic成员

Atomic成员分为四大块

1.原子方式更新基本类型 AtomicBoolean:原子更新布尔类型 AtomicInteger:原子更新整型 AtomicLong:原子更新长整型

2.原子方式更新数组 AtomicIntegerArray:原子更新整型数组里的元素 AtomicLongArray:原子更新长整型数组里的元素 AtomicReferenceArray:原子更新引用类型数组里的元素

3.原子方式更新引用 AtomicReference:原子更新引用类型 AtomicReferenceFieldUpdater:原子更新引用类型里的字段 AtomicMarkableReference:原子更新带有标记位的引用类型

4.原子方式更新字段 AtomicIntegerFieldUpdater:原子更新整型字段的更新器 AtomicLongFieldUpdater:原子更新长整型字段的更新器 AtomicStampedReference:原子更新带有版本号的引用类型

CAS 优缺点 非阻塞算法 、ABA 问题、循环开销大、只保证一个共享变量原子操作。

ConcurrentHashMap

线程安全的 map 有 Hashtable 和 SynchronizedMap以及 concurrentHashMapConcurrentHashMap 所使用的锁分段技术通过细化锁的粒度来降低锁的竞争。

不足:算 size 的结果需要遍历

CopyOnWrite

Copy-On-Write 简称 COW,其基本思路是,从一开始大家都在共享同一个内容,当某个人想要修改这个内容的时候,才会真正把内容 Copy 出去形成一个新的内容然后再改,这是一种延时懒惰策略。 从 JDK1.5 开始 Java 并发包里提供了两个使用 CopyOnWrite 机制实现的并发容器,它们是CopyOnWriteArrayList 和 CopyOnWriteArraySet。 CopyOnWrite 容器介绍: CopyOnWrite 容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行 Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我 们可以对 CopyOnWrite 容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。 所以 CopyOnWrite 容器也是一种读写分离的思想,读和写不同的容器。

  • 场景

黑白名单,读多写少(很少改动)场景,商品 sku 商品类目。

*优缺点

占内存(写时复制 new 两个对象),不能保证数据实时一致性。

BlockingQueue

BlockingQueue 的队列长度受限,用以保证生产者与消费者的速度不会相差太远,避免内存耗尽。队列长度设定后不可改变。当入队时队列已满,或出队时队列已空,不同函数的效果。 场景:生产与消费

可能报异常

返回布尔值

可能阻塞等待

可设定等待时间

入队

add(e)

offer(e)

put(e)

offer(e, timeout, unit)

出队

remove()

poll()

take()

poll(timeout, unit)

查看

element()

peek()

  • ArrayBlockingQueue

基于数组实现的有界阻塞队列,创建后不能修改队列的大小;是一个有边界的阻塞队列,它的内部实现是一个数组。有边界的意思是它的容量是有限的,我们必须在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。

  • LinkedBlockingQueue

基于链表实现的无界(可以指定)阻塞队列,默认大小为 Integer.MAX_VALUE,有较好的吞吐量,但可预测性差。

  • PriorityBlockingQueue

具有优先级的无界阻塞队列,不允许插入 null,所有元素都必须可比较(即实现 Comparable 接口)。顺序:非先进先出。

  • SynchronousQueue

只有一个元素的同步队列。若队列中有元素插入操作将被阻塞,直到队列中的元素被其他线程取走。

  • DelayQueue

无界阻塞队列,每个元素都有一个延迟时间,在延迟时间之后才释放元素。阻塞的是其内部元素,DelayQueue 中的元素必须实现 java.util.concurrent.Delayed 接口,这个接口的定义非常简单

  • ConcurrentLinkedQueue

基于链表实现的非阻塞队列。

BlockingDeque

一个线程生产元素并将元素插入到队列的两端。如果当前队列是满的,插入线程将会被阻塞直到一个移除元素的线程从队列中取出一个元素。同样,如果队列当前是空的,移除元素的线程会被阻塞直到一个插入元素的线程向队列中插入了一个元素。

ThreadLocal 线程本地变量

Java 中的 ThreadLocal 类允许我们创建只能被同一个线程读写的变量。因此,如果一段代码含有一个 ThreadLocal 变量的引用,即使两个线程同时执行这段代码,它们也无法访问到对方的 ThreadLocal 变量。

多线程控制的工具类

package com.tl.executor;
import java.util.concurrent.CountDownLatch;import java.util.concurrent.Executors;import java.util.concurrent.ThreadFactory;import java.util.concurrent.atomic.AtomicLong;
public class TljucUtil {
    /**     * 测试耗时     *     * @param nThreads     *            线程数     * @param task     *            执行任务     * @param singleNum     *            单个线程执行个数     * @return     * @throws InterruptedException     */    public static long timeTasks(int nThreads, int singleNum,                                 final Runnable task) {        final CountDownLatch startGate = new CountDownLatch(1);        final CountDownLatch endGate = new CountDownLatch(nThreads);        ThreadFactory tf = Executors.defaultThreadFactory();        final int singleExeNum = singleNum == 0 ? 1 : singleNum;        final AtomicLong sum = new AtomicLong();        final AtomicLong min = new AtomicLong(10000);        final AtomicLong max = new AtomicLong(0);        for (int i = 0; i < nThreads; i++) {            tf.newThread(new Thread() {                @Override                public void run() {                    try {                        startGate.await();                        for (int j = 0; j < singleExeNum; j++) {                            long start = System.nanoTime();                            try {                                task.run();                            } finally {                                long end = System.nanoTime();                                long at = ((end - start) / 1000 / 1000);                                sum.addAndGet(at);                                if (min.get() > at) {                                    min.getAndSet(at);                                }                                if (max.get() < at) {                                    max.getAndSet(at);                                }                            }                        }                        endGate.countDown();                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            }).start();        }        long start = System.nanoTime();        startGate.countDown();        try {            endGate.await();        } catch (InterruptedException e) {            e.printStackTrace();        }        long end = System.nanoTime();        long at = ((end - start) / 1000 / 1000);
        int allCount = singleExeNum * nThreads;
        System.out.println("执行任务数:" + allCount);        System.out.println("------------------------");        System.out.println("所有线程共耗时:" + transStr(sum.get()));        System.out.println("并发执行完耗时:" + transStr(at));        System.out                .println("单任务平均耗时:" + transStr((double) sum.get() / allCount));        System.out.println("单线程最小耗时:" + transStr(min.get()));        System.out.println("单线程最大耗时:" + transStr(max.get()));        return end - start;    }
    public static String transStr(long ms) {        return transStr((double) ms);
    }
    public static String transStr(double ms) {        if (ms < 1000) {            return ms + " ms";        }        double s = ms / 1000;        if (s < 1000) {            return s + " s";        }        double m = s / 60;        if (m < 60) {            return m + " m";        }        double h = m / 60;        if (h < 24) {            return h + " h";        }
        double d = h / 24;        if (d < 30) {            return d + " d";        }        return d + " d";    }}
  • 调用方式演示
package com.tl.executor.threadlocal;

import com.tl.executor.TljucUtil;
import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;
public class ThreadlocalDemo1 implements Runnable {
    static ThreadLocal<SimpleDateFormat> tl=new ThreadLocal<SimpleDateFormat>();    static SimpleDateFormat simpleDateFormat=null;
    public static void main(String[] args) {        TljucUtil.timeTasks(100,1,new ThreadlocalDemo1());    }
    @Override    public void run() {        try {            if(tl.get()==null){                tl.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));            }            Date date=tl.get().parse("2017-11-28 22:45:11");            Thread.sleep(100);            System.out.println(date);        } catch (ParseException e) {            e.printStackTrace();        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

PS:基本本次就讲解这么多吧,内容比较丰富,代码这块可以根据相关的名称百度搜一些大神的源码来看,但是理论一定要理解,一通百通。来不及握手拜了个拜~下次见。

本文分享自微信公众号 - 编程坑太多(idig88)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-03-12

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Java面试 | 002

    由于静态块在类被加载时就会被调用,因此可以在main()方法执行前,利用静态块实现输出“HELLO WORLD”的功能。

    Java猫说
  • Java多线程001——一图读懂线程与进程

    Java猫说
  • 推荐几个牛逼的 IDEA 插件,还带动图!

    一款热部署插件,只要不是修改了项目的配置文件,用它都可以实现热部署。收费的,破解比较麻烦。不过功能确实很强大。算是开发必备神器了。热部署快捷键是control+...

    良月柒
  • 简说Java线程的那几个启动方式

    并发是一件很美妙的事情,线程的调度与使用会让你除了业务代码外,有新的世界观,无论你是否参与但是这对于你未来的成长帮助很大。

    Java猫说
  • Fundebug支持浏览器报警

    Fundebug是专业的应用BUG监控服务,当您的线上应用,比如网页、小程序、Java等发生BUG时,我们会第一时间发送邮件报警,这样可以帮助您及时发现BUG,...

    Fundebug
  • NHibernate总结

    现在的项目中数据访问使用的是NHibernate的一个ORM框架,小弟也是在后期加入项目组,之前对NHibernate就一直没有接触过,所以一直在学习NHibe...

    写代码的猿
  • hbase基础操作 转

    –HBase–HadoopDatabase,是一个高可靠性、高性能、面向列、可伸缩、实时读写的分布式数据库

    双面人
  • Javascript Proxy对象 简介 转

    ES6 中引入Proxies,让你可以自定义Object的基本操作。例如,get就是Object的基础操作方法。

    双面人
  • Java面试 | 001优点有啥?

    2、“一次编译,到处运行”,由于Java为解释型语言,编译器转换java代码后再由Java虚拟机解释执行,所以java语言可以很好的跨平台执行,具备可移植性。

    Java猫说
  • Netty中的Channel之数据冲刷与线程安全(writeAndFlush)

    本文预设读者已经了解了一定的Netty基础知识,并能够自己构建一个Netty的通信服务(包括客户端与服务端)。那么你一定使用到了Channel,这是Netty对...

    Java猫说

扫码关注云+社区

领取腾讯云代金券