前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊AsyncHttpClient的ChannelPool

聊聊AsyncHttpClient的ChannelPool

作者头像
code4it
发布2023-12-11 17:26:31
1520
发布2023-12-11 17:26:31
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下AsyncHttpClient的ChannelPool

ChannelPool

org/asynchttpclient/channel/ChannelPool.java

代码语言:javascript
复制
public interface ChannelPool {

  /**
   * Add a channel to the pool
   *
   * @param channel      an I/O channel
   * @param partitionKey a key used to retrieve the cached channel
   * @return true if added.
   */
  boolean offer(Channel channel, Object partitionKey);

  /**
   * Remove the channel associated with the uri.
   *
   * @param partitionKey the partition used when invoking offer
   * @return the channel associated with the uri
   */
  Channel poll(Object partitionKey);

  /**
   * Remove all channels from the cache. A channel might have been associated
   * with several uri.
   *
   * @param channel a channel
   * @return the true if the channel has been removed
   */
  boolean removeAll(Channel channel);

  /**
   * Return true if a channel can be cached. A implementation can decide based
   * on some rules to allow caching Calling this method is equivalent of
   * checking the returned value of {@link ChannelPool#offer(Channel, Object)}
   *
   * @return true if a channel can be cached.
   */
  boolean isOpen();

  /**
   * Destroy all channels that has been cached by this instance.
   */
  void destroy();

  /**
   * Flush partitions based on a predicate
   *
   * @param predicate the predicate
   */
  void flushPartitions(Predicate<Object> predicate);

  /**
   * @return The number of idle channels per host.
   */
  Map<String, Long> getIdleChannelCountPerHost();
}

ChannelPool定义了offer、poll、removeAll、isOpen、destroy、flushPartitions、getIdleChannelCountPerHost方法,它有两个实现类,分别是NoopChannelPool及DefaultChannelPool

NoopChannelPool

org/asynchttpclient/channel/NoopChannelPool.java

代码语言:javascript
复制
public enum NoopChannelPool implements ChannelPool {

  INSTANCE;

  @Override
  public boolean offer(Channel channel, Object partitionKey) {
    return false;
  }

  @Override
  public Channel poll(Object partitionKey) {
    return null;
  }

  @Override
  public boolean removeAll(Channel channel) {
    return false;
  }

  @Override
  public boolean isOpen() {
    return true;
  }

  @Override
  public void destroy() {
  }

  @Override
  public void flushPartitions(Predicate<Object> predicate) {
  }

  @Override
  public Map<String, Long> getIdleChannelCountPerHost() {
    return Collections.emptyMap();
  }
}

NoopChannelPool是个枚举,用枚举实现了单例,其方法默认为空操作

DefaultChannelPool

代码语言:javascript
复制
/**
 * A simple implementation of {@link ChannelPool} based on a {@link java.util.concurrent.ConcurrentHashMap}
 */
public final class DefaultChannelPool implements ChannelPool {

  private static final Logger LOGGER = LoggerFactory.getLogger(DefaultChannelPool.class);

  private final ConcurrentHashMap<Object, ConcurrentLinkedDeque<IdleChannel>> partitions = new ConcurrentHashMap<>();
  private final ConcurrentHashMap<ChannelId, ChannelCreation> channelId2Creation;
  private final AtomicBoolean isClosed = new AtomicBoolean(false);
  private final Timer nettyTimer;
  private final int connectionTtl;
  private final boolean connectionTtlEnabled;
  private final int maxIdleTime;
  private final boolean maxIdleTimeEnabled;
  private final long cleanerPeriod;
  private final PoolLeaseStrategy poolLeaseStrategy;

  public DefaultChannelPool(AsyncHttpClientConfig config, Timer hashedWheelTimer) {
    this(config.getPooledConnectionIdleTimeout(),
            config.getConnectionTtl(),
            hashedWheelTimer,
            config.getConnectionPoolCleanerPeriod());
  }

  public DefaultChannelPool(int maxIdleTime,
                            int connectionTtl,
                            Timer nettyTimer,
                            int cleanerPeriod) {
    this(maxIdleTime,
            connectionTtl,
            PoolLeaseStrategy.LIFO,
            nettyTimer,
            cleanerPeriod);
  }

  public DefaultChannelPool(int maxIdleTime,
                            int connectionTtl,
                            PoolLeaseStrategy poolLeaseStrategy,
                            Timer nettyTimer,
                            int cleanerPeriod) {
    this.maxIdleTime = maxIdleTime;
    this.connectionTtl = connectionTtl;
    connectionTtlEnabled = connectionTtl > 0;
    channelId2Creation = connectionTtlEnabled ? new ConcurrentHashMap<>() : null;
    this.nettyTimer = nettyTimer;
    maxIdleTimeEnabled = maxIdleTime > 0;
    this.poolLeaseStrategy = poolLeaseStrategy;

    this.cleanerPeriod = Math.min(cleanerPeriod, Math.min(connectionTtlEnabled ? connectionTtl : Integer.MAX_VALUE, maxIdleTimeEnabled ? maxIdleTime : Integer.MAX_VALUE));

    if (connectionTtlEnabled || maxIdleTimeEnabled)
      scheduleNewIdleChannelDetector(new IdleChannelDetector());
  }

  //......
}  

DefaultChannelPool基于ConcurrentHashMap实现了ChannelPool接口,主要的参数为connectionTtl、maxIdleTime、cleanerPeriod、poolLeaseStrategy;cleanerPeriod会取connectionTtl、maxIdleTime、传入的cleanerPeriod的最小值;开启connectionTtl或者maxIdleTime的话,会往nettyTimer添加IdleChannelDetector,延后cleanerPeriod时间执行

offer

代码语言:javascript
复制
  public boolean offer(Channel channel, Object partitionKey) {
    if (isClosed.get())
      return false;

    long now = unpreciseMillisTime();

    if (isTtlExpired(channel, now))
      return false;

    boolean offered = offer0(channel, partitionKey, now);
    if (connectionTtlEnabled && offered) {
      registerChannelCreation(channel, partitionKey, now);
    }

    return offered;
  }

  private boolean isTtlExpired(Channel channel, long now) {
    if (!connectionTtlEnabled)
      return false;

    ChannelCreation creation = channelId2Creation.get(channel.id());
    return creation != null && now - creation.creationTime >= connectionTtl;
  }  

  private boolean offer0(Channel channel, Object partitionKey, long now) {
    ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey);
    if (partition == null) {
      partition = partitions.computeIfAbsent(partitionKey, pk -> new ConcurrentLinkedDeque<>());
    }
    return partition.offerFirst(new IdleChannel(channel, now));
  }  

  private void registerChannelCreation(Channel channel, Object partitionKey, long now) {
    ChannelId id = channel.id();
    if (!channelId2Creation.containsKey(id)) {
      channelId2Creation.putIfAbsent(id, new ChannelCreation(now, partitionKey));
    }
  }  

offer接口先判断isTtlExpired,如果channel的存活时间超过connectionTtl则返回false,否则执行offer0,往ConcurrentLinkedDeque添加,若添加成功且connectionTtlEnabled则执行registerChannelCreation,维护创建时间

poll

代码语言:javascript
复制
  /**
   * {@inheritDoc}
   */
  public Channel poll(Object partitionKey) {

    IdleChannel idleChannel = null;
    ConcurrentLinkedDeque<IdleChannel> partition = partitions.get(partitionKey);
    if (partition != null) {
      while (idleChannel == null) {
        idleChannel = poolLeaseStrategy.lease(partition);

        if (idleChannel == null)
          // pool is empty
          break;
        else if (!Channels.isChannelActive(idleChannel.channel)) {
          idleChannel = null;
          LOGGER.trace("Channel is inactive, probably remotely closed!");
        } else if (!idleChannel.takeOwnership()) {
          idleChannel = null;
          LOGGER.trace("Couldn't take ownership of channel, probably in the process of being expired!");
        }
      }
    }
    return idleChannel != null ? idleChannel.channel : null;
  }

poll方法是根据partitionKey找到对应的ConcurrentLinkedDeque,然后循环执行poolLeaseStrategy.lease(partition),若idleChannel为null直接break,若isChannelActive为false则重置为null继续循环,若idleChannel.takeOwnership()为false也重置为null继续循环

removeAll

代码语言:javascript
复制
  /**
   * {@inheritDoc}
   */
  public boolean removeAll(Channel channel) {
    ChannelCreation creation = connectionTtlEnabled ? channelId2Creation.remove(channel.id()) : null;
    return !isClosed.get() && creation != null && partitions.get(creation.partitionKey).remove(new IdleChannel(channel, Long.MIN_VALUE));
  }

removeAll方法会将指定的channel从channelId2Creation及ConcurrentLinkedDeque中移除

isOpen

代码语言:javascript
复制
  /**
   * {@inheritDoc}
   */
  public boolean isOpen() {
    return !isClosed.get();
  }

isOpen则取的isClosed变量

destroy

代码语言:javascript
复制
  /**
   * {@inheritDoc}
   */
  public void destroy() {
    if (isClosed.getAndSet(true))
      return;

    partitions.clear();
    if (connectionTtlEnabled) {
      channelId2Creation.clear();
    }
  }

destroy会设置isClosed为true,然后清空partitions及channelId2Creation

flushPartitions

代码语言:javascript
复制
  public void flushPartitions(Predicate<Object> predicate) {
    for (Map.Entry<Object, ConcurrentLinkedDeque<IdleChannel>> partitionsEntry : partitions.entrySet()) {
      Object partitionKey = partitionsEntry.getKey();
      if (predicate.test(partitionKey))
        flushPartition(partitionKey, partitionsEntry.getValue());
    }
  }

  private void flushPartition(Object partitionKey, ConcurrentLinkedDeque<IdleChannel> partition) {
    if (partition != null) {
      partitions.remove(partitionKey);
      for (IdleChannel idleChannel : partition)
        close(idleChannel.channel);
    }
  }

  private void close(Channel channel) {
    // FIXME pity to have to do this here
    Channels.setDiscard(channel);
    if (connectionTtlEnabled) {
      channelId2Creation.remove(channel.id());
    }
    Channels.silentlyCloseChannel(channel);
  }    

flushPartitions会遍历partitions,然后执行predicate.test,为true则执行flushPartition,它将从partitions移除指定的partitionKey,然后遍历idleChannels挨个执行close

getIdleChannelCountPerHost

代码语言:javascript
复制
  public Map<String, Long> getIdleChannelCountPerHost() {
    return partitions
            .values()
            .stream()
            .flatMap(ConcurrentLinkedDeque::stream)
            .map(idle -> idle.getChannel().remoteAddress())
            .filter(a -> a.getClass() == InetSocketAddress.class)
            .map(a -> (InetSocketAddress) a)
            .map(InetSocketAddress::getHostName)
            .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
  }

getIdleChannelCountPerHost则遍历partitions,然后map出remoteAddress获取hostName,然后进行groupBy

PoolLeaseStrategy

代码语言:javascript
复制
  public enum PoolLeaseStrategy {
    LIFO {
      public <E> E lease(Deque<E> d) {
        return d.pollFirst();
      }
    },
    FIFO {
      public <E> E lease(Deque<E> d) {
        return d.pollLast();
      }
    };

    abstract <E> E lease(Deque<E> d);
  }

PoolLeaseStrategy是个枚举,定义了LIFO及FIFO两个枚举,LIFO则是对Deque执行pollFirst,FIFO则是对Deque执行pollLast

IdleChannelDetector

代码语言:javascript
复制
  private final class IdleChannelDetector implements TimerTask {

    private boolean isIdleTimeoutExpired(IdleChannel idleChannel, long now) {
      return maxIdleTimeEnabled && now - idleChannel.start >= maxIdleTime;
    }

    private List<IdleChannel> expiredChannels(ConcurrentLinkedDeque<IdleChannel> partition, long now) {
      // lazy create
      List<IdleChannel> idleTimeoutChannels = null;
      for (IdleChannel idleChannel : partition) {
        boolean isIdleTimeoutExpired = isIdleTimeoutExpired(idleChannel, now);
        boolean isRemotelyClosed = !Channels.isChannelActive(idleChannel.channel);
        boolean isTtlExpired = isTtlExpired(idleChannel.channel, now);
        if (isIdleTimeoutExpired || isRemotelyClosed || isTtlExpired) {
          LOGGER.debug("Adding Candidate expired Channel {} isIdleTimeoutExpired={} isRemotelyClosed={} isTtlExpired={}", idleChannel.channel, isIdleTimeoutExpired, isRemotelyClosed, isTtlExpired);
          if (idleTimeoutChannels == null)
            idleTimeoutChannels = new ArrayList<>(1);
          idleTimeoutChannels.add(idleChannel);
        }
      }

      return idleTimeoutChannels != null ? idleTimeoutChannels : Collections.emptyList();
    }

    private List<IdleChannel> closeChannels(List<IdleChannel> candidates) {

      // lazy create, only if we hit a non-closeable channel
      List<IdleChannel> closedChannels = null;
      for (int i = 0; i < candidates.size(); i++) {
        // We call takeOwnership here to avoid closing a channel that has just been taken out
        // of the pool, otherwise we risk closing an active connection.
        IdleChannel idleChannel = candidates.get(i);
        if (idleChannel.takeOwnership()) {
          LOGGER.debug("Closing Idle Channel {}", idleChannel.channel);
          close(idleChannel.channel);
          if (closedChannels != null) {
            closedChannels.add(idleChannel);
          }

        } else if (closedChannels == null) {
          // first non closeable to be skipped, copy all
          // previously skipped closeable channels
          closedChannels = new ArrayList<>(candidates.size());
          for (int j = 0; j < i; j++)
            closedChannels.add(candidates.get(j));
        }
      }

      return closedChannels != null ? closedChannels : candidates;
    }

    public void run(Timeout timeout) {

      if (isClosed.get())
        return;

      if (LOGGER.isDebugEnabled())
        for (Object key : partitions.keySet()) {
          int size = partitions.get(key).size();
          if (size > 0) {
            LOGGER.debug("Entry count for : {} : {}", key, size);
          }
        }

      long start = unpreciseMillisTime();
      int closedCount = 0;
      int totalCount = 0;

      for (ConcurrentLinkedDeque<IdleChannel> partition : partitions.values()) {

        // store in intermediate unsynchronized lists to minimize
        // the impact on the ConcurrentLinkedDeque
        if (LOGGER.isDebugEnabled())
          totalCount += partition.size();

        List<IdleChannel> closedChannels = closeChannels(expiredChannels(partition, start));

        if (!closedChannels.isEmpty()) {
          if (connectionTtlEnabled) {
            for (IdleChannel closedChannel : closedChannels)
              channelId2Creation.remove(closedChannel.channel.id());
          }

          partition.removeAll(closedChannels);
          closedCount += closedChannels.size();
        }
      }

      if (LOGGER.isDebugEnabled()) {
        long duration = unpreciseMillisTime() - start;
        if (closedCount > 0) {
          LOGGER.debug("Closed {} connections out of {} in {} ms", closedCount, totalCount, duration);
        }
      }

      scheduleNewIdleChannelDetector(timeout.task());
    }
  }

IdleChannelDetector实现了netty的TimerTask接口,其run方法主要是遍历partitions,通过expiredChannels取出过期的IdleChannel,这里isIdleTimeoutExpired、isRemotelyClosed、isTtlExpired都算在内,然后挨个执行takeOwnership及close,再从channelId2Creation及partition中移除,最后再次调度一下IdleChannelDetector

小结

AsyncHttpClient的ChannelPool定义了offer、poll、removeAll、isOpen、destroy、flushPartitions、getIdleChannelCountPerHost方法,它有两个实现类,分别是NoopChannelPool及DefaultChannelPool;DefaultChannelPool基于ConcurrentHashMap实现了ChannelPool接口,主要的参数为connectionTtl、maxIdleTime、cleanerPeriod、poolLeaseStrategy;cleanerPeriod会取connectionTtl、maxIdleTime、传入的cleanerPeriod的最小值;开启connectionTtl或者maxIdleTime的话,会往nettyTimer添加IdleChannelDetector,延后cleanerPeriod时间执行。

poll方法会判断是active,不是的话继续循环lease,而IdleChannelDetector则会定期检查,isIdleTimeoutExpired、isRemotelyClosed、isTtlExpired都会被close,offer的时候还会判断isTtlExpired,这样子来保证连接的活性。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-12-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ChannelPool
  • NoopChannelPool
  • DefaultChannelPool
    • offer
      • poll
        • removeAll
          • isOpen
            • destroy
              • flushPartitions
                • getIdleChannelCountPerHost
                • PoolLeaseStrategy
                • IdleChannelDetector
                • 小结
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档