专栏首页后端技术学习CompletionService学习

CompletionService学习

前面已经说到Future的默认实现是FutureTask,因此你可以看到其在jdk1.5的时候采用的是AQS去实现的,因此具有阻塞性,但jdk1.6之后,可以看到其基于CAS实现的。之所以学习Future,除了其具备异步功能,同时其采用的思想也是在设计模式中有体现的,也即Future模式,而且可以在kafka源码中看到基于Future构建的异步编程。

前面说到其基于AQS具有阻塞性,但从源码中,可以看到在jdk1.6之后采用的是CAS:

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

可以看到awaitDone(true, unit.toNanos(timeout)))方法:

  /**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {  //进行自旋
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;  //进行状态匹配
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

还有其run方法:

public void run() {//采用cas
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

也即可以看到jdk1.6之后,FutureTask采用park和cas来实现的。

下面来学习CompletionService。

如果向Executor提交一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的future,然后返回使用get方法,同时将timeout指定为0,从而通过轮询的方式来判断任务是否完成,但是这样有些繁琐。因此可以采用CompletionService,其将Executor和BlockingQueue的功能融合在一起,可以采用队列操作take、poll获取已完结的结果。--摘自《并发编程实战》

/**
 * @description: CompletionSerivce使用
 * <p>
 * 采用异步的方式一边处理新的任务,一边处理完成任务的结果
 * 也就是说在处理多个任务时,可以实现先处理的任务,先拿到结果
 * 采用 submit+take,不至于在一个任务没有完成的情况下,其余的结果不能处理
 * 你可以将其理解成Executor+BlockingQueue的结合体,此时你可以使用其实现
 * ExecutorCompletionService,进行异构并行
 * </p>
 * @author: lyz
 * @date: 2020/05/24 22:02
 **/
public class CompletionServiceTest {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        //开启5个线程
        ExecutorService exs = Executors.newFixedThreadPool(4);
        try {
            int taskCount = 10;
            // 结果集
            List<Integer> list = new ArrayList<Integer>();
            List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();

            // 1.定义CompletionService
            CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(exs);

            // 2.添加任务,需要执行的业务
            for (int i = 0; i < taskCount; i++) {
                Future<Integer> future = completionService.submit(new Task(i + 1));
                futureList.add(future);
            }

            // 3.获取结果
            for (int i = 0; i < taskCount; i++) {
                Integer result = completionService.take().get();
                System.out.println("任务i==" + result + "完成!" + new Date());
                list.add(result);
            }

            System.out.println("list=" + list);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭线程池
            exs.shutdown();
        }

    }

    //实现Callable接口,重写call方法
    static class Task implements Callable<Integer> {
        Integer i;

        public Task(Integer i) {
            super();
            this.i = i;
        }

        @Override
        public Integer call() throws Exception {
            if (i == 4) {
                Thread.sleep(5000);
            } else {
                Thread.sleep(1000);
            }
            System.out.println("线程:" + Thread.currentThread().getName() + "任务i=" + i + ",执行完成!");
            return i;
        }

    }
}

运行结果:

线程:pool-1-thread-2任务i=2,执行完成!
线程:pool-1-thread-1任务i=1,执行完成!
线程:pool-1-thread-3任务i=3,执行完成!
任务i==2完成!Sun May 24 22:50:01 CST 2020
任务i==1完成!Sun May 24 22:50:01 CST 2020
任务i==3完成!Sun May 24 22:50:01 CST 2020
线程:pool-1-thread-2任务i=5,执行完成!
任务i==5完成!Sun May 24 22:50:02 CST 2020
线程:pool-1-thread-1任务i=6,执行完成!
线程:pool-1-thread-3任务i=7,执行完成!
任务i==6完成!Sun May 24 22:50:02 CST 2020
任务i==7完成!Sun May 24 22:50:02 CST 2020
线程:pool-1-thread-2任务i=8,执行完成!
任务i==8完成!Sun May 24 22:50:03 CST 2020
线程:pool-1-thread-1任务i=9,执行完成!
线程:pool-1-thread-3任务i=10,执行完成!
任务i==9完成!Sun May 24 22:50:03 CST 2020
任务i==10完成!Sun May 24 22:50:03 CST 2020
线程:pool-1-thread-4任务i=4,执行完成!
任务i==4完成!Sun May 24 22:50:05 CST 2020
list=[2, 1, 3, 5, 6, 7, 8, 9, 10, 4]

可以看到其与FutureTask相比,运行的结果是不相同的,其返回结果的线程是乱序的,同时是先执行完的,先返回结果,不同于FutureTask的顺序返回。同时性能上优于FutureTask。当然,在JDK8中,我们可以是一个更为强大的CompletableFuture来实现异构应用。

本文分享自微信公众号 - 后端技术学习(gh_9f5627e6cc61),作者:路行的亚洲

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-05-26

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • ThreadPoolExecutor源码学习

    但点进去看newSingleThreadExecutor可以看到其会调用ThreadPoolExecutor里面的线程。因此有必要研究ThreadPoolExe...

    路行的亚洲
  • 对前端传入的json对象解析成多个对象

    multiRequestBodyDemo(@MultiRequestBody("dog")

    路行的亚洲
  • pmq学习一-一些典型的使用和套路

    pmq是信也科技开源的一款消息中间件,虽然没有RocketMQ和Kafka出名,但是里面的代码还是有值得我们学习的地方的。

    路行的亚洲
  • Java中的锁的使用和实现介绍

    锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源。 源代码基于 1.8.0

    用户7886150
  • 破解 Kotlin 协程(2) - 协程启动篇

    我相信现在接触 Kotlin 的开发者绝大多数都有 Java 基础,我们刚开始学习 Thread 的时候,一定都是这样干的:

    bennyhuo
  • 企业环境下MySQL5.5调优

    山山仙人
  • 谈谈ThreadPoolExecutor线程池

    线程池,凡是学过java的同学都不陌生,一两行简单的代码就能实现并发编程。但java.util.concurrent.ThreadPoolExecutor的源码...

    zhangheng
  • 4.线程池

    六月的雨
  • runloop的解读

    且行且珍惜_iOS
  • Android多线程之AsyncTask源码解析

    叶志陈

扫码关注云+社区

领取腾讯云代金券