Java并发编程实战系列8之线程池的使用

ThreadPoolExecutor UML图:

image

image

8.1 在任务和执行策略之间隐形耦合

避免Thread starvation deadlock

8.2 设置线程池大小

8.3 配置ThreadPoolExecutor

image

构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
 int maximumPoolSize,
 long keepAliveTime,
 TimeUnit unit,
 BlockingQueue<Runnable> workQueue,
 ThreadFactory threadFactory,
 RejectedExecutionHandler handler) { ... } 
  • 核心和最大池大小:如果运行的线程少于 corePoolSize,则创建新线程来处理请求(即一个Runnable实例),即使其它线程是空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。
  • 保持活动时间:如果池中当前有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止。
  • 排队:如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列BlockingQueue,而不添加新的线程。
  • 被拒绝的任务:当 Executor 已经关闭,或者队列已满且线程数量达到maximumPoolSize时(即线程池饱和了),请求将被拒绝。这些拒绝的策略叫做Saturation Policy,即饱和策略。包括AbortPolicy, CallerRunsPolicy, DiscardPolicy, and DiscardOldestPolicy.

另外注意:

  • 如果运行的线程少于 corePoolSize,ThreadPoolExecutor 会始终首选创建新的线程来处理请求;注意,这时即使有空闲线程也不会重复使用(这和数据库连接池有很大差别)。
  • 如果运行的线程等于或多于 corePoolSize,则 ThreadPoolExecutor 会将请求加入队列BlockingQueue,而不添加新的线程(这和数据库连接池也不一样)。
  • 如果无法将请求加入队列(比如队列已满),则创建新的线程来处理请求;但是如果创建的线程数超出 maximumPoolSize,在这种情况下,请求将被拒绝。

newCachedThreadPool使用了SynchronousQueue,并且是无界的。

线程工厂ThreadFactory

8.4 扩展ThreadPoolExecutor

重写beforeExecute和afterExecute方法。

8.5 递归算法的并行化

实际就是类似Number of Islands或者N-Queens等DFS问题的一种并行处理。

串行版本如下:

public class SequentialPuzzleSolver <P, M> {
    private final Puzzle<P, M> puzzle;
    private final Set<P> seen = new HashSet<P>();

    public SequentialPuzzleSolver(Puzzle<P, M> puzzle) {
        this.puzzle = puzzle;
    }

    public List<M> solve() {
        P pos = puzzle.initialPosition();
        return search(new PuzzleNode<P, M>(pos, null, null));
    }

    private List<M> search(PuzzleNode<P, M> node) {
        if (!seen.contains(node.pos)) {
            seen.add(node.pos);
            if (puzzle.isGoal(node.pos))
                return node.asMoveList();
            for (M move : puzzle.legalMoves(node.pos)) {
                P pos = puzzle.move(node.pos, move);
                PuzzleNode<P, M> child = new PuzzleNode<P, M>(pos, move, node);
                List<M> result = search(child);
                if (result != null)
                    return result;
            }
        }
        return null;
    }
}

并行版本如下:

public class ConcurrentPuzzleSolver <P, M> {
    private final Puzzle<P, M> puzzle;
    private final ExecutorService exec;
    private final ConcurrentMap<P, Boolean> seen;
    protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();

    public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {
        this.puzzle = puzzle;
        this.exec = initThreadPool();
        this.seen = new ConcurrentHashMap<P, Boolean>();
        if (exec instanceof ThreadPoolExecutor) {
            ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
            tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        }
    }

    private ExecutorService initThreadPool() {
        return Executors.newCachedThreadPool();
    }

    public List<M> solve() throws InterruptedException {
        try {
            P p = puzzle.initialPosition();
            exec.execute(newTask(p, null, null));
            // block until solution found
            PuzzleNode<P, M> solnPuzzleNode = solution.getValue();
            return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
        } finally {
            exec.shutdown();
        }
    }

    protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
        return new SolverTask(p, m, n);
    }

    protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
        SolverTask(P pos, M move, PuzzleNode<P, M> prev) {
            super(pos, move, prev);
        }

        public void run() {
            if (solution.isSet()
                    || seen.putIfAbsent(pos, true) != null)
                return; // already solved or seen this position
            if (puzzle.isGoal(pos))
                solution.setValue(this);
            else
                for (M m : puzzle.legalMoves(pos))
                    exec.execute(newTask(puzzle.move(pos, m), m, this));
        }
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏行者常至

016.多线程-线程池的四种创建方式

版权声明:本文为博主原创文章,允许转载,请标明出处。 https://blog.csdn.net/qwdafedv/article/deta...

1623
来自专栏Linyb极客之路

并发编程之CyclicBarrier

一、CyclicBarrier简介 CyclicBarrier也叫同步屏障,在JDK1.5被引入,可以让一组线程达到一个屏障时被阻塞,直到最后一个线程达到屏...

2933
来自专栏挖掘大数据

Spark整合Mongodb(附实例代码)

环境准备 mongodb下载 解压安装 启动mongodb服务 $MONGODB_HOME/bin/mongod --fork --dbpath=/root/...

4470
来自专栏Android相关

Java线程池---getTask方法解析

/** * Performs blocking or timed wait for a task, depending on * current confi...

3262
来自专栏向前进

vue-cli脚手架npm相关文件解读(9)config/index.js

系列文章传送门: 1、build/webpack.base.conf.js 2、build/webpack.prod.conf.js 3、build/webp...

3916
来自专栏Java帮帮-微信公众号-技术文章全总结

Java多线程详解5【面试+工作】

Java多线程详解【面试+工作】 Java线程:新特征-信号量 Java的信号量实际上是一个功能完毕的计数器,对控制一定资源的消费与回收有着很重要的意义,信号量...

42110
来自专栏xdecode

Java高并发之线程池详解

例如线程, jdbc连接等等, 在高并发场景中, 如果可以复用之前销毁的对象, 那么系统效率将大大提升.

1532
来自专栏搜云库

使用 Executors,ThreadPoolExecutor,创建线程池,源码分析理解

之前创建线程的时候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSi...

21010
来自专栏java 成神之路

java 线程池

2413
来自专栏小灰灰

Java并发学习之玩转线程池

线程池的使用姿势 基本上实际的项目不可能离开线程池,只是看你有没有注意到罢了 作为以业务需求为驱动,最顺溜的是写if-else的码农我来说,线程池就比较高端了...

2166

扫码关注云+社区

领取腾讯云代金券