前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >HttpComponents HttpClient连接池(2)-连接的申请

HttpComponents HttpClient连接池(2)-连接的申请

作者头像
TA码字
发布2020-04-01 14:59:57
1.2K0
发布2020-04-01 14:59:57
举报
文章被收录于专栏:TA码字TA码字

上一篇文章里我们主要介绍了 httpclient 连接池的关键类和数据结构,在这里我们主要介绍http连接的申请和释放。

http连接的申请

http 连接的申请主要调用上一篇文章 Cpool 对象(在父类AbstractConnPool)的 lease() 方法,该方法会返回 Future<CPoolEntry> 对象,该对象调用 get 方法得到 CPoolEntry ,而在 get 方法里又调用 CPool 实例的 getPoolEntryBlocking() 方法,所以该方法是核心,代码如下:

代码语言:javascript
复制
private E getPoolEntryBlocking(final T route, final Object state, final long timeout, final TimeUnit timeUnit, final Future<E> future) throws IOException, InterruptedException, TimeoutException {
        Date deadline = null;
        if (timeout > 0) {
            deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
        }
        this.lock.lock();
        try {
            final RouteSpecificPool<T, C, E> pool = getPool(route);
            E entry;
            for (;;) {
                Asserts.check(!this.isShutDown, "Connection pool shut down");
                for (;;) {
                    entry = pool.getFree(state);
                    if (entry == null) {
                        break;
                    }
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    }
                    if (entry.isClosed()) {
                        this.available.remove(entry);
                        pool.free(entry, false);
                    } else {
                        break;
                    }
                }
                if (entry != null) {
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }

                // New connection is needed
                final int maxPerRoute = getMax(route);
                // Shrink the pool prior to allocating a new connection
                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
                if (excess > 0) {
                    for (int i = 0; i < excess; i++) {
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }

                if (pool.getAllocatedCount() < maxPerRoute) {
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                    if (freeCapacity > 0) {
                        final int totalAvailable = this.available.size();
                        if (totalAvailable > freeCapacity - 1) {
                            if (!this.available.isEmpty()) {
                                final E lastUsed = this.available.removeLast();
                                lastUsed.close();
                                final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                                otherpool.remove(lastUsed);
                            }
                        }
                        final C conn = this.connFactory.create(route);
                        entry = pool.add(conn);
                        this.leased.add(entry);
                        return entry;
                    }
                }

                boolean success = false;
                try {
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }
                    pool.queue(future);
                    this.pending.add(future);
                    if (deadline != null) {
                        success = this.condition.awaitUntil(deadline);
                    } else {
                        this.condition.await();
                        success = true;
                    }
                    if (future.isCancelled()) {
                        throw new InterruptedException("Operation interrupted");
                    }
                } finally {
                    // In case of 'success', we were woken up by the
                    // connection pool and should now have a connection
                    // waiting for us, or else we're shutting down.
                    // Just continue in the loop, both cases are checked.
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
                // check for spurious wakeup vs. timeout
                if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
                    break;
                }
            }
            throw new TimeoutException("Timeout waiting for connection");
        } finally {
            this.lock.unlock();
        }
    }

对以上代码分析如下:

  • 调用 getPool(route) 方法,根据当前 http 调用的 route 查找上一篇文章介绍的 individual 连接池对象 RouteSpecificPool ,如果没有就创建一个并加入到上一篇文章里 Cpool 对象中的 Map 里,以便后面可以重用。
  • 在一个循环里尝试获取上一篇文章介绍的池化对象 CpoolEntry 。
  • 在上述循环的子循环中调用连接池对象 pool.getFree() 方法尝试获取 CpoolEntry 对象。在 getFree() 方法里尝试从 individual 连接池对象的可用集合 available 里获取,如果有就从其中去掉,放入 individual leased 集合中,表示正在使用。如果没有就返回空值。
  • 如果 pool.getFree() 方法调用能够直接返回可用的 CpoolEntry 对象实例,首先通过调用方法entry.isExpired() 检查是否过期,过期则通过 entry.close() 关闭连接。
  • entry.isExpired() 本质是检查过期时间和当前时间对比,过期时间由 global 连接池 Cpool 对象的timeToLive决定,timeToLive的值可以通过HttpClientBuilder.setConnectionTimeToLive() 方法设置,没有设置默认为Long.MAX_VALUE,单位是毫秒。
  • entry.close() 本质是把上一篇文章中介绍的 ManagedHttpClientConnectin 里绑定的 socket 关闭输入输出流,关闭socket,设置绑定socket为null。entry.isOpen() 和 entry.isClosed() 就是判断绑定的 socket 是否为 null ,核心代码如下: public void close() throws IOException { final Socket socket = this.socketHolder.getAndSet(null); if (socket != null) { try { this.inBuffer.clear(); this.outbuffer.flush(); try { try { socket.shutdownOutput(); } catch (final IOException ignore) { } try { socket.shutdownInput(); } catch (final IOException ignore) { } } catch (final UnsupportedOperationException ignore) { // if one isn't supported, the other one isn't either } } finally { socket.close(); } } } public boolean isOpen() { return this.socketHolder.get() != null; } public boolean isClosed() { final HttpClientConnection conn = getConnection(); return !conn.isOpen(); }
  • 如果pool.getFree()方法调用返回可用连接CpoolEntry对象,通过entry.isClosed()检查其是否关闭,如果关闭则从 global Cpool 可用连接集合 available 里移除,并从 individual 连接池 RouteSpecificPool 正在使用的集合 leased 中移除。
  • 如果 pool.getFree() 方法调用直接返回可用连接 CpoolEntry 对象在上面的检查步骤中即没有过期也没有关闭,则表示可用,那么就从 global Cpool 可用连接集合 available 中移除,并加入 global Cpool 的正在使用连接集合 leased 中,然后返回,结束上面步骤中的循环。
  • 如果 pool.getFree() 调用返回为 null,表示没有可用连接,然后就会去检查当前是否超过 global Cpool 的限制,是否超过 individual 连接池的限制,如果没有则创建,并分别加入 global 连接池和 individual 连接池的正在使用集合 leased 里。然后返回,结束上面步骤中的循环。
  • 如果上述步骤中已经超过了连接池的限制,那么把请求对象分别加入 global 连接池和 individual 连接池的请求集合 pending 里。然后利用对象锁,使当前线程在该锁上等待。等待时间由RequestConfig.Builder.setConnectionRequestTimeout() 方法决定,默认不设置值为 0 ,为 0 的时候当前线程无限等待。
  • 上一步中超过了连接池的限制,则当前线程在该锁上等待,如果等待超时那么就意味申请可用连接失败,抛出异常TimeoutException("Timeout waiting for connection")。如果被其他线程唤醒,意味着有可用连接释放到池中,然后继续循环以上各个步骤尝试获取连接。
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-03-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 TA码字 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
腾讯云代码分析
腾讯云代码分析(内部代号CodeDog)是集众多代码分析工具的云原生、分布式、高性能的代码综合分析跟踪管理平台,其主要功能是持续跟踪分析代码,观测项目代码质量,支撑团队传承代码文化。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档