前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java 多线程模式 —— Guarded Suspension 模式

Java 多线程模式 —— Guarded Suspension 模式

作者头像
fengzhizi715
发布2022-05-16 08:43:49
4510
发布2022-05-16 08:43:49
举报

Guarded Suspension 模式的介绍

我们只从字面上看,Guarded Suspension 是受保护暂停的意思。

Guarded Suspension 模式

在实际的并发编程中,Guarded Suspension 模式适用于某个线程需要满足特定的条件(Predicate)才能执行某项任务(访问受保护对象)。条件未满足时,则挂起线程,让线程一直处于 WAITING 状态,直到条件满足后该线程才可以执行任务。有点类似于 Java 的 wait() 和 notify() 方法。

Guarded Suspension 模式通过让线程的等待,来保证受保护对象的安全。

Guarded Suspension.png

Guarded Suspension 模式是等待唤醒机制的一种规范实现,又被称为 Guarded Wait 模式、Spin Lock 模式、多线程版本的 if。

应用场景

Guarded Suspension 模式是多线程编程基础的设计模式,适用于很多应用场景,也可以和其他的多线程模式组合使用。

下面列举两个场景。

场景一

在工业自动化场景下,某个自动化流水线上使用工业相机通过图像算法识别上料台是否有物品,当算法识别到上料台有物品时,机械臂才会去抓取物品,否则机械臂一直处于等待状态。图像算法的调用和机械臂的控制,分别处于不同的线程。对于这样的多线程协作,正好可以使用 Guarded Suspension 模式。

场景二

Dubbo 的调用是异步的,却可以得到同步的返回结果。这也是经典的异步转同步的方法。 翻阅 Dubbo 的 DefaultFeture 类,我们可以看到它的源码也使用了 Guarded Suspension 模式。

Guarded Suspension 模式的使用

通用的 Guarded Suspension 模式

基于上述的说明,我们创建一个通用的受保护对象 GuardedObject,这里用到了 Lock、Condition 来实现该对象。当然,用 Java 的 wait() 和 notify()/notifyAll() 方法也是可以的。

代码语言:javascript
复制
public class GuardedObject<K,V> {

    private static final int TIMEOUT = 10;
    private static final Map<Object, GuardedObject> goMap = new ConcurrentHashMap<>();

    // 受保护对象
    private V obj;
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();

    public static <K,V> GuardedObject create(K key) {
        GuardedObject go = new GuardedObject<K,V>();
        goMap.put(key, go);
        return go;
    }

    // 唤醒等待的线程
    public static <K, V> void fireEvent(K key, V obj) {
        GuardedObject go = goMap.remove(key);
        if (go != null) {
            go.onChange(obj);
        }
    }

    // 获取受保护对象
    public V get(Predicate<V> p) throws WatcherException{
        lock.lock();
        try {
            while (!p.test(obj)) {
                done.await(TIMEOUT, TimeUnit.SECONDS);
            }
        } catch (InterruptedException e) {
            throw new WatcherException(e);
        } finally {
            lock.unlock();
        }
        return obj;
    }

    // 事件通知方法
    public void onChange(V obj) {
        lock.lock();
        try {
            this.obj = obj;
            done.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

在 GuardedObject 对象中, goMap 是静态变量,所以这个 Map 存储了所有的 GuardedObject 对象。

另外,WatcherException 是我们业务异常,在这里可以替换成 RuntimeException。

我们用代码来模拟 Guarded Suspension 模式的使用过程。

代码语言:javascript
复制
class SomeObj{

}

public class Test {

    public static void main(String[] args) {

        try {
            GuardedObject<Integer,SomeObj> guardedObject = GuardedObject.create(1);

            Thread t1 = new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "线程开始访问受保护对象");
                guardedObject.get(e -> {
                    return e!=null;
                });
                System.out.println(Thread.currentThread().getName() + "线程访问到了受保护对象");
            });
            t1.setName("t1");
            t1.start();

            Thread t2 = new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + "线程开始做准备的工作");
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                GuardedObject.fireEvent(1,new SomeObj());
                System.out.println(Thread.currentThread().getName() + "线程准备工作完成,条件满足,唤醒等待的线程");
            });
            t2.setName("t2");
            t2.start();

            t1.join();
            t2.join();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

执行结果:

代码语言:javascript
复制
t1线程开始访问受保护对象
t2线程开始做准备的工作
t2线程准备工作完成,条件满足,唤醒等待的线程
t1线程访问到了受保护对象

支持超时机制的 Guarded Suspension 模式

对于将异步调用转化成同步调用的操作时,肯定是需要超时机制的。因此,将上述代码修改一下,增加了超时机制。

代码语言:javascript
复制
public class GuardedObject<K,V> {

    private static final int TIMEOUT = 10;
    private static final Map<Object, GuardedObject> goMap = new ConcurrentHashMap<>();

    // 受保护对象
    private V obj;
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();

    public static <K,V> GuardedObject create(K key) {
        GuardedObject go = new GuardedObject<K,V>();
        goMap.put(key, go);
        return go;
    }

    /**
     * 判断条件
     * @return
     */
    private boolean isArrived() {
        return obj != null;
    }

    /**
     * 唤醒等待的线程
     * @param key
     * @param obj
     */
    public static <K, V> void fireEvent(K key, V obj) {
        GuardedObject go = goMap.remove(key);
        if (go != null) {
            go.onChange(obj);
        }
    }

    /**
     * 获取受保护对象
     * @return
     * @throws WatcherException
     */
    public V get() throws WatcherException{
        return get(TIMEOUT);
    }

    /**
     * 获取受保护对象
     * @return
     * @throws WatcherException
     */
    public V get(long timeout) throws WatcherException{
        lock.lock();
        try {
            while (!isArrived()) {
                //等待,增加超时机制
                try {
                    done.await(timeout, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    throw new WatcherException(e);
                }

                if (!isArrived()) {
                    throw new WatcherException("timeout");
                }
            }
        } finally {
            lock.unlock();
        }
        return obj;
    }

    /**
     * 事件通知方法
     * @param obj
     */
    private void onChange(V obj) {
        lock.lock();
        try {
            this.obj = obj;
            done.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

如果超时的话,GuardedObject#get() 会抛出异常,在调用时捕获异常就可以了。

总结

笔者正好使用该模式,将某个串口调用的第三方库 (https://github.com/NeuronRobotics/nrjavaserial) 从原先只支持异步调用,改成了也可以支持同步调用,增加了超时的机制,并应用于生产环境中。

对于多线程的协作,当然还有其他方式。比如 A 线程轮询等待 B 线程结束后,再去执行 A 线程的任务。对于这种情况,肯定是使用 Guarded Suspension 模式更佳。或者通过 eventbus 这样的事件总线来实现多线程的协作。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-05-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Guarded Suspension 模式的介绍
    • Guarded Suspension 模式
      • 应用场景
        • 场景一
        • 场景二
    • Guarded Suspension 模式的使用
      • 通用的 Guarded Suspension 模式
        • 支持超时机制的 Guarded Suspension 模式
        • 总结
        相关产品与服务
        事件总线
        腾讯云事件总线(EventBridge)是一款安全,稳定,高效的云上事件连接器,作为流数据和事件的自动收集、处理、分发管道,通过可视化的配置,实现事件源(例如:Kafka,审计,数据库等)和目标对象(例如:CLS,SCF等)的快速连接,当前 EventBridge 已接入 100+ 云上服务,助力分布式事件驱动架构的快速构建。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档