关于Java并发
从创建起,Java已经支持核心的并发概念,如线程和锁。本指南帮助Java开发人员使用多线程程序来了解核心并发概念以及如何应用它们。本指南涵盖的主题包括内建的Java语言功能,如Thread,synchronized和volatile,以及JavaSE 5中添加的新构造,如Locks,Atomics,并发集合,线程协调抽象和Executors。使用这些构建块,开发人员可以构建高度并发和线程安全的Java应用程序。
概念
本节介绍经常使用的关键Java并发概念.
表 1: Java并发概念
保护共享数据
编写线程安全的Java程序需要开发人员在修改共享数据时使用适当的锁。锁可保证Java内存模型所需的顺序,并保证对其他线程的更改的可见性。
热心提示:数据更改外部同步在Java内存模型下没有指定的语义! JVM可以自由地重新排序指令,并以可能对开发人员感到惊讶的方式限制可见性。
同步
每个对象实例都有一个监视器,一次可以被一个线程锁定。可以以方法或块形式指定synchronized关键字来锁定监视器。同时在对象上同步时修改字段可以保证在同一对象上同步的任何其他线程的后续读取将会看到更新的值。重要的是要注意,写入外部同步或同步在与读取不同的对象上不一定对其他线程是可见的。 可以在特定对象实例上以方法或块形式指定synchronized关键字。如果在非静态方法中指定,则将该引用用作实例。在一个同步静态方法中,定义该方法的类被用作实例。
锁
java.util.concurrent.locks包具有标准的锁接口。 ReentrantLock实现复制了synchronized关键字的功能,但也提供了其他功能,例如获取有关锁的状态,非阻塞tryLock()和可中断锁的信息。
使用显式ReentrantLock实例的示例:
ReadWriteLock
Java.util.concurrent.locks包还包含一个ReadWriteLock接口(和ReentrantReadWriteLock实现),它由一对用于读取和写入的锁定义,通常允许多个并发读,但只有一个写。使用一个显式的ReentrantReadWriteLock允许多个并发读的例子:
volatile
volatile可用于标记字段,并指示除了同步之外,其他线程的所有后续读取都必须看到该字段的更改。因此,易volatile提供可见性,就像同步,但仅限于对字段的每次读取或写入。在Java SE 5之前,volatile的实现在JVM实现和体系结构之间是不一致的,不能被依赖。 Java内存模型现在明确地定义了volatile的行为。
使用volatile作为信号标志的一个例子:
热心提示:将数组标记为volatile不会使数组中的item变成volatile!在这种情况下,volatile仅适用于数组引用本身。而是使用像AtomicIntegerArray这样的类来创建一个具有volatile类型条目的数组。
原子类
volatile的一个缺点是,当它提供可见性保证时,您不能同时检查和更新单个原子调用中的volatile字段。 java.util.concurrent.atomic包包含一组类,它们以类似于volatile的无锁方式支持单个值上的原子化操作.
incrementAndGet方法只是Atomic类上可用的复合操作的一个示例。为布尔,整数,长整型和对象引用以及整数,长整型和对象引用的数组提供了原子类。
ThreadLocal
在线程中包含数据并使锁定不必要的一种方法是使用ThreadLocal存储。在概念上,ThreadLocal就像在每个Thread中都有自己的版本的变量一样。 ThreadLocals通常用于堆叠每个线程值,如“当前事务”或其他资源。此外,它们用于维护每线程计数器,统计信息或ID生成器。
并发集合
正确保护共享数据的关键技术是将同步机制与保存数据的类进行封装。这种技术使得不可能不正确地访问数据,因为所有的使用必须符合同步协议。 java.util.concurrent包包含许多旨在并发使用的数据结构。通常,使用这些数据结构比使用非同步集合周围的同步包有更好的性能。
并发列表和集合
java.util.concurrent包包含三个并发的List和Set实现,
如表2所示。
类 | 描述 |
---|---|
CopyOnWriteArraySet | CopyOnWriteArraySet 提供写时复制语义,其中数据结构的每个修改导致数据的新内部副本(因此写入非常昂贵)。数据结构上的迭代器总是看到创建迭代器时数据的快照。 |
CopyOnWriteArrayList | 与CopyOnWriteArraySet类似, CopyOnWriteArrayList 使用写时复制语义来实现List接口 |
ConcurrentSkipListSet | ConcurrentSkipListSet (在Java SE 6中添加)提供与TreeSet类似的排序集功能的并发访问。由于基于SkipList的实现,只要不修改集合的相同部分,多个线程通常可以在集合内读取和写入。 |
并发map
java.util.concurrent包包含一个名为ConcurrentMap的Map接口的扩展,该接口提供了表3中描述的一些额外的方法。所有这些方法都在单个原子操作的范围内执行一组操作。由于在map上进行多个(非原子)调用,因此在map外部执行这一组动作会引起竞争条件。
表3:ConcurrentMap方法
方法 | 描述 |
---|---|
putIfAbsent(K key, V value) : V | 如果键不在map中,那么把键/值对,否则什么也不做。返回旧值,如果以前没有,则返回null |
remove(Object key, Object value) : boolean | 如果map包含key,并将其映射到值,则删除条目,否则不执行任何操作。 |
replace(K key, V value) : V | 如果map包含key,则用newValue替换,否则不执行任何操作。 |
replace(K key, V oldValue, V newValue) : boolean | 如果map包含key,并将其映射到oldValue,则用newValue替换,否则不执行任何操作。 |
有两个ConcurrentMap实现可用,如表4所示。
表4:ConcurrentMap实现
类 | 描述 |
---|---|
ConcurrentHashMap | ConcurrentHashMap 提供两级内部hash。第一级选择一个内部段,第二级别将哈希值分配到所选区段中的桶中。第一级通过允许并行地在每个段上安全地发生读取和写入来提供并发性 |
ConcurrentSkipListMap | ConcurrentSkipListMap (在Java SE 6中添加)提供并发访问以及类似于TreeMap的排序映射功能。性能界限类似于TreeMap,尽管多线程通常可以从map读取和写入,而无需争用,只要它们不修改map的相同部分即可。 |
队列
队列作为“生产者”和“消费者”之间的管道。物品放在管道的一端,以相同的“先进先出”(FIFO)顺序从管道的另一端出来。
Queue接口被添加到Java SE 5中的java.util中,而它可以在单线程方案中使用,它主要用于多个生产者或一个或多个消费者,所有这些都是从同一个队列中进行写入和读取。
BlockingQueue接口位于java.util.concurrent中,并扩展了Queue,以提供如何处理队列可能已满的场景(生产者添加项目时)或为空(消费者读取或删除项目时)的其他选项。
在这些情况下,BlockingQueue提供了在指定时间段内永久阻塞或阻塞的方法,等待条件由于另一个线程的动作而改变。表5演示了关键操作和处理这些特殊条件的策略的Queue和BlockingQueue方法。
表5:Queue和BlockingQueue方法
方法 | 策略 | 插入 | 删除 | 检查 |
---|---|---|---|---|
Queue | 抛出异常 | add | remove | 元素 |
返回特殊值 | offer | poll | peek | |
Blocking Queue | 永远阻塞 | put | take | n/a |
定时阻塞 | offer | poll | n/a |
下面几个队列实现由JDK提供,其关系在表6中进行了说明。
表6:队列实现
类 | 描述 |
---|---|
PriorityQueue | PriorityQueue是唯一的非并发队列实现,可以由单个线程用于收集项目并按排序顺序处理它们。 |
ConcurrentLinkedQueue | 无限链接的列表队列实现和唯一的并发实现不支持BlockingQueue。 |
ArrayBlockingQueue | 由数组支持的有界阻塞队列。 |
LinkedBlockingQueue | 由链表支持的可选边界的阻塞队列。这可能是最常用的Queue实现。 |
PriorityBlockingQueue | 堆栈支持的无界阻塞队列。项目按照与队列关联的比较器(而不是FIFO顺序)的顺序从队列中删除。 |
DelayQueue | 一个无阻塞的元素队列,每个都有一个延迟值。元素只能在其延迟通过后按照最旧的过期项目的顺序删除。 |
SynchronousQueue | 生产者和消费者阻止直到另一个到达的0长度队列。当两个线程到达时,该值直接从生产者转移到消费者。在线程之间传输数据时很有用 |
在Java SE 6中添加了一个双端队列或Deque(发音为“deck”).Deques不仅支持从一端添加,而是从另一端添加,并从两端添加和删除项。与BlockingQueue类似,有一个BlockingDeque接口,在特殊情况下提供阻塞和超时的方法。表7显示了Deque和BlockingDeque方法。因为Deque扩展Queue和BlockingDeque扩展了BlockingQueue,所有这些方法也可以使用。
表7:Deque和BlockingDeque方法
接口 | 首或尾 | 策略 | 插入 | 删除 | 检查 |
---|---|---|---|---|---|
Queue | 头 | 抛出异常 | addFirst | removeFirst | getFirst |
返回特定值 | offerFirst | pollFirst | peekFirst | ||
尾 | 抛出异常 | addLast | removeLast | getLast | |
返回特定值 | offerLast | pollLast | peekLast | ||
Blocking Queue | 头 | 永远阻塞 | putFirst | takeFirst | n/a |
定时阻塞 | offerFirst | pollFirst | n/a | ||
尾 | 永远阻塞 | putLast | takeLast | n/a | |
定时阻塞 | offerLast | pollLast | n/a |
Deque的一个特殊用例是添加,删除和检查操作时,只能在管道的一端进行。这种特殊情况只是一个堆栈(先进先出的检索顺序)。 Deque界面实际上提供了使用堆栈术语的方法:push(),pop()和peek()。这些方法映射到Deque接口中的addFirst(),removeFirst()和peekFirst()方法,并允许您将任何Deque实现用作堆栈。表8描述了JDK中的Deque和BlockingDeque实现。请注意,Deque扩展Queue和BlockingDeque扩展了BlockingQueue
表8:Deques
类 | 描述 |
---|---|
LinkedList | 这种长期的数据结构已在Java SE 6中进行了改进,以支持Deque界面。您现在可以使用标准的Deque方法从列表的任一端添加或删除(许多这些方法已经存在),还可以将其用作非同步堆栈来代替完全同步的Stack类。 |
ArrayDeque | 此实现不是并发的,并支持无限队列长度(根据需要动态调整)。 |
LinkedBlockingDeque | 唯一的并发deque实现,这是一个由链表实现的阻塞可选的有限的deque。 |
线程
在Java中,java.lang.Thread类用于表示应用程序或JVM线程。代码总是在一些Thread类的上下文中执行(使用Thread.currentThread()来获取你自己的Thread)。
线程通信
在线程之间通信的最明显的方法是一个线程直接调用另一个Thread对象的方法。表9显示了可用于跨线程直接交互的Thread上的方法。
表9:线程协调方法
启动一个Thread实例并执行其run()方法。阻塞直到其他线程退出
线程方法 | 描述 |
---|---|
start | |
join | |
interrupt | 中断另一个线程。如果线程在响应中断的方法中被阻塞,则InterruptedException将抛出到另一个线程,否则中断状态被置位。 |
stop, suspend, resume, destroy | 这些方法都被弃用,不应该使用。它们根据所讨论的线程的状态执行危险操作。相反,使用interrupt()或volatile标志来向线程指出应该做什么。 |
未捕获异常处理程序
线程可以指定一个UncaughtExceptionHandler,它将接收任何导致线程突然终止的未捕获异常的通知。
死锁
当存在多个线程,每个线程等待另一个线程持有的资源时,会发生死锁,从而形成资源和获取线程的循环。最明显的资源类型是对象监视器,但任何导致阻塞(如wait / notify)的资源都可以被限制。
线程协调
等待和通知的规范使用模式如下:
有关此代码的一些重要事项:
始终在同步锁中调用wait,notify和notifyAll,否则将抛出IllegalMonitorStateException异常。 总是在一个循环中等待检查正在等待的状态 - 如果另一个线程满足等待开始之前的条件,这将解决时序问题。此外,它可以保护您的代码免受可能发生的(和)发生的虚假唤醒。 始终确保您在调用notify或notifyAll之前满足等待状态。如果没有这样做会导致通知,但没有线程将永远不能逃脱其等待循环。
Condition
在Java SE 5中,添加了一个新的java.util.concurrent.locks.Condition类。Condition实现API中的等待/通知语义,但具有若干附加功能,例如创建多个Condition每个锁,可中断等待,访问统计信息等功能。Condition从Lock实例获取如下:
协调类
java.util.concurrent包包含几个为多线程通信形式预先构建的类。这些协调课程涵盖了大多数常见的情况,其中等待/通知和Condition可能被使用,并且由于其安全性和易用性被强烈推测。
CyclicBarrier
CyclicBarrier由参与者计数初始化。参与者调用await()并阻塞直到达到计数,此时最后到达的线程执行可选的屏障任务,并释放所有线程。障碍可以无限期地重复使用。用于协调线程组的启动和停止。
CountDownLatch
CountDownLatch用计数初始化。线程可能会调用await()等待计数达到0.其他线程(或相同)可能会调用countDown()来减少计数。一旦计数达到0,则不可重用。一旦发生一些操作,用于触发一组未知的线程。
Semaphore
信号量管理一组“许可”,可以使用capture()检出,这将阻止,直到可用。线程调用release()返回许可证。具有一个许可证的信号量等效于互斥块。
Exchanger
一个交换器等待线程在exchange()方法中进行交互,并以原子方式交换值。这与使用SynchronousQueue类似,但数据值在两个方向都通过。
任务执行
许多并发Java程序需要一个从队列执行任务的工作池。 java.util.concurrent包为这种工作管理风格提供了坚实的基础
ExecutorService
Executor和更广泛的ExecutorService接口为可以执行任务的组件定义协议。这些接口的用户可以在通用接口背后获得各种实现行为。
最通用的Executor接口仅以Runnables的形式接受作业:
void execute(Runnable command)
ExecutorService扩展Executor以添加采用Runnable和Callable任务以及任务集合的方法:
Future<?> submit(Runnable task)
Future<T> submit(Callable<T> task)
Future<T> submit(Runnable task, T result)
List<Future<T> invokeAll(Collection<? extends Callable<T> tasks)
List<Future<T> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
T invokeAny(Collection<? extends Callable<T>> tasks)
T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
Callable and Future
Callable就像熟悉的Runnable,但可以返回结果并抛出异常:
V call() throws Exception;
在Executor框架中通常提交Callable并接收Future。Future是一个标记,代表将来某个时刻可用的结果。Future的方法允许您轮询或阻塞等待结果准备就绪。您还可以在通过Future下的方法执行任务之前或之后取消任务。
如果您需要仅支持Runnables(如Executor)的Future的功能,则可以使用FutureTask作为桥梁。 FutureTask实现了Future和Runnable,以便您可以将该任务作为Runnable提交,并将该任务本身用作调用者中的Future。
ExecutorService实现
ExecutorService的主要实现是ThreadPoolExecutor。该实现类提供了各种可配置的功能:
Thread pool-指定“核心”线程计数(可选预先设置)和最大线程数
Thread Factory-生成具有自定义特征(例如自定义名称)的线程
Work Queue-指定队列实现,其必须是阻塞,但可以是有界的或无界的
Rejected Tasks-为完整的输入队列或不可用的工作者指定不能接受的任务的策略
Life hooks-被覆盖以扩展到覆盖生命周期中的关键点,如执行任务之前或之后
Shutdown-停止传入任务并等待执行任务完成
ScheduledThreadPoolExecutor是ThreadPoolExecutor的扩展,它提供了调度任务完成而不是使用FIFO语义的能力。对于java.util.Timer不够复杂的情况,ScheduledThreadPoolExecutor通常提供足够的灵活性。
Executors类包含许多静态方法(参见表10),用于创建预包装的ExecutorService和ScheduledExecutorService实例,这些实例将涵盖各种常见的用例 表10:Executors Factory方法
返回具有不同大小的线程池的ExecutorService
方法 | 描述 |
---|---|
newSingleThreadExecutor | 返回一个只有一个线程的ExecutorService。 |
newFixedThreadPool | 返回具有固定线程数的ExecutorService。 |
newCachedThreadPool | |
newSingleThreadScheduledExecutor | 返回带有单个线程的ScheduledExecutorService。 |
newScheduledThreadPool | 返回具有核心线程集的ScheduledExecutorService。 |
以下示例创建一个固定线程池并提交一个长时间运行的任务:
在此示例中,将任务提交给执行程序的调用将不会阻塞,而是立即返回。最后一行将阻塞get()调用,直到结果可用。
ExecutorService几乎包括您先前创建Thread对象或线程池的所有情况。任何时候你的代码直接构建一个线程,考虑你是否可以用Executor工厂方法之一生成的ExecutorService完成相同的目标;这往往会更简单和更灵活。
除了工作池和输入队列的常见模式之外,每个任务都需要生成一个必须累积以进行进一步处理的结果。 CompletionService界面允许用户提交Callable和Runnable任务,但也可以对结果队列中的结果进行轮询:
Future<V> take() – 如果可用就获取
Future<V> poll() – 阻塞到可用
Future<V> poll(long timeout, TimeUnit unit) – 阻塞直到超时结束
ExecutorCompletionService是CompletionService的标准实现。它由一个提供输入队列和工作线程池的Executor构造。
热心提示:在调整线程池大小时,将大小设置为运行应用程序的机器中的逻辑核数通常很有用。在Java中,您可以通过调用Runtime.getRuntime()
availableProcessors()获取该值。可用处理器的数量可能会在JVM的生存期内发生更改。