Substrate源码分析:交易流程

从业务角度分析substrate源码,梳理了交易流程,包括发起交易,广播交易和打包交易。

1. 发起交易

交易的发起是通过客户端的RPC调用,这个主要是在author模块中。

  • Substrate中把外部交易称做extrinsic
  • RPC方法名是author_submitExtrinsic
  • 外部交易以十六进制编码形式,被导入交易池中。

具体代码(substrate/core/rpc/api/src/author/mod.rs)如下:

1.1 author模块

定义AuthorApi

pub trait AuthorApi<Hash, BlockHash> {
    #[rpc(name = "author_submitExtrinsic")]
    fn submit_extrinsic(&self, extrinsic: Bytes) -> Result<Hash>;
}

通过use关键字,导入交易模块的相关功能

use transaction_pool::{
    txpool::{
        ChainApi as PoolChainApi,
        BlockHash,
        ExHash,
        IntoPoolError,
        Pool,
        watcher::Status,
    },
};

定义Author结构体

pub struct Author<B, E, P, RA> where P: PoolChainApi + Sync + Send + 'static {
    client: Arc<Client<B, E, <P as PoolChainApi>::Block, RA>>,
    pool: Arc<Pool<P>>,
    subscriptions: Subscriptions,
    keystore: BareCryptoStorePtr,
}

Author结构体实现AuthorApi中的函数submit_extrinsic

impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> where
...
{
    ...
    fn submit_extrinsic(&self, ext: Bytes) -> Result<ExHash<P>> {
        let xt = Decode::decode(&mut &ext[..])?;
        let best_block_hash = self.client.info().chain.best_hash;
        self.pool
            .submit_one(&generic::BlockId::hash(best_block_hash), xt)
            .map_err(|e| e.into_pool_error()
                .map(Into::into)
                .unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
            )
    }
}

1.2 提交交易 pool模块

我们通过Author这个结构体,知道self.pool.submit_one这个调用实现是在交易池模块的pool这个模块中。

  • 结构体pool,外部交易池。
  • 方法submit_one,导入一条未验证的外部交易到池中。
  • 方法submit_at,导入一批未验证的外部交易到池中。
    • 验证外部交易
    • 导入交易
    • 发送导入交易的notification
    • 监听导入交易处理结果

具体代码(/core/transaction-pool/graph/src/pool.rs)如下:

pub trait ChainApi: Send + Sync {
    fn validate_transaction(&self, at: &BlockId<Self::Block>, uxt: ExtrinsicFor<Self>) -> Result<TransactionValidity, Self::Error>;
}

pub struct Pool<B: ChainApi> {
    api: B,
    options: Options,
    listener: RwLock<Listener<ExHash<B>, BlockHash<B>>>,
    pool: RwLock<base::BasePool<
        ExHash<B>,
        ExtrinsicFor<B>,
    >>,
    import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>,
    rotator: PoolRotator<ExHash<B>>,
}

impl<B: ChainApi> Pool<B> {
    pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<ExHash<B>, B::Error> {
        Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed")?)
    }
    
    pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T) -> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error> where
        T: IntoIterator<Item=ExtrinsicFor<B>>
    {
        ...
        let results = xts
            .into_iter()
            .map(|xt| -> Result<_, B::Error> {          ...
            // 验证外部交易
            match self.api.validate_transaction(at, xt.clone())? {
                    Ok(validity) => if validity.provides.is_empty() {
                        Err(error::Error::NoTagsProvided.into())
                    } else {
                        Ok(base::Transaction {
                            data: xt,
                            bytes,
                            hash,
                            priority: validity.priority,
                            requires: validity.requires,
                            provides: validity.provides,
                            propagate: validity.propagate,
                            valid_till: block_number
                                .saturated_into::<u64>()
                                .saturating_add(validity.longevity),
                        })
                    },
                    ...
                }
            })
            .map(|tx| { // 导入交易
                let imported = self.pool.write().import(tx?)?;
                // 导入交易的notification
                if let base::Imported::Ready { .. } = imported {
                    self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok());
                }
                // 监听导入交易处理结果
                let mut listener = self.listener.write();
                fire_events(&mut *listener, &imported);
                Ok(imported.hash().clone())
            })
            .collect::<Vec<_>>();
        ...
    }
}

1.3 验证交易

在交易被导入池中前,会调用函数self.api.validate_transaction来验证交易,它来自traitChainApi。而这个函数的具体逻辑是由每个runtime自己实现的。

node-template中的代码(/substrate/node-template/runtime/src/lib.rs)如下:

pub type Executive = executive::Executive<Runtime, Block, system::ChainContext<Runtime>, Runtime, AllModules>;

impl client_api::TaggedTransactionQueue<Block> for Runtime {
    fn validate_transaction(tx: <Block as BlockT>::Extrinsic) -> TransactionValidity {
        Executive::validate_transaction(tx)
    }
}

其中,Executive是处理各种模块的调度器,其中有验证交易的具体实现,代码(substrate/srml/executive/src/lib.rs)如下:

// where 语句
UnsignedValidator: ValidateUnsigned<Call=CallOf<Block::Extrinsic, Context>>

pub fn validate_transaction(uxt: Block::Extrinsic) -> TransactionValidity {
    let encoded_len = uxt.using_encoded(|d| d.len());
    let xt = uxt.check(&Default::default())?;

    let dispatch_info = xt.get_dispatch_info();
    // 使用turbofish操作符
    // 指定泛型UnsignedValidator的方法validate的具体参数为ValidateUnsigned
    xt.validate::<UnsignedValidator>(dispatch_info, encoded_len)
}

validate方法的代码(/core/sr-primitives/src/generic/checked_extrinsic.rs)如下:

fn validate<U: ValidateUnsigned<Call = Self::Call>>(
    &self,
    info: DispatchInfo,
    len: usize,
) -> TransactionValidity {
    if let Some((ref id, ref extra)) = self.signed {
        Extra::validate(extra, id, &self.function, info, len)
    } else {
        let valid = Extra::validate_unsigned(&self.function, info, len)?;
        Ok(valid.combine_with(U::validate_unsigned(&self.function)?))
    }
}

可以看出验证交易的逻辑,就是检查给定交易签名的有效性。

1.4 导入交易 base_pool模块

导入交易调用的是base_pool模块中的函数self.pool.write().import。将交易导入池中:

  • 交易池中交易由两部分组成:FutureReady
    • 前者包含需要池中其他交易尚未提供的某些标记的交易。
    • 后者包含满足所有要求并准备包含在区块中的交易。
  • 将交易导入ready队列
    • 交易需要通过此队列中的事务满足所有(准备好)标记。
    • 返回由导入交易替换的交易。

具体实现代码(/core/transaction-pool/graph/src/base_pool.rs)如下:

pub fn import(&mut self,
    tx: Transaction<Hash, Ex>,
) -> error::Result<Imported<Hash, Ex>> {
    if self.future.contains(&tx.hash) || self.ready.contains(&tx.hash) {
        return Err(error::Error::AlreadyImported(Box::new(tx.hash.clone())))
    }
    ...
    self.import_to_ready(tx)
}

fn import_to_ready(&mut self, tx: WaitingTransaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> {
    ...
    loop {
        ...
        // 导入交易
        let current_hash = tx.transaction.hash.clone();
        match self.ready.import(tx) {
            Ok(mut replaced) => {
                if !first {
                    promoted.push(current_hash);
                }
                removed.append(&mut replaced);
            },
            ...
    }
    ...
}

交易导入ready队列的代码()如下:

pub fn import(
    &mut self,
    tx: WaitingTransaction<Hash, Ex>,
) -> error::Result<Vec<Arc<Transaction<Hash, Ex>>>> {
    ...
    let replaced = self.replace_previous(&transaction)?;
    ...
    // 插入到 best
    if goes_to_best {
        self.best.insert(transaction.clone());
    }
    // 插入到 Ready
    ready.insert(hash, ReadyTx {
        transaction,
        unlocks: vec![],
        requires_offset: 0,
    });
    Ok(replaced)
}

重要的几个数据结构:

pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
    future: FutureTransactions<Hash, Ex>,
    ready: ReadyTransactions<Hash, Ex>,
    recently_pruned: [HashSet<Tag>; RECENTLY_PRUNED_TAGS],
    recently_pruned_index: usize,
}
  • futrueFuture交易
  • readyReady交易
  • recently_pruned:存储最近两次修剪过的标签。
pub struct ReadyTransactions<Hash: hash::Hash + Eq, Ex> {
    insertion_id: u64,
    provided_tags: HashMap<Tag, Hash>,
    ready: Arc<RwLock<HashMap<Hash, ReadyTx<Hash, Ex>>>>,
    best: BTreeSet<TransactionRef<Hash, Ex>>,
}
  • ready:已准备好的交易
  • best:使用的是有序集合BTreeSet,存放将被打包到区块中的交易(没有任何前置交易)。

2. 广播交易

当交易被添加到交易池中,网络模块的函数trigger_repropagate会被调用。

  • 发送消息PropagateExtrinsics
  • 传播交易propagate_extrinsics
    • 判断节点状态,完全同步才接受交易并传播,否则直接退出
    • 发送交易数据包

具体代码(/substrate/core/service/src/lib.rs)如下:

{
    let network = Arc::downgrade(&network);
    let transaction_pool_ = transaction_pool.clone();
    let events = transaction_pool.import_notification_stream()
        .map(|v| Ok::<_, ()>(v)).compat()
        .for_each(move |_| {
            if let Some(network) = network.upgrade() {
                network.trigger_repropagate();
            }
            let status = transaction_pool_.status();
            telemetry!(SUBSTRATE_INFO; "txpool.import";
                "ready" => status.ready,
                "future" => status.future
            );
            Ok(())
        })
        .select(exit.clone())
        .then(|_| Ok(()));

    let _ = to_spawn_tx.unbounded_send(Box::new(events));
}

以及trigger_repropagate代码(substrate/core/network/src/service.rs)如下:

pub fn trigger_repropagate(&self) {
    let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::PropagateExtrinsics);
}

ServerToWorkerMsg::PropagateExtrinsics =>
    self.network_service.user_protocol_mut().propagate_extrinsics(),

传播交易propagate_extrinsics,代码(substrate/core/network/src/protocol.rs)如下:

pub fn propagate_extrinsics(
    &mut self,
) {
    ...
    if self.sync.status().state != SyncState::Idle {
        return;
    }
    
    let extrinsics = self.transaction_pool.transactions();
    let mut propagated_to = HashMap::new();
    for (who, peer) in self.context_data.peers.iter_mut() {
        let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics
            .iter()
            .filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone()))
            .cloned()
            .unzip();

        if !to_send.is_empty() {
            for hash in hashes {
                propagated_to
                    .entry(hash)
                    .or_insert_with(Vec::new)
                    .push(who.to_base58());
            }
            trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
            self.behaviour.send_packet(who, GenericMessage::Transactions(to_send))
        }
    }

    self.transaction_pool.on_broadcasted(propagated_to);
}

3. 打包交易

交易的打包是在区块生成模块,代码(substrate/core/client/src/block_builder/api.rs)如下:

pub trait BlockBuilder {
    /// Apply the given extrinsics.
    fn apply_extrinsic(extrinsic: <Block as BlockT>::Extrinsic) -> ApplyResult;
    /// Finish the current block.
    #[renamed("finalise_block", 3)]
    fn finalize_block() -> <Block as BlockT>::Header;
    /// Generate inherent extrinsics. The inherent data will vary from chain to chain.
    fn inherent_extrinsics(inherent: InherentData) -> Vec<<Block as BlockT>::Extrinsic>;
    /// Check that the inherents are valid. The inherent data will vary from chain to chain.
    fn check_inherents(block: Block, data: InherentData) -> CheckInherentsResult;
    /// Generate a random seed.
    fn random_seed() -> <Block as BlockT>::Hash;
}

而这个函数的具体逻辑是由每个runtime自己实现的。

node-template中的代码(/substrate/node-template/runtime/src/lib.rs)如下:

impl block_builder_api::BlockBuilder<Block> for Runtime {
    fn apply_extrinsic(extrinsic: <Block as BlockT>::Extrinsic) -> ApplyResult {
        Executive::apply_extrinsic(extrinsic)
    }
}

其中Executive有打包交易的具体实现,代码(substrate/srml/executive/src/lib.rs)如下:

pub fn apply_extrinsic(uxt: Block::Extrinsic) -> ApplyResult {
    let encoded = uxt.encode();
    let encoded_len = encoded.len();
    Self::apply_extrinsic_with_len(uxt, encoded_len, Some(encoded))
}

这样处理完交易,后面就是区块出块,以及进行最终性共识。

本文分享自微信公众号 - Rust语言学习交流(rust-china)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-09-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券