上一篇我们介绍了如何通过synchronized 来加锁保护资源。但是,不当的加锁方式可能就会导致死锁。
最典型的就是哲学家问题, 场景:5个哲学家,5跟筷子,5盘意大利面,大家围绕桌子而坐,进行思考与进食活动。
在这里插入图片描述 哲学家的活动描述: 哲学家除了吃面、还要思考、所以要么放下左右手筷子进行思考、要么拿起两个筷子(自己两侧的)开始吃面。 哲学家从不交谈,这就很危险了,很可能会发生死锁,假设每个人都是先拿到左边的筷子,然后去拿右边的筷子,那么就可能会出现如下情况。
在这里插入图片描述 通过代码模拟:
public class DeadLockTest2 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
int sum = 5;
Chopsticks[] chopsticks = new Chopsticks[sum];
for (int i = 0; i < sum; i++) {
chopsticks[i] = new Chopsticks();
}
for (int i = 0; i < sum; i++) {
executorService.execute(new Philosopher(chopsticks[i], chopsticks[(i + 1) % sum]));
}
}
// 筷子
static class Chopsticks {
}
//哲学家
static class Philosopher implements Runnable {
private Chopsticks left;
private Chopsticks right;
public Philosopher(Chopsticks left, Chopsticks right) {
this.left = left;
this.right = right;
}
@Override
public void run() {
try {
//思考一段时间
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (left) {
try {
//拿到左边的筷子之后等待一段时间
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (right) {
try {
System.out.println("********开始吃饭");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
如上程序:定义了一个哲学家类,该类的主要任务要么是思考,要么是吃饭,吃饭的话,首先拿到其左边的筷子,等待一段时间后再去拿其右边的筷子。在此处因为每个哲学家都是占用自己左边的筷子等待拿右边的筷子。所以,就会出现循环等待,导致死锁。下面我们就来查看下:
我们可以通过java命令很方便的查看是否有死锁发生。首先通过jps
命令查看当前程序所占的进程如下:
在这里插入图片描述
找到对应的进程之后,接着通过jstack
命令查看程序运行情况。如下:
在这里插入图片描述 通过上述分析我们发现死锁发生的条件是如下四个(必须同时满足):
前面我们说了,死锁的发生条件是必须同时满足上述四个条件。那么避免死锁的方式就是破坏掉其中的一个条件就可以了。
对于占用且等待的情况,我们只需要一次性申请所有的资源,只有申请到了才会往下面走。对于这种情况,我们需要一个调度者,由它来统一申请资源。调度者必须是单例的,由他给哲学家分配筷子。
public class Allocator {
private List<Object> applyList = new ArrayList<Object>();
private final static Allocator allocator = new Allocator();
private Allocator() {
}
/**
* 只能由一个人完成,所以是单例模式
* @return
*/
public static Allocator getAllocator() {
return allocator;
}
/**
* 申请资源
*/
synchronized boolean applyResource(Object from, Object to) {
if (applyList.contains(from) ||
applyList.contains(to)) {
return false;
}
applyList.add(from);
applyList.add(to);
return true;
}
/**
* 释放资源
*/
synchronized void free(Object from, Object to) {
applyList.remove(from);
applyList.remove(to);
}
调度者会一直申请拿到两个资源,如果能拿到这执行后续流程,拿不到的话则一直循环申请。
public void eat(Account2 target) {
//没有申请到锁就一直循环下去,直到成功
while (!Allocator.getAllocator().applyResource(this, target)) {
return;
}
try {
//左边
synchronized (this) {
//右边
synchronized (target) {
}
}
} finally {
//释放已经申请的资源
Allocator.getAllocator().free(this, target);
}
}
对于不可抢占资源,占有部分资源的线程进一步申请其他资源,如果申请不到则主动释放它占用的资源。在后面我们会运用lock来实现。给锁设定超时时间。如果在超时未获得需要的资源,则释放其所占资源。
class Philosopher extends Thread{
private ReentrantLock left,right;
public Philosopher(ReentrantLock left, ReentrantLock right) {
super();
this.left = left;
this.right = right;
}
public void run(){
try {
while(true){
Thread.sleep(1000);//思考一段时间
left.lock();
try{
if(right.tryLock(1000,TimeUnit.MILLISECONDS)){
try{
Thread.sleep(1000);//进餐一段时间
}finally {
right.unlock();
}
}else{
//没有获取到右手的筷子,放弃并继续思考
}
}finally {
left.unlock();
}
}
} catch (InterruptedException e) {
}
}
}
如上程序,我们将right锁的超时时间设置为1秒,如果不能获取到,右手的筷子,则放弃吃面并继续思考。
对于循环等待,我们可以按序申请资源来预防,所谓的按序申请,是指资源是有线性顺序的,申请的时候可以先申请序号小的,再申请序号大的。 我们在Chopsticks中添加一个id 字段,作为资源的序号。然后在申请资源时按照序号从小到大开始申请。
static class Chopsticks {
private int id;
public Chopsticks(int id) {
this.id = id;
}
public int getId() {
return id;
}
}
public Philosopher(Chopsticks left, Chopsticks right) {
if (left.getId() > right.getId()) {
this.left = right;
this.right = left;
} else {
this.left = left;
this.right = right;
}
}
当检测到死锁时,一个可行的做法是释放所有锁,回退,并且等待一段随机时间后重试。这个和简单的加锁超时类似,不一样的是只有死锁已经发生了才回退 。而不会是因为加锁的请求超时了,虽然有回退和等待,但是如果有大量线程竞争同一批锁,它们还是会重复地死锁。
一个更好的方案是给这些线程设置优先级,让一个(或几个)线程回退,剩下的线程就像没发生死锁一样继续保持着它们需要的锁。可以在死锁发生的时候设置随机的优先级。
本文通过一个经典的哲学家就餐的问题,引入了死锁发生的场景及发生的条件。然后,针对这些条件介绍了避免死锁的三种方式。
https://github.com/XWxiaowei/ConcurrencyDemo
https://www.jianshu.com/p/99f10708b1e1