前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >从一道面试题进入Java并发新机制---J.U.C

从一道面试题进入Java并发新机制---J.U.C

作者头像
行百里er
发布2020-12-02 14:43:13
2750
发布2020-12-02 14:43:13
举报
文章被收录于专栏:JavaJourney

“什么是J.U.C? 它是Java java.util.concurrent包的缩写,从包的名称就可以看出,它应该主要提供一些线程同步的类。 这个包下面的类提供了多种实现线程同步的方式,还有诸如Executor、Callable、Future、ExecutorService等耳熟能详的接口。 本着天天向上的原则,非常有必要学习它。

一道面试题

实现一个容器,提供两个方法add,size,实现两个线程:

线程1,向容器中添加10个元素到容器中;

线程2,实时监控容器元素个数,当元素个数到5个时,线程2给出提示并结束。

看到这道题,我首先想到的是synchronized + wait/notify,具体实现为:

代码语言:javascript
复制
public class NiubilityContainer<T> {
    private List<T> list = new ArrayList<>();

    /**
     * add方法,借用list的add方法向容器添加元素
     * @param t 待添加的元素
     */
    public void add(T t) {
        list.add(t);
    }

    /**
     * 借助list的size方法返回当前容器的元素个数
     * @return int 容器元素个数
     */
    public int size(){
        return list.size();
    }

    public static void main(String[] args) {
        NiubilityContainer c = new NiubilityContainer();
        //定义一个需要上锁的对象,线程持有该对象的锁才能执行
        final Object lock = new Object();

        //启动一个监控线程
        new Thread(() -> {
            System.out.println("监控线程启动...");
            synchronized (lock) {
                //只要元素个数不为5,就调用wait方法让出CPU
                if (c.size() != 5) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //通知添加元素的线程继续执行
                lock.notify();
            }
            System.out.println("容器元素个数为5,监控线程退出!");
        }, "MonitorThread").start();

        //启动一个添加元素的线程
        new Thread(() -> {
            System.out.println("添加元素线程启动...");
            synchronized (lock) {
                for (int i = 0; i < 10; i++) {
                    c.add(i);
                    System.out.println("添加元素线程 add " + i);

                    if (c.size() == 5) {
                        //先唤醒当前线程
                        lock.notify();
                        try {
                            //释放锁,使得监控线程得以执行
                            lock.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }, "AddThread").start();
    }
}

运行结果:

代码语言:javascript
复制
监控线程启动...
添加元素线程启动...
添加元素线程 add 0
添加元素线程 add 1
添加元素线程 add 2
添加元素线程 add 3
添加元素线程 add 4
容器元素个数为5,监控线程退出!
添加元素线程 add 5
添加元素线程 add 6
添加元素线程 add 7
添加元素线程 add 8
添加元素线程 add 9

这种方法要注意waitnotify的顺序,而且他们都必须放在synchronized内。

CyclicBarrier

CyclicBarrier的官网描述:

“A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released. A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue.

转译总结一下,大概就是这个意思:

它允许一组线程互相等待,直到到达某个公共屏障点 (Common Barrier Point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 Barrier 在释放等待线程后可以重用,所以称它为循环( Cyclic ) 的屏障( Barrier ) 。

构造函数

代码语言:javascript
复制
/**
 * 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
 *
 * @param parties 拦截线程的总数量
 * @param barrierAction 为 CyclicBarrier 接收的 Runnable 命令,用于在线程到达屏障时,优先执行
 * @throws IllegalArgumentException 当拦截线程数量小于1时抛出异常
 */
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

/**
 * 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
 * @param parties 表示拦截线程的总数量。
 * @throws IllegalArgumentException 当拦截线程数量小于1时抛出异常
 */
public CyclicBarrier(int parties) {
    this(parties, null);
}

CyclicBarrier 中最重要的方法莫过于 await() 方法,在所有参与者parties都已经在此 barrier 上调用 await 方法之前,将一直等待。

我举个栗子,一个小伙子(线程1)骑自行车到了一个红绿灯路口,红灯(Barrier)亮了,他需要停下来等待(await);一个白富美(线程2)开着兰博基尼也路过此路口,恰好此时绿灯亮了(拦截的最后一个线程到达barrier)。小伙子和白富美确认过眼神后,过绿灯可以去干别的事(barrierAction)了。

基于以上对CyclicBarrier的了解,我们要强行使用CyclicBarrier来解决开始的那道题(==只需要实现效果即可==),怎么办?

我的思路是,我只定义一个需要拦截的线程,让它去做添加元素的操作,当元素添加到5个之后,await,执行一个Runnable,也就是barrierAction,用来提示已经有5个元素了。(这不就是模板方法设计模式吗?)

具体实现:

代码语言:javascript
复制
public class TestCyclicBarrier {

    static CyclicBarrier barrier ;
    static List lists = new LinkedList();


    static void add(Object o) {
        lists.add(o);
    }

    static int size() {
        return lists.size();
    }

    static class ReactThread implements Runnable {
        @Override
        public void run() {
            System.out.println("============== 元素个数已到达5,监控退出!=============");
        }
    }

    static class AddThread extends Thread {
        @Override
        public void run() {
            for (int i = 1; i < 11; i++) {
                add(new Object());
                System.out.println("添加元素线程 add 第" + i + "个元素");

                if (size() == 5) {
                    try {
                        barrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    public static void main(String[] args) {
        barrier = new CyclicBarrier(1, new ReactThread());
        new AddThread().start();
    }
}

执行结果:

CountDownLatch

CountDownLatch的API描述:

“A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. 一种同步机制,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成为止。 A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier. 用给定的计数初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await() 方法会一直受阻塞。之后,会释放所有等待的线程,await() 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier 。

CountDownLatch 是通过一个计数器来实现的,当我们在 new 一个 CountDownLatch 对象的时候,需要带入该计数器值,该值就表示了线程的数量。

每当一个线程完成自己的任务后,计数器的值就会减 1 。当计数器的值变为0时,就表示所有的线程均已经完成了任务,然后就可以恢复等待的线程继续执行了。

再次强行使用CountDownLatch来解决一下开始的那道题,尝试代码如下:

代码语言:javascript
复制
public class TestCountDownLatch {

    static CountDownLatch latch = new CountDownLatch(1);
    static List lists = new LinkedList();


    static void add(Object o) {
        lists.add(o);
    }

    static int size() {
        return lists.size();
    }

    static class ReactThread extends Thread {
        @Override
        public void run() {
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("============== 元素个数已到达5,监控退出!=============");
        }
    }

    static class AddThread extends Thread {
        @Override
        public void run() {
            for (int i = 1; i < 11; i++) {
                add(new Object());
                System.out.println("添加元素线程 add 第" + i + "个元素");

                if (size() == 5) {
                    latch.countDown();
                }

                //(监控线程已经准备打印退出了,添加元素的线程还在继续添加)
                //加个睡眠时间,方便观察,因为打印的动作也需要耗时
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        new ReactThread().start();
        new AddThread().start();
    }
}

运行结果也能达到预期:

代码语言:javascript
复制
添加元素线程 add 第1个元素
添加元素线程 add 第2个元素
添加元素线程 add 第3个元素
添加元素线程 add 第4个元素
添加元素线程 add 第5个元素
============== 元素个数已到达5,监控退出!=============
添加元素线程 add 第6个元素
添加元素线程 add 第7个元素
添加元素线程 add 第8个元素
添加元素线程 add 第9个元素
添加元素线程 add 第10个元素

CountDownLatch的关键类图:

CountDownLatch简单类图

通过这个图,我们试着看一下它的实现源码。

  • 构造方法:
代码语言:javascript
复制
/**
 * 构造一个用给定计数初始化的 CountDownLatch 
 *
 * @param count the number of times {@link #countDown} must be invoked
 *        before threads can pass through {@link #await}
 * @throws IllegalArgumentException if {@code count} is negative
 */
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}
  • Sync

Sync是CountDownLatch的一个内部类,它实现了AbstractQueuedSynchronizer(传说中的AQS)。

代码语言:javascript
复制
/**
 * Synchronization control For CountDownLatch.
 * Uses AQS state to represent count.
 */
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    //获取同步状态
    int getCount() {
        return getState();
    }

    //获取同步状态
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    //释放同步状态
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}

由此可见,CountDownLatch内部是采用共享锁来实现的。

  • await方法
代码语言:javascript
复制
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly这个方法在其父类AQS里,来看一下:

代码语言:javascript
复制
// java.util.concurrent.locks.AbstractQueuedSynchronizer.java
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

//...

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

当进入到doAcquireSharedInterruptibly方法后,它会自旋for (;;),一直尝试去获取同步状态。

  • countDown
代码语言:javascript
复制
public void countDown() {
    sync.releaseShared(1);
}

这里调用的releaseShared也是父类AQS的方法:

代码语言:javascript
复制
// AQS
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
代码语言:javascript
复制
// CountDownLatch内部类Sync重写tryReleaseShared方法
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        //获取锁状态
        int c = getState();
        //c == 0 直接返回,释放锁成功
        if (c == 0)
            return false;
        int nextc = c-1;
        //比较并替换CAS,更新锁状态(计数器)
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

Semaphore

Semaphore也是并发工具类之一,按惯例,我们来看一下它的API描述:

“计数信号量。从概念上讲,信号量维护了一个许可集。 如有必要,在许可可用前会阻塞每一个 acquire,然后再获取该许可。每个 release 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。

信号量通常用于限制线程数量,使其无法访问某些(物理或逻辑)资源,例如API上举了一个例子,通过Semaphore来控制资源池中数据的访问:

代码语言:javascript
复制
class Pool {
    private static final int MAX_AVAILABLE = 100;
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
    
    public Object getItem() throws InterruptedException {
     available.acquire();
     return getNextAvailableItem();
    }
    
    public void putItem(Object x) {
     if (markAsUnused(x))
       available.release();
    }
    
    // Not a particularly efficient data structure; just for demo
    
    protected Object[] items = ... whatever kinds of items being managed
    protected boolean[] used = new boolean[MAX_AVAILABLE];
    
    protected synchronized Object getNextAvailableItem() {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (!used[i]) {
          used[i] = true;
          return items[i];
       }
     }
     return null; // not reached
    }
    
    protected synchronized boolean markAsUnused(Object item) {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (item == items[i]) {
          if (used[i]) {
            used[i] = false;
            return true;
          } else
            return false;
       }
     }
     return false;
    }
}

从中我们可以领会到,Semaphore的关键方法就是acquire()release()

  • acquire

“获取信号量的方法

  • release

“释放信号量的方法

再再次强行使用Semaphore来完成我们之前那道题,上代码:

代码语言:javascript
复制
public class TestSemaphore {
    static List lists = new LinkedList();

    static void add(Object o) {
        lists.add(o);
    }

    static int size() {
        return lists.size();
    }

    static Thread t1 = null, t2 = null;

    public static void main(String[] args) {
        //定义一个只能有1个线程能获得许可的信号量
        Semaphore semaphore = new Semaphore(1);

        t1 = new Thread(() -> {
            try {
                //尝试获得许可
                semaphore.acquire();
                //添加5个元素之后,释放锁
                for (int i = 0; i < 5; i++) {
                    add(new Object());
                    System.out.println("线程t1 已经 add " + size() + " 个元素");
                }
                //释放锁,等待t2打印退出
                semaphore.release();

                //需要让t2执行
                t2.start();
                t2.join();

                //t2退出后,继续获得许可,添加元素
                semaphore.acquire();
                for (int i = 0; i < 5; i++) {
                    add(new Object());
                    System.out.println("线程t1 已经 add " + size() + " 个元素");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        t2 = new Thread(() -> {
            try {
                //首先获得许可
                semaphore.acquire();
                //打印 退出
                System.out.println("------- 线程t2已知容器中有5个元素了,t2退出。-------");
                //释放锁,等t1接着添加剩余的元素
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        t1.start();
    }
}

执行结果:

代码语言:javascript
复制
线程t1 已经 add 1 个元素
线程t1 已经 add 2 个元素
线程t1 已经 add 3 个元素
线程t1 已经 add 4 个元素
线程t1 已经 add 5 个元素
------- 线程t2已知容器中有5个元素了,t2退出。-------
线程t1 已经 add 6 个元素
线程t1 已经 add 7 个元素
线程t1 已经 add 8 个元素
线程t1 已经 add 9 个元素
线程t1 已经 add 10 个元素

小结

这次从一道线程同步的题目,切入Java并发与线程同步新机制,使用了CyclicBarrireCountDownLatchSemaphore实现了预期的效果。

其实还有像ExchangerPhaser等等并发工具类没有介绍到,他们的底层实现都是继承了AQS这个强大的类。

AQS文章蓄力准备中...

欢迎阅读

synchronized底层实现知多少?synchronized加锁还会升级?

“看完点赞,养成习惯。 举手之劳,赞有余香。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-08-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 行百里er 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一道面试题
  • CyclicBarrier
  • CountDownLatch
  • Semaphore
  • 小结
  • 欢迎阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档