交易的发起是通过客户端的RPC
调用,这个主要是在author
模块中。
Substrate
中把外部交易称做extrinsic
。RPC
方法名是author_submitExtrinsic
。具体代码(substrate/core/rpc/api/src/author/mod.rs
)如下:
author
模块定义AuthorApi
pub trait AuthorApi<Hash, BlockHash> {
#[rpc(name = "author_submitExtrinsic")]
fn submit_extrinsic(&self, extrinsic: Bytes) -> Result<Hash>;
}
通过use
关键字,导入交易模块的相关功能
use transaction_pool::{
txpool::{
ChainApi as PoolChainApi,
BlockHash,
ExHash,
IntoPoolError,
Pool,
watcher::Status,
},
};
定义Author
结构体
pub struct Author<B, E, P, RA> where P: PoolChainApi + Sync + Send + 'static {
client: Arc<Client<B, E, <P as PoolChainApi>::Block, RA>>,
pool: Arc<Pool<P>>,
subscriptions: Subscriptions,
keystore: BareCryptoStorePtr,
}
为Author
结构体实现AuthorApi
中的函数submit_extrinsic
。
impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> where
...
{
...
fn submit_extrinsic(&self, ext: Bytes) -> Result<ExHash<P>> {
let xt = Decode::decode(&mut &ext[..])?;
let best_block_hash = self.client.info().chain.best_hash;
self.pool
.submit_one(&generic::BlockId::hash(best_block_hash), xt)
.map_err(|e| e.into_pool_error()
.map(Into::into)
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
)
}
}
pool
模块我们通过Author
这个结构体,知道self.pool.submit_one
这个调用实现是在交易池模块的pool
这个模块中。
pool
,外部交易池。submit_one
,导入一条未验证的外部交易到池中。submit_at
,导入一批未验证的外部交易到池中。notification
具体代码(/core/transaction-pool/graph/src/pool.rs
)如下:
pub trait ChainApi: Send + Sync {
fn validate_transaction(&self, at: &BlockId<Self::Block>, uxt: ExtrinsicFor<Self>) -> Result<TransactionValidity, Self::Error>;
}
pub struct Pool<B: ChainApi> {
api: B,
options: Options,
listener: RwLock<Listener<ExHash<B>, BlockHash<B>>>,
pool: RwLock<base::BasePool<
ExHash<B>,
ExtrinsicFor<B>,
>>,
import_notification_sinks: Mutex<Vec<mpsc::UnboundedSender<()>>>,
rotator: PoolRotator<ExHash<B>>,
}
impl<B: ChainApi> Pool<B> {
pub fn submit_one(&self, at: &BlockId<B::Block>, xt: ExtrinsicFor<B>) -> Result<ExHash<B>, B::Error> {
Ok(self.submit_at(at, ::std::iter::once(xt))?.pop().expect("One extrinsic passed; one result returned; qed")?)
}
pub fn submit_at<T>(&self, at: &BlockId<B::Block>, xts: T) -> Result<Vec<Result<ExHash<B>, B::Error>>, B::Error> where
T: IntoIterator<Item=ExtrinsicFor<B>>
{
...
let results = xts
.into_iter()
.map(|xt| -> Result<_, B::Error> { ...
// 验证外部交易
match self.api.validate_transaction(at, xt.clone())? {
Ok(validity) => if validity.provides.is_empty() {
Err(error::Error::NoTagsProvided.into())
} else {
Ok(base::Transaction {
data: xt,
bytes,
hash,
priority: validity.priority,
requires: validity.requires,
provides: validity.provides,
propagate: validity.propagate,
valid_till: block_number
.saturated_into::<u64>()
.saturating_add(validity.longevity),
})
},
...
}
})
.map(|tx| { // 导入交易
let imported = self.pool.write().import(tx?)?;
// 导入交易的notification
if let base::Imported::Ready { .. } = imported {
self.import_notification_sinks.lock().retain(|sink| sink.unbounded_send(()).is_ok());
}
// 监听导入交易处理结果
let mut listener = self.listener.write();
fire_events(&mut *listener, &imported);
Ok(imported.hash().clone())
})
.collect::<Vec<_>>();
...
}
}
在交易被导入池中前,会调用函数self.api.validate_transaction
来验证交易,它来自traitChainApi
。而这个函数的具体逻辑是由每个runtime
自己实现的。
node-template
中的代码(/substrate/node-template/runtime/src/lib.rs
)如下:
pub type Executive = executive::Executive<Runtime, Block, system::ChainContext<Runtime>, Runtime, AllModules>;
impl client_api::TaggedTransactionQueue<Block> for Runtime {
fn validate_transaction(tx: <Block as BlockT>::Extrinsic) -> TransactionValidity {
Executive::validate_transaction(tx)
}
}
其中,Executive
是处理各种模块的调度器,其中有验证交易的具体实现,代码(substrate/srml/executive/src/lib.rs
)如下:
// where 语句
UnsignedValidator: ValidateUnsigned<Call=CallOf<Block::Extrinsic, Context>>
pub fn validate_transaction(uxt: Block::Extrinsic) -> TransactionValidity {
let encoded_len = uxt.using_encoded(|d| d.len());
let xt = uxt.check(&Default::default())?;
let dispatch_info = xt.get_dispatch_info();
// 使用turbofish操作符
// 指定泛型UnsignedValidator的方法validate的具体参数为ValidateUnsigned
xt.validate::<UnsignedValidator>(dispatch_info, encoded_len)
}
validate
方法的代码(/core/sr-primitives/src/generic/checked_extrinsic.rs
)如下:
fn validate<U: ValidateUnsigned<Call = Self::Call>>(
&self,
info: DispatchInfo,
len: usize,
) -> TransactionValidity {
if let Some((ref id, ref extra)) = self.signed {
Extra::validate(extra, id, &self.function, info, len)
} else {
let valid = Extra::validate_unsigned(&self.function, info, len)?;
Ok(valid.combine_with(U::validate_unsigned(&self.function)?))
}
}
可以看出验证交易的逻辑,就是检查给定交易签名的有效性。
base_pool
模块导入交易调用的是base_pool
模块中的函数self.pool.write().import
。将交易导入池中:
Future
和Ready
。ready
队列具体实现代码(/core/transaction-pool/graph/src/base_pool.rs
)如下:
pub fn import(&mut self,
tx: Transaction<Hash, Ex>,
) -> error::Result<Imported<Hash, Ex>> {
if self.future.contains(&tx.hash) || self.ready.contains(&tx.hash) {
return Err(error::Error::AlreadyImported(Box::new(tx.hash.clone())))
}
...
self.import_to_ready(tx)
}
fn import_to_ready(&mut self, tx: WaitingTransaction<Hash, Ex>) -> error::Result<Imported<Hash, Ex>> {
...
loop {
...
// 导入交易
let current_hash = tx.transaction.hash.clone();
match self.ready.import(tx) {
Ok(mut replaced) => {
if !first {
promoted.push(current_hash);
}
removed.append(&mut replaced);
},
...
}
...
}
交易导入ready
队列的代码()如下:
pub fn import(
&mut self,
tx: WaitingTransaction<Hash, Ex>,
) -> error::Result<Vec<Arc<Transaction<Hash, Ex>>>> {
...
let replaced = self.replace_previous(&transaction)?;
...
// 插入到 best
if goes_to_best {
self.best.insert(transaction.clone());
}
// 插入到 Ready
ready.insert(hash, ReadyTx {
transaction,
unlocks: vec![],
requires_offset: 0,
});
Ok(replaced)
}
重要的几个数据结构:
pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
future: FutureTransactions<Hash, Ex>,
ready: ReadyTransactions<Hash, Ex>,
recently_pruned: [HashSet<Tag>; RECENTLY_PRUNED_TAGS],
recently_pruned_index: usize,
}
futrue
:Future
交易ready
:Ready
交易recently_pruned
:存储最近两次修剪过的标签。pub struct ReadyTransactions<Hash: hash::Hash + Eq, Ex> {
insertion_id: u64,
provided_tags: HashMap<Tag, Hash>,
ready: Arc<RwLock<HashMap<Hash, ReadyTx<Hash, Ex>>>>,
best: BTreeSet<TransactionRef<Hash, Ex>>,
}
ready
:已准备好的交易best
:使用的是有序集合BTreeSet
,存放将被打包到区块中的交易(没有任何前置交易)。当交易被添加到交易池中,网络模块的函数trigger_repropagate
会被调用。
PropagateExtrinsics
propagate_extrinsics
具体代码(/substrate/core/service/src/lib.rs
)如下:
{
let network = Arc::downgrade(&network);
let transaction_pool_ = transaction_pool.clone();
let events = transaction_pool.import_notification_stream()
.map(|v| Ok::<_, ()>(v)).compat()
.for_each(move |_| {
if let Some(network) = network.upgrade() {
network.trigger_repropagate();
}
let status = transaction_pool_.status();
telemetry!(SUBSTRATE_INFO; "txpool.import";
"ready" => status.ready,
"future" => status.future
);
Ok(())
})
.select(exit.clone())
.then(|_| Ok(()));
let _ = to_spawn_tx.unbounded_send(Box::new(events));
}
以及trigger_repropagate
代码(substrate/core/network/src/service.rs
)如下:
pub fn trigger_repropagate(&self) {
let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::PropagateExtrinsics);
}
ServerToWorkerMsg::PropagateExtrinsics =>
self.network_service.user_protocol_mut().propagate_extrinsics(),
传播交易propagate_extrinsics
,代码(substrate/core/network/src/protocol.rs
)如下:
pub fn propagate_extrinsics(
&mut self,
) {
...
if self.sync.status().state != SyncState::Idle {
return;
}
let extrinsics = self.transaction_pool.transactions();
let mut propagated_to = HashMap::new();
for (who, peer) in self.context_data.peers.iter_mut() {
let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics
.iter()
.filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone()))
.cloned()
.unzip();
if !to_send.is_empty() {
for hash in hashes {
propagated_to
.entry(hash)
.or_insert_with(Vec::new)
.push(who.to_base58());
}
trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
self.behaviour.send_packet(who, GenericMessage::Transactions(to_send))
}
}
self.transaction_pool.on_broadcasted(propagated_to);
}
交易的打包是在区块生成模块,代码(substrate/core/client/src/block_builder/api.rs
)如下:
pub trait BlockBuilder {
/// Apply the given extrinsics.
fn apply_extrinsic(extrinsic: <Block as BlockT>::Extrinsic) -> ApplyResult;
/// Finish the current block.
#[renamed("finalise_block", 3)]
fn finalize_block() -> <Block as BlockT>::Header;
/// Generate inherent extrinsics. The inherent data will vary from chain to chain.
fn inherent_extrinsics(inherent: InherentData) -> Vec<<Block as BlockT>::Extrinsic>;
/// Check that the inherents are valid. The inherent data will vary from chain to chain.
fn check_inherents(block: Block, data: InherentData) -> CheckInherentsResult;
/// Generate a random seed.
fn random_seed() -> <Block as BlockT>::Hash;
}
而这个函数的具体逻辑是由每个runtime
自己实现的。
node-template
中的代码(/substrate/node-template/runtime/src/lib.rs
)如下:
impl block_builder_api::BlockBuilder<Block> for Runtime {
fn apply_extrinsic(extrinsic: <Block as BlockT>::Extrinsic) -> ApplyResult {
Executive::apply_extrinsic(extrinsic)
}
}
其中Executive
有打包交易的具体实现,代码(substrate/srml/executive/src/lib.rs
)如下:
pub fn apply_extrinsic(uxt: Block::Extrinsic) -> ApplyResult {
let encoded = uxt.encode();
let encoded_len = encoded.len();
Self::apply_extrinsic_with_len(uxt, encoded_len, Some(encoded))
}
这样处理完交易,后面就是区块出块,以及进行最终性共识。