首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >JUC系列(七) ForkJion任务拆分与异步回调

JUC系列(七) ForkJion任务拆分与异步回调

作者头像
冷环渊
发布2022-12-03 08:56:54
2700
发布2022-12-03 08:56:54
举报

ForkJion

什么是ForkJoin

ForkJoin 下 JDK 1.7 并行执行任务的,数量越大,效率越高

比如 :大数据 Map Reduce(把大任务拆分成小任务)

image-20220304004113183
image-20220304004113183

ForkJoin 特点: 工作窃取

举例子:

PS: 维护的是双端队列 Deuue

A线程执行任务到 第二个

B线程执行完毕,那么B线程回去讲A线程的东西拿来执行,从而提高效率

image-20220304004235249
image-20220304004235249

认识forkjion

ForkJoin 使用两个类来完成以上两件事情:

  • ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:
    • RecursiveAction:用于没有返回结果的任务。
    • RecursiveTask :用于有返回结果的任务。
  • ForkJoinPool :ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
image-20220304005022113
image-20220304005022113

代码实例

task 类 里面编写的是我们继承了 递归任务继承的实现方法

public class forkjoinDemo extends RecursiveTask<Long> {
    /* 解决方案 也是有三六九等的,比如案例 求和
     * 最低等 就是直接for循环求和
     * 中等 使用forkjion
     * 高等 stream 并行流
     * */
//开始
    private long start;
    //结束
    private long end;
    //到多少值,才开始分开任务
    private long threshold = 10000L;

    public forkjoinDemo(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        //判断超过阈值的时候 开始使用 fork join
        if (end - start > threshold) {
            long sum = 0L;
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {
            //    求出中间值
            long mid = (start - end) / 2;
            forkjoinDemo task1 = new forkjoinDemo(start, mid);

            //拆分任务,把任务压入线程队列
            task1.fork();
            forkjoinDemo task2 = new forkjoinDemo(mid + 1, end);
            task2.fork();
            return task1.join() + task2.join();
        }
    }
}

测试类 三种方法的速度

public class test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //test1(); 7042;
        //test2(); 969
        //test3(); 179;
    }

    public static void test1() {
        Long sum = 0L;
        long start = System.currentTimeMillis();
        for (long i = 1L; i <= 10_0000_0000; i++) {
            sum += i;
        }
        long end = System.currentTimeMillis();
        System.out.println("sum" + sum + "=> 执行时间" + (end - start));
    }


    public static void test2() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> task = new forkjoinDemo(0L, 10_0000_0000L);
        ForkJoinTask<Long> submit = forkJoinPool.submit(task);
        Long sum = submit.get();
        long end = System.currentTimeMillis();
        System.out.println("sum" + sum + "=> 执行时间" + (end - start));
    }

    public static void test3() {

        long start = System.currentTimeMillis();
        //并行流
        long reduce = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
        long end = System.currentTimeMillis();
        System.out.println("sum" + reduce + "=> 执行时间" + (end - start));
    }
}

异步回调

什么是future

常见的两种创建线程的方式。一种是直接继承Thread,另外一种就是实现Runnable接口。

这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。

从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

Future模式的核心思想是能够让主线程将原来需要同步等待的这段时间用来做其他的事情。(因为可以异步获得执行结果,所以不用一直同步等待去获得执行结果)

image-20220304014056949
image-20220304014056949

上图简单描述了不使用Future和使用Future的区别,不使用Future模式,主线程在invoke完一些耗时逻辑之后需要等待,这个耗时逻辑在实际应用中可能是一次RPC调用,可能是一个本地IO操作等。B图表达的是使用Future模式之后,我们主线程在invoke之后可以立即返回,去做其他的事情,回头再来看看刚才提交的invoke有没有结果。

Future接口的局限性

当我们得到包含结果的Future时,我们可以使用get方法等待线程完成并获取返回值,注意我加粗的地方,Future的get() 方法会阻塞主线程。即使我们使用isDone()方法轮询去查看线程执行状态,但是这样也非常浪费cpu资源。

image-20220304014316398
image-20220304014316398

我们需要新的,更强大的拓展,CompletableFuture

在Java 8中, 新增加了一个包含50个方法左右的类: CompletableFuture,结合了Future的优点,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

CompletableFuture被设计在Java中进行异步编程。异步编程意味着在主线程之外创建一个独立的线程,与主线程分隔开,并在上面运行一个非阻塞的任务,然后通知主线程进展,成功或者失败。

通过这种方式,你的主线程不用为了任务的完成而阻塞/等待,你可以用主线程去并行执行其他的任务。 使用这种并行方式,极大地提升了程序的表现。

实例化:

有两种格式,一种是supply开头的方法,一种是run开头的方法

  • supply开头:这种方法,可以返回异步线程执行之后的结果
  • run开头:这种不会返回结果,就只是执行线程任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
获取结果

同步获取结果

public T    get()
public T    get(long timeout, TimeUnit unit)
public T    getNow(T valueIfAbsent)
public T    join()

简单的例子

CompletableFuture<Integer> future = new CompletableFuture<>();
Integer integer = future.get();

get() 方法同样会阻塞直到任务完成,上面的代码,主线程会一直阻塞,因为这种方式创建的future从未完成。有兴趣的小伙伴可以打个断点看看,状态会一直是not completed

代码使用案例

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        没有返回值的异步回调, runAsync
        //CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {
        //    System.out.println(Thread.currentThread().getName() + "runAsync=> Void");
        //});
        //System.out.println("1111");
        获取执行结果
        //completableFuture.get();
     
     
        //    有返回值的
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "runAsync=>integer");
            int i = 10 / 0;
            return 1024;
        });
        completableFuture.whenComplete((t, u) -> {
            //t是正常的返回结果
            //u是返回报错信息
            System.out.println("t=>" + t);
            System.out.println("u=>" + u);
        }).exceptionally((e) -> {
            System.out.println(e.getMessage());
            return 233;
        }).get();
    }

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-12-02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ForkJion
    • 什么是ForkJoin
      • 代码实例
      • 异步回调
        • 什么是future
          • Future接口的局限性
            • 我们需要新的,更强大的拓展,CompletableFuture
              • 实例化:
              • 获取结果
            • 简单的例子
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档