异步任务执行的设计模式

参考:java的设计模式

异步执行方法回调的设计模式:异步方法调用是在等待任务结果时不阻塞调用线程的模式。该模式提供了多个独立的任务并行处理和取得任务结果或者等待所有任务结束。

  • 总览图如下

image.png

  • 下面为代码示例,首先是执行器接口
/**
 * Copyright: Copyright (c) 2017 LanRu-Caifu
 * @author xzg
 * 2017年9月8日
 * @ClassName: AsyncExecutor.java
 * @Description: 执行器executor的三个关联的对象,1:传入的参数线程task,2:传入的保存结果状态的callback
 * 3:返回值result。它也是整个模式的核心部分
 * @version: v1.0.0
 */
public interface AsyncExecutor {

//   开始执行任务,未持有callback则说明客户端不需要对返回结果做额外判断。返回异步结果
      <T> AsyncResult<T> startProcess(Callable<T> task);

//       开始执行任务,持有callback则说明客户端自定义实现额外判断。返回异步结果
      <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback);

//    结束异步任务,如果必要时阻塞当前的线程并返回结果结束任务
      <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException;
}
  • 异步执行返回结果接口
/**
 * Copyright: Copyright (c) 2017 LanRu-Caifu
 * @author xzg
 * 2017年9月8日
 * @ClassName: AsyncResult.java
 * @Description: executor执行器执行的返回结果。它应该提供执行状态、任务返回值、结果挂起
 * @version: v1.0.0
 */
public interface AsyncResult<T> {

//  线程任务是否完成
    boolean isCompleted();
//  获取任务的返回值
    T getValue() throws ExecutionException;

//    阻塞当前线程,直到异步任务完成,如果执行中断,抛出异常
    void await() throws InterruptedException;
}
  • 保存执行器executor执行结果(task任务状态,返回值),客户端可以进行自定义处理
/**
 * Copyright: Copyright (c) 2017 LanRu-Caifu
 * @author xzg
 * 2017年9月8日
 * @ClassName: AsynCallback.java
 * @Description: 保存执行器executor执行结果(task任务状态,返回值),可以由客户端进行自定义处理
 * @version: v1.0.0
 */
public interface AsynCallback<T> {

    //客户端实现,对executor执行结果后做自定义处理
    void onComplete(T val,Optional<Exception> ex);
}
  • 执行器的具体实现
/**
 * Copyright: Copyright (c) 2017 LanRu-Caifu
 * @author xzg
 * 2017年9月8日
 * @ClassName: ThreadAsyncExecutor.java
 * @Description: 
 * @version: v1.0.0
 */
public class ThreadAsyncExecutor implements AsyncExecutor {

//      为区别线程,为每个线程命名
      private final AtomicInteger idx = new AtomicInteger(0);

      @Override
      public <T> AsyncResult<T> startProcess(Callable<T> task) {
        return startProcess(task, null);
      }

      @Override
      public <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback) {
//      CompletableResult作为executor的返回结果,它会对callback传递参数让callback自行处理
        CompletableResult<T> result = new CompletableResult<>(callback);
//      启动一个线程去处理任务线程,并将任务线程的返回结果设置到result中
        new Thread(() -> {
          try {
            result.setValue(task.call());
          } catch (Exception ex) {
            result.setException(ex);
          }
        } , "executor-" + idx.incrementAndGet()).start();
        return result;
      }
//      结束任务,如果当前任务没有完成则让出cpu让其他任务使用。如果执行结束返回结果
      @Override
      public <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException {
        if (!asyncResult.isCompleted()) {
          asyncResult.await();
        }
        return asyncResult.getValue();
      }

      /**
       * Simple implementation of async result that allows completing it successfully with a value or exceptionally with an
       * exception. A really simplified version from its real life cousins FutureTask and CompletableFuture.
       *
       * @see java.util.concurrent.FutureTask
       * @see java.util.concurrent.CompletableFuture
       */
//   执行器executor的三个关联的对象,1:传入的参数线程task,2:传入的保存结果状态的callback,3:返回值result
//    异步执行的结果封装,持有callback对象(该对象可由客户端重写),这里是将执行的结果保存到callback中的value|exception
      private static class CompletableResult<T> implements AsyncResult<T> {
//      几种执行的状态
        static final int RUNNING = 1;
        static final int FAILED = 2;
        static final int COMPLETED = 3;
//      对象锁
        final Object lock;
//      Optional封装callback
        final Optional<AsyncCallback<T>> callback;
//      初始状态
        volatile int state = RUNNING;
//             执行结果
        T value;
//             执行异常情况
        Exception exception;

        CompletableResult(AsyncCallback<T> callback) {
          this.lock = new Object();
          this.callback = Optional.ofNullable(callback);
        }

        /**
         * Sets the value from successful execution and executes callback if available. Notifies any thread waiting for
         * completion.
         * 封装任务的返回结果
         * @param value
         *          value of the evaluated task
         */
        void setValue(T value) {
          this.value = value;
          this.state = COMPLETED;
          this.callback.ifPresent(ac -> ac.onComplete(value, Optional.<Exception>empty()));
          synchronized (lock) {
            lock.notifyAll();
          }
        }

        /**
         * Sets the exception from failed execution and executes callback if available. Notifies any thread waiting for
         * completion.
         * 设置异常
         * @param exception
         *          exception of the failed task
         */
        void setException(Exception exception) {
          this.exception = exception;
          this.state = FAILED;
          this.callback.ifPresent(ac -> ac.onComplete(null, Optional.of(exception)));
          synchronized (lock) {
            lock.notifyAll();
          }
        }
//      是否运行状态
        @Override
        public boolean isCompleted() {
          return state > RUNNING;
        }
//      取得任务结果
        @Override
        public T getValue() throws ExecutionException {
          if (state == COMPLETED) {
            return value;
          } else if (state == FAILED) {
            throw new ExecutionException(exception);
          } else {
            throw new IllegalStateException("Execution not completed yet");
          }
        }
//      未完成时不参与竞争
        @Override
        public void await() throws InterruptedException {
          synchronized (lock) {
            while (!isCompleted()) {
              lock.wait();
            }
          }
        }
      }
    }
  • 测试部分
public class App {
      public static void main(String[] args) throws Exception {
        // 新建一个executor执行器
        AsyncExecutor executor = new ThreadAsyncExecutor();

        // 开始执行一些任务
        AsyncResult<Integer> asyncResult1 = executor.startProcess(lazyval(10, 500));
        AsyncResult<String> asyncResult2 = executor.startProcess(lazyval("test", 300));
        AsyncResult<Long> asyncResult3 = executor.startProcess(lazyval(50L, 700));
        AsyncResult<Integer> asyncResult4 = executor.startProcess(lazyval(20, 400), callback("Callback result 4"));
        AsyncResult<String> asyncResult5 = executor.startProcess(lazyval("callback", 600), callback("Callback result 5"));

        // emulate processing in the current thread while async tasks are running in their own threads
        Thread.sleep(350); // Oh boy I'm working hard here
        log("Some hard work done");

        // wait for completion of the tasks
        Integer result1 = executor.endProcess(asyncResult1);
        String result2 = executor.endProcess(asyncResult2);
        Long result3 = executor.endProcess(asyncResult3);
//      下面的执行结果挂起
        asyncResult4.await();
        asyncResult5.await();

        // 打印线程结果
        log("Result 1: " + result1);
        log("Result 2: " + result2);
        log("Result 3: " + result3);
      }

      /**
       * Creates a callable that lazily evaluates to given value with artificial delay.
       * 创建一个任务
       * @param value
       *          value to evaluate
       * @param delayMillis
       *          artificial delay in milliseconds
       * @return new callable for lazy evaluation
       */
      private static <T> Callable<T> lazyval(T value, long delayMillis) {
        return () -> {
          Thread.sleep(delayMillis);
          log("Task completed with: " + value);
          return value;
        };
      }

      /**
       * 客户端自定义callback
       */
      private static <T> AsyncCallback<T> callback(String name) {
//        返回一个callback重写 void onComplete(T value, Optional<Exception> ex)的实现类对象
        return (value, ex) -> {
          if (ex.isPresent()) {
            log(name + " failed: " + ex.map(Exception::getMessage).orElse(""));
          } else {
            log(name + ": " + value);
          }
        };
      }
//      日志方法
      private static void log(String msg) {
        System.out.println(msg);
      }
    }
Task completed with: test
Some hard work done
Task completed with: 20
Callback result 4: 20
Task completed with: 10
Task completed with: callback
Callback result 5: callback
Task completed with: 50
Result 1: 10
Result 2: test
Result 3: 50

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏IT可乐

mybatis源码解读(二)——构建Configuration对象

1962
来自专栏逍遥剑客的游戏开发

MPQ文件系统优化(续)

1725
来自专栏菩提树下的杨过

struts2: 玩转 rest-plugin

近期使用struts2的rest-plugin,参考官方示例struts2-rest-showcase,做了一个restful service小项目,但官网提供...

3155
来自专栏androidBlog

java 解决文件名重复问题的两种方法

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/gdutxiaoxu/article/de...

3471
来自专栏Spark生态圈

[Spark SQL] 源码解析之Analyzer

Analyzer模块将Unresolved LogicalPlan结合元数据catalog进行绑定,最终转化为Resolved LogicalPlan。跟着代码...

1272
来自专栏程序猿DD

Spring框架中的设计模式(二)

在 上一篇 中我们在Spring中所谈到的设计模式涉及到了创建模式三剑客和1个行为模式(解释器模式)。这次我们会将眼光更多地关注在具有结构性和行为性的设计模式上...

4118
来自专栏Spark生态圈

[spark] Task执行流程

在文章TaskScheduler 任务提交与调度源码解析 中介绍了Task在executor上的逻辑分配,调用TaskSchedulerImpl的resourc...

1791
来自专栏服务端技术杂谈

dubbo源码学习笔记----monitor

核心类 public abstract class AbstractMonitorFactory implements MonitorFactory { ...

3418
来自专栏Java帮帮-微信公众号-技术文章全总结

Java类加载器(用户自定义类加载器实现)

java类加载器主要分为如下几种: jvm提供的类加载器 根类加载器:底层实现,主要加载java核心类库(如:java.lang.*) 扩展类加载器:使用jav...

3586
来自专栏wannshan(javaer,RPC)

dubbo通信消息解析过程分析(1)

由于rpc底层涉及网络编程接口,线程模型,网络数据结构,服务协议,细到字节的处理。牵涉内容较多,今天就先从一个点说起。 说说,dubbo通过netty框架做传...

5036

扫码关注云+社区

领取腾讯云代金券