前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Tendermint 启动流程

Tendermint 启动流程

作者头像
潇洒
发布2023-10-23 08:55:09
1630
发布2023-10-23 08:55:09
举报
文章被收录于专栏:石头岛

启动流程

Tendermint 的启动流程比较清析明了,各业务启动流程都在对应的实现代码,主启动流程加载所需配置,由各启动实现类启动自己对应业务,如节点启动相关在 nodeImpl,共识相关处理是 state 中进行处理。

流程大致:

  1. 加载配置 node.NewDefault
  2. 启动运行 Start
  3. 启动相关实现的 OnStart

先看启动流程

服务启动
服务启动

启动入口代码,这里使用到了一个命令行工具:cobra。 代码位置:cmd/tendermint/main.go

代码语言:javascript
复制
func main() {
    ...省略部份代码
	// NOTE:
	// Users wishing to:
	//	* Use an external signer for their validators
	//	* Supply an in-proc abci app
	//	* Supply a genesis doc file from another source
	//	* Provide their own DB implementation
	// can copy this file and use something other than the
	// node.NewDefault function
    // 创建节点为默认动行节点,这里是函数引用,并未执行
    // 在 cmd.NewRunNodeCmd 调用
	nodeFunc := node.NewDefault

	// Create & start node
    // 主要方法 cmd.NewRunNodeCmd
	rootCmd.AddCommand(cmd.NewRunNodeCmd(nodeFunc))

	cmd := cli.PrepareBaseCmd(rootCmd, "TM", os.ExpandEnv(filepath.Join("$HOME", config.DefaultTendermintDir)))
	if err := cmd.Execute(); err != nil {
		panic(err)
	}
}

创建运行节点

代码语言:javascript
复制
// NewRunNodeCmd returns the command that allows the CLI to start a node.
// It can be used with a custom PrivValidator and in-process ABCI application.
func NewRunNodeCmd(nodeProvider cfg.ServiceProvider) *cobra.Command {
	// 添加到命令行
	cmd := &cobra.Command{
		Use:     "start",
		Aliases: []string{"node", "run"},
		Short:   "Run the tendermint node",
		RunE: func(cmd *cobra.Command, args []string) error {
			if err := checkGenesisHash(config); err != nil {
				return err
			}
			// 这里调用 node.NewDefault 这个方法实现
            // 创建了节点
			n, err := nodeProvider(config, logger)
			if err != nil {
				return fmt.Errorf("failed to create node: %w", err)
			}
			// 启动服务
			if err := n.Start(); err != nil {
				return fmt.Errorf("failed to start node: %w", err)
			}

			logger.Info("started node", "node", n.String())

			// Stop upon receiving SIGTERM or CTRL-C.
			tmos.TrapSignal(logger, func() {
				if n.IsRunning() {
					if err := n.Stop(); err != nil {
						logger.Error("unable to stop the node", "error", err)
					}
				}
			})

			// Run forever.
			select {}
		},
	}

	AddNodeFlags(cmd)
	return cmd
}

启动需的默认配置一目了然

代码语言:javascript
复制
// DefaultConfig returns a default configuration for a Tendermint node
func DefaultConfig() *Config {
	return &Config{
		BaseConfig:      DefaultBaseConfig(),
		RPC:             DefaultRPCConfig(),
		P2P:             DefaultP2PConfig(),
		Mempool:         DefaultMempoolConfig(),
		StateSync:       DefaultStateSyncConfig(),
		BlockSync:       DefaultBlockSyncConfig(),
		Consensus:       DefaultConsensusConfig(),
		TxIndex:         DefaultTxIndexConfig(),
		Instrumentation: DefaultInstrumentationConfig(),
		PrivValidator:   DefaultPrivValidatorConfig(),
	}
}

启动服务

启动服务接口 Service 主要实现类是BaseService

service.go

代码语言:javascript
复制
// Service defines a service that can be started, stopped, and reset.
type Service interface {
	// Start the service.
	// If it's already started or stopped, will return an error.
	// If OnStart() returns an error, it's returned by Start()
	Start() error
	OnStart() error
}

启动 node

代码语言:javascript
复制
// Start implements Service by calling OnStart (if defined). An error will be
// returned if the service is already running or stopped. Not to start the
// stopped service, you need to call Reset.
func (bs *BaseService) Start() error {
	if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
		if atomic.LoadUint32(&bs.stopped) == 1 {
			bs.Logger.Error("not starting service; already stopped", "service", bs.name, "impl", bs.impl.String())
			atomic.StoreUint32(&bs.started, 0)
			return ErrAlreadyStopped
		}

		bs.Logger.Info("starting service", "service", bs.name, "impl", bs.impl.String())
        // 启动节点。BaseService 有很多实,都实现 OnStart。
        // 服务启动是:node.go OnStart
        // 共识启动是: state.go OnStart
		if err := bs.impl.OnStart(); err != nil {
			// revert flag
			atomic.StoreUint32(&bs.started, 0)
			return err
		}
		return nil
	}

	bs.Logger.Debug("not starting service; already started", "service", bs.name, "impl", bs.impl.String())
	return ErrAlreadyStarted
}

nodeImpl 实现启动流程,总的来说还是比较清晰。

代码语言:javascript
复制

// OnStart starts the Node. It implements service.Service.
func (n *nodeImpl) OnStart() error {
	now := tmtime.Now()
	genTime := n.genesisDoc.GenesisTime
	if genTime.After(now) {
		n.Logger.Info("Genesis time is in the future. Sleeping until then...", "genTime", genTime)
		time.Sleep(genTime.Sub(now))
	}

	// Start the RPC server before the P2P server
	// so we can eg. receive txs for the first block
	// 这里顺带说下,tendermint 的3种节点为类型
	// 	ModeFull      = "full" 数据转发节点
	//	ModeValidator = "validator"  数据验证节点
	//	ModeSeed      = "seed"   用来做节点发现
	if n.config.RPC.ListenAddress != "" && n.config.Mode != config.ModeSeed {
		// 启动 RPC
		listeners, err := n.startRPC()
		if err != nil {
			return err
		}
		n.rpcListeners = listeners
	}

	if n.config.Instrumentation.Prometheus &&
		n.config.Instrumentation.PrometheusListenAddr != "" {
		n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
	}

	// Start the transport.
	addr, err := types.NewNetAddressString(n.nodeKey.ID.AddressString(n.config.P2P.ListenAddress))
	if err != nil {
		return err
	}
	if err := n.transport.Listen(p2p.NewEndpoint(addr)); err != nil {
		return err
	}

	n.isListening = true

	// p2p 路由
	if err = n.router.Start(); err != nil {
		return err
	}

	if n.config.Mode != config.ModeSeed {
		if n.config.BlockSync.Enable {
			// 开启区块同步
			if err := n.bcReactor.Start(); err != nil {
				return err
			}
		}

		// Start the real consensus reactor separately since the switch uses the shim.
		if err := n.consensusReactor.Start(); err != nil {
			return err
		}

		// Start the real state sync reactor separately since the switch uses the shim.
		if err := n.stateSyncReactor.Start(); err != nil {
			return err
		}

		// Start the real mempool reactor separately since the switch uses the shim.
		if err := n.mempoolReactor.Start(); err != nil {
			return err
		}

		// Start the real evidence reactor separately since the switch uses the shim.
		if err := n.evidenceReactor.Start(); err != nil {
			return err
		}
	}

	if err := n.pexReactor.Start(); err != nil {
		return err
	}

	// Run state sync
	// TODO: We shouldn't run state sync if we already have state that has a
	// LastBlockHeight that is not InitialHeight
	if n.stateSync {
		bcR, ok := n.bcReactor.(consensus.BlockSyncReactor)
		if !ok {
			return fmt.Errorf("this blockchain reactor does not support switching from state sync")
		}

		// we need to get the genesis state to get parameters such as
		state, err := sm.MakeGenesisState(n.genesisDoc)
		if err != nil {
			return fmt.Errorf("unable to derive state: %w", err)
		}

		// TODO: we may want to move these events within the respective
		// reactors.
		// At the beginning of the statesync start, we use the initialHeight as the event height
		// because of the statesync doesn't have the concreate state height before fetched the snapshot.
		d := types.EventDataStateSyncStatus{Complete: false, Height: state.InitialHeight}
		if err := n.eventBus.PublishEventStateSyncStatus(d); err != nil {
			n.eventBus.Logger.Error("failed to emit the statesync start event", "err", err)
		}

		// FIXME: We shouldn't allow state sync to silently error out without
		// bubbling up the error and gracefully shutting down the rest of the node
		go func() {
			n.Logger.Info("starting state sync")
			state, err := n.stateSyncReactor.Sync(context.TODO())
			if err != nil {
				n.Logger.Error("state sync failed; shutting down this node", "err", err)
				// stop the node
				if err := n.Stop(); err != nil {
					n.Logger.Error("failed to shut down node", "err", err)
				}
				return
			}

			n.consensusReactor.SetStateSyncingMetrics(0)

			d := types.EventDataStateSyncStatus{Complete: true, Height: state.LastBlockHeight}
			if err := n.eventBus.PublishEventStateSyncStatus(d); err != nil {
				n.eventBus.Logger.Error("failed to emit the statesync start event", "err", err)
			}

			// TODO: Some form of orchestrator is needed here between the state
			// advancing reactors to be able to control which one of the three
			// is running
			if n.config.BlockSync.Enable {
				// FIXME Very ugly to have these metrics bleed through here.
				n.consensusReactor.SetBlockSyncingMetrics(1)
				if err := bcR.SwitchToBlockSync(state); err != nil {
					n.Logger.Error("failed to switch to block sync", "err", err)
					return
				}

				d := types.EventDataBlockSyncStatus{Complete: false, Height: state.LastBlockHeight}
				if err := n.eventBus.PublishEventBlockSyncStatus(d); err != nil {
					n.eventBus.Logger.Error("failed to emit the block sync starting event", "err", err)
				}

			} else {
				n.consensusReactor.SwitchToConsensus(state, true)
			}
		}()
	}

	return nil
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-11-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 启动流程
    • 创建运行节点
      • 启动服务
        • 启动 node
        相关产品与服务
        Prometheus 监控服务
        Prometheus 监控服务(TencentCloud Managed Service for Prometheus,TMP)是基于开源 Prometheus 构建的高可用、全托管的服务,与腾讯云容器服务(TKE)高度集成,兼容开源生态丰富多样的应用组件,结合腾讯云可观测平台-告警管理和 Prometheus Alertmanager 能力,为您提供免搭建的高效运维能力,减少开发及运维成本。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档