前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊AsyncHttpClient的RequestFilter

聊聊AsyncHttpClient的RequestFilter

原创
作者头像
code4it
发布2023-12-12 20:28:04
1520
发布2023-12-12 20:28:04
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下AsyncHttpClient的RequestFilter

RequestFilter

org/asynchttpclient/filter/RequestFilter.java

代码语言:javascript
复制
/**
 * A Filter interface that gets invoked before making an actual request.
 */
public interface RequestFilter {

  /**
   * An {@link org.asynchttpclient.AsyncHttpClient} will invoke {@link RequestFilter#filter} and will use the
   * returned {@link FilterContext#getRequest()} and {@link FilterContext#getAsyncHandler()} to continue the request
   * processing.
   *
   * @param ctx a {@link FilterContext}
   * @param <T> the handler result type
   * @return {@link FilterContext}. The {@link FilterContext} instance may not the same as the original one.
   * @throws FilterException to interrupt the filter processing.
   */
  <T> FilterContext<T> filter(FilterContext<T> ctx) throws FilterException;
}

RequestFilter定义了filter方法

ThrottleRequestFilter

org/asynchttpclient/filter/ThrottleRequestFilter.java

代码语言:javascript
复制
/**
 * A {@link org.asynchttpclient.filter.RequestFilter} throttles requests and block when the number of permits is reached,
 * waiting for the response to arrives before executing the next request.
 */
public class ThrottleRequestFilter implements RequestFilter {
  private static final Logger logger = LoggerFactory.getLogger(ThrottleRequestFilter.class);
  private final Semaphore available;
  private final int maxWait;

  public ThrottleRequestFilter(int maxConnections) {
    this(maxConnections, Integer.MAX_VALUE);
  }

  public ThrottleRequestFilter(int maxConnections, int maxWait) {
    this(maxConnections, maxWait, false);
  }

  public ThrottleRequestFilter(int maxConnections, int maxWait, boolean fair) {
    this.maxWait = maxWait;
    available = new Semaphore(maxConnections, fair);
  }

  /**
   * {@inheritDoc}
   */
  @Override
  public <T> FilterContext<T> filter(FilterContext<T> ctx) throws FilterException {
    try {
      if (logger.isDebugEnabled()) {
        logger.debug("Current Throttling Status {}", available.availablePermits());
      }
      if (!available.tryAcquire(maxWait, TimeUnit.MILLISECONDS)) {
        throw new FilterException(String.format("No slot available for processing Request %s with AsyncHandler %s",
                ctx.getRequest(), ctx.getAsyncHandler()));
      }
    } catch (InterruptedException e) {
      throw new FilterException(String.format("Interrupted Request %s with AsyncHandler %s",
              ctx.getRequest(), ctx.getAsyncHandler()));
    }

    return new FilterContext.FilterContextBuilder<>(ctx)
            .asyncHandler(ReleasePermitOnComplete.wrap(ctx.getAsyncHandler(), available))
            .build();
  }
}

ThrottleRequestFilter实现了RequestFilter接口,它使用Semaphore来对request进行限流,限流不通过抛出FilterException,若通过则通过ReleasePermitOnComplete.wrap(ctx.getAsyncHandler(), available)包装一下asyncHandler以释放信号量ReleasePermitOnComplete

ReleasePermitOnComplete

org/asynchttpclient/filter/ReleasePermitOnComplete.java

代码语言:javascript
复制
/**
 * Wrapper for {@link AsyncHandler}s to release a permit on {@link AsyncHandler#onCompleted()}. This is done via a dynamic proxy to preserve all interfaces of the wrapped handler.
 */
public class ReleasePermitOnComplete {

  /**
   * Wrap handler to release the permit of the semaphore on {@link AsyncHandler#onCompleted()}.
   *
   * @param handler   the handler to be wrapped
   * @param available the Semaphore to be released when the wrapped handler is completed
   * @param <T>       the handler result type
   * @return the wrapped handler
   */
  @SuppressWarnings("unchecked")
  public static <T> AsyncHandler<T> wrap(final AsyncHandler<T> handler, final Semaphore available) {
    Class<?> handlerClass = handler.getClass();
    ClassLoader classLoader = handlerClass.getClassLoader();
    Class<?>[] interfaces = allInterfaces(handlerClass);

    return (AsyncHandler<T>) Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
        try {
          return method.invoke(handler, args);
        } finally {
          switch (method.getName()) {
            case "onCompleted":
            case "onThrowable":
              available.release();
            default:
          }
        }
    });
  }

  //......
}  

ReleasePermitOnComplete的wrap对原来的handler进行代理,在finally里头执行available.release()

preProcessRequest

org/asynchttpclient/DefaultAsyncHttpClient.java

代码语言:javascript
复制
  /**
   * Configure and execute the associated {@link RequestFilter}. This class
   * may decorate the {@link Request} and {@link AsyncHandler}
   *
   * @param fc {@link FilterContext}
   * @return {@link FilterContext}
   */
  private <T> FilterContext<T> preProcessRequest(FilterContext<T> fc) throws FilterException {
    for (RequestFilter asyncFilter : config.getRequestFilters()) {
      fc = asyncFilter.filter(fc);
      assertNotNull(fc, "filterContext");
    }

    Request request = fc.getRequest();
    if (fc.getAsyncHandler() instanceof ResumableAsyncHandler) {
      request = ResumableAsyncHandler.class.cast(fc.getAsyncHandler()).adjustRequestRange(request);
    }

    if (request.getRangeOffset() != 0) {
      RequestBuilder builder = new RequestBuilder(request);
      builder.setHeader("Range", "bytes=" + request.getRangeOffset() + "-");
      request = builder.build();
    }
    fc = new FilterContext.FilterContextBuilder<>(fc).request(request).build();
    return fc;
  }

DefaultAsyncHttpClient的preProcessRequest方法遍历config.getRequestFilters(),挨个执行asyncFilter.filter(fc)

executeRequest

org/asynchttpclient/DefaultAsyncHttpClient.java

代码语言:javascript
复制
  public <T> ListenableFuture<T> executeRequest(Request request, AsyncHandler<T> handler) {
    if (config.getCookieStore() != null) {
      try {
        List<Cookie> cookies = config.getCookieStore().get(request.getUri());
        if (!cookies.isEmpty()) {
          RequestBuilder requestBuilder = new RequestBuilder(request);
          for (Cookie cookie : cookies) {
            requestBuilder.addOrReplaceCookie(cookie);
          }
          request = requestBuilder.build();
        }
      } catch (Exception e) {
        handler.onThrowable(e);
        return new ListenableFuture.CompletedFailure<>("Failed to set cookies of request", e);
      }
    }

    if (noRequestFilters) {
      return execute(request, handler);
    } else {
      FilterContext<T> fc = new FilterContext.FilterContextBuilder<T>().asyncHandler(handler).request(request).build();
      try {
        fc = preProcessRequest(fc);
      } catch (Exception e) {
        handler.onThrowable(e);
        return new ListenableFuture.CompletedFailure<>("preProcessRequest failed", e);
      }

      return execute(fc.getRequest(), fc.getAsyncHandler());
    }
  }

executeRequest方法对于noRequestFilters为false会执行preProcessRequest

小结

AsyncHttpClient的RequestFilter定义了filter方法,它有一个实现类为ThrottleRequestFilter,使用信号量用于对请求进行限流;DefaultAsyncHttpClient的executeRequest方法对于noRequestFilters为false会执行preProcessRequest,而preProcessRequest方法遍历config.getRequestFilters(),挨个执行asyncFilter.filter(fc)。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RequestFilter
  • ThrottleRequestFilter
  • ReleasePermitOnComplete
  • preProcessRequest
  • executeRequest
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档