Substrate源码分析:启动流程

我们在命令行启动 substrate 节点,到底发生了什么呢?本文基于 substrate 源码,对其启动流程进行了简单的分析。

命令行启动 substrate,主要是解析命令行参数并配置服务。

主程序在substrate/node/main.rs中,入口是main()函数。其中的关键代码如下:

fn main() {
    ...
    if let Err(e) = cli::run(::std::env::args(), Exit, version) {
        eprintln!("Error starting the node: {}\n\n{:?}", e, e);
        std::process::exit(1)
    }
}

这行代码调用的是node/cli/src/lib.rsrun函数。进入该run函数,有如下代码:

pub fn run<I, T, E>(args: I, exit: E, version: cli::VersionInfo) -> error::Result<()> where
    I: IntoIterator<Item = T>,
    T: Into<std::ffi::OsString> + Clone,
    E: IntoExit,
{
    match parse_and_prepare::<CustomSubcommands, NoCustom, _>(&version, "substrate-node", args) {
        ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit, |exit, _cli_args, _custom_args, config| {
            info!("{}", version.name);
            ...
}

parse_and_prepare函数(位于core/cli/src/lib.rs中),这个函数类似于所有的区块链启动,主要是对命令行参数进行解析,并启动相关的操作。

parse_and_prepare函数中,会根据不同的参数,返回不同类型的ParseAndPrepare

pub fn parse_and_prepare<'a, CC, RP, I>(
    version: &'a VersionInfo,
    impl_name: &'static str,
    args: I,
) -> ParseAndPrepare<'a, CC, RP>
where
    CC: StructOpt + Clone + GetLogFilter,
    RP: StructOpt + Clone + AugmentClap,
    I: IntoIterator,
    <I as IntoIterator>::Item: Into<std::ffi::OsString> + Clone,
{
    ...
    let matches = CoreParams::<CC, RP>::clap()
        .name(version.executable_name)
        .author(version.author)
        .about(version.description)
        .version(&(full_version + "\n")[..])
        .setting(AppSettings::GlobalVersion)
        .setting(AppSettings::ArgsNegateSubcommands)
        .setting(AppSettings::SubcommandsNegateReqs)
        .get_matches_from(args);
    let cli_args = CoreParams::<CC, RP>::from_clap(&matches);
    ...
    match cli_args {
        params::CoreParams::Run(params) => ParseAndPrepare::Run(
            ParseAndPrepareRun { params, impl_name, version }
        ),
    ...
}

各种参数在core/cli/src/param.rs中有相关的定义,部分代码如下:

pub enum CoreParams<CC, RP> {
    /// Run a node.
    Run(MergeParameters<RunCmd, RP>),

    /// Build a spec.json file, outputing to stdout.
    BuildSpec(BuildSpecCmd),

    /// Export blocks to a file.
    ExportBlocks(ExportBlocksCmd),

    /// Import blocks from file.
    ImportBlocks(ImportBlocksCmd),

    /// Revert chain to the previous state.
    Revert(RevertCmd),

    /// Remove the whole chain data.
    PurgeChain(PurgeChainCmd),

    /// Further custom subcommands.
    Custom(CC),
}

而对于枚举类型ParseAndPrepare,每一类结构体,均会实现各自的run方法,解析参数生成配置,并根据配置运行服务。

pub enum ParseAndPrepare<'a, CC, RP> {
    /// Command ready to run the main client.
    Run(ParseAndPrepareRun<'a, RP>),
    /// Command ready to build chain specs.
    BuildSpec(ParseAndPrepareBuildSpec<'a>),
    /// Command ready to export the chain.
    ExportBlocks(ParseAndPrepareExport<'a>),
    /// Command ready to import the chain.
    ImportBlocks(ParseAndPrepareImport<'a>),
    /// Command ready to purge the chain.
    PurgeChain(ParseAndPreparePurge<'a>),
    /// Command ready to revert the chain.
    RevertChain(ParseAndPrepareRevert<'a>),
    /// An additional custom command passed to `parse_and_prepare`.
    CustomCommand(CC),
}

以结构体ParseAndPrepareRun为例,其run函数的实现代码如下:

impl<'a, RP> ParseAndPrepareRun<'a, RP> {
    /// Runs the command and runs the main client.
    pub fn run<C, G, S, E, RS>(
        self,
        spec_factory: S,
        exit: E,
        run_service: RS,
    ) -> error::Result<()>
    where S: FnOnce(&str) -> Result<Option<ChainSpec<G>>, String>,
        RP: StructOpt + Clone,
        C: Default,
        G: RuntimeGenesis,
        E: IntoExit,
        RS: FnOnce(E, RunCmd, RP, Configuration<C, G>) -> Result<(), String>
    {
        let config = create_run_node_config(self.params.left.clone(), spec_factory, self.impl_name, self.version)?;

        run_service(exit, self.params.left, self.params.right, config).map_err(Into::into)
    }
}

其实执行服务,具体是在run函数的闭包函数中,代码如下:

ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit, |exit, _cli_args, _custom_args, config| {
    info!("{}", version.name);
    info!("  version {}", config.full_version());
    info!("  by Parity Technologies, 2017-2019");
    info!("Chain specification: {}", config.chain_spec.name());
    info!("Node name: {}", config.name);
    info!("Roles: {:?}", config.roles);
    let runtime = RuntimeBuilder::new().name_prefix("main-tokio-").build()
        .map_err(|e| format!("{:?}", e))?;
    match config.roles {
        ServiceRoles::LIGHT => run_until_exit(
            runtime,
            service::Factory::new_light(config).map_err(|e| format!("{:?}", e))?,
            exit
        ),
        _ => run_until_exit(
            runtime,
            service::Factory::new_full(config).map_err(|e| format!("{:?}", e))?,
            exit
        ),
    }.map_err(|e| format!("{:?}", e))
}),
  • 首先使用tokio库构建一个runtime
  • 然后根据节点角色配置,分别传入全节点或轻节点服务,调用run_until_exit函数。
  • 最后在函数中,调用tokio runtime启动线程,将由service构建出的`Future informant绑定到event loop上面定期轮询。代码如下:
fn run_until_exit<T, C, E>(
    mut runtime: Runtime,
    service: T,
    e: E,
) -> error::Result<()>
    where
        T: Deref<Target=substrate_service::Service<C>> + Future<Item = (), Error = ()> + Send + 'static,
        C: substrate_service::Components,
        E: IntoExit,
{
    ...
    let informant = cli::informant::build(&service);
    runtime.executor().spawn(exit.until(informant).map(|_| ()));
    ...
    Ok(())
}

代码中调用了core/cli/src/informant.rsbuild函数,创建了一个Futrue“线人”informant

基本上到这儿,相关的命令就全启动了。我们看下生成全节点或轻节点服务的具体细节。

construct_service_factory

在声明宏construct_service_factory的定义中,有如下代码:

}

node/cli/src/service.rs中,包含了service结合service中的对宏的调用,宏展开后,是执行的<Factory>::new(config),代码如下:

construct_service_factory! {
    struct Factory {
        ...
        FullService = FullComponents<Self> {
            |config: FactoryFullConfiguration<Self>| FullComponents::<Factory>::new(config)
        },
        ...
        LightService = LightComponents<Self>
            { |config| <LightComponents<Factory>>::new(config) },
        ...
}

服务组件service

core/service/src/components.rs中定义了substrate的服务组件:FullComponentsLightComponents。它们new函数的实现均调用了Servicenew函数,代码如下:

Ok(
    Self {
        service: Service::new(config)?,
    }
)

通过该函数创建substrate service,它会启动客户端,初始化session keys,构建网络,交易池以及RPC,并管理他们之间的通信,包括区块通知,交易通知等。关键代码如下:

let executor = NativeExecutor::new(config.default_heap_pages);
...
Components::RuntimeServices::generate_intial_session_keys(
    client.clone(),
    config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(),
)?;
...
let network_protocol = <Components::Factory>::build_network_protocol(&config)?;
let transaction_pool = Arc::new(
    Components::build_transaction_pool(config.transaction_pool.clone(), client.clone())?
);
...
let events = client.import_notification_stream()
    .map(|v| Ok::<_, ()>(v)).compat()
    .for_each(move |notification| {
        let number = *notification.header.number();
...
let events = transaction_pool.import_notification_stream()
    .for_each(move |_| {
...
Ok(Service {
    client,
    network,
    network_status_sinks,
    select_chain,
    transaction_pool,
    signal: Some(signal),
    to_spawn_tx,
    to_spawn_rx,
    to_poll: Vec::new(),
    config,
    exit,
    rpc_handlers,
    _rpc: rpc,
    _telemetry: telemetry,
    _offchain_workers: offchain_workers,
    _telemetry_on_connect_sinks: telemetry_connection_sinks.clone(),
    keystore,
})

这个有些类似于以太坊,在启动节点时把相关的网络服务都创建好。这样最后Ok返回整个Service

PS:源码分析是基于master分支(substrate 2.0)。

1. 其中对命令行参数的解析,使用了第三方库structopt,该库通过结构体来解析参数,并对clap库进行了补充。

2. 异步编程,使用了第三方库tokio,该库使用Reactor-Executor模式,是基于事件驱动的非阻塞I/O库。是 Rust 中的异步编程框架,它将复杂的异步编程抽象为 Futures、Tasks 和 Executor,并提供了 Timers 等基础设施。

本文分享自微信公众号 - Rust语言学习交流(rust-china)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-08-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券