前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >线程池整理

线程池整理

作者头像
算法之名
发布2019-08-20 16:10:39
5790
发布2019-08-20 16:10:39
举报
文章被收录于专栏:算法之名算法之名

一般在生产环境中,我们都不会直接new一个Thread,然后再去start(),因为这么做会不断频繁的创建线程,销毁线程,过大的线程会耗尽CPU和内存资源,大量的垃圾回收,也会给GC带来压力,延长GC停顿时间.

1、固定大小线程池

代码语言:javascript
复制
public class ThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        MyTask task = new MyTask();
        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i = 0;i < 10;i++) {
            es.submit(task);
        }
        es.shutdown();
    }
}

运行结果:

1539134496389:Thread ID:11 1539134496389:Thread ID:12 1539134496389:Thread ID:13 1539134496389:Thread ID:14 1539134496389:Thread ID:15 1539134497390:Thread ID:14 1539134497390:Thread ID:12 1539134497390:Thread ID:15 1539134497390:Thread ID:13 1539134497390:Thread ID:11

结果解读:运行结果并不是一次刷出来的,而是刷出了5个,中间会停顿1秒,再刷出5个,说明,并行处理是5个线程执行一次,然后再并行处理5个。

将Executors.newFixedThreadPool改成Executors.newCachedThreadPool()

代码语言:javascript
复制
public class ThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        MyTask task = new MyTask();
        ExecutorService es = Executors.newCachedThreadPool();
        for (int i = 0;i < 10;i++) {
            es.submit(task);
        }
        es.shutdown();
    }
}

结果相同,但是是同时并行处理的,中间没有停顿,说明newCachedThreadPool()是根据需要来分配线程数的。

2、计划任务

newScheduledThreadPool()有两个方法来调用线程对象,scheduleAtFixedRate()跟scheduleWithFixedDelay().他们之间的差别就是scheduleAtFixedRate()总共只占用调度时间,而scheduleWithFixedDelay()占用的是线程执行时间加调度时间.但如果scheduleAtFixedRate()的线程执行时间大于调度时间,也不会出现重复调度(即一个线程还没有执行完,另外一个线程会启动),而是一个线程执行完,另一个线程马上启动.

代码语言:javascript
复制
public class ScheduledExecutorServiceDemo {
    public static void main(String[] args) {
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        ses.scheduleAtFixedRate(new Runnable() {
            public void run() {
                try {
                    long start = System.currentTimeMillis();
                    Thread.sleep(2000);
                    System.out.println((System.currentTimeMillis() -start) +":" + Thread.currentThread().getName());
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },0,2, TimeUnit.SECONDS);
    }
}

运行结果(部分截取)

2001:pool-1-thread-1 2000:pool-1-thread-1 2000:pool-1-thread-2 2000:pool-1-thread-1 2001:pool-1-thread-3 2000:pool-1-thread-2

结果解读:尽管有时间调度,他们依然是不同的线程来运行的,每显示一条中间停顿2秒(线程运行时间也是2秒)

代码语言:javascript
复制
public class ScheduledExecutorServiceDemo {
    public static void main(String[] args) {
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        ses.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                try {
                    long start = System.currentTimeMillis();
                    Thread.sleep(2000);
                    System.out.println((System.currentTimeMillis() -start) +":" + Thread.currentThread().getName());
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },0,2, TimeUnit.SECONDS);
    }
}

运行结果与之前相同,但是每显示一条的时间间隔为4秒(线程运行时间依然为2秒),其中2秒为调度时间,2秒为运行时间.

3、核心线程池的内部实现。

其实不论是Executors工厂的哪种实现,都是调用了同一个类ThreadPoolExecutor,使用了不同的构造参数罢了.不同的构造参数可以产生不同种类的线程池,因此我们也可以自定义线程池.

JDK实现

代码语言:javascript
复制
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
代码语言:javascript
复制
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
代码语言:javascript
复制
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

拒绝策略

当线程池任务数量超过系统实际承载能力时,可以启用拒绝策略。

直接中断策略

代码语言:javascript
复制
public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
//                new RejectedExecutionHandler() {
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        System.out.println(r.toString() + " is discard");
//                    }
//                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

ThreadPoolExecutor()的最后一个参数为中断策略,上面的new ThreadPoolExecutor.AbortPolicy()为直接中断!

参数说明:

第一个参数corePoolSize:指定了线程池中的线程数量.

第二个参数maximumPoolSize:指定了线程池中的最大线程数量.

第三个参数KeepAliveTime:当线程池线程数量超过了corePoolSize时,多余的空闲线程的存活时间.即超过corePoolSize的空闲线程,在多长时间内会被销毁.

第四个参数unit:keepAliveTime的单位.

第五个参数workQueue:任务队列,被提交但尚未被执行的任务.

1,直接提交的队列:SynchronousQueue,无容量,每一个插入操作都要等待一个删除操作,提交的任务不会被真实保存,总是将新任务提交给线程执行.如果没有空闲进程,则尝试创建新的进程.如果进程数量达到最大,则执行拒绝策略.

2,有界的任务队列:ArrayBlockingQueue,必须带一个容量参数,表示该队列的最大容量.当线程池的实际线程数小于corePoolSize,会优先创建新的线程,若大于corePoolSize,则会将新任务加入到等待队列.若等待队列满的时候,无法加入,则在总线程数不大于maximumPoolSize的前提下,创建新的进程执行任务.若大于maximumPoolSize,执行拒绝策略.

3,无界的任务队列:LinkedBlockingQueue,除非系统资源耗尽,不存在任务入队失败的情况.当线程池的实际线程数小于corePoolSize,会优先创建新的线程,若大于corePoolSize,则会将新任务加入到等待队列,若任务的创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存.

4,优先任务队列:PriorityBlockingQueue,可以控制任务执行的先后顺序.是一个特殊的无界队列.无论是ArrayBlockingQueue还是LinkedBlockingQueue都是按照先进先出算法处理任务的,而PriorityBlockingQueue则可以根据任务自身的优先级顺序先后执行,总是确保高优先级的任务先执行.

第六个参数threadFactory:线程工厂,用于创建线程,一般用默认的即可.

第七个参数handler:拒绝策略,当任务太多,来不及处理,如何拒绝任务.

运行结果

1539153799420:Thread ID:11 1539153799430:Thread ID:12 1539153799440:Thread ID:13 1539153799450:Thread ID:14 1539153799460:Thread ID:15 1539153799520:Thread ID:11 1539153799530:Thread ID:12 1539153799540:Thread ID:13 1539153799550:Thread ID:14 1539153799560:Thread ID:15 1539153799621:Thread ID:11 Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@45ee12a7 rejected from java.util.concurrent.ThreadPoolExecutor@330bedb4[Running, pool size = 5, active threads = 5, queued tasks = 9, completed tasks = 6] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) at com.guanjian.RejectThreadPoolDemo.main(RejectThreadPoolDemo.java:31)

结果解读:由于并发线程数量太大,Integer.MAX_VALUE,我们线程池的最大线程数只有5个,而无界任务队列LinkedBlockingQueue<Runnable>只有10个,无法满足快速的线程数量增长,拒绝策略发挥作用,抛出异常,阻止系统正常工作.

代码语言:javascript
复制
public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
//                new RejectedExecutionHandler() {
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        System.out.println(r.toString() + " is discard");
//                    }
//                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

new ThreadPoolExecutor.CallerRunsPolicy()只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务,但性能极有可能会急剧下降.

代码语言:javascript
复制
public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());
//                new RejectedExecutionHandler() {
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        System.out.println(r.toString() + " is discard");
//                    }
//                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

ThreadPoolExecutor.DiscardOldestPolicy()该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务.

代码语言:javascript
复制
public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
//                new RejectedExecutionHandler() {
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        System.out.println(r.toString() + " is discard");
//                    }
//                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

ThreadPoolExecutor.DiscardPolicy()丢弃无法处理的任务,不予任何处理.

自定义拒绝策略

代码语言:javascript
复制
public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(),
                new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println(r.toString() + " is discard");
                    }
                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

运行结果:

1539159379178:Thread ID:11 1539159379187:Thread ID:12 1539159379197:Thread ID:13 1539159379207:Thread ID:14 1539159379217:Thread ID:15 1539159379279:Thread ID:11 1539159379288:Thread ID:12 1539159379301:Thread ID:13 1539159379308:Thread ID:14 1539159379318:Thread ID:15 1539159379379:Thread ID:11 1539159379388:Thread ID:12 1539159379401:Thread ID:13 java.util.concurrent.FutureTask@45ee12a7 is discard 1539159379408:Thread ID:14 1539159379418:Thread ID:15 java.util.concurrent.FutureTask@330bedb4 is discard java.util.concurrent.FutureTask@2503dbd3 is discard java.util.concurrent.FutureTask@4b67cf4d is discard java.util.concurrent.FutureTask@7ea987ac is discard

这里只是比ThreadPoolExecutor.DiscardPolicy()多了打印出丢弃的任务.

自定义线程创建

代码语言:javascript
复制
public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactory() {
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setDaemon(true);
                        System.out.println("create " + t);
                        return t;
                    }
                });
        for (int i = 0; i < 5; i++) {
            es.submit(task);
        }
        Thread.sleep(2000);
    }
}

就是可以自己定义线程,如守护线程等等

运行结果:

create Thread[Thread-0,5,main] create Thread[Thread-1,5,main] create Thread[Thread-2,5,main] create Thread[Thread-3,5,main] create Thread[Thread-4,5,main] 1539159694414:Thread ID:11 1539159694414:Thread ID:12 1539159694414:Thread ID:13 1539159694414:Thread ID:14 1539159694414:Thread ID:15

扩展线程池

线程池可以扩展出线程执行前,执行后,终止的后续处理

代码语言:javascript
复制
public class ExtThreadPool {
    public static class MyTask implements Runnable {
        public String name;
        public MyTask(String name) {
            this.name = name;
        }
        public void run() {
            System.out.println("正在执行" + ":Thread ID" + Thread.currentThread().getId() + ",Task Name=" + name);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行:" + ((MyTask)r).name);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完成:" + ((MyTask)r).name);
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出!");
            }
        };
        for (int i = 0;i < 5;i++) {
            MyTask task = new MyTask("TASK-GEYM-" + i);
            es.execute(task);
            Thread.sleep(10);
        }
        es.shutdown();
    }
}

运行结果:

准备执行:TASK-GEYM-0 正在执行:Thread ID11,Task Name=TASK-GEYM-0 准备执行:TASK-GEYM-1 正在执行:Thread ID12,Task Name=TASK-GEYM-1 准备执行:TASK-GEYM-2 正在执行:Thread ID13,Task Name=TASK-GEYM-2 准备执行:TASK-GEYM-3 正在执行:Thread ID14,Task Name=TASK-GEYM-3 准备执行:TASK-GEYM-4 正在执行:Thread ID15,Task Name=TASK-GEYM-4 执行完成:TASK-GEYM-0 执行完成:TASK-GEYM-1 执行完成:TASK-GEYM-2 执行完成:TASK-GEYM-3 执行完成:TASK-GEYM-4 线程池退出!

在线程池中寻找堆栈

有时候线程执行时会出现Bug,抛出异常,如果使用submit()来提交线程时,不会打印异常信息,而使用execute()来执行线程时可以打印异常信息.

代码语言:javascript
复制
public class DivTask implements Runnable {
    int a,b;
    public DivTask(int a,int b) {
        this.a = a;
        this.b = b;
    }
    public void run() {
        double re = a / b;
        System.out.println(re);
    }

    public static void main(String[] args) {
//        ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS,
//                new SynchronousQueue<Runnable>());
        ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
        for (int i = 0;i < 5;i++) {
            pools.submit(new DivTask(100,i));
        }
    }
}

这段代码中,5个并发线程会有一个线程有除0错误

运行结果:

100.0 50.0 33.0 25.0

结果没有任何提示,异常抛出.

代码语言:javascript
复制
public class DivTask implements Runnable {
    int a,b;
    public DivTask(int a,int b) {
        this.a = a;
        this.b = b;
    }
    public void run() {
        double re = a / b;
        System.out.println(re);
    }

    public static void main(String[] args) {
//        ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS,
//                new SynchronousQueue<Runnable>());
        ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
        for (int i = 0;i < 5;i++) {
            pools.execute(new DivTask(100,i));
        }
    }
}

运行结果:

100.0 Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero 50.0 33.0 at com.guanjian.DivTask.run(DivTask.java:18) 25.0 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

有异常抛出

重写跟踪线程池,自定义跟踪

代码语言:javascript
复制
public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void execute(Runnable task) {
        super.execute(wrap(task,clientTrace(),Thread.currentThread().getName()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(wrap(task,clientTrace(),Thread.currentThread().getName()));
    }

    private Exception clientTrace() {
        return new Exception("Client stack trace");
    }

    private Runnable wrap(final Runnable task,final Exception clientStack,String clientThreadName) {
        return new Runnable() {
            public void run() {
                try {
                    task.run();
                } catch (Exception e) {
                    clientStack.printStackTrace();
                    try {
                        throw e;
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }
                }
            }
        };
    }
}
代码语言:javascript
复制
public class DivTask implements Runnable {
    int a,b;
    public DivTask(int a,int b) {
        this.a = a;
        this.b = b;
    }
    public void run() {
        double re = a / b;
        System.out.println(re);
    }

    public static void main(String[] args) {
        ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>());
//        ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS,
//                new SynchronousQueue<Runnable>());
        for (int i = 0;i < 5;i++) {
            pools.execute(new DivTask(100,i));
        }
    }
}

运行结果:

100.0 java.lang.Exception: Client stack trace 50.0 at com.guanjian.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:27) 33.0 25.0 at com.guanjian.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:18) at com.guanjian.DivTask.main(DivTask.java:28) java.lang.ArithmeticException: / by zero at com.guanjian.DivTask.run(DivTask.java:18) at com.guanjian.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:34) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

这样我们就可以知道是在哪里出了错.

4、分而治之,Fork/Join框架

将一个大任务拆分成各种较小规模的任务,进行并行处理,也许按照约定条件拆分的任务还是大于约定条件就继续拆分.有两种线程类型,一种是有返回值的RecursiveTask<T>,一种是没有返回值的RecursiveAction,他们都继承于ForkJoinTask<>,一个带泛型<T>,一个是<Void>.

代码语言:javascript
复制
/**
 * Created by Administrator on 2018/10/11.
 * 可以理解成一个Runnable(线程类)
 */
public class CountTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10000;
    private long start;
    private long end;

    public CountTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    /**
     * 可以理解成run()方法
     * @return
     */
    @Override
    protected Long compute() {
        long sum = 0;
        boolean canCompute = (end - start) < THRESHOLD;
        //最终计算,所有的最终拆分都是在这里计算
        if (canCompute) {
            for (long i = start;i <= end;i++) {
                sum += i;
            }
        }else {
            //并行计算的规模,拆分成100个并行计算
            long step = (start + end) /100;
            //创建子任务线程集合
            List<CountTask> subTasks = new ArrayList<CountTask>();
            //每个并行子任务的开始值
            long pos = start;
            //并行执行100个分叉线程
            for (int i = 0;i < 100;i++) {
                //每个并行子任务的结束值
                long lastOne = pos + step;
                if (lastOne > end) {
                    lastOne = end;
                }
                //建立一个子任务的线程
                CountTask subTask = new CountTask(pos,lastOne);
                //创建下一个并行子任务的开始值
                pos += step + 1;
                //将当前子任务线程添加到线程集合
                subTasks.add(subTask);
                //执行该线程,其实是一个递归,判断lastOne-pos是否小于THRESHOLD,小于则真正执行,否则继续分叉100个子线程
                subTask.fork();
            }
            for (CountTask t:subTasks) {
                //阻断每一次分叉前的上一级线程进行等待,并将最终并行的结果进行层层累加
                sum += t.join();
            }
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(0,200000L);
        ForkJoinTask<Long> result = forkJoinPool.submit(task);
        try {
            long res = result.get();
            System.out.println("sum: " + res);
        }catch (InterruptedException e) {
            e.printStackTrace();
        }catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

sum: 20000100000

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
GPU 云服务器
GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档