继续上一篇golang源码分析:grpc 链接池(1),我们从源码来分析,我们将从连接池的建立,请求发起的时候获取连接,以及最终关闭连接三个流程进行源码分析。
1,创建连接的过程
源码入口位于google.golang.org/grpc@v1.46.0/clientconn.go
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
resolverBuilder, err := cc.parseTargetAndFindResolver()
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
})
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
if cc.dopts.block {
for {
cc.Connect()
}
}
首先,获取域名解析器的构造器,比如拿到获取自定义的dns解析器;然后得到负载均衡器cc.balancerWrapper;最后得到解析器。
google.golang.org/grpc@v1.46.0/resolver_conn_wrapper.go
func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error)
ccr := &ccResolverWrapper{
cc: cc,
done: grpcsync.NewEvent(),
}
ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
如果我们在Dial的时候指定选项grpc.WithBlock(),就会在for循环里进行尝试连接,否则返回连接器,在请求到来的时候才进行真正的连接。接函数的定义如下,将当前连接退出idle状态:
func (cc *ClientConn) Connect() {
cc.balancerWrapper.exitIdle()
}
连接的状态定义在google.golang.org/grpc@v1.46.0/connectivity/connectivity.go,可以看到有6个状态:
func (s State) String() string {
switch s {
case Idle:
return "IDLE"
case Connecting:
return "CONNECTING"
case Ready:
return "READY"
case TransientFailure:
return "TRANSIENT_FAILURE"
case Shutdown:
return "SHUTDOWN"
default:
logger.Errorf("unknown connectivity state: %d", s)
return "INVALID_STATE"
}
}
在banlancer里会启动一个协程watch,服务端连接状态的变化
func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper
ccb := &ccBalancerWrapper{
cc: cc,
updateCh: buffer.NewUnbounded(),
resultCh: buffer.NewUnbounded(),
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
}
go ccb.watcher()
ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)
根据状态channel里的类型执行响应的操作分发,代码位于google.golang.org/grpc@v1.46.0/balancer_conn_wrappers.go
type ccBalancerWrapper struct {
cc *ClientConn
// Since these fields are accessed only from handleXxx() methods which are
// synchronized by the watcher goroutine, we do not need a mutex to protect
// these fields.
balancer *gracefulswitch.Balancer
curBalancerName string
updateCh *buffer.Unbounded // Updates written on this channel are processed by watcher().
resultCh *buffer.Unbounded // Results of calls to UpdateClientConnState() are pushed here.
closed *grpcsync.Event // Indicates if close has been called.
done *grpcsync.Event // Indicates if close has completed its work.
}
里面有两个channel分别是处理更新时间和执行结果
func (ccb *ccBalancerWrapper) watcher() {
case u := <-ccb.updateCh.Get():
ccb.updateCh.Load()
if ccb.closed.HasFired() {
break
}
switch update := u.(type) {
case *ccStateUpdate:
ccb.handleClientConnStateChange(update.ccs)
func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) {
ccb.balancer.UpdateSubConnState(update.sc, balancer.SubConnState{ConnectivityState: update.state, ConnectionError: update.err})
}
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
ccb.updateCh.Put(&scStateUpdate{
回过头来,我们就可以看到,我们发起连接的时候,exitIdle具体干了什么,就是往channel里发送更新的事件:
func (ccb *ccBalancerWrapper) exitIdle() {
ccb.updateCh.Put(&exitIdleUpdate{})
}
对应的处理事件如下:如果状态是Idle,执行ExitIdle,退出空闲状态
func (ccb *ccBalancerWrapper) handleExitIdle() {
if ccb.cc.GetState() != connectivity.Idle {
return
}
ccb.balancer.ExitIdle()
}
channel的类型是一个无限长的队列,每次处理第一个,如果来不及处理,就放在对接末尾,类似nginx的backlog队列:
google.golang.org/grpc@v1.46.0/internal/buffer/unbounded.go
type Unbounded struct {
c chan interface{}
mu sync.Mutex
backlog []interface{}
}
func (b *Unbounded) Put(t interface{}) {
if len(b.backlog) == 0 {
select {
case b.c <- t:
b.backlog = append(b.backlog, t)
func (b *Unbounded) Load() {
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
b.backlog[0] = nil
b.backlog = b.backlog[1:]
func (b *Unbounded) Get() <-chan interface{} {
分析完队列后,我们看下ExitIdle的内容是什么,源码位于:
google.golang.org/grpc@v1.46.0/internal/balancer/gracefulswitch/gracefulswitch.go
func (gsb *Balancer) ExitIdle() {
for sc := range balToUpdate.subconns {
sc.Connect()
}
对每个subConn都尝试着去进行连接,这里才是发起服务端连接的真正地方。subconns保存在balancerWrapper里。
type balancerWrapper struct {
balancer.Balancer
gsb *Balancer
lastState balancer.State
subconns map[balancer.SubConn]bool // subconns created by this balancer
}
当需要更新连接状态的时候,根据传入的状态,发送对应的消息,进行连接状态的更新:
func (bw *balancerWrapper) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
if state.ConnectivityState == connectivity.Shutdown {
delete(bw.subconns, sc)
bw.Balancer.UpdateSubConnState(sc, state)
真正执行状态更新的函数是:
func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
if gsb.balancerCurrent != nil && gsb.balancerCurrent.subconns[sc] {
balToUpdate = gsb.balancerCurrent
} else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] {
balToUpdate = gsb.balancerPending
}
balToUpdate.UpdateSubConnState(sc, state)
SubConn的定义位于google.golang.org/grpc@v1.46.0/balancer/balancer.go
type SubConn interface {
// UpdateAddresses updates the addresses used in this SubConn.
// gRPC checks if currently-connected address is still in the new list.
// If it's in the list, the connection will be kept.
// If it's not in the list, the connection will gracefully closed, and
// a new connection will be created.
//
// This will trigger a state transition for the SubConn.
//
// Deprecated: This method is now part of the ClientConn interface and will
// eventually be removed from here.
UpdateAddresses([]resolver.Address)
// Connect starts the connecting for this SubConn.
Connect()
}
这里定义了Picker接口,用来从连接池中选择一个可用连接
type Picker interface {
// Pick returns the connection to use for this RPC and related information.
//
// Pick should not block. If the balancer needs to do I/O or any blocking
// or time-consuming work to service this call, it should return
// ErrNoSubConnAvailable, and the Pick call will be repeated by gRPC when
// the Picker is updated (using ClientConn.UpdateState).
//
// If an error is returned:
//
// - If the error is ErrNoSubConnAvailable, gRPC will block until a new
// Picker is provided by the balancer (using ClientConn.UpdateState).
//
// - If the error is a status error (implemented by the grpc/status
// package), gRPC will terminate the RPC with the code and message
// provided.
//
// - For all other errors, wait for ready RPCs will wait, but non-wait for
// ready RPCs will be terminated with this error's Error() string and
// status code Unavailable.
Pick(info PickInfo) (PickResult, error)
}
并且定义了Banlancer的接口,用户可以自定义banlancer实现这个接口,包括UpdateClientConnState和 UpdateSubConnState
type Balancer interface {
// UpdateClientConnState is called by gRPC when the state of the ClientConn
// changes. If the error returned is ErrBadResolverState, the ClientConn
// will begin calling ResolveNow on the active name resolver with
// exponential backoff until a subsequent call to UpdateClientConnState
// returns a nil error. Any other errors are currently ignored.
UpdateClientConnState(ClientConnState) error
// ResolverError is called by gRPC when the name resolver reports an error.
ResolverError(error)
// UpdateSubConnState is called by gRPC when the state of a SubConn
// changes.
UpdateSubConnState(SubConn, SubConnState)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
Close()
}
官方包里给了很多种banlancer的实现,比如ringhash
google.golang.org/grpc@v1.46.0/xds/internal/balancer/ringhash/ringhash.go
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &ringhashBalancer{
cc: cc,
subConns: make(map[resolver.Address]*subConn),
scStates: make(map[balancer.SubConn]*subConn),
csEvltr: &connectivityStateEvaluator{},
}
2,用户发起客户端请求的时候的调用过程
源码入口位于:google.golang.org/grpc@v1.46.0/call.go
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
return invoke(ctx, method, args, reply, cc, opts...)
它会创建一个clientStream然后发送和接收消息:
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
google.golang.org/grpc@v1.46.0/stream.go
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
}
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
cs.finish(err)
return nil, err
}
func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
google.golang.org/grpc@v1.46.0/clientconn.go,从banlancer中pick一个链接
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error)
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
Ctx: ctx,
FullMethodName: method,
})
google.golang.org/grpc@v1.46.0/picker_wrapper.go,通过wrapper在用户自定义balancer里面的picke方法,获取连接
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {
pickResult, err := p.Pick(info)
acw, ok := pickResult.SubConn.(*acBalancerWrapper)
if t := acw.getAddrConn().getReadyTransport(); t != nil {
3,关闭连接的过程
源码入口位于:google.golang.org/grpc@v1.46.0/clientconn.go
func (cc *ClientConn) Close() error {
cc.csMgr.updateState(connectivity.Shutdown)
for ac := range conns {
ac.tearDown(ErrClientConnClosing)
}
依次关闭连接池中的所有连接。
func (ac *addrConn) tearDown(err error) {
tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct
// will leak. In most cases, call cc.removeAddrConn() instead.
ac.updateConnectivityState(connectivity.Shutdown, nil)
func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
ac.state = s
ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
它也是通过更细状态的方式来影响连接池状态机的
func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
cc.balancerWrapper.updateSubConnState(sc, s, err)
}
本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!