前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >以太坊区块同步

以太坊区块同步

作者头像
Al1ex
发布2021-07-21 16:11:55
3.9K0
发布2021-07-21 16:11:55
举报
文章被收录于专栏:网络安全攻防
同步模式

以太坊中区块同步包含以下三种模式:

  • full sync:从网络同步所有的区块头,区块体并重放区块中的交易以生成状态数据
  • fast sync:从网络同步所有的区块头,区块体以及状态数据,但不对区块中的交易进行重放,只会对区块中的数据进行校验
  • light sync:从网络中同步所有区块头,不去同步区块体,也不去同步状态数据,仅在需要相应区块和状态数据时从网络上获取
区块下载

区块下载流程示意图如下所示:

首先根据Synchronise开始区块同步,通过findAncestor找到指定节点的共同祖先,并在此高度进行同步,同时开启多个goroutine同步不同的数据:header、receipt、body,假如同步高度为100的区块,必须先header同步成功同步完成才可以进行body和receipts的同步,而每个部分的同步大致都是由FetchParts来完成的,里面包含了各个Chan的配合,也会涉及不少的回调函数

源码分析
数据结构

downloader数据结构如下所示:

代码语言:javascript
复制
// filedir:go-ethereum-1.10.2\eth\downloader\downloader.go  L96
type Downloader struct {
  // WARNING: The `rttEstimate` and `rttConfidence` fields are accessed atomically.
  // On 32 bit platforms, only 64-bit aligned fields can be atomic. The struct is
  // guaranteed to be so aligned, so take advantage of that. For more information,
  // see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
  rttEstimate   uint64 // Round trip time to target for download requests
  rttConfidence uint64 // Confidence in the estimated RTT (unit: millionths to allow atomic ops)

  mode uint32         // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
  mux  *event.TypeMux // Event multiplexer to announce sync operation events

  checkpoint uint64   // Checkpoint block number to enforce head against (e.g. fast sync)
  genesis    uint64   // Genesis block number to limit sync to (e.g. light client CHT)
  queue      *queue   // Scheduler for selecting the hashes to download
  peers      *peerSet // Set of active peers from which download can proceed

  stateDB    ethdb.Database  // Database to state sync into (and deduplicate via)
  stateBloom *trie.SyncBloom // Bloom filter for fast trie node and contract code existence checks

  // Statistics  统计信息,
  syncStatsChainOrigin uint64 // Origin block number where syncing started at
  syncStatsChainHeight uint64 // Highest block number known when syncing started
  syncStatsState       stateSyncStats
  syncStatsLock        sync.RWMutex // Lock protecting the sync stats fields

  lightchain LightChain
  blockchain BlockChain

  // Callbacks
  dropPeer peerDropFn // Drops a peer for misbehaving

  // Status
  synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
  synchronising   int32
  notified        int32
  committed       int32
  ancientLimit    uint64 // The maximum block number which can be regarded as ancient data.

  // Channels
  headerCh      chan dataPack        // Channel receiving inbound block headers  header的输入通道,从网络下载的header会被送到这个通道
  bodyCh        chan dataPack        // Channel receiving inbound block bodies   bodies的输入通道,从网络下载的bodies会被送到这个通道
  receiptCh     chan dataPack        // Channel receiving inbound receipts       receipts的输入通道,从网络下载的receipts会被送到这个通道
  bodyWakeCh    chan bool            // Channel to signal the block body fetcher of new tasks   用来传输body fetcher新任务的通道
  receiptWakeCh chan bool            // Channel to signal the receipt fetcher of new tasks      用来传输receipt fetcher 新任务的通道
  headerProcCh  chan []*types.Header // Channel to feed the header processor new tasks          通道为header处理者提供新的任务

  // State sync
  pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root
  pivotLock   sync.RWMutex  // Lock protecting pivot header reads from updates

  snapSync       bool         // Whether to run state sync over the snap protocol
  SnapSyncer     *snap.Syncer // TODO(karalabe): make private! hack for now
  stateSyncStart chan *stateSync    //启动新的state fetcher
  trackStateReq  chan *stateReq
  stateCh        chan dataPack // Channel receiving inbound node state data  State的输入通道,从网络下载的State会被送到这个通道

  // Cancellation and termination
  cancelPeer string         // Identifier of the peer currently being used as the master (cancel on drop)
  cancelCh   chan struct{}  // Channel to cancel mid-flight syncs
  cancelLock sync.RWMutex   // Lock to protect the cancel channel and peer in delivers
  cancelWg   sync.WaitGroup // Make sure all fetcher goroutines have exited.

  quitCh   chan struct{} // Quit channel to signal termination
  quitLock sync.Mutex    // Lock to prevent double closes

  // Testing hooks
  syncInitHook     func(uint64, uint64)  // Method to call upon initiating a new sync run
  bodyFetchHook    func([]*types.Header) // Method to call upon starting a block body fetch
  receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
  chainInsertHook  func([]*fetchResult)  // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
}
构造方法

New用于初始化一个Downloader对象,具体代码如下所示:

代码语言:javascript
复制
// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
  if lightchain == nil {
    lightchain = chain
  }
  dl := &Downloader{
    stateDB:        stateDb,
    stateBloom:     stateBloom,
    mux:            mux,
    checkpoint:     checkpoint,
    queue:          newQueue(blockCacheMaxItems, blockCacheInitialItems),
    peers:          newPeerSet(),
    rttEstimate:    uint64(rttMaxEstimate),
    rttConfidence:  uint64(1000000),
    blockchain:     chain,
    lightchain:     lightchain,
    dropPeer:       dropPeer,
    headerCh:       make(chan dataPack, 1),
    bodyCh:         make(chan dataPack, 1),
    receiptCh:      make(chan dataPack, 1),
    bodyWakeCh:     make(chan bool, 1),
    receiptWakeCh:  make(chan bool, 1),
    headerProcCh:   make(chan []*types.Header, 1),
    quitCh:         make(chan struct{}),
    stateCh:        make(chan dataPack),
    SnapSyncer:     snap.NewSyncer(stateDb),
    stateSyncStart: make(chan *stateSync),
    syncStatsState: stateSyncStats{
      processed: rawdb.ReadFastTrieProgress(stateDb),
    },
    trackStateReq: make(chan *stateReq),
  }
  go dl.qosTuner()        //计算rttEstimate和rttConfidence
  go dl.stateFetcher()    //启动stateFetcher的任务监听
  return dl
}
同步下载

区块同步始于Synchronise函数,在这里会直接调用synchronise进行同步,如果同步过程中出现错误,则删除掉Peer:

代码语言:javascript
复制
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
  err := d.synchronise(id, head, td, mode)

  switch err {
  case nil, errBusy, errCanceled:
    return err
  }
  if errors.Is(err, errInvalidChain) || errors.Is(err, errBadPeer) || errors.Is(err, errTimeout) ||
    errors.Is(err, errStallingPeer) || errors.Is(err, errUnsyncedPeer) || errors.Is(err, errEmptyHeaderSet) ||
    errors.Is(err, errPeersUnavailable) || errors.Is(err, errTooOld) || errors.Is(err, errInvalidAncestor) {
    log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
    if d.dropPeer == nil {
      // The dropPeer method is nil when `--copydb` is used for a local copy.
      // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
      log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
    } else {
      d.dropPeer(id)
    }
    return err
  }
  log.Warn("Synchronisation failed, retrying", "err", err)
  return err
}

synchronise函数实现代码如下:

代码语言:javascript
复制
// synchronise will select the peer and use it for synchronising. If an empty string is given
// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the
// checks fail an error will be returned. This method is synchronous
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
  // Mock out the synchronisation if testing
  if d.synchroniseMock != nil {
    return d.synchroniseMock(id, hash)
  }
  // Make sure only one goroutine is ever allowed past this point at once    // 只能运行一个, 检查是否正在运行
  if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
    return errBusy
  }
  defer atomic.StoreInt32(&d.synchronising, 0)

  // Post a user notification of the sync (only once per session)   // 发布同步的用户通知(每个会话仅一次)
  if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
    log.Info("Block synchronisation started")
  }
  // If we are already full syncing, but have a fast-sync bloom filter laying
  // around, make sure it doesn't use memory any more. This is a special case
  // when the user attempts to fast sync a new empty network.
  if mode == FullSync && d.stateBloom != nil {
    d.stateBloom.Close()
  }
  // If snap sync was requested, create the snap scheduler and switch to fast
  // sync mode. Long term we could drop fast sync or merge the two together,
  // but until snap becomes prevalent, we should support both. TODO(karalabe).
  if mode == SnapSync {
    if !d.snapSync {
      log.Warn("Enabling snapshot sync prototype")
      d.snapSync = true
    }
    mode = FastSync
  }
  // Reset the queue, peer set and wake channels to clean any internal leftover state  
  d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems)  // 重置queue的状态
  d.peers.Reset()                       //  重置peer的状态

  for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {   // 清空d.bodyWakeCh, d.receiptWakeCh
    select {
    case <-ch:
    default:
    }
  }
  for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} {   //清空d.headerCh, d.bodyCh, d.receiptCh
    for empty := false; !empty; {
      select {
      case <-ch:
      default:
        empty = true
      }
    }
  }
  for empty := false; !empty; {    // 清空headerProcCh
    select {
    case <-d.headerProcCh:
    default:
      empty = true
    }
  }
  // Create cancel channel for aborting mid-flight and mark the master peer
  d.cancelLock.Lock()
  d.cancelCh = make(chan struct{})
  d.cancelPeer = id
  d.cancelLock.Unlock()

  defer d.Cancel() // No matter what, we can't leave the cancel channel open

  // Atomically set the requested sync mode
  atomic.StoreUint32(&d.mode, uint32(mode))

  // Retrieve the origin peer and initiate the downloading process
  p := d.peers.Peer(id)
  if p == nil {
    return errUnknownPeer
  }
  return d.syncWithPeer(p, hash, td)    // 基于哈希链从指定的peer和head hash开始块同步
}

syncWithPeer函数代码如下所示:

代码语言:javascript
复制
// filedir:go-ethereum-1.10.2\eth\downloader\downloader.go  L448
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
  d.mux.Post(StartEvent{})
  defer func() {
    // reset on error
    if err != nil {
      d.mux.Post(FailedEvent{err})
    } else {
      latest := d.lightchain.CurrentHeader()
      d.mux.Post(DoneEvent{latest})
    }
  }()
  if p.version < 64 {
    return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, 64)
  }
  mode := d.getMode()

  log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode)
  defer func(start time.Time) {
    log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start)))
  }(time.Now())

  // Look up the sync boundaries: the common ancestor and the target block
  latest, pivot, err := d.fetchHead(p)   
  if err != nil {
    return err
  }
  if mode == FastSync && pivot == nil {
    // If no pivot block was returned, the head is below the min full block
    // threshold (i.e. new chian). In that case we won't really fast sync
    // anyway, but still need a valid pivot block to avoid some code hitting
    // nil panics on an access.
    pivot = d.blockchain.CurrentBlock().Header()
  }
  height := latest.Number.Uint64()

  origin, err := d.findAncestor(p, latest)  // 通过findAncestor来获取共同祖先,以便找到一个开始同步的点
  if err != nil {
    return err
  }
  d.syncStatsLock.Lock()
  if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
    d.syncStatsChainOrigin = origin
  }
  d.syncStatsChainHeight = height
  d.syncStatsLock.Unlock()

  // Ensure our origin point is below any fast sync pivot point
  if mode == FastSync {
    if height <= uint64(fsMinFullBlocks) {      // 如果对端节点的height小于64,则共同祖先更新为0
      origin = 0
    } else {                   // 否则更新pivot为对端节点height-64
      pivotNumber := pivot.Number.Uint64()
      if pivotNumber <= origin {             // 如果pivot小于共同祖先,则更新共同祖先为pivot的前一个
        origin = pivotNumber - 1
      }
      // Write out the pivot into the database so a rollback beyond it will
      // reenable fast sync
      rawdb.WriteLastPivotNumber(d.stateDB, pivotNumber)
    }
  }
  d.committed = 1
  if mode == FastSync && pivot.Number.Uint64() != 0 {
    d.committed = 0
  }
  if mode == FastSync {
    // Set the ancient data limitation.
    // If we are running fast sync, all block data older than ancientLimit will be
    // written to the ancient store. More recent data will be written to the active
    // database and will wait for the freezer to migrate.
    //
    // If there is a checkpoint available, then calculate the ancientLimit through
    // that. Otherwise calculate the ancient limit through the advertised height
    // of the remote peer.
    //
    // The reason for picking checkpoint first is that a malicious peer can give us
    // a fake (very high) height, forcing the ancient limit to also be very high.
    // The peer would start to feed us valid blocks until head, resulting in all of
    // the blocks might be written into the ancient store. A following mini-reorg
    // could cause issues.
    if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {
      d.ancientLimit = d.checkpoint
    } else if height > fullMaxForkAncestry+1 {
      d.ancientLimit = height - fullMaxForkAncestry - 1
    } else {
      d.ancientLimit = 0
    }
    frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.

    // If a part of blockchain data has already been written into active store,
    // disable the ancient style insertion explicitly.
    if origin >= frozen && frozen != 0 {
      d.ancientLimit = 0
      log.Info("Disabling direct-ancient mode", "origin", origin, "ancient", frozen-1)
    } else if d.ancientLimit > 0 {
      log.Debug("Enabling direct-ancient mode", "ancient", d.ancientLimit)
    }
    // Rewind the ancient store and blockchain if reorg happens.
    if origin+1 < frozen {
      if err := d.lightchain.SetHead(origin + 1); err != nil {
        return err
      }
    }
  }
  // Initiate the sync using a concurrent header and content retrieval algorithm
  d.queue.Prepare(origin+1, mode) // 更新queue的值从共同祖先+1开始,即从共同祖先开始sync区块
  if d.syncInitHook != nil {
    d.syncInitHook(origin, height)
  }
  fetchers := []func() error{
    func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
    func() error { return d.fetchBodies(origin + 1) },   // Bodies are retrieved during normal and fast sync
    func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
    func() error { return d.processHeaders(origin+1, td) },
  }
  if mode == FastSync {        //根据模式的不同,增加新的处理逻辑
    d.pivotLock.Lock()
    d.pivotHeader = pivot
    d.pivotLock.Unlock()

    fetchers = append(fetchers, func() error { return d.processFastSyncContent() })
  } else if mode == FullSync {
    fetchers = append(fetchers, d.processFullSyncContent)
  }
  return d.spawnSync(fetchers)
}

spawnSync会给每个fetcher启动一个goroutine, 然后阻塞的等待fetcher出错:

代码语言:javascript
复制
// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
func (d *Downloader) spawnSync(fetchers []func() error) error {
  errc := make(chan error, len(fetchers))
  d.cancelWg.Add(len(fetchers))
  for _, fn := range fetchers {
    fn := fn
    go func() { defer d.cancelWg.Done(); errc <- fn() }()
  }
  // Wait for the first error, then terminate the others.
  var err error
  for i := 0; i < len(fetchers); i++ {
    if i == len(fetchers)-1 {
      // Close the queue when all fetchers have exited.
      // This will cause the block processor to end when
      // it has processed the queue.
      d.queue.Close()
    }
    if err = <-errc; err != nil && err != errCanceled {
      break
    }
  }
  d.queue.Close()
  d.Cancel()
  return err
}
同步State

state即世界状态,其保存着所有账户的余额等信息

代码语言:javascript
复制
// filedir: go-ethereum-1.10.2\eth\downloader\statesync.go
// stateFetcher manages the active state sync and accepts requests
// on its behalf.
func (d *Downloader) stateFetcher() {
  for {
    select {
    case s := <-d.stateSyncStart:
      for next := s; next != nil; {
        next = d.runStateSync(next)
      }
    case <-d.stateCh:
      // Ignore state responses while no sync is running.
    case <-d.quitCh:
      return
    }
  }
}

runStateSync函数执行状态同步,直到它完成或请求切换到另一个根哈希:

代码语言:javascript
复制
// runStateSync runs a state synchronisation until it completes or another root
// hash is requested to be switched over to.
func (d *Downloader) runStateSync(s *stateSync) *stateSync {
  var (
    active   = make(map[string]*stateReq) // Currently in-flight requests
    finished []*stateReq                  // Completed or failed requests
    timeout  = make(chan *stateReq)       // Timed out active requests
  )
  log.Trace("State sync starting", "root", s.root)

  defer func() {
    // Cancel active request timers on exit. Also set peers to idle so they're
    // available for the next sync.
    for _, req := range active {
      req.timer.Stop()
      req.peer.SetNodeDataIdle(int(req.nItems), time.Now())
    }
  }()
  go s.run()
  defer s.Cancel()

  // Listen for peer departure events to cancel assigned tasks
  peerDrop := make(chan *peerConnection, 1024)
  peerSub := s.d.peers.SubscribePeerDrops(peerDrop)
  defer peerSub.Unsubscribe()

  for {
    // Enable sending of the first buffered element if there is one.
    var (
      deliverReq   *stateReq
      deliverReqCh chan *stateReq
    )
    if len(finished) > 0 {
      deliverReq = finished[0]
      deliverReqCh = s.deliver
    }

    select {
    // The stateSync lifecycle:
    case next := <-d.stateSyncStart:
      d.spindownStateSync(active, finished, timeout, peerDrop)
      return next

    case <-s.done:
      d.spindownStateSync(active, finished, timeout, peerDrop)
      return nil

    // Send the next finished request to the current sync:
    case deliverReqCh <- deliverReq:
      // Shift out the first request, but also set the emptied slot to nil for GC
      copy(finished, finished[1:])
      finished[len(finished)-1] = nil
      finished = finished[:len(finished)-1]

    // Handle incoming state packs:
    case pack := <-d.stateCh:
      // Discard any data not requested (or previously timed out)
      req := active[pack.PeerId()]
      if req == nil {
        log.Debug("Unrequested node data", "peer", pack.PeerId(), "len", pack.Items())
        continue
      }
      // Finalize the request and queue up for processing
      req.timer.Stop()
      req.response = pack.(*statePack).states
      req.delivered = time.Now()

      finished = append(finished, req)
      delete(active, pack.PeerId())

    // Handle dropped peer connections:
    case p := <-peerDrop:
      // Skip if no request is currently pending
      req := active[p.id]
      if req == nil {
        continue
      }
      // Finalize the request and queue up for processing
      req.timer.Stop()
      req.dropped = true
      req.delivered = time.Now()

      finished = append(finished, req)
      delete(active, p.id)

    // Handle timed-out requests:
    case req := <-timeout:
      // If the peer is already requesting something else, ignore the stale timeout.
      // This can happen when the timeout and the delivery happens simultaneously,
      // causing both pathways to trigger.
      if active[req.peer.id] != req {
        continue
      }
      req.delivered = time.Now()
      // Move the timed out data back into the download queue
      finished = append(finished, req)
      delete(active, req.peer.id)

    // Track outgoing state requests:
    case req := <-d.trackStateReq:
      // If an active request already exists for this peer, we have a problem. In
      // theory the trie node schedule must never assign two requests to the same
      // peer. In practice however, a peer might receive a request, disconnect and
      // immediately reconnect before the previous times out. In this case the first
      // request is never honored, alas we must not silently overwrite it, as that
      // causes valid requests to go missing and sync to get stuck.
      if old := active[req.peer.id]; old != nil {
        log.Warn("Busy peer assigned new state fetch", "peer", old.peer.id)
        // Move the previous request to the finished set
        old.timer.Stop()
        old.dropped = true
        old.delivered = time.Now()
        finished = append(finished, old)
      }
      // Start a timer to notify the sync loop if the peer stalled.
      req.timer = time.AfterFunc(req.timeout, func() {
        timeout <- req
      })
      active[req.peer.id] = req
    }
  }
}
同步Head
代码语言:javascript
复制
// fetchHead retrieves the head header and prior pivot block (if available) from
// a remote peer.
func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *types.Header, err error) {
  p.log.Debug("Retrieving remote chain head")
  mode := d.getMode()

  // Request the advertised remote head block and wait for the response
  latest, _ := p.peer.Head()
  fetch := 1
  if mode == FastSync {
    fetch = 2 // head + pivot headers
  }
  go p.peer.RequestHeadersByHash(latest, fetch, fsMinFullBlocks-1, true)

  ttl := d.requestTTL()
  timeout := time.After(ttl)
  for {
    select {
    case <-d.cancelCh:
      return nil, nil, errCanceled

    case packet := <-d.headerCh:
      // Discard anything not from the origin peer
      if packet.PeerId() != p.id {
        log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
        break
      }
      // Make sure the peer gave us at least one and at most the requested headers
      headers := packet.(*headerPack).headers
      if len(headers) == 0 || len(headers) > fetch {
        return nil, nil, fmt.Errorf("%w: returned headers %d != requested %d", errBadPeer, len(headers), fetch)
      }
      // The first header needs to be the head, validate against the checkpoint
      // and request. If only 1 header was returned, make sure there's no pivot
      // or there was not one requested.
      head := headers[0]
      if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint {
        return nil, nil, fmt.Errorf("%w: remote head %d below checkpoint %d", errUnsyncedPeer, head.Number, d.checkpoint)
      }
      if len(headers) == 1 {
        if mode == FastSync && head.Number.Uint64() > uint64(fsMinFullBlocks) {
          return nil, nil, fmt.Errorf("%w: no pivot included along head header", errBadPeer)
        }
        p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", head.Hash())
        return head, nil, nil
      }
      // At this point we have 2 headers in total and the first is the
      // validated head of the chian. Check the pivot number and return,
      pivot := headers[1]
      if pivot.Number.Uint64() != head.Number.Uint64()-uint64(fsMinFullBlocks) {
        return nil, nil, fmt.Errorf("%w: remote pivot %d != requested %d", errInvalidChain, pivot.Number, head.Number.Uint64()-uint64(fsMinFullBlocks))
      }
      return head, pivot, nil

    case <-timeout:
      p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
      return nil, nil, errTimeout

    case <-d.bodyCh:
    case <-d.receiptCh:
      // Out of bounds delivery, ignore
    }
  }
}
处理Head
代码语言:javascript
复制
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
  // Keep a count of uncertain headers to roll back
  var (
    rollback    uint64 // Zero means no rollback (fine as you can't unroll the genesis)
    rollbackErr error
    mode        = d.getMode()
  )
  defer func() {
    if rollback > 0 {
      lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
      if mode != LightSync {
        lastFastBlock = d.blockchain.CurrentFastBlock().Number()
        lastBlock = d.blockchain.CurrentBlock().Number()
      }
      if err := d.lightchain.SetHead(rollback - 1); err != nil { // -1 to target the parent of the first uncertain block
        // We're already unwinding the stack, only print the error to make it more visible
        log.Error("Failed to roll back chain segment", "head", rollback-1, "err", err)
      }
      curFastBlock, curBlock := common.Big0, common.Big0
      if mode != LightSync {
        curFastBlock = d.blockchain.CurrentFastBlock().Number()
        curBlock = d.blockchain.CurrentBlock().Number()
      }
      log.Warn("Rolled back chain segment",
        "header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
        "fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
        "block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
    }
  }()
  // Wait for batches of headers to process
  gotHeaders := false

  for {
    select {
    case <-d.cancelCh:
      rollbackErr = errCanceled
      return errCanceled

    case headers := <-d.headerProcCh:
      // Terminate header processing if we synced up
      if len(headers) == 0 {
        // Notify everyone that headers are fully processed
        for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
          select {
          case ch <- false:
          case <-d.cancelCh:
          }
        }
        // If no headers were retrieved at all, the peer violated its TD promise that it had a
        // better chain compared to ours. The only exception is if its promised blocks were
        // already imported by other means (e.g. fetcher):
        //
        // R <remote peer>, L <local node>: Both at block 10
        // R: Mine block 11, and propagate it to L
        // L: Queue block 11 for import
        // L: Notice that R's head and TD increased compared to ours, start sync
        // L: Import of block 11 finishes
        // L: Sync begins, and finds common ancestor at 11
        // L: Request new headers up from 11 (R's TD was higher, it must have something)
        // R: Nothing to give
        if mode != LightSync {
          head := d.blockchain.CurrentBlock()
          if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {
            return errStallingPeer
          }
        }
        // If fast or light syncing, ensure promised headers are indeed delivered. This is
        // needed to detect scenarios where an attacker feeds a bad pivot and then bails out
        // of delivering the post-pivot blocks that would flag the invalid content.
        //
        // This check cannot be executed "as is" for full imports, since blocks may still be
        // queued for processing when the header download completes. However, as long as the
        // peer gave us something useful, we're already happy/progressed (above check).
        if mode == FastSync || mode == LightSync {
          head := d.lightchain.CurrentHeader()
          if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
            return errStallingPeer
          }
        }
        // Disable any rollback and return
        rollback = 0
        return nil
      }
      // Otherwise split the chunk of headers into batches and process them
      gotHeaders = true
      for len(headers) > 0 {
        // Terminate if something failed in between processing chunks
        select {
        case <-d.cancelCh:
          rollbackErr = errCanceled
          return errCanceled
        default:
        }
        // Select the next chunk of headers to import
        limit := maxHeadersProcess
        if limit > len(headers) {
          limit = len(headers)
        }
        chunk := headers[:limit]

        // In case of header only syncing, validate the chunk immediately
        if mode == FastSync || mode == LightSync {
          // If we're importing pure headers, verify based on their recentness
          var pivot uint64

          d.pivotLock.RLock()
          if d.pivotHeader != nil {
            pivot = d.pivotHeader.Number.Uint64()
          }
          d.pivotLock.RUnlock()

          frequency := fsHeaderCheckFrequency
          if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
            frequency = 1
          }
          if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
            rollbackErr = err

            // If some headers were inserted, track them as uncertain
            if (mode == FastSync || frequency > 1) && n > 0 && rollback == 0 {
              rollback = chunk[0].Number.Uint64()
            }
            log.Warn("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err)
            return fmt.Errorf("%w: %v", errInvalidChain, err)
          }
          // All verifications passed, track all headers within the alloted limits
          if mode == FastSync {
            head := chunk[len(chunk)-1].Number.Uint64()
            if head-rollback > uint64(fsHeaderSafetyNet) {
              rollback = head - uint64(fsHeaderSafetyNet)
            } else {
              rollback = 1
            }
          }
        }
        // Unless we're doing light chains, schedule the headers for associated content retrieval
        if mode == FullSync || mode == FastSync {
          // If we've reached the allowed number of pending headers, stall a bit
          for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
            select {
            case <-d.cancelCh:
              rollbackErr = errCanceled
              return errCanceled
            case <-time.After(time.Second):
            }
          }
          // Otherwise insert the headers for content retrieval
          inserts := d.queue.Schedule(chunk, origin)
          if len(inserts) != len(chunk) {
            rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk))
            return fmt.Errorf("%w: stale headers", errBadPeer)
          }
        }
        headers = headers[limit:]
        origin += uint64(limit)
      }
      // Update the highest block number we know if a higher one is found.
      d.syncStatsLock.Lock()
      if d.syncStatsChainHeight < origin {
        d.syncStatsChainHeight = origin - 1
      }
      d.syncStatsLock.Unlock()

      // Signal the content downloaders of the availablility of new tasks
      for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
        select {
        case ch <- true:
        default:
        }
      }
    }
  }
}
同步Body
代码语言:javascript
复制
// fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchBodies(from uint64) error {
  log.Debug("Downloading block bodies", "origin", from)

  var (
    deliver = func(packet dataPack) (int, error) {
      pack := packet.(*bodyPack)
      return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles)
    }
    expire   = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) }
    fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
    capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) }
    setIdle  = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) }
  )
  err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
    d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ReserveBodies,
    d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")

  log.Debug("Block body download terminated", "err", err)
  return err
}
代码语言:javascript
复制
// DeliverBodies injects a new batch of block bodies received from a remote node.
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) error {
  return d.deliver(d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
}
代码语言:javascript
复制
// deliver injects a new batch of data received from a remote node.
func (d *Downloader) deliver(destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
  // Update the delivery metrics for both good and failed deliveries
  inMeter.Mark(int64(packet.Items()))
  defer func() {
    if err != nil {
      dropMeter.Mark(int64(packet.Items()))
    }
  }()
  // Deliver or abort if the sync is canceled while queuing
  d.cancelLock.RLock()
  cancel := d.cancelCh
  d.cancelLock.RUnlock()
  if cancel == nil {
    return errNoSyncActive
  }
  select {
  case destCh <- packet:
    return nil
  case <-cancel:
    return errNoSyncActive
  }
}
代码语言:javascript
复制
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
  expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
  fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
  idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {

  // Create a ticker to detect expired retrieval tasks
  ticker := time.NewTicker(100 * time.Millisecond)
  defer ticker.Stop()

  update := make(chan struct{}, 1)

  // Prepare the queue and fetch block parts until the block header fetcher's done
  finished := false
  for {
    select {
    case <-d.cancelCh:
      return errCanceled

    case packet := <-deliveryCh:
      deliveryTime := time.Now()
      // If the peer was previously banned and failed to deliver its pack
      // in a reasonable time frame, ignore its message.
      if peer := d.peers.Peer(packet.PeerId()); peer != nil {
        // Deliver the received chunk of data and check chain validity
        accepted, err := deliver(packet)
        if errors.Is(err, errInvalidChain) {
          return err
        }
        // Unless a peer delivered something completely else than requested (usually
        // caused by a timed out request which came through in the end), set it to
        // idle. If the delivery's stale, the peer should have already been idled.
        if !errors.Is(err, errStaleDelivery) {
          setIdle(peer, accepted, deliveryTime)
        }
        // Issue a log to the user to see what's going on
        switch {
        case err == nil && packet.Items() == 0:
          peer.log.Trace("Requested data not delivered", "type", kind)
        case err == nil:
          peer.log.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats())
        default:
          peer.log.Debug("Failed to deliver retrieved data", "type", kind, "err", err)
        }
      }
      // Blocks assembled, try to update the progress
      select {
      case update <- struct{}{}:
      default:
      }

    case cont := <-wakeCh:
      // The header fetcher sent a continuation flag, check if it's done
      if !cont {
        finished = true
      }
      // Headers arrive, try to update the progress
      select {
      case update <- struct{}{}:
      default:
      }

    case <-ticker.C:
      // Sanity check update the progress
      select {
      case update <- struct{}{}:
      default:
      }

    case <-update:
      // Short circuit if we lost all our peers
      if d.peers.Len() == 0 {
        return errNoPeers
      }
      // Check for fetch request timeouts and demote the responsible peers
      for pid, fails := range expire() {
        if peer := d.peers.Peer(pid); peer != nil {
          // If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps
          // ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times
          // out that sync wise we need to get rid of the peer.
          //
          // The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth
          // and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
          // how response times reacts, to it always requests one more than the minimum (i.e. min 2).
          if fails > 2 {
            peer.log.Trace("Data delivery timed out", "type", kind)
            setIdle(peer, 0, time.Now())
          } else {
            peer.log.Debug("Stalling delivery, dropping", "type", kind)

            if d.dropPeer == nil {
              // The dropPeer method is nil when `--copydb` is used for a local copy.
              // Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
              peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid)
            } else {
              d.dropPeer(pid)

              // If this peer was the master peer, abort sync immediately
              d.cancelLock.RLock()
              master := pid == d.cancelPeer
              d.cancelLock.RUnlock()

              if master {
                d.cancel()
                return errTimeout
              }
            }
          }
        }
      }
      // If there's nothing more to fetch, wait or terminate
      if pending() == 0 {
        if !inFlight() && finished {
          log.Debug("Data fetching completed", "type", kind)
          return nil
        }
        break
      }
      // Send a download request to all idle peers, until throttled
      progressed, throttled, running := false, false, inFlight()
      idles, total := idle()
      pendCount := pending()
      for _, peer := range idles {
        // Short circuit if throttling activated
        if throttled {
          break
        }
        // Short circuit if there is no more available task.
        if pendCount = pending(); pendCount == 0 {
          break
        }
        // Reserve a chunk of fetches for a peer. A nil can mean either that
        // no more headers are available, or that the peer is known not to
        // have them.
        request, progress, throttle := reserve(peer, capacity(peer))
        if progress {
          progressed = true
        }
        if throttle {
          throttled = true
          throttleCounter.Inc(1)
        }
        if request == nil {
          continue
        }
        if request.From > 0 {
          peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From)
        } else {
          peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
        }
        // Fetch the chunk and make sure any errors return the hashes to the queue
        if fetchHook != nil {
          fetchHook(request.Headers)
        }
        if err := fetch(peer, request); err != nil {
          // Although we could try and make an attempt to fix this, this error really
          // means that we've double allocated a fetch task to a peer. If that is the
          // case, the internal state of the downloader and the queue is very wrong so
          // better hard crash and note the error instead of silently accumulating into
          // a much bigger issue.
          panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, kind))
        }
        running = true
      }
      // Make sure that we have peers available for fetching. If all peers have been tried
      // and all failed throw an error
      if !progressed && !throttled && !running && len(idles) == total && pendCount > 0 {
        return errPeersUnavailable
      }
    }
  }
}
同步收据
代码语言:javascript
复制
// fetchReceipts iteratively downloads the scheduled block receipts, taking any
// available peers, reserving a chunk of receipts for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchReceipts(from uint64) error {
  log.Debug("Downloading transaction receipts", "origin", from)

  var (
    deliver = func(packet dataPack) (int, error) {
      pack := packet.(*receiptPack)
      return d.queue.DeliverReceipts(pack.peerID, pack.receipts)
    }
    expire   = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) }
    fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
    capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) }
    setIdle  = func(p *peerConnection, accepted int, deliveryTime time.Time) {
      p.SetReceiptsIdle(accepted, deliveryTime)
    }
  )
  err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,
    d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ReserveReceipts,
    d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")

  log.Debug("Transaction receipt download terminated", "err", err)
  return err
}
代码语言:javascript
复制
// DeliverReceipts injects a new batch of receipts received from a remote node.
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) error {
  return d.deliver(d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
}
代码语言:javascript
复制
// deliver injects a new batch of data received from a remote node.
func (d *Downloader) deliver(destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
  // Update the delivery metrics for both good and failed deliveries
  inMeter.Mark(int64(packet.Items()))
  defer func() {
    if err != nil {
      dropMeter.Mark(int64(packet.Items()))
    }
  }()
  // Deliver or abort if the sync is canceled while queuing
  d.cancelLock.RLock()
  cancel := d.cancelCh
  d.cancelLock.RUnlock()
  if cancel == nil {
    return errNoSyncActive
  }
  select {
  case destCh <- packet:
    return nil
  case <-cancel:
    return errNoSyncActive
  }
}

Content

代码语言:javascript
复制
// processFullSyncContent takes fetch results from the queue and imports them into the chain.
func (d *Downloader) processFullSyncContent() error {
  for {
    results := d.queue.Results(true)
    if len(results) == 0 {
      return nil
    }
    if d.chainInsertHook != nil {
      d.chainInsertHook(results)
    }
    if err := d.importBlockResults(results); err != nil {
      return err
    }
  }
}
代码语言:javascript
复制
// processFastSyncContent takes fetch results from the queue and writes them to the
// database. It also controls the synchronisation of state nodes of the pivot block.
func (d *Downloader) processFastSyncContent() error {
  // Start syncing state of the reported head block. This should get us most of
  // the state of the pivot block.
  d.pivotLock.RLock()
  sync := d.syncState(d.pivotHeader.Root)
  d.pivotLock.RUnlock()

  defer func() {
    // The `sync` object is replaced every time the pivot moves. We need to
    // defer close the very last active one, hence the lazy evaluation vs.
    // calling defer sync.Cancel() !!!
    sync.Cancel()
  }()

  closeOnErr := func(s *stateSync) {
    if err := s.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled && err != snap.ErrCancelled {
      d.queue.Close() // wake up Results
    }
  }
  go closeOnErr(sync)

  // To cater for moving pivot points, track the pivot block and subsequently
  // accumulated download results separately.
  var (
    oldPivot *fetchResult   // Locked in pivot block, might change eventually
    oldTail  []*fetchResult // Downloaded content after the pivot
  )
  for {
    // Wait for the next batch of downloaded data to be available, and if the pivot
    // block became stale, move the goalpost
    results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness
    if len(results) == 0 {
      // If pivot sync is done, stop
      if oldPivot == nil {
        return sync.Cancel()
      }
      // If sync failed, stop
      select {
      case <-d.cancelCh:
        sync.Cancel()
        return errCanceled
      default:
      }
    }
    if d.chainInsertHook != nil {
      d.chainInsertHook(results)
    }
    // If we haven't downloaded the pivot block yet, check pivot staleness
    // notifications from the header downloader
    d.pivotLock.RLock()
    pivot := d.pivotHeader
    d.pivotLock.RUnlock()

    if oldPivot == nil {
      if pivot.Root != sync.root {
        sync.Cancel()
        sync = d.syncState(pivot.Root)

        go closeOnErr(sync)
      }
    } else {
      results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
    }
    // Split around the pivot block and process the two sides via fast/full sync
    if atomic.LoadInt32(&d.committed) == 0 {
      latest := results[len(results)-1].Header
      // If the height is above the pivot block by 2 sets, it means the pivot
      // become stale in the network and it was garbage collected, move to a
      // new pivot.
      //
      // Note, we have `reorgProtHeaderDelay` number of blocks withheld, Those
      // need to be taken into account, otherwise we're detecting the pivot move
      // late and will drop peers due to unavailable state!!!
      if height := latest.Number.Uint64(); height >= pivot.Number.Uint64()+2*uint64(fsMinFullBlocks)-uint64(reorgProtHeaderDelay) {
        log.Warn("Pivot became stale, moving", "old", pivot.Number.Uint64(), "new", height-uint64(fsMinFullBlocks)+uint64(reorgProtHeaderDelay))
        pivot = results[len(results)-1-fsMinFullBlocks+reorgProtHeaderDelay].Header // must exist as lower old pivot is uncommitted

        d.pivotLock.Lock()
        d.pivotHeader = pivot
        d.pivotLock.Unlock()

        // Write out the pivot into the database so a rollback beyond it will
        // reenable fast sync
        rawdb.WriteLastPivotNumber(d.stateDB, pivot.Number.Uint64())
      }
    }
    P, beforeP, afterP := splitAroundPivot(pivot.Number.Uint64(), results)
    if err := d.commitFastSyncData(beforeP, sync); err != nil {
      return err
    }
    if P != nil {
      // If new pivot block found, cancel old state retrieval and restart
      if oldPivot != P {
        sync.Cancel()
        sync = d.syncState(P.Header.Root)

        go closeOnErr(sync)
        oldPivot = P
      }
      // Wait for completion, occasionally checking for pivot staleness
      select {
      case <-sync.done:
        if sync.err != nil {
          return sync.err
        }
        if err := d.commitPivotBlock(P); err != nil {
          return err
        }
        oldPivot = nil

      case <-time.After(time.Second):
        oldTail = afterP
        continue
      }
    }
    // Fast sync done, pivot commit done, full import
    if err := d.importBlockResults(afterP); err != nil {
      return err
    }
  }
}
参考链接

https://www.jianshu.com/p/427fbc3a25f9

https://blog.csdn.net/pulong0748/article/details/111574388

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-06-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 七芒星实验室 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 同步模式
  • 区块下载
  • 源码分析
    • 数据结构
      • 构造方法
        • 同步下载
          • 同步State
            • 同步Head
              • 处理Head
                • 同步Body
                  • 同步收据
                  • 参考链接
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档