前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >异步任务执行的设计模式

异步任务执行的设计模式

作者头像
用户1418372
发布2018-09-13 10:23:32
1.5K0
发布2018-09-13 10:23:32
举报
文章被收录于专栏:清晨我上码清晨我上码

参考:java的设计模式

异步执行方法回调的设计模式:异步方法调用是在等待任务结果时不阻塞调用线程的模式。该模式提供了多个独立的任务并行处理和取得任务结果或者等待所有任务结束。

  • 总览图如下

image.png

  • 下面为代码示例,首先是执行器接口
代码语言:javascript
复制
/**
 * Copyright: Copyright (c) 2017 LanRu-Caifu
 * @author xzg
 * 2017年9月8日
 * @ClassName: AsyncExecutor.java
 * @Description: 执行器executor的三个关联的对象,1:传入的参数线程task,2:传入的保存结果状态的callback
 * 3:返回值result。它也是整个模式的核心部分
 * @version: v1.0.0
 */
public interface AsyncExecutor {

//   开始执行任务,未持有callback则说明客户端不需要对返回结果做额外判断。返回异步结果
      <T> AsyncResult<T> startProcess(Callable<T> task);

//       开始执行任务,持有callback则说明客户端自定义实现额外判断。返回异步结果
      <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback);

//    结束异步任务,如果必要时阻塞当前的线程并返回结果结束任务
      <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException;
}
  • 异步执行返回结果接口
代码语言:javascript
复制
/**
 * Copyright: Copyright (c) 2017 LanRu-Caifu
 * @author xzg
 * 2017年9月8日
 * @ClassName: AsyncResult.java
 * @Description: executor执行器执行的返回结果。它应该提供执行状态、任务返回值、结果挂起
 * @version: v1.0.0
 */
public interface AsyncResult<T> {

//  线程任务是否完成
    boolean isCompleted();
//  获取任务的返回值
    T getValue() throws ExecutionException;

//    阻塞当前线程,直到异步任务完成,如果执行中断,抛出异常
    void await() throws InterruptedException;
}
  • 保存执行器executor执行结果(task任务状态,返回值),客户端可以进行自定义处理
代码语言:javascript
复制
/**
 * Copyright: Copyright (c) 2017 LanRu-Caifu
 * @author xzg
 * 2017年9月8日
 * @ClassName: AsynCallback.java
 * @Description: 保存执行器executor执行结果(task任务状态,返回值),可以由客户端进行自定义处理
 * @version: v1.0.0
 */
public interface AsynCallback<T> {

    //客户端实现,对executor执行结果后做自定义处理
    void onComplete(T val,Optional<Exception> ex);
}
  • 执行器的具体实现
代码语言:javascript
复制
/**
 * Copyright: Copyright (c) 2017 LanRu-Caifu
 * @author xzg
 * 2017年9月8日
 * @ClassName: ThreadAsyncExecutor.java
 * @Description: 
 * @version: v1.0.0
 */
public class ThreadAsyncExecutor implements AsyncExecutor {

//      为区别线程,为每个线程命名
      private final AtomicInteger idx = new AtomicInteger(0);

      @Override
      public <T> AsyncResult<T> startProcess(Callable<T> task) {
        return startProcess(task, null);
      }

      @Override
      public <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback) {
//      CompletableResult作为executor的返回结果,它会对callback传递参数让callback自行处理
        CompletableResult<T> result = new CompletableResult<>(callback);
//      启动一个线程去处理任务线程,并将任务线程的返回结果设置到result中
        new Thread(() -> {
          try {
            result.setValue(task.call());
          } catch (Exception ex) {
            result.setException(ex);
          }
        } , "executor-" + idx.incrementAndGet()).start();
        return result;
      }
//      结束任务,如果当前任务没有完成则让出cpu让其他任务使用。如果执行结束返回结果
      @Override
      public <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException {
        if (!asyncResult.isCompleted()) {
          asyncResult.await();
        }
        return asyncResult.getValue();
      }

      /**
       * Simple implementation of async result that allows completing it successfully with a value or exceptionally with an
       * exception. A really simplified version from its real life cousins FutureTask and CompletableFuture.
       *
       * @see java.util.concurrent.FutureTask
       * @see java.util.concurrent.CompletableFuture
       */
//   执行器executor的三个关联的对象,1:传入的参数线程task,2:传入的保存结果状态的callback,3:返回值result
//    异步执行的结果封装,持有callback对象(该对象可由客户端重写),这里是将执行的结果保存到callback中的value|exception
      private static class CompletableResult<T> implements AsyncResult<T> {
//      几种执行的状态
        static final int RUNNING = 1;
        static final int FAILED = 2;
        static final int COMPLETED = 3;
//      对象锁
        final Object lock;
//      Optional封装callback
        final Optional<AsyncCallback<T>> callback;
//      初始状态
        volatile int state = RUNNING;
//             执行结果
        T value;
//             执行异常情况
        Exception exception;

        CompletableResult(AsyncCallback<T> callback) {
          this.lock = new Object();
          this.callback = Optional.ofNullable(callback);
        }

        /**
         * Sets the value from successful execution and executes callback if available. Notifies any thread waiting for
         * completion.
         * 封装任务的返回结果
         * @param value
         *          value of the evaluated task
         */
        void setValue(T value) {
          this.value = value;
          this.state = COMPLETED;
          this.callback.ifPresent(ac -> ac.onComplete(value, Optional.<Exception>empty()));
          synchronized (lock) {
            lock.notifyAll();
          }
        }

        /**
         * Sets the exception from failed execution and executes callback if available. Notifies any thread waiting for
         * completion.
         * 设置异常
         * @param exception
         *          exception of the failed task
         */
        void setException(Exception exception) {
          this.exception = exception;
          this.state = FAILED;
          this.callback.ifPresent(ac -> ac.onComplete(null, Optional.of(exception)));
          synchronized (lock) {
            lock.notifyAll();
          }
        }
//      是否运行状态
        @Override
        public boolean isCompleted() {
          return state > RUNNING;
        }
//      取得任务结果
        @Override
        public T getValue() throws ExecutionException {
          if (state == COMPLETED) {
            return value;
          } else if (state == FAILED) {
            throw new ExecutionException(exception);
          } else {
            throw new IllegalStateException("Execution not completed yet");
          }
        }
//      未完成时不参与竞争
        @Override
        public void await() throws InterruptedException {
          synchronized (lock) {
            while (!isCompleted()) {
              lock.wait();
            }
          }
        }
      }
    }
  • 测试部分
代码语言:javascript
复制
public class App {
      public static void main(String[] args) throws Exception {
        // 新建一个executor执行器
        AsyncExecutor executor = new ThreadAsyncExecutor();

        // 开始执行一些任务
        AsyncResult<Integer> asyncResult1 = executor.startProcess(lazyval(10, 500));
        AsyncResult<String> asyncResult2 = executor.startProcess(lazyval("test", 300));
        AsyncResult<Long> asyncResult3 = executor.startProcess(lazyval(50L, 700));
        AsyncResult<Integer> asyncResult4 = executor.startProcess(lazyval(20, 400), callback("Callback result 4"));
        AsyncResult<String> asyncResult5 = executor.startProcess(lazyval("callback", 600), callback("Callback result 5"));

        // emulate processing in the current thread while async tasks are running in their own threads
        Thread.sleep(350); // Oh boy I'm working hard here
        log("Some hard work done");

        // wait for completion of the tasks
        Integer result1 = executor.endProcess(asyncResult1);
        String result2 = executor.endProcess(asyncResult2);
        Long result3 = executor.endProcess(asyncResult3);
//      下面的执行结果挂起
        asyncResult4.await();
        asyncResult5.await();

        // 打印线程结果
        log("Result 1: " + result1);
        log("Result 2: " + result2);
        log("Result 3: " + result3);
      }

      /**
       * Creates a callable that lazily evaluates to given value with artificial delay.
       * 创建一个任务
       * @param value
       *          value to evaluate
       * @param delayMillis
       *          artificial delay in milliseconds
       * @return new callable for lazy evaluation
       */
      private static <T> Callable<T> lazyval(T value, long delayMillis) {
        return () -> {
          Thread.sleep(delayMillis);
          log("Task completed with: " + value);
          return value;
        };
      }

      /**
       * 客户端自定义callback
       */
      private static <T> AsyncCallback<T> callback(String name) {
//        返回一个callback重写 void onComplete(T value, Optional<Exception> ex)的实现类对象
        return (value, ex) -> {
          if (ex.isPresent()) {
            log(name + " failed: " + ex.map(Exception::getMessage).orElse(""));
          } else {
            log(name + ": " + value);
          }
        };
      }
//      日志方法
      private static void log(String msg) {
        System.out.println(msg);
      }
    }
代码语言:javascript
复制
Task completed with: test
Some hard work done
Task completed with: 20
Callback result 4: 20
Task completed with: 10
Task completed with: callback
Callback result 5: callback
Task completed with: 50
Result 1: 10
Result 2: test
Result 3: 50
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.09.08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 异步执行方法回调的设计模式:异步方法调用是在等待任务结果时不阻塞调用线程的模式。该模式提供了多个独立的任务并行处理和取得任务结果或者等待所有任务结束。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档