前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Substrate源码分析:交易流程

Substrate源码分析:交易流程

作者头像
MikeLoveRust
发布2019-09-17 17:18:04
1.3K0
发布2019-09-17 17:18:04
举报

从业务角度分析substrate源码,梳理了交易流程,包括发起交易,广播交易和打包交易。

1. 发起交易

交易的发起是通过客户端的RPC调用,这个主要是在author模块中。

  • Substrate中把外部交易称做extrinsic
  • RPC方法名是author_submitExtrinsic
  • 外部交易以十六进制编码形式,被导入交易池中。

具体代码(substrate/core/rpc/api/src/author/mod.rs)如下:

1.1 author模块

定义AuthorApi

代码语言:javascript
复制
pub trait AuthorApi<Hash, BlockHash> {
    #[rpc(name = "author_submitExtrinsic")]
    fn submit_extrinsic(&self, extrinsic: Bytes) -> Result<Hash>;
}

通过use关键字,导入交易模块的相关功能

代码语言:javascript
复制
use transaction_pool::{
    txpool::{
        ChainApi as PoolChainApi,
        BlockHash,
        ExHash,
        IntoPoolError,
        Pool,
        watcher::Status,
    },
};

定义Author结构体

代码语言:javascript
复制
pub struct Author<B, E, P, RA> where P: PoolChainApi + Sync + Send + 'static {
    client: Arc<Client<B, E, <P as PoolChainApi>::Block, RA>>,
    pool: Arc<Pool<P>>,
    subscriptions: Subscriptions,
    keystore: BareCryptoStorePtr,
}

Author结构体实现AuthorApi中的函数submit_extrinsic

代码语言:javascript
复制
impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> where
...
{
    ...
    fn submit_extrinsic(&self, ext: Bytes) -> Result<ExHash<P>> {
        let xt = Decode::decode(&mut &ext[..])?;
        let best_block_hash = self.client.info().chain.best_hash;
        self.pool
            .submit_one(&generic::BlockId::hash(best_block_hash), xt)
            .map_err(|e| e.into_pool_error()
                .map(Into::into)
                .unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
            )
    }
}
1.2 提交交易 pool模块

我们通过Author这个结构体,知道self.pool.submit_one这个调用实现是在交易池模块的pool这个模块中。

  • 结构体pool,外部交易池。
  • 方法submit_one,导入一条未验证的外部交易到池中。
  • 方法submit_at,导入一批未验证的外部交易到池中。
    • 验证外部交易
    • 导入交易
    • 发送导入交易的notification
    • 监听导入交易处理结果

具体代码(/core/transaction-pool/graph/src/pool.rs)如下:

代码语言:javascript
复制
pub trait ChainApi: Send + Sync {
    fn validate_transaction(&self, at: &BlockId<Self::Block>, uxt: ExtrinsicFor<Self>) -> Result<TransactionValidity, Self::Error>;
}

pub struct Pool<B: ChainApi> {
    api: B,
    options: Options,
    listener: RwLock<Listener<ExHash<B>, BlockHash<B>>>,
    pool: RwLock<base::BasePool<
        ExHash<B>,
        ExtrinsicFor<B>,
    >>,
    import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>,
    rotator: PoolRotator<ExHash<B>>,
}

impl<B: ChainApi> Pool<B> {
    pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<ExHash<B>, B::Error> {
        Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed")?)
    }
    
    pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T) -> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error> where
        T: IntoIterator<Item=ExtrinsicFor<B>>
    {
        ...
        let results = xts
            .into_iter()
            .map(|xt| -> Result<_, B::Error> {          ...
            // 验证外部交易
            match self.api.validate_transaction(at, xt.clone())? {
                    Ok(validity) => if validity.provides.is_empty() {
                        Err(error::Error::NoTagsProvided.into())
                    } else {
                        Ok(base::Transaction {
                            data: xt,
                            bytes,
                            hash,
                            priority: validity.priority,
                            requires: validity.requires,
                            provides: validity.provides,
                            propagate: validity.propagate,
                            valid_till: block_number
                                .saturated_into::<u64>()
                                .saturating_add(validity.longevity),
                        })
                    },
                    ...
                }
            })
            .map(|tx| { // 导入交易
                let imported = self.pool.write().import(tx?)?;
                // 导入交易的notification
                if let base::Imported::Ready { .. } = imported {
                    self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok());
                }
                // 监听导入交易处理结果
                let mut listener = self.listener.write();
                fire_events(&mut *listener, &imported);
                Ok(imported.hash().clone())
            })
            .collect::<Vec<_>>();
        ...
    }
}
1.3 验证交易

在交易被导入池中前,会调用函数self.api.validate_transaction来验证交易,它来自traitChainApi。而这个函数的具体逻辑是由每个runtime自己实现的。

node-template中的代码(/substrate/node-template/runtime/src/lib.rs)如下:

代码语言:javascript
复制
pub type Executive = executive::Executive<Runtime, Block, system::ChainContext<Runtime>, Runtime, AllModules>;

impl client_api::TaggedTransactionQueue<Block> for Runtime {
    fn validate_transaction(tx: <Block as BlockT>::Extrinsic) -> TransactionValidity {
        Executive::validate_transaction(tx)
    }
}

其中,Executive是处理各种模块的调度器,其中有验证交易的具体实现,代码(substrate/srml/executive/src/lib.rs)如下:

代码语言:javascript
复制
// where 语句
UnsignedValidator: ValidateUnsigned<Call=CallOf<Block::Extrinsic, Context>>

pub fn validate_transaction(uxt: Block::Extrinsic) -> TransactionValidity {
    let encoded_len = uxt.using_encoded(|d| d.len());
    let xt = uxt.check(&Default::default())?;

    let dispatch_info = xt.get_dispatch_info();
    // 使用turbofish操作符
    // 指定泛型UnsignedValidator的方法validate的具体参数为ValidateUnsigned
    xt.validate::<UnsignedValidator>(dispatch_info, encoded_len)
}

validate方法的代码(/core/sr-primitives/src/generic/checked_extrinsic.rs)如下:

代码语言:javascript
复制
fn validate<U: ValidateUnsigned<Call = Self::Call>>(
    &self,
    info: DispatchInfo,
    len: usize,
) -> TransactionValidity {
    if let Some((ref id, ref extra)) = self.signed {
        Extra::validate(extra, id, &self.function, info, len)
    } else {
        let valid = Extra::validate_unsigned(&self.function, info, len)?;
        Ok(valid.combine_with(U::validate_unsigned(&self.function)?))
    }
}

可以看出验证交易的逻辑,就是检查给定交易签名的有效性。

1.4 导入交易 base_pool模块

导入交易调用的是base_pool模块中的函数self.pool.write().import。将交易导入池中:

  • 交易池中交易由两部分组成:FutureReady
    • 前者包含需要池中其他交易尚未提供的某些标记的交易。
    • 后者包含满足所有要求并准备包含在区块中的交易。
  • 将交易导入ready队列
    • 交易需要通过此队列中的事务满足所有(准备好)标记。
    • 返回由导入交易替换的交易。

具体实现代码(/core/transaction-pool/graph/src/base_pool.rs)如下:

代码语言:javascript
复制
pub fn import(&mut self,
    tx: Transaction<Hash, Ex>,
) -> error::Result<Imported<Hash, Ex>> {
    if self.future.contains(&tx.hash) || self.ready.contains(&tx.hash) {
        return Err(error::Error::AlreadyImported(Box::new(tx.hash.clone())))
    }
    ...
    self.import_to_ready(tx)
}

fn import_to_ready(&mut self, tx: WaitingTransaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> {
    ...
    loop {
        ...
        // 导入交易
        let current_hash = tx.transaction.hash.clone();
        match self.ready.import(tx) {
            Ok(mut replaced) => {
                if !first {
                    promoted.push(current_hash);
                }
                removed.append(&mut replaced);
            },
            ...
    }
    ...
}

交易导入ready队列的代码()如下:

代码语言:javascript
复制
pub fn import(
    &mut self,
    tx: WaitingTransaction<Hash, Ex>,
) -> error::Result<Vec<Arc<Transaction<Hash, Ex>>>> {
    ...
    let replaced = self.replace_previous(&transaction)?;
    ...
    // 插入到 best
    if goes_to_best {
        self.best.insert(transaction.clone());
    }
    // 插入到 Ready
    ready.insert(hash, ReadyTx {
        transaction,
        unlocks: vec![],
        requires_offset: 0,
    });
    Ok(replaced)
}

重要的几个数据结构:

代码语言:javascript
复制
pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
    future: FutureTransactions<Hash, Ex>,
    ready: ReadyTransactions<Hash, Ex>,
    recently_pruned: [HashSet<Tag>; RECENTLY_PRUNED_TAGS],
    recently_pruned_index: usize,
}
  • futrueFuture交易
  • readyReady交易
  • recently_pruned:存储最近两次修剪过的标签。
代码语言:javascript
复制
pub struct ReadyTransactions<Hash: hash::Hash + Eq, Ex> {
    insertion_id: u64,
    provided_tags: HashMap<Tag, Hash>,
    ready: Arc<RwLock<HashMap<Hash, ReadyTx<Hash, Ex>>>>,
    best: BTreeSet<TransactionRef<Hash, Ex>>,
}
  • ready:已准备好的交易
  • best:使用的是有序集合BTreeSet,存放将被打包到区块中的交易(没有任何前置交易)。

2. 广播交易

当交易被添加到交易池中,网络模块的函数trigger_repropagate会被调用。

  • 发送消息PropagateExtrinsics
  • 传播交易propagate_extrinsics
    • 判断节点状态,完全同步才接受交易并传播,否则直接退出
    • 发送交易数据包

具体代码(/substrate/core/service/src/lib.rs)如下:

代码语言:javascript
复制
{
    let network = Arc::downgrade(&network);
    let transaction_pool_ = transaction_pool.clone();
    let events = transaction_pool.import_notification_stream()
        .map(|v| Ok::<_, ()>(v)).compat()
        .for_each(move |_| {
            if let Some(network) = network.upgrade() {
                network.trigger_repropagate();
            }
            let status = transaction_pool_.status();
            telemetry!(SUBSTRATE_INFO; "txpool.import";
                "ready" => status.ready,
                "future" => status.future
            );
            Ok(())
        })
        .select(exit.clone())
        .then(|_| Ok(()));

    let _ = to_spawn_tx.unbounded_send(Box::new(events));
}

以及trigger_repropagate代码(substrate/core/network/src/service.rs)如下:

代码语言:javascript
复制
pub fn trigger_repropagate(&self) {
    let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::PropagateExtrinsics);
}

ServerToWorkerMsg::PropagateExtrinsics =>
    self.network_service.user_protocol_mut().propagate_extrinsics(),

传播交易propagate_extrinsics,代码(substrate/core/network/src/protocol.rs)如下:

代码语言:javascript
复制
pub fn propagate_extrinsics(
    &mut self,
) {
    ...
    if self.sync.status().state != SyncState::Idle {
        return;
    }
    
    let extrinsics = self.transaction_pool.transactions();
    let mut propagated_to = HashMap::new();
    for (who, peer) in self.context_data.peers.iter_mut() {
        let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics
            .iter()
            .filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone()))
            .cloned()
            .unzip();

        if !to_send.is_empty() {
            for hash in hashes {
                propagated_to
                    .entry(hash)
                    .or_insert_with(Vec::new)
                    .push(who.to_base58());
            }
            trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
            self.behaviour.send_packet(who, GenericMessage::Transactions(to_send))
        }
    }

    self.transaction_pool.on_broadcasted(propagated_to);
}

3. 打包交易

交易的打包是在区块生成模块,代码(substrate/core/client/src/block_builder/api.rs)如下:

代码语言:javascript
复制
pub trait BlockBuilder {
    /// Apply the given extrinsics.
    fn apply_extrinsic(extrinsic: <Block as BlockT>::Extrinsic) -> ApplyResult;
    /// Finish the current block.
    #[renamed("finalise_block", 3)]
    fn finalize_block() -> <Block as BlockT>::Header;
    /// Generate inherent extrinsics. The inherent data will vary from chain to chain.
    fn inherent_extrinsics(inherent: InherentData) -> Vec<<Block as BlockT>::Extrinsic>;
    /// Check that the inherents are valid. The inherent data will vary from chain to chain.
    fn check_inherents(block: Block, data: InherentData) -> CheckInherentsResult;
    /// Generate a random seed.
    fn random_seed() -> <Block as BlockT>::Hash;
}

而这个函数的具体逻辑是由每个runtime自己实现的。

node-template中的代码(/substrate/node-template/runtime/src/lib.rs)如下:

代码语言:javascript
复制
impl block_builder_api::BlockBuilder<Block> for Runtime {
    fn apply_extrinsic(extrinsic: <Block as BlockT>::Extrinsic) -> ApplyResult {
        Executive::apply_extrinsic(extrinsic)
    }
}

其中Executive有打包交易的具体实现,代码(substrate/srml/executive/src/lib.rs)如下:

代码语言:javascript
复制
pub fn apply_extrinsic(uxt: Block::Extrinsic) -> ApplyResult {
    let encoded = uxt.encode();
    let encoded_len = encoded.len();
    Self::apply_extrinsic_with_len(uxt, encoded_len, Some(encoded))
}

这样处理完交易,后面就是区块出块,以及进行最终性共识。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-09-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Rust语言学习交流 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 从业务角度分析substrate源码,梳理了交易流程,包括发起交易,广播交易和打包交易。
  • 1. 发起交易
    • 1.1 author模块
      • 1.2 提交交易 pool模块
        • 1.3 验证交易
          • 1.4 导入交易 base_pool模块
          • 2. 广播交易
          • 3. 打包交易
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档