btcd p2p 网络分析

btcd p2p 网络分析

比特币依赖于对等网络来实现信息的共享与传输,网络中的每个节点即可以是客户端也可以是服务端,本篇文章基于比特币go版本btcd探索比特币对等网络的实现原理,整个实现从底层到上层可以分为地址,连接,节点三层,每层都有自己的功能与职责。下面逐一的分析这三个部分的构成与功能

地址管理

连接管理对象结构,其中重要的两个成员是addrNew和addTried,前者维护了1024个地址桶,每个桶的尺寸为64,地址经过一个散列算法放入到桶里面,保存的是已经添加尚未确认的连接,后者则维护了64个list,每个确定完好的连接hash散列后放到里面。

    type AddrManager struct {
        mtx            sync.Mutex
        peersFile      string
        lookupFunc     func(string) ([]net.IP, error)
        rand           *rand.Rand
        key            [32]byte
        addrIndex      map[string]*KnownAddress // address key to ka for all addrs.
        addrNew        [newBucketCount]map[string]*KnownAddress
        addrTried      [triedBucketCount]*list.List
        started        int32
        shutdown       int32
        wg             sync.WaitGroup
        quit           chan struct{}
        nTried         int
        nNew           int
        lamtx          sync.Mutex
        localAddresses map[string]*localAddress
    }

当通过AddLocalAddress函数添加一个新的地址的时候,这个地址会先加到addrNew里面,GetAddress会有一半的几率从addrNew里面随机选取一个地址上来尝试进行网络连接校验,如果检验完成,则会调用Good方法,将这个地址从New移动到tred里面。

GetAddress函数选择addrNew和选择addrTried的几率一半一半,选择addrTried用于更新老的可用性差的节点。for循环保证能获取到节点,chance()用于用于桶内位置调整,访问的越频繁这个值越大,选中的可能性越小,这是为了避免多次访问重复的节点。为了防止经过多轮次也无法选中地址,通过factor变量来控制,随着轮次的增加factor增加进而减少上面的限制,意思是实在是找不到没用过的点到用老的也行了。

    func (a *AddrManager) GetAddress() *KnownAddress {
        //省略
        // Use a 50% chance for choosing between tried and new table entries.
        if a.nTried > 0 && (a.nNew == 0 || a.rand.Intn(2) == 0) {
            // Tried entry.
            large := 1 << 30
            factor := 1.0
            for {
                // pick a random bucket.
                bucket := a.rand.Intn(len(a.addrTried))
                if a.addrTried[bucket].Len() == 0 {
                    continue
                }


                randval := a.rand.Intn(large)
                //控制几率避免段时间访问重复节点
                if float64(randval) < (factor * ka.chance() * float64(large)) {
                    log.Tracef("Selected %v from tried bucket",
                        NetAddressKey(ka.na))
                    return ka
                }
                factor *= 1.2
            }
        } else {
            // new node.
            // XXX use a closure/function to avoid repeating this.
            large := 1 << 30
            factor := 1.0
            for {
                // Pick a random bucket.
                bucket := a.rand.Intn(len(a.addrNew))
                if len(a.addrNew[bucket]) == 0 {
                    continue
                }
                // Then, a random entry in it.
                var ka *KnownAddress
                nth := a.rand.Intn(len(a.addrNew[bucket]))
                for _, value := range a.addrNew[bucket] {
                    if nth == 0 {
                        ka = value
                    }
                    nth--
                }
                randval := a.rand.Intn(large)
                if float64(randval) < (factor * ka.chance() * float64(large)) {
                    log.Tracef("Selected %v from new bucket",
                        NetAddressKey(ka.na))
                    return ka
                }
                factor *= 1.2
            }
        }
    }

连接管理

连接主要控制地址的可达性,通过一个状态即来控制连接的生命周期,每个连接都有如下五种状态,

    ConnPending ConnState = iota
    ConnFailing
    ConnCanceled
    ConnEstablished
    ConnDisconnected

函数connHandler监控几个chanel()的变化来对应处理连接的状态。

1)ConnPending 通过NewConn函数创建的对象进入该分支处理,将这个新的连接状态标记为ConnPending,并把连接挂到挂起连接里面

    case registerPending:
        connReq := msg.c
        connReq.updateState(ConnPending)
        pending[msg.c.id] = connReq
        close(msg.done)

2)ConnFailing 当Connect函数确认连接出现错误的时候会进入handfail分支标记连接错误

    case handleFailed:
            connReq := msg.c
            //一种特殊情况,当存在的peer过少,为了保证系统可用会把这个操作当作失败的情况处理
            if _, ok := pending[connReq.id]; !ok {
                log.Debugf("Ignoring connection for "+
                    "canceled conn req: %v", connReq)
                continue
            }

            connReq.updateState(ConnFailing)
            log.Debugf("Failed to connect to %v: %v",
                connReq, msg.err)
            cm.handleFailedConn(connReq)

3)ConnCanceled 当用户主动断开连接且该连接尚未完全建立的时候视为ConnCanceled状态

    case handleDisconnected:
        connReq, ok := conns[msg.id]
        if !ok {
            connReq, ok = pending[msg.id]
            if !ok {
                log.Errorf("Unknown connid=%d",
                    msg.id)
                continue
            }

            // Pending connection was found, remove
            // it from pending map if we should
            // ignore a later, successful
            // connection.
            connReq.updateState(ConnCanceled)
            log.Debugf("Canceling: %v", connReq)
            delete(pending, msg.id)
            continue
        }

4)ConnEstablished 连接通过函数Connect确认建立成功后变成ConnEstablished

    case handleConnected:
                    connReq := msg.c

                    if _, ok := pending[connReq.id]; !ok {
                        if msg.conn != nil {
                            msg.conn.Close()
                        }
                        log.Debugf("Ignoring connection for "+
                            "canceled connreq=%v", connReq)
                        continue
                    }

                    connReq.updateState(ConnEstablished)
                    connReq.conn = msg.conn
                    conns[connReq.id] = connReq
                    log.Debugf("Connected to %v", connReq)
                    connReq.retryCount = 0
                    cm.failedAttempts = 0

                    delete(pending, connReq.id)

                    if cm.cfg.OnConnection != nil {
                        go cm.cfg.OnConnection(connReq, msg.conn)
                    }

5)ConnDisconnected 用户主动断开已经建立好的连接,该连接状态会变成ConnDisconnected

    case handleDisconnected:
        if connReq.conn != nil {
            connReq.conn.Close()
        }

        if cm.cfg.OnDisconnection != nil {
            go cm.cfg.OnDisconnection(connReq)
        }

        // All internal state has been cleaned up, if
        // this connection is being removed, we will
        // make no further attempts with this request.
        if !msg.retry {
            connReq.updateState(ConnDisconnected)
            continue
        }

连接管理里面还附带了一个用于考评连接质量的估分函数,该数值累加函数如下

    func (s *DynamicBanScore) increase(persistent, transient uint32, t time.Time) uint32 {
        s.persistent += persistent
        tu := t.Unix()
        dt := tu - s.lastUnix

        if transient > 0 {
            if Lifetime < dt {
                s.transient = 0
            } else if s.transient > 1 && dt > 0 {
                s.transient *= decayFactor(dt)
            }
            s.transient += float64(transient)
            s.lastUnix = tu
        }
        return s.persistent + uint32(s.transient)
    }

参数和具体的操作有关系,大致来说会随着操作不断累加,操作越多增长的越快,当超过一定阈值后,该peer会休息一段时间,主要是为了防止恶意流量攻击。调用increase的位置都是可能出现大流量的位置(GetData,MemPool...)。

协议

协议层定义了网络消息的读写格式与应答方式,该协议定义了如下的消息类型.

    CmdVersion      = "version"   版本
    CmdVerAck       = "verack"    联通
    CmdGetAddr      = "getaddr"   获取地址
    CmdAddr         = "addr"      发送地址
    CmdGetBlocks    = "getblocks" 获取区块
    CmdInv          = "inv"       发送inv(交易/区块)
    CmdGetData      = "getdata"   发送区块数据
    CmdNotFound     = "notfound"
    CmdBlock        = "block"     发送区块
    CmdTx           = "tx"        发送交易
    CmdGetHeaders   = "getheaders"获取区块头
    CmdHeaders      = "headers"   发送区块头
    CmdPing         = "ping"
    CmdPong         = "pong"      和ping成对使用 维护连接
    CmdAlert        = "alert"     无用
    CmdMemPool      = "mempool"   交易池(代码上看没有用处)
    CmdFilterAdd    = "filteradd"
    CmdFilterClear  = "filterclear"
    CmdFilterLoad   = "filterload"
    CmdMerkleBlock  = "merkleblock"
    CmdReject       = "reject"
    CmdSendHeaders  = "sendheaders"
    CmdFeeFilter    = "feefilter"
    CmdGetCFilters  = "getcfilters"
    CmdGetCFHeaders = "getcfheaders"
    CmdGetCFCheckpt = "getcfcheckpt"
    CmdCFilter      = "cfilter"
    CmdCFHeaders    = "cfheaders"
    CmdCFCheckpt    = "cfcheckpt"

inHandler

负责收消息,收到消息解析具体的消息类型,在调用一个更加具体的函数来处理这些消息,这个具体的函数常常是通过配置从上层传递下来的.

    for atomic.LoadInt32(&p.disconnect) == 0 {
            rmsg, buf, err := p.readMessage(p.wireEncoding)
            idleTimer.Stop()
            //略
            atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
            p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg}

            // Handle each supported message type.
            p.stallControl <- stallControlMsg{sccHandlerStart, rmsg}
            switch msg := rmsg.(type) {
            case *wire.MsgVersion:
                // Limit to one version message per peer.
                p.PushRejectMsg(msg.Command(), wire.RejectDuplicate,
                    "duplicate version message", nil, true)
                break out

            case *wire.MsgVerAck:
                // No read lock is necessary because verAckReceived is not written
                // to in any other goroutine.
                if p.verAckReceived {
                    log.Infof("Already received 'verack' from peer %v -- "+
                        "disconnecting", p)
                    break out
                }
                p.flagsMtx.Lock()
                p.verAckReceived = true
                p.flagsMtx.Unlock()
                if p.cfg.Listeners.OnVerAck != nil {
                    p.cfg.Listeners.OnVerAck(p, msg)
                }
            ///略
            case *wire.MsgSendHeaders:
                p.flagsMtx.Lock()
                p.sendHeadersPreferred = true
                p.flagsMtx.Unlock()

                if p.cfg.Listeners.OnSendHeaders != nil {
                    p.cfg.Listeners.OnSendHeaders(p, msg)
                }

            default:
                log.Debugf("Received unhandled message of type %v "+
                    "from %v", rmsg.Command(), p)
            }
            p.stallControl <- stallControlMsg{sccHandlerDone, rmsg}

            // A message was received so reset the idle timer.
            idleTimer.Reset(idleTimeout)
        }

outHandler

通过for select监听sendQueue,如果有新消息进来调用writeMessage发送消息

序列化,反序列化

每条消息都是由消息头和消息体构成。每个消息都实现有一个Message接口,

    type Message interface {
        BtcDecode(io.Reader, uint32, MessageEncoding) error
        BtcEncode(io.Writer, uint32, MessageEncoding) error
        Command() string
        MaxPayloadLength(uint32) uint32
    }

序列化的过程中先调用BtcDecode方法序列化一个二进制消息体,这里逐个写入消息字段,有兴趣的看wire文件夹下面的每个消息的BtcDecode消息的具体实现。然后双重hash该消息体取其前四位作为消息体校验位,在把网络号,消息名,消息长度,校验位置合起来构成一个消息头。反序列化则是个相反的过程

    err := msg.BtcEncode(&bw, pver, encoding)
    if err != nil {
        return totalBytes, err
    }
    payload := bw.Bytes()
    lenp := len(payload)

    //略

    // Create header for the message.
    hdr := messageHeader{}
    hdr.magic = btcnet
    hdr.command = cmd
    hdr.length = uint32(lenp)
    copy(hdr.checksum[:], chainhash.DoubleHashB(payload)[0:4])

交易控制

在实际应用中节点之间互相广播交易会占用很多流量,为了提高网路性能,节点会控制交易的重发,具体实现为每个peer带有一个knownInventory对象,记录已经发送过的交易Id,发送的时候会检查是否发过,发过的不会在重发发送.这个机制有时候会带来些奇怪的问题.之前在部署网络时候遇到个奇怪的现象就是矿机断电重启后,有些交易需要很久才能打包,原因则在于该机器直接相连的机器认为已经发送过交易,不会在二次重发。

    for e := invSendQueue.Front(); e != nil; e = invSendQueue.Front() {
        iv := invSendQueue.Remove(e).(*wire.InvVect)

        // Don't send inventory that became known after
        // the initial check.
        if p.knownInventory.Exists(iv) {
            continue
        }

        invMsg.AddInvVect(iv)
        if len(invMsg.InvList) >= maxInvTrickleSize {
            waiting = queuePacket(
                outMsg{msg: invMsg},
                pendingMsgs, waiting)
            invMsg = wire.NewMsgInvSizeHint(uint(invSendQueue.Len()))
        }

        // Add the inventory that is being relayed to
        // the known inventory for the peer.
        p.AddKnownInventory(iv)
    }

peer管理

peer管理主要负责peer节点的维护,消息的应答方式等

peer维护

peerHandler中for select 监听几个chanel,每个chanel对应几种操作,增删改查之类的,即在系统内部使用也提供外部rpc功能.

    for {
        select {
        // New peers connected to the server.
        case p := <-s.newPeers:
            s.handleAddPeerMsg(state, p)

        // Disconnected peers.
        case p := <-s.donePeers:
            s.handleDonePeerMsg(state, p)

        // Block accepted in mainchain or orphan, update peer height.
        case umsg := <-s.peerHeightsUpdate:
            s.handleUpdatePeerHeights(state, umsg)

        // Peer to ban.
        case p := <-s.banPeers:
            s.handleBanPeerMsg(state, p)

        // New inventory to potentially be relayed to other peers.
        case invMsg := <-s.relayInv:
            s.handleRelayInvMsg(state, invMsg)

        // Message to broadcast to all connected peers except those
        // which are excluded by the message.
        case bmsg := <-s.broadcast:
            s.handleBroadcastMsg(state, &bmsg)

        case qmsg := <-s.query:
            s.handleQuery(state, qmsg)

        case <-s.quit:
            // Disconnect all peers on server shutdown.
            state.forAllPeers(func(sp *serverPeer) {
                srvrLog.Tracef("Shutdown peer %s", sp)
                sp.Disconnect()
            })
            break out
        }
    }

这里有个可以说下的是交易重发机制,主要是解决节点刚启动时候同步交易池的问题,只要连接到节点,等待一段时间,周边的节点就会进行广播,这时候就能得到交易了,考虑到上面说过的knownInv问题,这时候最好是找个新的节点连接比较安全

peer的产生

server 启动时候会调用connManager的Start方法,其中会产生许多的连接对象,

    for i := atomic.LoadUint64(&cm.connReqCount); i < uint64 (cm.cfg.TargetOutbound); i++ {
        go cm.NewConnReq()
    }

NewConnReq函数里面会走上面提过的几种状态变化,最后进到ConnEstablished状态分支里面,该分支会调用OnConnection函数,这个函数就是outboundPeerConnected函数。这里结构有点差,整体上是这样子的,connManager中配置了一个函数变量OnConnect,而在p2p servver启动的时候会赋值connManager的函数,这里就是把outboundPeerConnected函数赋值给connManager的OnConnection变量(OnConnect:PeerConnected)。

    func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
        sp := newServerPeer(s, c.Permanent)
        p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String())
        if err != nil {
            srvrLog.Debugf("Cannot create outbound peer %s: %v", c.Addr, err)
            s.connManager.Disconnect(c.ID())
        }
        sp.Peer = p
        sp.connReq = c
        sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
        sp.AssociateConnection(conn)
        go s.peerDoneHandler(sp)
        s.addrManager.Attempt(sp.NA())
    }

peer产生之后在调用AssociateConnection关联连接对象的时候就开始进行信息沟通. 调用流程是:AssociateConnection -> start -> negotiateOutboundProtocol -> readRemoteVersionMsg 具体代码就不贴了,该函数最后又会调用到server的OnVersion函数(OnVersion和OnConnect是相同的做法),该函数主要就是校验版本,服务之类的功能是否完整匹配,此后节点就建立成功,之后就可以进行数据的广播同步了.

    if !cfg.SimNet && !isInbound {
        addrManager.SetServices(remoteAddr, msg.Services)  //服务校验
    }

    // Ignore peers that have a protcol version that is too old.  The peer
    // negotiation logic will disconnect it after this callback returns.
    if msg.ProtocolVersion < int32(peer.MinAcceptableProtocolVersion) { //版本校验
        return nil
    }

    //这里是交换地址用的  协议是 	getaddr/addr  当你连到一个点的时候互相交换地址,
    hasTimestamp := sp.ProtocolVersion() >= wire.NetAddressTimeVersion
    if addrManager.NeedMoreAddresses() && hasTimestamp {
        sp.QueueMessage(wire.NewMsgGetAddr(), nil)
    }

    //这句上面说过会把newAddr移动到triedAddr 这句通过了才说能对象节点是完全可用的
    // Mark the address as a known good address.
    addrManager.Good(remoteAddr)

peer同步维持

有个SyncManager结构体负责区块,交易同步的维护,接上文,新的节点加进来之后刷新bestPeer(原则是高度最高的节点),之后向该节点发送getblocks请求,参数则是自己的高度状态

    if sm.nextCheckpoint != nil &&
        best.Height < sm.nextCheckpoint.Height &&
        sm.chainParams != &chaincfg.RegressionNetParams {

        bestPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
        sm.headersFirstMode = true
        log.Infof("Downloading headers for blocks %d to "+
            "%d from peer %s", best.Height+1,
            sm.nextCheckpoint.Height, bestPeer.Addr())
    } else {
        bestPeer.PushGetBlocksMsg(locator, &zeroHash)
    }

对方收到请求之后,比对高度,查找到自己的BlockHash构造InvTypeBlock类型NewMsgInv消息返回,然后在逐个的请求区块

发送方

接收方

PushGetBlocksMsg(当前自己高度)

InvTypeBlock类型NewMsgInv(返回对方没有的高度hash))

OnInv处理后发送NewMsgGetData(接受message后逐个getdata)

MsgBlock(区块详细信息)

OnBlock处理附加到链上

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区