前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java里阻塞线程的三种实现方法

Java里阻塞线程的三种实现方法

作者头像
九州暮云
发布2019-08-21 14:29:27
2.8K0
发布2019-08-21 14:29:27
举报
文章被收录于专栏:九州牧云

在日常开发中,我们有时会遇到遇到多线程处理任务的情况,JDK里提供了便利的ThreadPoolExecutor以及其包装的工具类Executors。但是我们知道ExecutorService.excute(Runnable r)是异步的,超过线程池处理能力的线程会被加入到执行队列里。有时候为了保证任务提交的顺序性,我们不希望有这个执行队列,在线程池满的时候,则把主线程阻塞。那么,怎么实现呢?

最直接的想法是继承ThreadPoolExecutor,重载excute()方法,加入线程池是否已满的检查,若线程池已满,则等待直到上一个任务执行完毕。这里ThreadPoolExecutor提供了一个afterExecute(Runnable r, Throwable t)方法,每个任务执行结束时会调用这个方法。 同时,我们会用到concurrent包的ReentrantLock以及Condition.wait/notify方法。以下是实现代码(代码来自:http://www.cnblogs.com/steeven/archive/2005/12/08/293219.html):

代码语言:javascript
复制
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();

@Override
public void execute(Runnable command) {
	pauseLock.lock();
	try {
		while (getPoolSize() == getMaximumPoolSize() && getQueue().remainingCapacity() == 0)
			unpaused.await();
		super.execute(command);// 放到lock外面的话,在压力测试下会有漏网的!
	} catch (InterruptedException e) {
		log.warn(this, e);
	} finally {
		pauseLock.unlock();
	}
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
	super.afterExecute(r, t);
	try {
		pauseLock.lock();
		unpaused.signal();
	} finally {
		pauseLock.unlock();
	}
}

当然,有些熟悉JDK源码的人会说,自己实现这个太费劲了,不喜欢!有没有比较简单的方法呢?

这里介绍一下vela同学的方法: http://vela.diandian.com/post/2012-07-24/40031283329

研究ThreadPoolExecutor.excute()源码会发现,它调用了BlockingQueue.offer()来实现多余任务的入队。BlockingQueue有两个方法:BlockingQueue.offer()BlockingQueue.put(),前者在队列满时不阻塞,直接失败,后者在队列满时阻塞。那么,问题就很简单了,继承某个BlockingQueue,然后将offer()重写,改成调用put()就搞定了!最短的代码量,也能起到很好的效果哦!

代码语言:javascript
复制
package com.diandian.framework.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ExecutorsEx {

    /**
     * 创建一个堵塞队列
     * 
     * @param threadSize
     * @return
     */
    public static ExecutorService newFixedThreadPool(int threadSize) {
        return new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(1) {

                    private static final long serialVersionUID = -9028058603126367678L;

                    @Override
                    public boolean offer(Runnable e) {
                        try {
                            put(e);
                            return true;
                        } catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                        }
                        return false;
                    }
                });
    }
}

当然这个方法有一点让人不快的地方,因为它与我们熟知的OO基本原则之一--里氏替换原则冲突了,即子类的方法与父类的方法有不同的行为。毕竟都是实现了BlockingQueue接口,offer()方法的行为被改变了。虽然只是一个匿名类,但是对于某些OOP的拥趸来说总有些不爽的地方吧!

没关系,我们还有JDK默认的解决方法:使用RejectedExecutionHandler。当ThreadPoolExecutor.excute执行失败时,会调用的RejectedExecutionHandler,这就是ThreadPoolExecutor的可定制的失败策略机制。JDK默认提供了4种失败策略: AbortPolicy(中止)、CallersRunPolicy(调用者运行)、DiscardPolicy(丢弃)、DiscardOldestPolicy(丢弃最旧的)。 其中值得说的是CallersRunPolicy,它会在excute失败后,尝试使用主线程(就是调用excute方法的线程)去执行它,这样就起到了阻塞的效果!于是一个完完全全基于JDK的方法诞生了:

代码语言:javascript
复制
public static ExecutorService newBlockingExecutorsUseCallerRun(int size) {
    return new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
            new ThreadPoolExecutor.CallerRunsPolicy());
}

当然这个方法有一个问题:这样加上主线程,总是会比参数的size线程多上一个。要么在函数开始就把size-1,要么,我们可以尝试自己实现一个RejectedExecutionHandler:

代码语言:javascript
复制
public static ExecutorService newBlockingExecutorsUseCallerRun(int size) {
    return new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(),
            new RejectedExecutionHandler() {

                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        executor.getQueue().put(r);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
}

怎么样,这下是不是感觉挺好了呢?

2013年9月22日更新:

事实证明,除了JDK的CallerRunsPolicy方案,其他的方案都存在一个隐患:

如果线程仍在执行,此时显式调用ExecutorService.shutdown()方法,会因为还有一个线程阻塞没有入队,而此时线程已经停止了,而这个元素才刚刚入队,最终会导致RejectedExecutionException。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档