我们只从字面上看,Guarded Suspension 是受保护暂停的意思。
在实际的并发编程中,Guarded Suspension 模式适用于某个线程需要满足特定的条件(Predicate)才能执行某项任务(访问受保护对象)。条件未满足时,则挂起线程,让线程一直处于 WAITING 状态,直到条件满足后该线程才可以执行任务。有点类似于 Java 的 wait() 和 notify() 方法。
Guarded Suspension 模式通过让线程的等待,来保证受保护对象的安全。
Guarded Suspension.png
Guarded Suspension 模式是等待唤醒机制的一种规范实现,又被称为 Guarded Wait 模式、Spin Lock 模式、多线程版本的 if。
Guarded Suspension 模式是多线程编程基础的设计模式,适用于很多应用场景,也可以和其他的多线程模式组合使用。
下面列举两个场景。
在工业自动化场景下,某个自动化流水线上使用工业相机通过图像算法识别上料台是否有物品,当算法识别到上料台有物品时,机械臂才会去抓取物品,否则机械臂一直处于等待状态。图像算法的调用和机械臂的控制,分别处于不同的线程。对于这样的多线程协作,正好可以使用 Guarded Suspension 模式。
Dubbo 的调用是异步的,却可以得到同步的返回结果。这也是经典的异步转同步的方法。 翻阅 Dubbo 的 DefaultFeture 类,我们可以看到它的源码也使用了 Guarded Suspension 模式。
基于上述的说明,我们创建一个通用的受保护对象 GuardedObject,这里用到了 Lock、Condition 来实现该对象。当然,用 Java 的 wait() 和 notify()/notifyAll() 方法也是可以的。
public class GuardedObject<K,V> {
private static final int TIMEOUT = 10;
private static final Map<Object, GuardedObject> goMap = new ConcurrentHashMap<>();
// 受保护对象
private V obj;
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
public static <K,V> GuardedObject create(K key) {
GuardedObject go = new GuardedObject<K,V>();
goMap.put(key, go);
return go;
}
// 唤醒等待的线程
public static <K, V> void fireEvent(K key, V obj) {
GuardedObject go = goMap.remove(key);
if (go != null) {
go.onChange(obj);
}
}
// 获取受保护对象
public V get(Predicate<V> p) throws WatcherException{
lock.lock();
try {
while (!p.test(obj)) {
done.await(TIMEOUT, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
throw new WatcherException(e);
} finally {
lock.unlock();
}
return obj;
}
// 事件通知方法
public void onChange(V obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}
}
在 GuardedObject 对象中, goMap 是静态变量,所以这个 Map 存储了所有的 GuardedObject 对象。
另外,WatcherException 是我们业务异常,在这里可以替换成 RuntimeException。
我们用代码来模拟 Guarded Suspension 模式的使用过程。
class SomeObj{
}
public class Test {
public static void main(String[] args) {
try {
GuardedObject<Integer,SomeObj> guardedObject = GuardedObject.create(1);
Thread t1 = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "线程开始访问受保护对象");
guardedObject.get(e -> {
return e!=null;
});
System.out.println(Thread.currentThread().getName() + "线程访问到了受保护对象");
});
t1.setName("t1");
t1.start();
Thread t2 = new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "线程开始做准备的工作");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
GuardedObject.fireEvent(1,new SomeObj());
System.out.println(Thread.currentThread().getName() + "线程准备工作完成,条件满足,唤醒等待的线程");
});
t2.setName("t2");
t2.start();
t1.join();
t2.join();
} catch (Exception e) {
e.printStackTrace();
}
}
}
执行结果:
t1线程开始访问受保护对象
t2线程开始做准备的工作
t2线程准备工作完成,条件满足,唤醒等待的线程
t1线程访问到了受保护对象
对于将异步调用转化成同步调用的操作时,肯定是需要超时机制的。因此,将上述代码修改一下,增加了超时机制。
public class GuardedObject<K,V> {
private static final int TIMEOUT = 10;
private static final Map<Object, GuardedObject> goMap = new ConcurrentHashMap<>();
// 受保护对象
private V obj;
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
public static <K,V> GuardedObject create(K key) {
GuardedObject go = new GuardedObject<K,V>();
goMap.put(key, go);
return go;
}
/**
* 判断条件
* @return
*/
private boolean isArrived() {
return obj != null;
}
/**
* 唤醒等待的线程
* @param key
* @param obj
*/
public static <K, V> void fireEvent(K key, V obj) {
GuardedObject go = goMap.remove(key);
if (go != null) {
go.onChange(obj);
}
}
/**
* 获取受保护对象
* @return
* @throws WatcherException
*/
public V get() throws WatcherException{
return get(TIMEOUT);
}
/**
* 获取受保护对象
* @return
* @throws WatcherException
*/
public V get(long timeout) throws WatcherException{
lock.lock();
try {
while (!isArrived()) {
//等待,增加超时机制
try {
done.await(timeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new WatcherException(e);
}
if (!isArrived()) {
throw new WatcherException("timeout");
}
}
} finally {
lock.unlock();
}
return obj;
}
/**
* 事件通知方法
* @param obj
*/
private void onChange(V obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}
}
如果超时的话,GuardedObject#get() 会抛出异常,在调用时捕获异常就可以了。
笔者正好使用该模式,将某个串口调用的第三方库 (https://github.com/NeuronRobotics/nrjavaserial) 从原先只支持异步调用,改成了也可以支持同步调用,增加了超时的机制,并应用于生产环境中。
对于多线程的协作,当然还有其他方式。比如 A 线程轮询等待 B 线程结束后,再去执行 A 线程的任务。对于这种情况,肯定是使用 Guarded Suspension 模式更佳。或者通过 eventbus 这样的事件总线来实现多线程的协作。