前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >CompletableFuture、parallelStream学习测试对比

CompletableFuture、parallelStream学习测试对比

作者头像
发布2019-01-07 12:56:43
1.2K0
发布2019-01-07 12:56:43
举报
文章被收录于专栏:WD学习记录

参考链接:https://www.cnblogs.com/taiyonghai/p/9397394.html

代码语言:javascript
复制
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

public class Main {

  public static void main(String[] args) {

    System.out.println("Hello World!");
    testAllOfAnyOf();
    testParallelStream();
    testOneFunAsync();
    testOneFunAsyncWithThreadPool();
  }

  public static void printlnConsole(String msg){
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    String time=sdf.format(new Date());
    System.out.println(String.format("[%s]%s", time, msg));
  }

  /**
   * 多任务单次异步执行
   */
  public static void testMyFun(){
    long start=System.currentTimeMillis();
    try{
      int id=1;
      printlnConsole("调度异步任务。。。。。");
      CompletableFuture futureClassCount=CompletableFuture.supplyAsync(()->getClassCount(id));
      CompletableFuture futureStudentCount=CompletableFuture.supplyAsync(()->getStudentCount(id));

      printlnConsole("获取异步过程。。。。");
      Object classCount=futureClassCount.get();
      Object studentCount=futureStudentCount.get();
      printlnConsole("异步任务结果获取完成");
      printlnConsole("classCount:"+classCount);
      printlnConsole("studentCount:"+studentCount);
    }catch (Exception e){
      e.printStackTrace();
    }
    long end=System.currentTimeMillis();
    printlnConsole("系统运行时间为:"+(end-start)/1000);
  }

  /**
   * 并行指定等待全部结果或等待任意结果
   */
  public static void testAllOfAnyOf(){
    long start=System.currentTimeMillis();
    try{
      List<Integer> ids = Arrays.asList(1, 2,3,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,8,9,0,3, 5);//准备的请求参数
      printlnConsole("调度异步任务。。。。。testAllOfAnyOf*******************************8");
      //创建异步方法数组
      CompletableFuture[] futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getClassName(id))).toArray(size -> new CompletableFuture[size]);
      //指定该异步方法数组的子任务线程等待类型
      //CompletableFuture.anyOf(futures).join();//anyOf()为任意一个子任务线程执行完毕后返回
      CompletableFuture.allOf(futures).join();//allOf()为等待所有子任务线程全部执行完毕后返回


      printlnConsole("获取异步任务结果:");
      for (CompletableFuture f : futures) {
        //Object obj = f.getNow(1);//getNow()表示我需要立即拿到结果,如果当前的线程并未执行完成,则使用我传入的值进行任务调用,参数为无法获取结果时使用我传入的值
        Object obj = f.get();//get()获取子线程运算的结果,会抛出检查到的异常
        //Object obj = f.join();//join()获取子线程运算的结果,不会抛出异常
        printlnConsole(String.valueOf(obj));
      }
    }catch (Exception e){
      e.printStackTrace();
    }
    long end=System.currentTimeMillis();
    printlnConsole("系统运行时间为:"+(end-start)/1000);
  }

  public static String getClassName(int id) {
    try {
      Thread.sleep(id * 1000);
      printlnConsole("getClassName(" + id + ")执行完毕");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return "taiyonghai-" + id;
  }

  public static void testParallelStream(){
    long start=System.currentTimeMillis();
    try{
      printlnConsole("调用异步任务...****************testParallelStream*************");
      List<Integer> ids = Arrays.asList(1, 2,3,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,8,9,0,3, 5);//准备的请求参数
      //串行执行会等待每一个方法执行完毕后在继续执行下一个
      //List<String> names = ids.stream().map(id -> getStudentName(id)).collect(Collectors.toList());
      //并行执行会同时调用多个方法待全部执行完毕后一起返回(parallelStream是非线程安全的,配合collect达到线程安全,后续验证一下)
      List<String> names = ids.parallelStream().map(id -> getClassName(id)).collect(Collectors.toList());

      printlnConsole("获取异步任务结果:");
      names.forEach(item -> printlnConsole(item));

    }catch (Exception e){
      e.printStackTrace();
    }
    long end=System.currentTimeMillis();
    printlnConsole("系统运行时间为:"+(end-start)/1000);
  }

  /**
   * 单任务多次异步执行
   */
  public static void testOneFunAsync(){
    long start=System.currentTimeMillis();
    try{
      printlnConsole("调用异步任务...****************testOneFunAsync*************");
      List<Integer> ids = Arrays.asList(1, 2,3,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,8,9,0,3, 5);//准备的请求参数
      List<CompletableFuture<String>> futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getClassName(id))).collect(Collectors.toList());

      //不用并行流parallelStream()调用时就不会阻断线程执行

      printlnConsole("获取异步任务结果:");
      futures.forEach(f -> {
        try {
          Object obj = f.get();
          printlnConsole(String.valueOf(obj));
        } catch (InterruptedException e) {
          e.printStackTrace();
        } catch (ExecutionException e) {
          e.printStackTrace();
        }
      });

    }catch (Exception e){
      e.printStackTrace();
    }
    long end=System.currentTimeMillis();
    printlnConsole("系统运行时间为:"+(end-start)/1000);
  }

  /**
   * 手动配置线程执行器的线程池大小
   */
  private final static Executor myExecutor = Executors.newFixedThreadPool(20, new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
      Thread t = new Thread(r);
      //使用守护线程保证不会阻止程序的关停
      t.setDaemon(true);
      return t;
    }
  });

  /**
   *
   */
  public static void testOneFunAsyncWithThreadPool(){
    long start=System.currentTimeMillis();
    try{
      printlnConsole("调用异步任务...****************testOneFunAsyncWithThreadPool带有线程池的*************");
      List<Integer> ids = Arrays.asList(1, 2,3,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,8,9,0,3, 5);//准备的请求参数
      List<CompletableFuture<String>> futures = ids.stream().map(id -> CompletableFuture.supplyAsync(() -> getClassName(id), myExecutor)).collect(Collectors.toList());

      //不用并行流parallelStream()调用时就不会阻断线程执行

      printlnConsole("获取异步任务结果:");
      futures.forEach(f -> {
        try {
          Object obj = f.get();
          printlnConsole(String.valueOf(obj));
        } catch (InterruptedException e) {
          e.printStackTrace();
        } catch (ExecutionException e) {
          e.printStackTrace();
        }
      });

    }catch (Exception e){
      e.printStackTrace();
    }
    long end=System.currentTimeMillis();
    printlnConsole("系统运行时间为:"+(end-start)/1000);
  }
  public static int getClassCount(int id) {

    try {
      Thread.sleep(2000);
      printlnConsole("getClassCount(" + id + ")执行完毕");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return 20;
  }

  public static int getStudentCount(int id) {
    try {
      Thread.sleep(1000);
      printlnConsole("getStudentCount(" + id + ")执行完毕");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return 100;
  }
}

结果:

调度异步任务。。。。。testAllOfAnyOf*******************************

系统运行时间为:38

调用异步任务...****************testParallelStream*************

系统运行时间为:29

调用异步任务...****************testOneFunAsync*************

系统运行时间为:38

调用异步任务...****************testOneFunAsyncWithThreadPool带有线程池的*************

系统运行时间为:9

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018年12月28日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档