前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊AsyncHttpClient的TimeoutTimerTask

聊聊AsyncHttpClient的TimeoutTimerTask

原创
作者头像
code4it
发布2023-12-14 09:13:45
850
发布2023-12-14 09:13:45
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下AsyncHttpClient的TimeoutTimerTask

TimerTask

io/netty/util/TimerTask.java

代码语言:javascript
复制
/**
 * A task which is executed after the delay specified with
 * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
 */
public interface TimerTask {

    /**
     * Executed after the delay specified with
     * {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
     *
     * @param timeout a handle which is associated with this task
     */
    void run(Timeout timeout) throws Exception;
}

netty的TimerTask接口定义了run方法,其入参为Timeout

Timeout

io/netty/util/Timeout.java

代码语言:javascript
复制
public interface Timeout {

    /**
     * Returns the {@link Timer} that created this handle.
     */
    Timer timer();

    /**
     * Returns the {@link TimerTask} which is associated with this handle.
     */
    TimerTask task();

    /**
     * Returns {@code true} if and only if the {@link TimerTask} associated
     * with this handle has been expired.
     */
    boolean isExpired();

    /**
     * Returns {@code true} if and only if the {@link TimerTask} associated
     * with this handle has been cancelled.
     */
    boolean isCancelled();

    /**
     * Attempts to cancel the {@link TimerTask} associated with this handle.
     * If the task has been executed or cancelled already, it will return with
     * no side effect.
     *
     * @return True if the cancellation completed successfully, otherwise false
     */
    boolean cancel();
}

Timeout接口定义了timer()、task()、isExpired()、isCancelled()、cancel()方法

TimeoutTimerTask

org/asynchttpclient/netty/timeout/TimeoutTimerTask.java

代码语言:javascript
复制
public abstract class TimeoutTimerTask implements TimerTask {

  private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutTimerTask.class);

  protected final AtomicBoolean done = new AtomicBoolean();
  protected final NettyRequestSender requestSender;
  final TimeoutsHolder timeoutsHolder;
  volatile NettyResponseFuture<?> nettyResponseFuture;

  TimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, TimeoutsHolder timeoutsHolder) {
    this.nettyResponseFuture = nettyResponseFuture;
    this.requestSender = requestSender;
    this.timeoutsHolder = timeoutsHolder;
  }

  void expire(String message, long time) {
    LOGGER.debug("{} for {} after {} ms", message, nettyResponseFuture, time);
    requestSender.abort(nettyResponseFuture.channel(), nettyResponseFuture, new TimeoutException(message));
  }

  /**
   * When the timeout is cancelled, it could still be referenced for quite some time in the Timer. Holding a reference to the future might mean holding a reference to the
   * channel, and heavy objects such as SslEngines
   */
  public void clean() {
    if (done.compareAndSet(false, true)) {
      nettyResponseFuture = null;
    }
  }

  void appendRemoteAddress(StringBuilder sb) {
    InetSocketAddress remoteAddress = timeoutsHolder.remoteAddress();
    sb.append(remoteAddress.getHostName());
    if (!remoteAddress.isUnresolved()) {
      sb.append('/').append(remoteAddress.getAddress().getHostAddress());
    }
    sb.append(':').append(remoteAddress.getPort());
  }
}

TimeoutTimerTask声明实现TimerTask接口,它定义了expire方法,执行requestSender.abort;clean方法来重置done及nettyResponseFuture

ReadTimeoutTimerTask

org/asynchttpclient/netty/timeout/ReadTimeoutTimerTask.java

代码语言:javascript
复制
public class ReadTimeoutTimerTask extends TimeoutTimerTask {

  private final long readTimeout;

  ReadTimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture,
                       NettyRequestSender requestSender,
                       TimeoutsHolder timeoutsHolder,
                       int readTimeout) {
    super(nettyResponseFuture, requestSender, timeoutsHolder);
    this.readTimeout = readTimeout;
  }

  public void run(Timeout timeout) {

    if (done.getAndSet(true) || requestSender.isClosed())
      return;

    if (nettyResponseFuture.isDone()) {
      timeoutsHolder.cancel();
      return;
    }

    long now = unpreciseMillisTime();

    long currentReadTimeoutInstant = readTimeout + nettyResponseFuture.getLastTouch();
    long durationBeforeCurrentReadTimeout = currentReadTimeoutInstant - now;

    if (durationBeforeCurrentReadTimeout <= 0L) {
      // idleConnectTimeout reached
      StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Read timeout to ");
      appendRemoteAddress(sb);
      String message = sb.append(" after ").append(readTimeout).append(" ms").toString();
      long durationSinceLastTouch = now - nettyResponseFuture.getLastTouch();
      expire(message, durationSinceLastTouch);
      // cancel request timeout sibling
      timeoutsHolder.cancel();

    } else {
      done.set(false);
      timeoutsHolder.startReadTimeout(this);
    }
  }
}

ReadTimeoutTimerTask继承了TimeoutTimerTask,其run方法会根据readTimeout及nettyResponseFuture.getLastTouch()计算currentReadTimeoutInstant,然后判断是否已经超时,是则执行expire及timeoutsHolder.cancel(),否则执行timeoutsHolder.startReadTimeout(this)

startReadTimeout

org/asynchttpclient/netty/timeout/TimeoutsHolder.java

代码语言:javascript
复制
  void startReadTimeout(ReadTimeoutTimerTask task) {
    if (requestTimeout == null || (!requestTimeout.isExpired() && readTimeoutValue < (requestTimeoutMillisTime - unpreciseMillisTime()))) {
      // only schedule a new readTimeout if the requestTimeout doesn't happen first
      if (task == null) {
        // first call triggered from outside (else is read timeout is re-scheduling itself)
        task = new ReadTimeoutTimerTask(nettyResponseFuture, requestSender, this, readTimeoutValue);
      }
      this.readTimeout = newTimeout(task, readTimeoutValue);

    } else if (task != null) {
      // read timeout couldn't re-scheduling itself, clean up
      task.clean();
    }
  }

startReadTimeout会判断readTimeoutValue+当前时间是否小于requestTimeoutMillisTime,是则通过newTimeout调度,否则执行task.clean()

RequestTimeoutTimerTask

org/asynchttpclient/netty/timeout/RequestTimeoutTimerTask.java

代码语言:javascript
复制
public class RequestTimeoutTimerTask extends TimeoutTimerTask {

  private final long requestTimeout;

  RequestTimeoutTimerTask(NettyResponseFuture<?> nettyResponseFuture,
                                 NettyRequestSender requestSender,
                                 TimeoutsHolder timeoutsHolder,
                                 int requestTimeout) {
    super(nettyResponseFuture, requestSender, timeoutsHolder);
    this.requestTimeout = requestTimeout;
  }

  public void run(Timeout timeout) {

    if (done.getAndSet(true) || requestSender.isClosed())
      return;

    // in any case, cancel possible readTimeout sibling
    timeoutsHolder.cancel();

    if (nettyResponseFuture.isDone())
      return;

    StringBuilder sb = StringBuilderPool.DEFAULT.stringBuilder().append("Request timeout to ");
    appendRemoteAddress(sb);
    String message = sb.append(" after ").append(requestTimeout).append(" ms").toString();
    long age = unpreciseMillisTime() - nettyResponseFuture.getStart();
    expire(message, age);
  }
}

RequestTimeoutTimerTask继承了TimeoutTimerTask,其run方法在done为true或者requestSender为closed则直接返回,对于nettyResponseFuture.isDone()也直接返回,其余的执行expire方法

TimeoutsHolder

org/asynchttpclient/netty/timeout/TimeoutsHolder.java

代码语言:javascript
复制
public class TimeoutsHolder {

  private final Timeout requestTimeout;
  private final AtomicBoolean cancelled = new AtomicBoolean();
  private final Timer nettyTimer;
  private final NettyRequestSender requestSender;
  private final long requestTimeoutMillisTime;
  private final int readTimeoutValue;
  private volatile Timeout readTimeout;
  private volatile NettyResponseFuture<?> nettyResponseFuture;
  private volatile InetSocketAddress remoteAddress;

  public TimeoutsHolder(Timer nettyTimer, NettyResponseFuture<?> nettyResponseFuture, NettyRequestSender requestSender, AsyncHttpClientConfig config, InetSocketAddress originalRemoteAddress) {
    this.nettyTimer = nettyTimer;
    this.nettyResponseFuture = nettyResponseFuture;
    this.requestSender = requestSender;
    this.remoteAddress = originalRemoteAddress;

    final Request targetRequest = nettyResponseFuture.getTargetRequest();

    final int readTimeoutInMs = targetRequest.getReadTimeout();
    this.readTimeoutValue = readTimeoutInMs == 0 ? config.getReadTimeout() : readTimeoutInMs;

    int requestTimeoutInMs = targetRequest.getRequestTimeout();
    if (requestTimeoutInMs == 0) {
      requestTimeoutInMs = config.getRequestTimeout();
    }

    if (requestTimeoutInMs != -1) {
      requestTimeoutMillisTime = unpreciseMillisTime() + requestTimeoutInMs;
      requestTimeout = newTimeout(new RequestTimeoutTimerTask(nettyResponseFuture, requestSender, this, requestTimeoutInMs), requestTimeoutInMs);
    } else {
      requestTimeoutMillisTime = -1L;
      requestTimeout = null;
    }
  }

  //......
}  

TimeoutsHolder的构造器对于requestTimeoutInMs不为-1的会创建RequestTimeoutTimerTask,然后通过newTimeout进行调度

scheduleRequestTimeout

org/asynchttpclient/netty/request/NettyRequestSender.java

代码语言:javascript
复制
  private <T> ListenableFuture<T> sendRequestWithOpenChannel(NettyResponseFuture<T> future,
                                                             AsyncHandler<T> asyncHandler,
                                                             Channel channel) {

    try {
      asyncHandler.onConnectionPooled(channel);
    } catch (Exception e) {
      LOGGER.error("onConnectionPooled crashed", e);
      abort(channel, future, e);
      return future;
    }

    SocketAddress channelRemoteAddress = channel.remoteAddress();
    if (channelRemoteAddress != null) {
      // otherwise, bad luck, the channel was closed, see bellow
      scheduleRequestTimeout(future, (InetSocketAddress) channelRemoteAddress);
    }

    future.setChannelState(ChannelState.POOLED);
    future.attachChannel(channel, false);

    if (LOGGER.isDebugEnabled()) {
      HttpRequest httpRequest = future.getNettyRequest().getHttpRequest();
      LOGGER.debug("Using open Channel {} for {} '{}'", channel, httpRequest.method(), httpRequest.uri());
    }

    // channelInactive might be called between isChannelValid and writeRequest
    // so if we don't store the Future now, channelInactive won't perform
    // handleUnexpectedClosedChannel
    Channels.setAttribute(channel, future);

    if (Channels.isChannelActive(channel)) {
      writeRequest(future, channel);
    } else {
      // bad luck, the channel was closed in-between
      // there's a very good chance onClose was already notified but the
      // future wasn't already registered
      handleUnexpectedClosedChannel(channel, future);
    }

    return future;
  }

  private void scheduleRequestTimeout(NettyResponseFuture<?> nettyResponseFuture,
                                      InetSocketAddress originalRemoteAddress) {
    nettyResponseFuture.touch();
    TimeoutsHolder timeoutsHolder = new TimeoutsHolder(nettyTimer, nettyResponseFuture, this, config,
            originalRemoteAddress);
    nettyResponseFuture.setTimeoutsHolder(timeoutsHolder);
  }

  public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {

    NettyRequest nettyRequest = future.getNettyRequest();
    HttpRequest httpRequest = nettyRequest.getHttpRequest();
    AsyncHandler<T> asyncHandler = future.getAsyncHandler();

    // if the channel is dead because it was pooled and the remote server decided to
    // close it,
    // we just let it go and the channelInactive do its work
    if (!Channels.isChannelActive(channel))
      return;

    try {
      if (asyncHandler instanceof TransferCompletionHandler) {
        configureTransferAdapter(asyncHandler, httpRequest);
      }

      boolean writeBody = !future.isDontWriteBodyBecauseExpectContinue()
              && httpRequest.method() != HttpMethod.CONNECT && nettyRequest.getBody() != null;

      if (!future.isHeadersAlreadyWrittenOnContinue()) {
        try {
          asyncHandler.onRequestSend(nettyRequest);
        } catch (Exception e) {
          LOGGER.error("onRequestSend crashed", e);
          abort(channel, future, e);
          return;
        }

        // if the request has a body, we want to track progress
        if (writeBody) {
          // FIXME does this really work??? the promise is for the request without body!!!
          ChannelProgressivePromise promise = channel.newProgressivePromise();
          ChannelFuture f = channel.write(httpRequest, promise);
          f.addListener(new WriteProgressListener(future, true, 0L));
        } else {
          // we can just track write completion
          ChannelPromise promise = channel.newPromise();
          ChannelFuture f = channel.writeAndFlush(httpRequest, promise);
          f.addListener(new WriteCompleteListener(future));
        }
      }

      if (writeBody)
        nettyRequest.getBody().write(channel, future);

      // don't bother scheduling read timeout if channel became invalid
      if (Channels.isChannelActive(channel)) {
        scheduleReadTimeout(future);
      }

    } catch (Exception e) {
      LOGGER.error("Can't write request", e);
      abort(channel, future, e);
    }
  }  

NettyRequestSender的sendRequestWithOpenChannel方法在channelRemoteAddress不为null时会执行scheduleRequestTimeout,创建TimeoutsHolder调度RequestTimeoutTimerTask;其writeRequest方法在nettyRequest.getBody().write(channel, future)之后,若channel还是active的则通过scheduleReadTimeout(future)调度ReadTimeoutTimerTask

小结

AsyncHttpClient的TimeoutTimerTask声明实现了netty的TimerTask接口,它定义了expire方法,执行requestSender.abort;clean方法来重置done及nettyResponseFuture;它有一个抽象子类为TimeoutTimerTask,RequestTimeoutTimerTask及ReadTimeoutTimerTask继承了TimeoutTimerTask;AsyncHttpClient用TimeoutsHolder来封装了这些timeout timer,NettyRequestSender的sendRequestWithOpenChannel方法会触发调度RequestTimeoutTimerTask,而其writeRequest方法在nettyRequest.getBody().write(channel, future)之后,通过scheduleReadTimeout(future)调度ReadTimeoutTimerTask。

可以看到requestTimeoutMillisTime是总的请求时间,它包含了写入数据之后的readTimeoutValue

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TimerTask
    • Timeout
    • TimeoutTimerTask
    • ReadTimeoutTimerTask
      • startReadTimeout
      • RequestTimeoutTimerTask
      • TimeoutsHolder
      • scheduleRequestTimeout
      • 小结
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档