关于golang http transport的讲解,网上有很多文章进行了解读,但都比较粗,很多代码实现并没有讲清楚。故给出更加详细的实现说明。整体看下来细节实现层面还是比较难懂的。
本次使用golang版本1.12.9
transport实现了RoundTripper接口,该接口只有一个方法RoundTrip(),故transport的入口函数就是RoundTrip()。transport的主要功能其实就是缓存了长连接,用于大量http请求场景下的连接复用,减少发送请求时TCP(TLS)连接建立的时间损耗,同时transport还能对连接做一些限制,如连接超时时间,每个host的最大连接数等。transport对长连接的缓存和控制仅限于TCP+(TLS)+HTTP1,不对HTTP2做缓存和限制。
tranport包含如下几个主要概念:
一对readLoop/writeLoop只能处理一条连接,如果这条连接上没有更多的请求,则关闭连接,退出循环,释放系统资源
下述代码都来自golang源码的src/net/httptransport.go文件
type RoundTripper interface {
// RoundTrip executes a single HTTP transaction, returning
// a Response for the provided Request.
//
// RoundTrip should not attempt to interpret the response. In
// particular, RoundTrip must return err == nil if it obtained
// a response, regardless of the response's HTTP status code.
// A non-nil err should be reserved for failure to obtain a
// response. Similarly, RoundTrip should not attempt to
// handle higher-level protocol details such as redirects,
// authentication, or cookies.
//
// RoundTrip should not modify the request, except for
// consuming and closing the Request's Body. RoundTrip may
// read fields of the request in a separate goroutine. Callers
// should not mutate or reuse the request until the Response's
// Body has been closed.
//
// RoundTrip must always close the body, including on errors,
// but depending on the implementation may do so in a separate
// goroutine even after RoundTrip returns. This means that
// callers wanting to reuse the body for subsequent requests
// must arrange to wait for the Close call before doing so.
//
// The Request's URL and Header fields must be initialized.
RoundTrip(*Request) (*Response, error)
}
Transport结构体中的主要成员如下(没有列出所有成员):
wantIdle 要求关闭所有idle的persistConn
reqCanceler map[*Request]func(error) 用于取消request
idleConn map[connectMethodKey][]*persistConn idle状态的persistConn连接池,最大值受maxIdleConnsPerHost限制
idleConnCh map[connectMethodKey]chan *persistConn 用于给调用者传递persistConn
connPerHostCount map[connectMethodKey]int 表示一类连接上的host数目,最大值受MaxConnsPerHost限制
connPerHostAvailable map[connectMethodKey]chan struct{} 与connPerHostCount配合使用,判断该类型的连接数目是否已经达到上限
idleLRU connLRU 长度受MaxIdleConns限制,队列方式保存所有idle的pconn
altProto atomic.Value nil or map[string]RoundTripper,key为URI scheme,表示处理该scheme的RoundTripper实现。注意与TLSNextProto的不同,前者表示URI的scheme,后者表示tls之上的协议。如前者不会体现http2,后者会体现http2
Proxy func(*Request) (*url.URL, error) 为request返回一个代理的url
DisableKeepAlives bool 是否取消长连接
DisableCompression bool 是否取消HTTP压缩
MaxIdleConns int 所有host的idle状态的最大连接数目,即idleConn中所有连接数
MaxIdleConnsPerHost int 每个host的idle状态的最大连接数目,即idleConn中的key对应的连接数
MaxConnsPerHost 每个host上的最大连接数目,含dialing/active/idle状态的connections。http2时,每个host只允许有一条idle的conneciton
DialContext func(ctx context.Context, network, addr string) (net.Conn, error) 创建未加密的tcp连接,比Dial函数增加了context控制
Dial func(network, addr string) (net.Conn, error) 创建未加密的tcp连接,废弃,使用DialContext
DialTLS func(network, addr string) (net.Conn, error) 为非代理模式的https创建连接的函数,如果该函数非空,则不会使用Dial函数,且忽略TLSClientConfig和TLSHandshakeTimeout;反之使用Dila和TLSClientConfig。即有限使用DialTLS进行tls协商
TLSClientConfig *tls.Config tls client用于tls协商的配置
IdleConnTimeout 连接保持idle状态的最大时间,超时关闭pconn
TLSHandshakeTimeout time.Duration tls协商的超时时间
ResponseHeaderTimeout time.Duration 发送完request后等待serve response的时间
TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper 在tls协商带NPN/ALPN的扩展后,transport如何切换到其他协议。指tls之上的协议(next指的就是tls之上的意思)
ProxyConnectHeader Header 在CONNECT请求时,配置request的首部信息,可选
MaxResponseHeaderBytes 指定server响应首部的最大字节数
Transport.roundTrip是主入口,它通过传入一个request参数,由此选择一个合适的长连接来发送该request并返回response。整个流程主要分为两步:
使用getConn函数来获得底层TCP(TLS)连接;调用roundTrip函数进行上层协议(HTTP)处理。
func (t *Transport) roundTrip(req *Request) (*Response, error) {
t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
ctx := req.Context()
trace := httptrace.ContextClientTrace(ctx)
if req.URL == nil {
req.closeBody()
return nil, errors.New("http: nil Request.URL")
}
if req.Header == nil {
req.closeBody()
return nil, errors.New("http: nil Request.Header")
}
scheme := req.URL.Scheme
isHTTP := scheme == "http" || scheme == "https"
// 下面判断request首部的有效性
if isHTTP {
for k, vv := range req.Header {
if !httpguts.ValidHeaderFieldName(k) {
return nil, fmt.Errorf("net/http: invalid header field name %q", k)
}
for _, v := range vv {
if !httpguts.ValidHeaderFieldValue(v) {
return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k)
}
}
}
}
// 判断是否使用注册的RoundTrip来处理对应的scheme。对于使用tcp+tls+http1(wss协议升级)的场景
// 不能使用注册的roundTrip。后续代码对tcp+tls+http1或tcp+http1进行了roundTrip处理
if t.useRegisteredProtocol(req) {
altProto, _ := t.altProto.Load().(map[string]RoundTripper)
if altRT := altProto[scheme]; altRT != nil {
if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
return resp, err
}
}
}
// 后续仅处理URL scheme为http或https的连接
if !isHTTP {
req.closeBody()
return nil, &badStringError{"unsupported protocol scheme", scheme}
}
if req.Method != "" && !validMethod(req.Method) {
return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
}
if req.URL.Host == "" {
req.closeBody()
return nil, errors.New("http: no Host in request URL")
}
// 下面for循环用于在request出现错误的时候进行请求重试。但不是所有的请求失败都会被尝试,如请求被取消(errRequestCanceled)
// 的情况是不会进行重试的。具体参见shouldRetryRequest函数
for {
select {
case <-ctx.Done():
req.closeBody()
return nil, ctx.Err()
default:
}
// treq gets modified by roundTrip, so we need to recreate for each retry.
treq := &transportRequest{Request: req, trace: trace}
// connectMethodForRequest函数通过输入一个request返回一个connectMethod(简称cm),该类型通过
// {proxyURL,targetScheme,tartgetAddr,onlyH1},即{代理URL,server端的scheme,server的地址,是否HTTP1}
// 来表示一个请求。一个符合connectMethod描述的request将会在Transport.idleConn中匹配到一类长连接。
cm, err := t.connectMethodForRequest(treq)
if err != nil {
req.closeBody()
return nil, err
}
// 获取一条长连接,如果连接池中有现成的连接则直接返回,否则返回一条新建的连接。该连接可能是HTTP2格式的,存放在persistCnn.alt中,
// 使用其自注册的RoundTrip处理。该函数描述参见下面内容。
// 从getConn的实现中可以看到,一个请求只能在idle的连接上执行,反之一条连接只能同时处理一个请求。
getConn用于返回一条长连接。长连接的来源有2种路径:连接池中获取;当连接池中无法获取到时会新建一条连接
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) {
req := treq.Request
trace := treq.trace
ctx := req.Context()
if trace != nil && trace.GetConn != nil {
trace.GetConn(cm.addr())
}
// 从连接池中找一条合适的连接,如果找到则返回该连接,否则新建连接
if pc, idleSince := t.getIdleConn(cm); pc != nil {
if trace != nil && trace.GotConn != nil {
trace.GotConn(pc.gotIdleConnTrace(idleSince))
}
// 此处设置transport.reqCanceler比较难理解,主要功能是做一个标记,用于判断当前到执行pconn.roundTrip
// 期间,request有没有被(如Request.Cancel,Request.Context().Done())取消,被取消的request将无需继续roundTrip处理
t.setReqCanceler(req, func(error) {})
return pc, nil
}
type dialRes struct {
pc *persistConn
err error
}
// 该chan中用于存放通过dialConn函数新创建的长连接persistConn(后续简称pconn),表示一条TCP(TLS)的底层连接.
dialc := make(chan dialRes)
// cmKey实际就是把connectMethod中的元素全部字符串化。cmKey作为一类连接的标识,如Transport.idleConn[cmKey]就表示一类特定的连接
cmKey := cm.key()
// Copy these hooks so we don't race on the postPendingDial in
// the goroutine we launch. Issue 11136.
testHookPrePendingDial := testHookPrePendingDial
testHookPostPendingDial := testHookPostPendingDial
// 在尝试获取连接的时候,如果此时正在创建一条连接,但最后没有选择这条新建的连接(有其它调用者释放了一条连接),
// 此时,handlePendingDial负责将这条新创建的连接放到Transport.idleConn连接池中
handlePendingDial := func() {
testHookPrePendingDial()
go func() {
if v := <-dialc; v.err == nil {
// 将一条连接放入连接池中,描述见下文--tryPutIdleConn
t.putOrCloseIdleConn(v.pc)
} else {
t.decHostConnCount(cmKey)
}
testHookPostPendingDial()
}()
}
cancelc := make(chan error, 1)
// 为request设置ReqCanceler。transport代码中不会主动调用该ReqCanceler函数(会在
// roundTrip中调用replaceReqCanceler将其覆盖),可能的原因是transport提供了一个对外API CancelRequest,
// 用户可以调用该函数取消连接,此时会调用该ReqCanceler。需要注意的是从CancelRequest的注释中可以看出,该API
// 已经被废弃,这段代码后面可能会被删除(如果有不同看法,请指出)
t.setReqCanceler(req, func(err error) { cancelc <- err })
// 如果对host上建立的连接有限制
if t.MaxConnsPerHost > 0 {
select {
// incHostConnCount会根据主机已经建立的连接是否达到t.MaxConnsPerHost来返回一个未关闭
// 的chan(连接数达到MaxConnsPerHost)或关闭的chan(连接数未达到MaxConnsPerHost),
// 返回未关闭的chan时会阻塞等待其他请求释放连接,不能新创建pconn;反之可以使用新创建的pconn
case <-t.incHostConnCount(cmKey):
// 等待获取某一类连接对应的chan。tryPutIdleConn函数中会尝试将新建或释放的连接放入到该chan中
case pc := <-t.getIdleConnCh(cm):
if trace != nil && trace.GotConn != nil {
trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
}
return pc, nil
// 下面2个case都表示request被取消,其中Cancel被废弃,建议使用Context来取消request
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
}
}
go func() {
// 新建连接,创建好后将其放入dialc chan中
pc, err := t.dialConn(ctx, cm)
dialc <- dialRes{pc, err}
}()
// 下面会通过两种途径来获得连接:从dialc中获得通过dialConn新建的连接;通过idleConnCh获得其他request释放的连接
// 如果首先获取到的是dialConn新建的连接,直接返回该连接即可;如果首先获取到的是其他request释放的连接,在返回该连接前
// 需要调用handlePendingDial来处理dialConn新建的连接。
idleConnCh := t.getIdleConnCh(cm)
select {
// 获取dialConn新建的连接
case v := <-dialc:
// Our dial finished.
if v.pc != nil {
if trace != nil && trace.GotConn != nil && v.pc.alt == nil {
trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn})
}
return v.pc, nil
}
// 仅针对MaxConnsPerHost>0有效,对应上面的incHostConnCount()
t.decHostConnCount(cmKey)
// 下面用于返回更易读的错误信息
select {
case <-req.Cancel:
// It was an error due to cancelation, so prioritize that
// error value. (Issue 16049)
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
default:
// It wasn't an error due to cancelation, so
// return the original error message:
return nil, v.err
}
// 获取其他request释放的连接
case pc := <-idleConnCh:
// Another request finished first and its net.Conn
// became available before our dial. Or somebody
// else's dial that they didn't use.
// But our dial is still going, so give it away
// when it finishes:
handlePendingDial()
if trace != nil && trace.GotConn != nil {
trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
}
return pc, nil
// 如果request取消,也需要调用handlePendingDial处理新建的连接
case <-req.Cancel:
handlePendingDial()
return nil, errRequestCanceledConn
case <-req.Context().Done():
handlePendingDial()
return nil, req.Context().Err()
case err := <-cancelc:
handlePendingDial()
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
}
}
tryPutIdleConn函数用来将一条新创建或回收的连接放回连接池中,以便后续使用。与getIdleConnCh配合使用,后者用于获取一类连接对应的chan。在如下场景会将一个连接放回idleConn中
func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
// 当不使用长连接或该主机上的连接数小于0(即不允许缓存任何连接)时,返回错误并关闭创建的连接(此处没有做关闭处理,
// 但存在不适用的连接时必须关闭,如使用putOrCloseIdleConn)。
// 可以看出当不使用长连接时,Transport不能缓存连接
if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 {
return errKeepAlivesDisabled
}
if pconn.isBroken() {
return errConnBroken
}
// 如果是HTTP2连接,则直接返回,不缓存该连接
if pconn.alt != nil {
return errNotCachingH2Conn
}
// 为新连接标记可重用状态,新创建的连接肯定是可以重用的,用于在Transport.roundTrip
// 中的shouldRetryRequest函数中判断连接是否可以重用
pconn.markReused()
// 该key对应Transport.idleConn中的key,标识特定的连接
key := pconn.cacheKey
t.idleMu.Lock()
defer t.idleMu.Unlock()
// idleConnCh中的chan元素用于存放可用的连接pconn,每类连接都有一个chan
waitingDialer := t.idleConnCh[key]
select {
// 如果此时有调用者等待一个连接,则直接将该连接传递出去,不进行保存,这种做法有利于提高效率
case waitingDialer <- pconn:
// We're done with this pconn and somebody else is
// currently waiting for a conn of this type (they're
// actively dialing, but this conn is ready
// first). Chrome calls this socket late binding. See
// https://insouciant.org/tech/connection-management-in-chromium/
return nil
default:
// 如果没有调用者等待连接,则清除该chan。删除map中的chan直接会关闭该chan
if waitingDialer != nil {
// They had populated this, but their dial won
// first, so we can clean up this map entry.
delete(t.idleConnCh, key)
}
}
// 与DisableKeepAlives有点像,当用户需要关闭所有idle的连接时,不会再缓存连接
if t.wantIdle {
return errWantIdle
}
if t.idleConn == nil {
t.idleConn = make(map[connectMethodKey][]*persistConn)
}
idles := t.idleConn[key]
// 当主机上该类连接数超过Transport.MaxIdleConnsPerHost时,不能再保存新的连接,返回错误并关闭连接
if len(idles) >= t.maxIdleConnsPerHost() {
return errTooManyIdleHost
}
// 需要缓存的连接与连接池中已有的重复,系统退出(这种情况下系统已经发生了混乱,直接退出)
for _, exist := range idles {
if exist == pconn {
log.Fatalf("dup idle pconn %p in freelist", pconn)
}
}
// 添加待缓存的连接
t.idleConn[key] = append(idles, pconn)
t.idleLRU.add(pconn)
// 受MaxIdleConns的限制,添加策略变为:添加新的连接,删除最老的连接。
// MaxIdleConns限制了所有类型的idle状态的最大连接数目,而MaxIdleConnsPerHost限制了host上单一类型的最大连接数目
// idleLRU中保存了所有的连接,此处的作用为,找出最老的连接并移除
if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns {
oldest := t.idleLRU.removeOldest()
oldest.close(errTooManyIdle)
t.removeIdleConnLocked(oldest)
}
// 为新添加的连接设置超时时间
if t.IdleConnTimeout > 0 {
if pconn.idleTimer != nil {
// 如果该连接是被释放的,则重置超时时间
pconn.idleTimer.Reset(t.IdleConnTimeout)
} else {
// 如果该连接时新建的,则设置超时时间并设置超时动作pconn.closeConnIfStillIdle
// closeConnIfStillIdle用于释放连接,从Transport.idleLRU和Transport.idleConn中移除并关闭该连接
pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle)
}
}
pconn.idleAt = time.Now()
return nil
}
dialConn用于新创建一条连接,并为该连接启动readLoop和writeLoop
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) {
pconn := &persistConn{
t: t,
cacheKey: cm.key(),
reqch: make(chan requestAndChan, 1),
writech: make(chan writeRequest, 1),
closech: make(chan struct{}),
writeErrCh: make(chan error, 1),
writeLoopDone: make(chan struct{}),
}
trace := httptrace.ContextClientTrace(ctx)
wrapErr := func(err error) error {
if cm.proxyURL != nil {
// Return a typed error, per Issue 16997
return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
}
return err
}
// 调用注册的DialTLS处理tls。使用自注册的TLS处理函数时,transport的TLSClientConfig和TLSHandshakeTimeout
// 参数会被忽略
if cm.scheme() == "https" && t.DialTLS != nil {
var err error
// 调用注册的连接函数创建一条连接,注意cm.addr()的实现,如果该连接存在proxy,则此处是与proxy建立TLS连接;否则直接连server。
// 存在proxy时,与server建立连接分为2步:与proxy建立TLP(TLS)连接;与server建立HTTP(HTTPS)连接
// func (cm *connectMethod) addr() string {
// if cm.proxyURL != nil {
// return canonicalAddr(cm.proxyURL)
// }
// return cm.targetAddr
// }
pconn.conn, err = t.DialTLS("tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
if pconn.conn == nil {
return nil, wrapErr(errors.New("net/http: Transport.DialTLS returned (nil, nil)"))
}
// 如果连接类型是TLS的,则需要处理TLS协商
if tc, ok := pconn.conn.(*tls.Conn); ok {
// Handshake here, in case DialTLS didn't. TLSNextProto below
// depends on it for knowing the connection state.
if trace != nil && trace.TLSHandshakeStart != nil {
trace.TLSHandshakeStart()
}
// 启动TLS协商,如果协商失败需要关闭连接
if err := tc.Handshake(); err != nil {
go pconn.conn.Close()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(tls.ConnectionState{}, err)
}
return nil, err
}
cs := tc.ConnectionState()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(cs, nil)
}
// 保存TLS协商结果
pconn.tlsState = &cs
}
} else {
// 使用默认方式创建连接,此时会用到transport的TLSClientConfig和TLSHandshakeTimeout参数。同样注意cm.addr()
conn, err := t.dial(ctx, "tcp", cm.addr())
if err != nil {
return nil, wrapErr(err)
}
pconn.conn = conn
// 如果scheme是需要TLS协商的,则处理TLS协商,否则为普通的HTTP连接
if cm.scheme() == "https" {
var firstTLSHost string
if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
return nil, wrapErr(err)
}
// 进行TLS协商,具体参见下文addTLS
if err = pconn.addTLS(firstTLSHost, trace); err != nil {
return nil, wrapErr(err)
}
}
}
// 处理proxy的情况
switch {
// 不存在proxy 直接跳过
case cm.proxyURL == nil:
case cm.proxyURL.Scheme == "socks5":
conn := pconn.conn
d := socksNewDialer("tcp", conn.RemoteAddr().String())
if u := cm.proxyURL.User; u != nil {
auth := &socksUsernamePassword{
Username: u.Username(),
}
auth.Password, _ = u.Password()
d.AuthMethods = []socksAuthMethod{
socksAuthMethodNotRequired,
socksAuthMethodUsernamePassword,
}
d.Authenticate = auth.Authenticate
}
if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
conn.Close()
return nil, err
}
// 如果存在proxy,且server的scheme为"http",如果需要代理认证,则设置认证信息
case cm.targetScheme == "http":
pconn.isProxy = true
if pa := cm.proxyAuth(); pa != "" {
pconn.mutateHeaderFunc = func(h Header) {
h.Set("Proxy-Authorization", pa)
}
}
// 如果存在proxy,且server的scheme为"https"。与"http"不同,在与server进行tls协商前,会给proxy
// 发送一个method为"CONNECT"的HTTP请求,如果请求通过(返回200),则可以继续与server进行TLS协商
case cm.targetScheme == "https":
// 该conn表示与proxy建立的连接
conn := pconn.conn
hdr := t.ProxyConnectHeader
if hdr == nil {
hdr = make(Header)
}
connectReq := &Request{
Method: "CONNECT",
URL: &url.URL{Opaque: cm.targetAddr},
Host: cm.targetAddr,
Header: hdr,
}
if pa := cm.proxyAuth(); pa != "" {
connectReq.Header.Set("Proxy-Authorization", pa)
}
// 发送"CONNECT" http请求
connectReq.Write(conn)
// Read response.
// Okay to use and discard buffered reader here, because
// TLS server will not speak until spoken to.
br := bufio.NewReader(conn)
resp, err := ReadResponse(br, connectReq)
if err != nil {
conn.Close()
return nil, err
}
// proxy返回非200,表示无法建立连接,可能情况如proxy认证失败
if resp.StatusCode != 200 {
f := strings.SplitN(resp.Status, " ", 2)
conn.Close()
if len(f) < 2 {
return nil, errors.New("unknown status code")
}
return nil, errors.New(f[1])
}
}
// 与proxy建立连接后,再与server进行TLS协商
if cm.proxyURL != nil && cm.targetScheme == "https" {
if err := pconn.addTLS(cm.tlsHost(), trace); err != nil {
return nil, err
}
}
// 后续进行TLS之上的协议处理,如果TLS之上的协议为注册协议,则使用注册的roundTrip进行处理
// TLS之上的协议为TLS协商过程中使用NPN/ALPN扩展协商出的协议,如HTTP2(参见golang.org/x/net/http2)
if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
return &persistConn{alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil
}
}
if t.MaxConnsPerHost > 0 {
pconn.conn = &connCloseListener{Conn: pconn.conn, t: t, cmKey: pconn.cacheKey}
}
// 创建读写通道,writeLoop用于发送request,readLoop用于接收响应。roundTrip函数中会通过chan给writeLoop发送
// request,通过chan从readLoop接口response。每个连接都有一个readLoop和writeLoop,连接关闭后,这2个Loop也会退出。
// pconn.br给readLoop使用,pconn.bw给writeLoop使用,注意此时已经建立了tcp连接。
pconn.br = bufio.NewReader(pconn)
pconn.bw = bufio.NewWriter(persistConnWriter{pconn})
go pconn.readLoop()
go pconn.writeLoop()
return pconn, nil
}
addTLS用于进行非注册协议下的TLS协商
func (pconn *persistConn) addTLS(name string, trace *httptrace.ClientTrace) error {
// Initiate TLS and check remote host name against certificate.
cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
if cfg.ServerName == "" {
cfg.ServerName = name
}
if pconn.cacheKey.onlyH1 {
cfg.NextProtos = nil
}
plainConn := pconn.conn
// 配置TLS client,包含一个TCP连接和TLC配置
tlsConn := tls.Client(plainConn, cfg)
errc := make(chan error, 2)
var timer *time.Timer
// 设置TLS超时时间,并在超时后往errc中写入一个tlsHandshakeTimeoutError{}
if d := pconn.t.TLSHandshakeTimeout; d != 0 {
timer = time.AfterFunc(d, func() {
errc <- tlsHandshakeTimeoutError{}
})
}
go func() {
if trace != nil && trace.TLSHandshakeStart != nil {
trace.TLSHandshakeStart()
}
// 执行TLS协商,如果协商没有超时,则将协商结果err放入errc中
err := tlsConn.Handshake()
if timer != nil {
timer.Stop()
}
errc <- err
}()
// 阻塞等待TLS协商结果,如果协商失败或协商超时,关闭底层连接
if err := <-errc; err != nil {
plainConn.Close()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(tls.ConnectionState{}, err)
}
return err
}
// 获取协商结果并设置到pconn.tlsState
cs := tlsConn.ConnectionState()
if trace != nil && trace.TLSHandshakeDone != nil {
trace.TLSHandshakeDone(cs, nil)
}
pconn.tlsState = &cs
pconn.conn = tlsConn
return nil
}
在获取到底层TCP(TLS)连接后在roundTrip中处理上层协议:即发送HTTP request,返回HTTP response。roundTrip给writeLoop提供request,从readLoop获取response。
一个roundTrip用于处理一类request。
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
testHookEnterRoundTrip()
// 此处与getConn中的"t.setReqCanceler(req, func(error) {})"相对应,用于判断request是否被取消
// 返回false表示request被取消,不必继续后续请求,关闭连接并返回错误
if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) {
pc.t.putOrCloseIdleConn(pc)
return nil, errRequestCanceled
}
pc.mu.Lock()
// 与readLoop配合使用,表示期望的响应的个数
pc.numExpectedResponses++
// dialConn中定义的函数,设置了proxy的认证信息
headerFn := pc.mutateHeaderFunc
pc.mu.Unlock()
if headerFn != nil {
headerFn(req.extraHeaders())
}
// Ask for a compressed version if the caller didn't set their
// own value for Accept-Encoding. We only attempt to
// uncompress the gzip stream if we were the layer that
// requested it.
requestedGzip := false
// 如果需要在request中设置可接受的解码方法,则在request中添加对应的首部。仅支持gzip方式且
// 仅在调用者没有设置这些首部时设置
if !pc.t.DisableCompression &&
req.Header.Get("Accept-Encoding") == "" &&
req.Header.Get("Range") == "" &&
req.Method != "HEAD" {
// Request gzip only, not deflate. Deflate is ambiguous and
// not as universally supported anyway.
// See: https://zlib.net/zlib_faq.html#faq39
//
// Note that we don't request this for HEAD requests,
// due to a bug in nginx:
// https://trac.nginx.org/nginx/ticket/358
// https://golang.org/issue/5522
//
// We don't request gzip if the request is for a range, since
// auto-decoding a portion of a gzipped document will just fail
// anyway. See https://golang.org/issue/8923
requestedGzip = true
req.extraHeaders().Set("Accept-Encoding", "gzip")
}
// 用于处理首部含"Expect: 100-continue"的request,客户端使用该首部探测服务器是否能够
// 处理request首部中的规格要求(如长度过大的request)。
var continueCh chan struct{}
if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
continueCh = make(chan struct{}, 1)
}
// HTTP1.1默认使用长连接,当transport设置DisableKeepAlives时会导致处理每个request时都会
// 新建一个连接。此处的处理逻辑是:如果transport设置了DisableKeepAlives,而request没有设置
// "Connection: close",则为request设置该首部。将底层表现与上层协议保持一致。
if pc.t.DisableKeepAlives && !req.wantsClose() {
req.extraHeaders().Set("Connection", "close")
}
// 用于在异常场景(如request取消)下通知readLoop,roundTrip是否已经退出,防止ReadLoop发送response阻塞
gone := make(chan struct{})
defer close(gone)
defer func() {
if err != nil {
pc.t.setReqCanceler(req.Request, nil)
}
}()
const debugRoundTrip = false
// Write the request concurrently with waiting for a response,
// in case the server decides to reply before reading our full
// request body.
// 表示发送了多少个字节的request,debug使用
startBytesWritten := pc.nwrite
// 给writeLoop封装并发送信息,注意此处的先后顺序。首先给writeLoop发送数据,阻塞等待writeLoop
// 接收,待writeLoop接收后才能发送数据给readLoop,因此发送request总会优先接收response
writeErrCh := make(chan error, 1)
pc.writech <- writeRequest{req, writeErrCh, continueCh}
// 给readLoop封装并发送信息
resc := make(chan responseAndError)
pc.reqch <- requestAndChan{
req: req.Request,
ch: resc,
addedGzip: requestedGzip,
continueCh: continueCh,
callerGone: gone,
}
var respHeaderTimer <-chan time.Time
cancelChan := req.Request.Cancel
ctxDoneChan := req.Context().Done()
// 该循环主要用于处理获取response超时和request取消时的条件跳转。正常情况下收到reponse
// 退出roundtrip函数
for {
testHookWaitResLoop()
select {
// writeLoop返回发送request后的结果
case err := <-writeErrCh:
if debugRoundTrip {
req.logf("writeErrCh resv: %T/%#v", err, err)
}
if err != nil {
pc.close(fmt.Errorf("write error: %v", err))
return nil, pc.mapRoundTripError(req, startBytesWritten, err)
}
// 设置一个接收response的定时器,如果在这段时间内没有接收到response(即没有进入下面代码
// 的"case re := <-resc:"分支),超时后进入""case <-respHeaderTimer:分支,关闭连接,
// 防止readLoop一直等待读取response,导致处理阻塞;没有超时则关闭定时器
if d := pc.t.ResponseHeaderTimeout; d > 0 {
if debugRoundTrip {
req.logf("starting timer for %v", d)
}
timer := time.NewTimer(d)
defer timer.Stop() // prevent leaks
respHeaderTimer = timer.C
}
// 处理底层连接关闭。"case <-cancelChan:"和”case <-ctxDoneChan:“为request关闭,request
// 关闭也会导致底层连接关闭,但必须处理非上层协议导致底层连接关闭的情况。
case <-pc.closech:
if debugRoundTrip {
req.logf("closech recv: %T %#v", pc.closed, pc.closed)
}
return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
// 等待获取response超时,关闭连接
case <-respHeaderTimer:
if debugRoundTrip {
req.logf("timeout waiting for response headers.")
}
pc.close(errTimeout)
return nil, errTimeout
// 接收到readLoop返回的response结果
case re := <-resc:
// 极异常情况,直接程序panic
if (re.res == nil) == (re.err == nil) {
panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
}
if debugRoundTrip {
req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
}
if re.err != nil {
return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
}
return re.res, nil
// request取消
case <-cancelChan:
pc.t.CancelRequest(req.Request)
// 将关闭之后的chan置为nil,用来防止select一直进入该case(close的chan不会阻塞读,读取的数据为0)
cancelChan = nil
case <-ctxDoneChan:
pc.t.cancelRequest(req.Request, req.Context().Err())
cancelChan = nil
ctxDoneChan = nil
}
}
}
writeLoop用于发送request请求
func (pc *persistConn) writeLoop() {
defer close(pc.writeLoopDone)
for {
// writeLoop会阻塞等待两个IO case:
// 循环等待并处理roundTrip发来的writeRequest数据,此时需要发送request;
// 如果底层连接关闭,则退出writeLoop
select {
case wr := <-pc.writech:
startBytesWritten := pc.nwrite
// 构造request并发送request请求。waitForContinue用于处理首部含"Expect: 100-continue"的request
err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
if bre, ok := err.(requestBodyReadError); ok {
err = bre.error
// Errors reading from the user's
// Request.Body are high priority.
// Set it here before sending on the
// channels below or calling
// pc.close() which tears town
// connections and causes other
// errors.
wr.req.setError(err)
}
if err == nil {
err = pc.bw.Flush()
}
// 请求失败时,需要关闭request和底层连接
if err != nil {
wr.req.Request.closeBody()
if pc.nwrite == startBytesWritten {
err = nothingWrittenError{err}
}
}
// 将结果发送给readLoop的pc.wroteRequest()函数处理
readLoop循环接收response响应,成功获得response后会将连接返回连接池,便于后续复用。当readLoop正常处理完一个response之后,会将连接重新放入到连接池中;
当readloop退出后,该连接会被关闭移除。
func (pc *persistConn) readLoop() {
closeErr := errReadLoopExiting // default value, if not changed below
// 当writeLoop或readLoop(异常)跳出循环后,都需要关闭底层连接。即一条连接包含writeLoop和readLoop两个
// 处理,任何一个loop退出(协议升级除外)则该连接不可用
// readLoo跳出循环的正常原因是连接上没有待处理的请求,此时关闭连接,释放资源
defer func() {
pc.close(closeErr)
pc.t.removeIdleConn(pc)
}()
// 尝试将连接放回连接池
tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
if err := pc.t.tryPutIdleConn(pc); err != nil {
closeErr = err
if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
trace.PutIdleConn(err)
}
return false
}
if trace != nil && trace.PutIdleConn != nil {
trace.PutIdleConn(nil)
}
return true
}
// eofc is used to block caller goroutines reading from Response.Body
// at EOF until this goroutines has (potentially) added the connection
// back to the idle pool.
// 从上面注释可以看出该变量主要用于阻塞调用者协程读取EOF的resp.body,
// 直到该连接重新放入连接池中。处理逻辑与上面先尝试放入连接池,然后返回response一样,
// 便于连接快速重用
eofc := make(chan struct{})
// 出现错误时也需要释放读取resp.Body的协程,防止调用者协程挂死
defer close(eofc) // unblock reader on errors
// Read this once, before loop starts. (to avoid races in tests)
testHookMu.Lock()
testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
testHookMu.Unlock()
alive := true
for alive {
// 获取允许的response首部的最大字节数
pc.readLimit = pc.maxHeaderResponseSize()
// 从接收buffer中peek一个字节来判断底层是否接收到response。roundTrip保证了request先于response发送。
// 此处peek会阻塞等待response(这也是roundtrip中设置response超时定时器的原因)。goroutine中的read/write
// 操作都是阻塞模式。
_, err := pc.br.Peek(1)
pc.mu.Lock()
// 如果期望的response为0,则直接退出readLoop并关闭连接,此时连接上没有需要处理的数据,
// 关闭连接,释放系统资源。
if pc.numExpectedResponses == 0 {
pc.readLoopPeekFailLocked(err)
pc.mu.Unlock()
return
}
pc.mu.Unlock()
// 阻塞等待roundTrip发来的数据
rc := <-pc.reqch
trace := httptrace.ContextClientTrace(rc.req.Context())
var resp *Response
// 如果有response数据,则读取并解析为Response格式
if err == nil {
resp, err = pc.readResponse(rc, trace)
} else {
// 可能的错误如server端关闭,发送EOF
err = transportReadFromServerError{err}
closeErr = err
}
// 底层没有接收到server的任何数据,断开该连接,可能原因是在client发出request的同时,server关闭
// 了连接。参见transportReadFromServerError的注释。
if err != nil {
if pc.readLimit <= 0 {
err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
}
// 传递错误信息给roundTrip并退出loop
select {
case rc.ch <- responseAndError{err: err}:
case <-rc.callerGone:
return
}
return
}
pc.readLimit = maxInt64 // effictively no limit for response bodies
pc.mu.Lock()
pc.numExpectedResponses--
pc.mu.Unlock()
// 判断response是否可写,在使用101 Switching Protocol进行协议升级时需要返回一个可写的resp.body
// 如果使用了101 Switching Protocol,升级完成后就与transport没有关系了(后续使用http2或websocket等)
bodyWritable := resp.bodyIsWritable()
// 判断response的body是否为空,如果body为空,则不必读取body内容(HEAD的resp.body没有数据)
hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
// 如果server关闭连接或client关闭连接或非预期的响应码或使用了协议升级,这几种情况下不能在该连接上继续
// 接收响应,退出readLoop
if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
// Don't do keep-alive on error if either party requested a close
// or we get an unexpected informational (1xx) response.
// StatusCode 100 is already handled above.
alive = false
}
// 此处用于处理body为空或协议升级场景,会尝试将连接放回连接池,对于后者,连接由调用者管理,退出readLoop
if !hasBody || bodyWritable {
pc.t.setReqCanceler(rc.req, nil)
// 在返回response前将连接放回连接池,快速回收利用。回收连接需要按顺序满足:
// 1.alive 为true
// 2.接收到EOF错误,此时底层连接关闭,该连接不可用
// 3.成功发送request;
// 此处的执行顺序很重要,将连接返回连接池的操作放到最后,即在协议升级的场景,服务端不再
// 发送数据的场景,以及request发送失败的场景下都不会将连接放回连接池,这些情况会导致
// alive为false,readLoop退出并关闭该连接(协议升级后的连接不能关闭)
alive = alive &&
!pc.sawEOF &&
pc.wroteRequest() &&
tryPutIdleConn(trace)
if bodyWritable {
// 协议升级之后还是会使用同一条连接,设置closeErr为errCallerOwnsConn,这样在readLoop
// return后不会被pc.close(closeErr)关闭连接
closeErr = errCallerOwnsConn
}
select {
// 1:将response成功返回后继续等待下一个response;
// 2:如果roundTrip退出,(此时无法返回给roundTrip response)则退出readLoop。
// 即roundTrip接收完response后退出不会影响readLoop继续运行
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
// Now that they've read from the unbuffered channel, they're safely
// out of the select that also waits on this goroutine to die, so
// we're allowed to exit now if needed (if alive is false)
testHookReadLoopBeforeNextRead()
continue
}
// 下面处理response body存在数据的场景,逻辑与body不存在数据的场景类似
waitForBodyRead := make(chan bool, 2)
// 初始化body的处理函数,读取完response会返回EOF,这类连接是可重用的
body := &bodyEOFSignal{
body: resp.Body,
earlyCloseFn: func() error {
waitForBodyRead <- false
<-eofc // will be closed by deferred call at the end of the function
return nil
},
fn: func(err error) error {
isEOF := err == io.EOF
waitForBodyRead <- isEOF
if isEOF {
<-eofc // see comment above eofc declaration
} else if err != nil {
if cerr := pc.canceled(); cerr != nil {
return cerr
}
}
return err
},
}
//返回的resp.Body类型变为了bodyEOFSignal,如果调用者在读取resp.Body后没有关闭,会导致
// readLoop阻塞在下面"case bodyEOF := <-waitForBodyRead:"中
resp.Body = body
if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
resp.Body = &gzipReader{body: body}
resp.Header.Del("Content-Encoding")
resp.Header.Del("Content-Length")
resp.ContentLength = -1
resp.Uncompressed = true
}
// 此处与处理不带resp.body的场景相同
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
// Before looping back to the top of this function and peeking on
// the bufio.Reader, wait for the caller goroutine to finish
// reading the response body. (or for cancelation or death)
select {
case bodyEOF := <-waitForBodyRead:
pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
alive = alive &&
// 如果读取完response的数据,则该连接可以被重用,否则直接释放。释放一个未读取完数据的连接会导致数据丢失。
//注意区分bodyEOF和pc.sawEOF的区别,一个是上层通道(http response.Body)关闭,一个是底层通道(TCP)关闭。
bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
tryPutIdleConn(trace)
// 释放阻塞的读操作
if bodyEOF {
eofc <- struct{}{}
}
case <-rc.req.Cancel:
alive = false
pc.t.CancelRequest(rc.req)
case <-rc.req.Context().Done():
alive = false
pc.t.cancelRequest(rc.req, rc.req.Context().Err())
case <-pc.closech:
alive = false
}
testHookReadLoopBeforeNextRead()
}
}