前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊AsyncHttpClient的ListenableFuture

聊聊AsyncHttpClient的ListenableFuture

作者头像
code4it
发布2023-12-19 17:20:21
1420
发布2023-12-19 17:20:21
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下AsyncHttpClient的ListenableFuture

ListenableFuture

org/asynchttpclient/ListenableFuture.java

代码语言:javascript
复制
public interface ListenableFuture<V> extends Future<V> {

  /**
   * Terminate and if there is no exception, mark this Future as done and release the internal lock.
   */
  void done();

  /**
   * Abort the current processing, and propagate the {@link Throwable} to the {@link AsyncHandler} or {@link Future}
   *
   * @param t the exception
   */
  void abort(Throwable t);

  /**
   * Touch the current instance to prevent external service to times out.
   */
  void touch();

  /**
   * Adds a listener and executor to the ListenableFuture.
   * The listener will be {@linkplain java.util.concurrent.Executor#execute(Runnable) passed
   * to the executor} for execution when the {@code Future}'s computation is
   * {@linkplain Future#isDone() complete}.
   * <br>
   * Executor can be <code>null</code>, in that case executor will be executed
   * in the thread where completion happens.
   * <br>
   * There is no guaranteed ordering of execution of listeners, they may get
   * called in the order they were added and they may get called out of order,
   * but any listener added through this method is guaranteed to be called once
   * the computation is complete.
   *
   * @param listener the listener to run when the computation is complete.
   * @param exec     the executor to run the listener in.
   * @return this Future
   */
  ListenableFuture<V> addListener(Runnable listener, Executor exec);

  CompletableFuture<V> toCompletableFuture();

  //......
}  

ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法

CompletedFailure

org/asynchttpclient/ListenableFuture.java

代码语言:javascript
复制
  class CompletedFailure<T> implements ListenableFuture<T> {

    private final ExecutionException e;

    public CompletedFailure(Throwable t) {
      e = new ExecutionException(t);
    }

    public CompletedFailure(String message, Throwable t) {
      e = new ExecutionException(message, t);
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
      return true;
    }

    @Override
    public boolean isCancelled() {
      return false;
    }

    @Override
    public boolean isDone() {
      return true;
    }

    @Override
    public T get() throws ExecutionException {
      throw e;
    }

    @Override
    public T get(long timeout, TimeUnit unit) throws ExecutionException {
      throw e;
    }

    @Override
    public void done() {
    }

    @Override
    public void abort(Throwable t) {
    }

    @Override
    public void touch() {
    }

    @Override
    public ListenableFuture<T> addListener(Runnable listener, Executor exec) {
      if (exec != null) {
        exec.execute(listener);
      } else {
        listener.run();
      }
      return this;
    }

    @Override
    public CompletableFuture<T> toCompletableFuture() {
      CompletableFuture<T> future = new CompletableFuture<>();
      future.completeExceptionally(e);
      return future;
    }
  }

CompletedFailure实现了ListenableFuture接口,其cancel方法返回true、isDone返回true

NettyResponseFuture

org/asynchttpclient/netty/NettyResponseFuture.java

代码语言:javascript
复制
public final class NettyResponseFuture<V> implements ListenableFuture<V> {

  private static final Logger LOGGER = LoggerFactory.getLogger(NettyResponseFuture.class);

  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> REDIRECT_COUNT_UPDATER = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "redirectCount");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CURRENT_RETRY_UPDATER = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "currentRetry");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_DONE_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "isDone");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_CANCELLED_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "isCancelled");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_AUTH_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "inAuth");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_PROXY_AUTH_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "inProxyAuth");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CONTENT_PROCESSED_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "contentProcessed");
  @SuppressWarnings("rawtypes")
  private static final AtomicIntegerFieldUpdater<NettyResponseFuture> ON_THROWABLE_CALLED_FIELD = AtomicIntegerFieldUpdater
          .newUpdater(NettyResponseFuture.class, "onThrowableCalled");
  @SuppressWarnings("rawtypes")
  private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> TIMEOUTS_HOLDER_FIELD = AtomicReferenceFieldUpdater
          .newUpdater(NettyResponseFuture.class, TimeoutsHolder.class, "timeoutsHolder");
  @SuppressWarnings("rawtypes")
  private static final AtomicReferenceFieldUpdater<NettyResponseFuture, Object> PARTITION_KEY_LOCK_FIELD = AtomicReferenceFieldUpdater
          .newUpdater(NettyResponseFuture.class, Object.class, "partitionKeyLock");

  private final long start = unpreciseMillisTime();
  private final ChannelPoolPartitioning connectionPoolPartitioning;
  private final ConnectionSemaphore connectionSemaphore;
  private final ProxyServer proxyServer;
  private final int maxRetry;
  private final CompletableFuture<V> future = new CompletableFuture<>();          

          //......

  @Override
  public V get() throws InterruptedException, ExecutionException {
    return future.get();
  }

  @Override
  public V get(long l, TimeUnit tu) throws InterruptedException, TimeoutException, ExecutionException {
    return future.get(l, tu);
  }          
}          

NettyResponseFuture实现了ListenableFuture接口

done

代码语言:javascript
复制
  public final void done() {

    if (terminateAndExit())
      return;

    try {
      loadContent();
    } catch (ExecutionException ignored) {

    } catch (RuntimeException t) {
      future.completeExceptionally(t);
    } catch (Throwable t) {
      future.completeExceptionally(t);
      throw t;
    }
  }

  private boolean terminateAndExit() {
    releasePartitionKeyLock();
    cancelTimeouts();
    this.channel = null;
    this.reuseChannel = false;
    return IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0;
  }  

private void loadContent() throws ExecutionException {
    if (future.isDone()) {
      try {
        future.get();
      } catch (InterruptedException e) {
        throw new RuntimeException("unreachable", e);
      }
    }

    // No more retry
    CURRENT_RETRY_UPDATER.set(this, maxRetry);
    if (CONTENT_PROCESSED_FIELD.getAndSet(this, 1) == 0) {
      try {
        future.complete(asyncHandler.onCompleted());
      } catch (Throwable ex) {
        if (ON_THROWABLE_CALLED_FIELD.getAndSet(this, 1) == 0) {
          try {
            try {
              asyncHandler.onThrowable(ex);
            } catch (Throwable t) {
              LOGGER.debug("asyncHandler.onThrowable", t);
            }
          } finally {
            cancelTimeouts();
          }
        }
        future.completeExceptionally(ex);
      }
    }
    future.getNow(null);
  }  

done方法对于terminateAndExit返回true的直接返回,否则执行loadContent,它对于future.isDone()的执行future.get(),然后执行future.complete(asyncHandler.onCompleted())回调

abort

代码语言:javascript
复制
  public final void abort(final Throwable t) {

    if (terminateAndExit())
      return;

    future.completeExceptionally(t);

    if (ON_THROWABLE_CALLED_FIELD.compareAndSet(this, 0, 1)) {
      try {
        asyncHandler.onThrowable(t);
      } catch (Throwable te) {
        LOGGER.debug("asyncHandler.onThrowable", te);
      }
    }
  }

abort方法也是对于terminateAndExit返回true的直接返回,否则执行future.completeExceptionally(t),然后触发asyncHandler.onThrowable(t)回调

touch

代码语言:javascript
复制
  public void touch() {
    touch = unpreciseMillisTime();
  }

touch方法用当前时间戳更新touch属性

addListener

代码语言:javascript
复制
  public ListenableFuture<V> addListener(Runnable listener, Executor exec) {
    if (exec == null) {
      exec = Runnable::run;
    }
    future.whenCompleteAsync((r, v) -> listener.run(), exec);
    return this;
  }

addListener方法会执行future.whenCompleteAsync((r, v) -> listener.run(), exec)

toCompletableFuture

代码语言:javascript
复制
  public CompletableFuture<V> toCompletableFuture() {
    return future;
  }

toCompletableFuture方法直接返回future

newNettyResponseFuture

org/asynchttpclient/netty/request/NettyRequestSender.java

代码语言:javascript
复制
  private <T> NettyResponseFuture<T> newNettyResponseFuture(Request request,
                                                            AsyncHandler<T> asyncHandler,
                                                            NettyRequest nettyRequest,
                                                            ProxyServer proxyServer) {

    NettyResponseFuture<T> future = new NettyResponseFuture<>(
            request,
            asyncHandler,
            nettyRequest,
            config.getMaxRequestRetry(),
            request.getChannelPoolPartitioning(),
            connectionSemaphore,
            proxyServer);

    String expectHeader = request.getHeaders().get(EXPECT);
    if (HttpHeaderValues.CONTINUE.contentEqualsIgnoreCase(expectHeader))
      future.setDontWriteBodyBecauseExpectContinue(true);
    return future;
  }

  private <T> ListenableFuture<T> sendRequestWithCertainForceConnect(Request request,
                                                                     AsyncHandler<T> asyncHandler,
                                                                     NettyResponseFuture<T> future,
                                                                     ProxyServer proxyServer,
                                                                     boolean performConnectRequest) {

    NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer,
            performConnectRequest);

    Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler);

    return Channels.isChannelActive(channel)
            ? sendRequestWithOpenChannel(newFuture, asyncHandler, channel)
            : sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler);
  }  

NettyRequestSender的newNettyResponseFuture创建的是NettyResponseFuture;sendRequestWithCertainForceConnect则将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求

小结

AsyncHttpClient的ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法;它有两个实现类,分别是CompletedFailure及NettyResponseFuture;NettyRequestSender的sendRequest方法将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-12-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ListenableFuture
  • CompletedFailure
  • NettyResponseFuture
    • done
      • abort
        • touch
          • addListener
            • toCompletableFuture
            • newNettyResponseFuture
            • 小结
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档