专栏首页Linux内核及编程语言底层相关技术研究Java中的CompletableFuture究竟怎么用

Java中的CompletableFuture究竟怎么用

1. 对象的创建及完成。

static void complete() throws ExecutionException, InterruptedException {
  CompletableFuture<String> f = new CompletableFuture<>();
  new Thread() {
    @Override
    public void run() {
      // 该方法会将结果传给CompletableFuture,并将其设置为完成状态
      // 一般是异步调用
      f.complete("hello");
    }
  }.start();
  System.out.println(f.get()); // 输出 hello
}

2. 异步等待CompletableFuture的完成,并回调方法。

static void whenComplete() {
  CompletableFuture<String> f = new CompletableFuture<>();
  f.whenComplete(
      new BiConsumer<>() {
        @Override
        public void accept(String s, Throwable throwable) {
          if (throwable != null) {
            throwable.printStackTrace();
          } else {
            System.out.println(s);
          }
        }
      });
  // f.complete("hello"); // 正常完成
  f.completeExceptionally(new RuntimeException()); // 异常完成
}

3. 设置完成的超时时间。

static void timeout() throws InterruptedException {
  CompletableFuture<String> f = new CompletableFuture<>();
  f.whenComplete(
      new BiConsumer<>() {
        @Override
        public void accept(String s, Throwable throwable) {
          if (throwable != null) {
            throwable.printStackTrace();
          } else {
            System.out.println(s);
          }
        }
      });
  // f.orTimeout(1, TimeUnit.SECONDS); // 1秒内未完成就抛timeout异常给CompletableFuture
  f.completeOnTimeout(
      "timeout", 1, TimeUnit.SECONDS); // 一秒内未完成会把timeout字符串作为结果传给CompletableFuture

  Thread.sleep(2000); // 等待timeout的发生
}

4. 异步执行某任务,当任务完成时,将结果传给CompletableFuture。

static void supplyAsync() {
  ExecutorService exec = Executors.newSingleThreadExecutor();

  CompletableFuture<String> f =
      CompletableFuture.supplyAsync(
          new Supplier<>() {
            @Override
            public String get() {
              try {
                Thread.sleep(1000);
              } catch (InterruptedException e) {
                e.printStackTrace();
              }
              return "hello";
            }
          },
          exec);

  f.whenComplete(
      new BiConsumer<>() {
        @Override
        public void accept(String s, Throwable throwable) {
          System.out.println(s); // 输出hello
        }
      });

  exec.shutdown();
}

5. 等待其他的所有CompletableFuture完成。

static void allOf() {
  CompletableFuture<Integer> f1 = new CompletableFuture<>();
  CompletableFuture<Integer> f2 = new CompletableFuture<>();

  CompletableFuture<Void> f = CompletableFuture.allOf(f1, f2);
  f.whenComplete(
      new BiConsumer<>() {
        @Override
        public void accept(Void aVoid, Throwable throwable) {
          System.out.println(f1.getNow(0) + f2.getNow(0)); // 输出3
        }
      });

  f1.complete(1);
  f2.complete(2);
}

6. 将异常结果转成正常结果。

static void exceptionally() {
  CompletableFuture<String> f1 = new CompletableFuture<>();

  // 如果f1是正常结果,则原结果会传给f2
  // 如果f1是异常结果,就会调用下面的方法转成正常结果,然后再传给f2
  CompletableFuture<String> f2 =
      f1.exceptionally(
          new Function<>() {
            @Override
            public String apply(Throwable throwable) {
              return "exception";
            }
          });

  // thenAccept方法传入的函数只有在f2是正常结果时才会被调用
  f2.thenAccept(
      new Consumer<>() {
        @Override
        public void accept(String s) {
          System.out.println(s); // 输出exception
        }
      });

  f1.completeExceptionally(new RuntimeException());
}

7. 对结果做类型转换。

static void handle() {
  CompletableFuture<String> f = new CompletableFuture<>();

  // 当f完成后会执行handle传入的方法
  f.handle(
          new BiFunction<String, Throwable, Integer>() {
            @Override
            public Integer apply(String s, Throwable throwable) {
              if (throwable != null) {
                return -1;
              }
              return Integer.valueOf(s);
            }
          })
      .thenAccept(
          new Consumer<>() {
            @Override
            public void accept(Integer integer) {
              System.out.println(integer);
            }
          });

  // f.complete("1"); // 输出1
  f.completeExceptionally(new RuntimeException()); // 输出-1
}

8. 写个尽量完整的例子,看下各个方法是如何结合在一起使用的。

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class DataLoader {
  private static final AtomicInteger idGen = new AtomicInteger();
  private final ConcurrentMap<Integer, CompletableFuture<byte[]>> futureMap;
  private RemoteDataLoader loader;

  public DataLoader() {
    this.futureMap = new ConcurrentHashMap<>();
  }

  public void load(int dataID, Consumer<byte[]> dataConsumer, Executor executor) {
    int futureID = idGen.incrementAndGet();
    CompletableFuture<byte[]> f = new CompletableFuture<>();

    // 当future完成后,做些清理工作,然后将数据传给用户
    f.whenCompleteAsync(
        new BiConsumer<>() {
          @Override
          public void accept(byte[] data, Throwable throwable) {
            futureMap.remove(futureID);
            dataConsumer.accept(data);
          }
        },
        executor);

    // 3秒内没结果就返回null
    f.completeOnTimeout("null".getBytes(), 3, TimeUnit.SECONDS);

    // 将future放入map中
    futureMap.put(futureID, f);

    // 通知remote加载数据并将结果以回调remoteDataLoaded方法的形式返回
    loader.load(dataID, futureID);
  }

  public void remoteDataLoaded(int futureID, byte[] data) {
    CompletableFuture<byte[]> f = futureMap.get(futureID);
    if (f != null) {
      f.complete(data);
    }
  }

  public interface RemoteDataLoader {
    void load(int dataID, int futureID);
  }

  public static void main(String[] args) throws InterruptedException {
    DataLoader loader = new DataLoader();
    loader.loader =
        new RemoteDataLoader() {
          @Override
          public void load(int dataID, int futureID) {
            if (dataID > 0) {
              loader.remoteDataLoaded(futureID, "hello".getBytes());
            }
            // 如果dataID非法,则不返回数据,DataLoader里就会报timeout异常
          }
        };

    // 所有返回的数据都用该Executor执行输出操作
    ExecutorService exec = Executors.newCachedThreadPool();

    // 正常数据加载,输出hello
    loader.load(
        1,
        new Consumer<>() {
          @Override
          public void accept(byte[] data) {
            System.out.println(new String(data));
          }
        },
        exec);

    // 异常数据加载,发生timeout,输出null
    loader.load(
        0,
        new Consumer<>() {
          @Override
          public void accept(byte[] data) {
            System.out.println(new String(data));
          }
        },
        exec);

    // 等待timeout发生
    Thread.sleep(4000);

    // 关闭ExecutorService
    exec.shutdown();
  }
}

例子中的逻辑不是非常完善,但基本上可以展示CompletableFuture在项目中如何使用,当然,CompletableFuture还有更加复杂和强大的用法,这里就不一一介绍了,感兴趣的朋友可以点击阅读原文,查看其完整的api。

希望对你有所帮助。

完。

本文分享自微信公众号 - Linux内核及JVM底层相关技术研究(ytcode),作者:wangyuntao

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-09-21

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Linux根目录的文件系统是如何被挂载的 . 续

    该方法中的saved_root_name变量的值是在kernel启动时,由传给kernel的root参数决定的,对应的设置方法如下:

    wangyuntao
  • Linux tcp/ip 源码分析 - socket

    Linux下的tcp编程中,第一步就是要创建socket,本文将从源码角度看下socket是如何被创建的。

    wangyuntao
  • 系统调用mmap的内核实现分析

    执行该程序,输出mmap方法返回的内存地址,同时使用pmap命令输出该程序执行mmap之前以及之后的内存使用情况。

    wangyuntao
  • 使用JXL组件导入Excel文件数据到数据库

    三、上传(本人使用Struts2+iBatis+Spring框架,上传部分自然也是Struts2方式上传)

    Dunizb
  • Java IO

    java中涉及到的io流基本都是从以上四个抽象基类派生出来的,其子类都是以其父类的名字做后缀。

    万能青年
  • win10 uwp 气泡 WPF 气泡

    假设尖头宽度 10 高度 5 ,那么可以看到第一个点是 (0,5) 第二个点是 (5,0) 第三个点是 (10,5)

    林德熙
  • Java简单实现UDP和TCP

    TCP实现 TCP协议需要在双方之间建立连接,通过输入输出流来进行数据的交换,建立需要通过三次握手,断开需要四次挥手,保证了数据的完整性,但传输效率也会相应的降...

    李家酒馆酒保
  • Arcgis for JS之Cluster聚类分析的实现(基于区域范围的)

    咱们书接上文,在上文,实现了基于距离的空间聚类的算法实现,在本文,将继续介绍空间聚类之基于区域范围的实现方式,好了,闲言少叙,先看看具体的效果:

    lzugis
  • docker运行storm及wordcount实例

    本文简单介绍下怎么使用docker运行storm以及在springboot中使用storm。

    codecraft
  • Kafka 0.8.2.2 Producer报错:java.net.ConnectException: Connection timed out: no further information

    CoderJed

扫码关注云+社区

领取腾讯云代金券