作者:林冠宏 / 指尖下的幽灵。转载者,请: 务必标明出处。
博客:http://www.cnblogs.com/linguanh/
掘金:https://juejin.im/user/1785262612681997
GitHub : https://github.com/af913337456/
出版的书籍:
知识点
总结21年的第一篇文章,开源写作6年。
最近比特币
和以太坊
的价格也已然起飞,现在一个 BTC 已能全款辆某斯拉 model 3
汽车。离谱。
发布这篇文章:从区块链技术研发者的角度,说说我的区块链从业经历和对它的理解 的时候,是去年,现在回首去看最后那段话,一语成谶。
言归正传。
一般做数据池
之类的开发。比如:订单池,请求池...,传统的服务端思想
会引导我们直接向消息中间件
想去。使用各类消息组件
去实现,比如 RocketMQ,Redis,Kafka...
然而,在区块链公链应用中,现已知的多条公链,每一条,都有交易池
这么一个功能模块,且,它们的代码实现都没有引入消息中间件去实现。
早前在阅读以太坊公链源码的时候,我就对以太坊交易池这一块的实现思想感到新颖,今天总结下,分享给大家看看,区块链公链应用中不依赖消息中间件去实现交易池的做法及其特点。
以太坊交易池知识点
总结 _(BTW:面试的时候可死记):
配置变量
指定的地址,则认为是本地交易: 节点启动的时候,可以在配置文件指定,不开启本地交易的操作
。map[addr]TxList
;RPC请求
,SendTransaction 或 SendRawTransaction;定时
从 Pending 和 Queue 中选出本地交易存储到本地文件
;先 new
一个,再 rename
一波;即是更新
,也是删除
操作;rlp 编码
,不是 json。程序启动之初
的操作;上面第 4 点,中断恢复
,对比于传统后端服务的消息中间件,对消息的不丢失保障性,区块链公链的做法,完全是靠分布式来维持的,单节点的数据丢失,可以从其它节点同步过来。所以,它们交易池的实现的实现,相对来说,更加灵活,编码难点在消息同步部分。
要看注释。
源码文件:tx_pool.go
,config.Locals 由配置文件指定,是以太坊钱包地址数组。
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
...
for _, addr := range config.Locals { // 从配置文件添加 本地地址
log.Info("Setting new local account", "address", addr)
// 添加到 locals 变量里面,后面会用它来过滤出一个地址是否是本地地址
pool.locals.add(addr)
}
...
}
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
...
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
log.Info("Setting new local account", "address", addr)
pool.locals.add(addr)
}
...
// 上面添加完了
// If local transactions and journaling is enabled, load from disk
if !config.NoLocals && config.Journal != "" { // 如果配置开启了本地加载的需求
pool.journal = newTxJournal(config.Journal)
// load 是加载函数,pool.AddLocals 是实际添加函数
if err := pool.journal.load(pool.AddLocals); err != nil {
log.Warn("Failed to load transaction journal", "err", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate transaction journal", "err", err)
}
}
...
go pool.loop() // 循环处理事件
}
源码文件:tx_journal.go
func (journal *txJournal) load(add func([]*types.Transaction) []error) error {
// Skip the parsing if the journal file doesn't exist at all
if _, err := os.Stat(journal.path); os.IsNotExist(err) {
return nil
}
// Open the journal for loading any past transactions
input, err := os.Open(journal.path) // 打开文件,读取流数据
if err != nil {
return err
}
...
stream := rlp.NewStream(input, 0) // 使用 rlp 编码算法解码数据
...
loadBatch := func(txs types.Transactions) {
for _, err := range add(txs) { // 调用 add 函数,进行添加
if err != nil {
log.Debug("Failed to add journaled transaction", "err", err)
dropped++
}
}
}
// loadBatch 在下面会被调用
...
}
pool.AddLocals
是实际的添加函数。内部的一系列调用后,最终到 tx_pool.add 函数。pool 的 queue 都是 map 结构,能根据相同 key 去重。
func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
...
// 下面的 if,如果已在 pool.pending 里面,那么证明之前已经添加过在 queue 里
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
...
pool.journalTx(from, tx) // 内部调用 journal.insert
return old != nil, nil
}
replaced, err = pool.enqueueTx(hash, tx) // 这里,会添加到 pool.enqueue 里面
if err != nil {
return false, err
}
pool.journalTx(from, tx) // 内部调用 journal.insert
...
}
func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
// 本地钱包地址,没指定的话,就跳过
if pool.journal == nil || !pool.locals.contains(from) {
return
}
// insert 会在造成重复添加,但是 load 出来的时候会根据 addr 去重
if err := pool.journal.insert(tx); err != nil {
log.Warn("Failed to journal local transaction", "err", err)
}
}
截止到上面,本地交易已经被添加到 pool 的 queue 里面了。
节点启动之初,除了会从本地 load 交易到 queue 外,还会不停地监听链的事件,比如接收交易,再 add 交易 到 queue 里。
loop 是触发的入口。除了主动的 journal.insert 达到了插入本地交易的目的之外。
下面的更新操作,也达到了包含插入的目的:以替换的手段,从文件删除旧交易,存储新交易到文件
func (pool *TxPool) loop() {
...
for {
select {
...
// Handle local transaction journal rotation
// journal 定时器,定时执行下面的本地交易数据文件的更新 journal.rotate
case <-journal.C:
if pool.journal != nil {
pool.mu.Lock()
if err := pool.journal.rotate(pool.local()); err != nil {
log.Warn("Failed to rotate local tx journal", "err", err)
}
pool.mu.Unlock()
}
}
}
}
journal.rotate 的做法,使用文件替换的方式,来从 pool 的交易 pending 和 queue 中存储 locals 钱包地址相关的交易到文件。注意,只存本地钱包地址的,其它的,不存。
//输入
func (pool *TxPool) local() map[common.Address]types.Transactions {
...
for addr := range pool.locals.accounts {
if pending := pool.pending[addr]; pending != nil {
// 添加 pending 的
txs[addr] = append(txs[addr], pending.Flatten()...)
}
if queued := pool.queue[addr]; queued != nil {
// 添加 queue 的
txs[addr] = append(txs[addr], queued.Flatten()...)
}
}
return txs
}
// all 参数,来源于上面 local()
func (journal *txJournal) rotate(all map[common.Address]types.Transactions) error {
...
// journal.path+".new" 后缀 .new
replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
return err
}
journaled := 0
for _, txs := range all {
for _, tx := range txs {
if err = rlp.Encode(replacement, tx); err != nil {
replacement.Close()
return err
}
}
journaled += len(txs)
}
replacement.Close()
// rename,重命名文件到原始的 path,达到更新,替换目的
if err = os.Rename(journal.path+".new", journal.path); err != nil {
return err
}
sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0755)
if err != nil {
return err
}
...
return nil
}
源码文件:eth/backend.go
func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
...
if config.TxPool.Journal != "" {
config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
}
// 初始化交易池
eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)
...
// 使用 交易池指针对象 作为参数初始化 protocolManager
if eth.protocolManager, err = NewProtocolManager(
chainConfig, checkpoint, config.SyncMode, config.NetworkId,
eth.eventMux, `eth.txPool`, eth.engine,
eth.blockchain, chainDb, cacheLimit, config.Whitelist); err != nil {
return nil, err
}
...
return eth, nil
}
func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
// 下面初始化 tx_fetcher,使用 txpool.AddRemotes 赋值给函数变量 addTxs
manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, fetchTx)
}
源码文件:eth/handler.go
func (pm *ProtocolManager) handleMsg(p *peer) error {
...
switch {
...
// 接收到其它节点的交易数据
case msg.Code == TransactionMsg || (msg.Code == PooledTransactionsMsg && p.version >= eth65):
...
// Enqueue 将交易添加到交易池
pm.txFetcher.Enqueue(p.id, txs, msg.Code == PooledTransactionsMsg)
}
...
}
// tx_fetcher.go 文件
func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
...
errs := f.addTxs(txs) // 执行添加,这个函数其实就是 tx_pool.go 的 AddRemotes
...
}
// tx_pool.go
// addTxs 内部就会把交易添加到 Pending 和 Queue 里面
func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
return pool.addTxs(txs, false, false)
}
更多以太坊的开发知识,见我的书籍: