前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >CompletionService学习

CompletionService学习

作者头像
路行的亚洲
发布2020-07-16 16:00:38
2380
发布2020-07-16 16:00:38
举报

前面已经说到Future的默认实现是FutureTask,因此你可以看到其在jdk1.5的时候采用的是AQS去实现的,因此具有阻塞性,但jdk1.6之后,可以看到其基于CAS实现的。之所以学习Future,除了其具备异步功能,同时其采用的思想也是在设计模式中有体现的,也即Future模式,而且可以在kafka源码中看到基于Future构建的异步编程。

前面说到其基于AQS具有阻塞性,但从源码中,可以看到在jdk1.6之后采用的是CAS:

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

可以看到awaitDone(true, unit.toNanos(timeout)))方法:

  /**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {  //进行自旋
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;  //进行状态匹配
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

还有其run方法:

public void run() {//采用cas
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

也即可以看到jdk1.6之后,FutureTask采用park和cas来实现的。

下面来学习CompletionService。

如果向Executor提交一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的future,然后返回使用get方法,同时将timeout指定为0,从而通过轮询的方式来判断任务是否完成,但是这样有些繁琐。因此可以采用CompletionService,其将Executor和BlockingQueue的功能融合在一起,可以采用队列操作take、poll获取已完结的结果。--摘自《并发编程实战》

/**
 * @description: CompletionSerivce使用
 * <p>
 * 采用异步的方式一边处理新的任务,一边处理完成任务的结果
 * 也就是说在处理多个任务时,可以实现先处理的任务,先拿到结果
 * 采用 submit+take,不至于在一个任务没有完成的情况下,其余的结果不能处理
 * 你可以将其理解成Executor+BlockingQueue的结合体,此时你可以使用其实现
 * ExecutorCompletionService,进行异构并行
 * </p>
 * @author: lyz
 * @date: 2020/05/24 22:02
 **/
public class CompletionServiceTest {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        //开启5个线程
        ExecutorService exs = Executors.newFixedThreadPool(4);
        try {
            int taskCount = 10;
            // 结果集
            List<Integer> list = new ArrayList<Integer>();
            List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();

            // 1.定义CompletionService
            CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(exs);

            // 2.添加任务,需要执行的业务
            for (int i = 0; i < taskCount; i++) {
                Future<Integer> future = completionService.submit(new Task(i + 1));
                futureList.add(future);
            }

            // 3.获取结果
            for (int i = 0; i < taskCount; i++) {
                Integer result = completionService.take().get();
                System.out.println("任务i==" + result + "完成!" + new Date());
                list.add(result);
            }

            System.out.println("list=" + list);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭线程池
            exs.shutdown();
        }

    }

    //实现Callable接口,重写call方法
    static class Task implements Callable<Integer> {
        Integer i;

        public Task(Integer i) {
            super();
            this.i = i;
        }

        @Override
        public Integer call() throws Exception {
            if (i == 4) {
                Thread.sleep(5000);
            } else {
                Thread.sleep(1000);
            }
            System.out.println("线程:" + Thread.currentThread().getName() + "任务i=" + i + ",执行完成!");
            return i;
        }

    }
}

运行结果:

线程:pool-1-thread-2任务i=2,执行完成!
线程:pool-1-thread-1任务i=1,执行完成!
线程:pool-1-thread-3任务i=3,执行完成!
任务i==2完成!Sun May 24 22:50:01 CST 2020
任务i==1完成!Sun May 24 22:50:01 CST 2020
任务i==3完成!Sun May 24 22:50:01 CST 2020
线程:pool-1-thread-2任务i=5,执行完成!
任务i==5完成!Sun May 24 22:50:02 CST 2020
线程:pool-1-thread-1任务i=6,执行完成!
线程:pool-1-thread-3任务i=7,执行完成!
任务i==6完成!Sun May 24 22:50:02 CST 2020
任务i==7完成!Sun May 24 22:50:02 CST 2020
线程:pool-1-thread-2任务i=8,执行完成!
任务i==8完成!Sun May 24 22:50:03 CST 2020
线程:pool-1-thread-1任务i=9,执行完成!
线程:pool-1-thread-3任务i=10,执行完成!
任务i==9完成!Sun May 24 22:50:03 CST 2020
任务i==10完成!Sun May 24 22:50:03 CST 2020
线程:pool-1-thread-4任务i=4,执行完成!
任务i==4完成!Sun May 24 22:50:05 CST 2020
list=[2, 1, 3, 5, 6, 7, 8, 9, 10, 4]

可以看到其与FutureTask相比,运行的结果是不相同的,其返回结果的线程是乱序的,同时是先执行完的,先返回结果,不同于FutureTask的顺序返回。同时性能上优于FutureTask。当然,在JDK8中,我们可以是一个更为强大的CompletableFuture来实现异构应用。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档