前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >以太坊 --- 交易池的特点 与 中断恢复

以太坊 --- 交易池的特点 与 中断恢复

作者头像
林冠宏-指尖下的幽灵
发布2021-01-13 15:13:47
5460
发布2021-01-13 15:13:47
举报
文章被收录于专栏:林冠宏的技术文章

作者:林冠宏 / 指尖下的幽灵。转载者,请: 务必标明出处。

博客:http://www.cnblogs.com/linguanh/

掘金:https://juejin.im/user/1785262612681997

GitHub : https://github.com/af913337456/

出版的书籍:


目录

  • 前序
  • 以太坊交易池知识点总结
  • 源码探秘
    • 本地交易
      • 本地钱包地址的初始化
      • 加载本地交易
      • pool.journal.load
      • pool.AddLocals
      • 本地交易文件的更新
    • 远程交易
      • P2P 通讯模块的初始化
      • 接收 P2P 消息
      • 添加远程交易到交易池
  • “ 彩蛋 ”

21年的第一篇文章,开源写作6年。

最近比特币以太坊的价格也已然起飞,现在一个 BTC 已能全款辆某斯拉 model 3汽车。离谱。

发布这篇文章:从区块链技术研发者的角度,说说我的区块链从业经历和对它的理解 的时候,是去年,现在回首去看最后那段话,一语成谶


言归正传。

一般做数据池之类的开发。比如:订单池,请求池...,传统的服务端思想会引导我们直接向消息中间件想去。使用各类消息组件去实现,比如 RocketMQ,Redis,Kafka...

然而,在区块链公链应用中,现已知的多条公链,每一条,都有交易池这么一个功能模块,且,它们的代码实现都没有引入消息中间件去实现。

早前在阅读以太坊公链源码的时候,我就对以太坊交易池这一块的实现思想感到新颖,今天总结下,分享给大家看看,区块链公链应用中不依赖消息中间件去实现交易池的做法及其特点。


以太坊交易池知识点总结 _(BTW:面试的时候可死记)

  1. 交易的分类:
    • 从本地文件存与不存的角度去看:
      1. 本地交易,若交易的发送者地址是配置变量指定的地址,则认为是本地交易:
        • 节点启动的时候,可以在配置文件指定,不开启本地交易的操作
      2. 远程交易,不满足 1 条件的交易。
    • 从内存存储的角度去看:
      1. Queue,待进入 Pending 的交易,结构是 map[addr]TxList
      2. Pending,待进入打包队列的交易,结构和 Queue 一样,由 1 转化而来。
  2. 交易的输入(产生):
    • 程序启动之初:
      1. 本地交易,从本地文件加载到内存,本地若没,自然是 0 输入;
      2. 远程交易,由 P2P 通讯模块,接收到交易数据,存储到内存。
    • 程序运行中:
      1. 自己接收交易的 RPC请求,SendTransaction 或 SendRawTransaction;
      2. 通过 P2P 通讯模块,接收其它节点的信息,包含的动作有:
        1. 旧交易的移除;
        2. 新交易的增加。
  3. 交易的持久化策略:
    • 本地交易:
      1. 定时从 Pending 和 Queue 中选出本地交易存储到本地文件
      2. 存储方式,文件替换,先 new 一个,再 rename 一波;
      3. 注意第 2 点,文件的替换,意味着即是更新也是删除操作;
      4. 编码方式,rlp 编码,不是 json。
    • 远程交易:
      1. 不存,不进行持久化,总是依赖由其它节点 P2P 通讯同步过来。
  4. 中断恢复:
    1. 本地交易,同上面 程序启动之初 的操作;
    2. 远程交易,没有恢复,内存中的交易丢了就是丢了,不影响。即使当初正在打包,即使当前节点挂了,其它节点还在工作。

上面第 4 点,中断恢复,对比于传统后端服务的消息中间件,对消息的不丢失保障性,区块链公链的做法,完全是靠分布式来维持的,单节点的数据丢失,可以从其它节点同步过来。所以,它们交易池的实现的实现,相对来说,更加灵活,编码难点在消息同步部分。


下面进入枯燥的源码分析阶段,读有余力的读者可以继续

要看注释。

本地交易

1. 本地钱包地址的初始化

源码文件:tx_pool.go,config.Locals 由配置文件指定,是以太坊钱包地址数组。

代码语言:javascript
复制
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
	...
	for _, addr := range config.Locals { // 从配置文件添加 本地地址
		log.Info("Setting new local account", "address", addr)
		// 添加到 locals 变量里面,后面会用它来过滤出一个地址是否是本地地址
		pool.locals.add(addr) 
	}
	...
}

2. 从本地文件,加载交易数据数据,即加载本地交易

代码语言:javascript
复制
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
	...
	pool.locals = newAccountSet(pool.signer)
	for _, addr := range config.Locals {
		log.Info("Setting new local account", "address", addr)
		pool.locals.add(addr)
	}
	...	
 	// 上面添加完了
	// If local transactions and journaling is enabled, load from disk
	if !config.NoLocals && config.Journal != "" { // 如果配置开启了本地加载的需求
		pool.journal = newTxJournal(config.Journal)
   		// load 是加载函数,pool.AddLocals 是实际添加函数
		if err := pool.journal.load(pool.AddLocals); err != nil {
			log.Warn("Failed to load transaction journal", "err", err)
		}
		if err := pool.journal.rotate(pool.local()); err != nil {
			log.Warn("Failed to rotate transaction journal", "err", err)
		}
	}
	...
    go pool.loop() // 循环处理事件
}

3. pool.journal.load

源码文件:tx_journal.go

代码语言:javascript
复制
func (journal *txJournal) load(add func([]*types.Transaction) []error) error {
	// Skip the parsing if the journal file doesn't exist at all
	if _, err := os.Stat(journal.path); os.IsNotExist(err) {
		return nil
	}
	// Open the journal for loading any past transactions
	input, err := os.Open(journal.path) // 打开文件,读取流数据
	if err != nil {
		return err
	}
	...
	stream := rlp.NewStream(input, 0) // 使用 rlp 编码算法解码数据
	...
	loadBatch := func(txs types.Transactions) {
		for _, err := range add(txs) { // 调用 add 函数,进行添加
			if err != nil {
				log.Debug("Failed to add journaled transaction", "err", err)
				dropped++
			}
		}
	}
	// loadBatch 在下面会被调用
	...
}

4. pool.AddLocals

pool.AddLocals 是实际的添加函数。内部的一系列调用后,最终到 tx_pool.add 函数。pool 的 queue 都是 map 结构,能根据相同 key 去重。

代码语言:javascript
复制
func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
	...
 	// 下面的 if,如果已在 pool.pending 里面,那么证明之前已经添加过在 queue 里
	if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
		...
 		pool.journalTx(from, tx) // 内部调用 journal.insert
		return old != nil, nil
	}
	replaced, err = pool.enqueueTx(hash, tx) // 这里,会添加到 pool.enqueue 里面
	if err != nil {
		return false, err
	}
	pool.journalTx(from, tx) // 内部调用 journal.insert
	...
}

func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
	// 本地钱包地址,没指定的话,就跳过
	if pool.journal == nil || !pool.locals.contains(from) {
		return
	}
 	// insert 会在造成重复添加,但是 load 出来的时候会根据 addr 去重
	if err := pool.journal.insert(tx); err != nil {
		log.Warn("Failed to journal local transaction", "err", err)
	}
}

截止到上面,本地交易已经被添加到 pool 的 queue 里面了。

节点启动之初,除了会从本地 load 交易到 queue 外,还会不停地监听链的事件,比如接收交易,再 add 交易 到 queue 里。

5. 本地交易文件的更新 ( 插入 / 删除 )

loop 是触发的入口。除了主动的 journal.insert 达到了插入本地交易的目的之外。

下面的更新操作,也达到了包含插入的目的:以替换的手段,从文件删除旧交易,存储新交易到文件

代码语言:javascript
复制
func (pool *TxPool) loop() {
	...
	for {
		select {
		...
		// Handle local transaction journal rotation
 		// journal 定时器,定时执行下面的本地交易数据文件的更新 journal.rotate
		case <-journal.C:
			if pool.journal != nil {
				pool.mu.Lock()
				if err := pool.journal.rotate(pool.local()); err != nil {
					log.Warn("Failed to rotate local tx journal", "err", err)
				}
				pool.mu.Unlock()
			}
		}
	}
}

journal.rotate 的做法,使用文件替换的方式,来从 pool 的交易 pending 和 queue 中存储 locals 钱包地址相关的交易到文件。注意,只存本地钱包地址的,其它的,不存。

代码语言:javascript
复制
//输入
func (pool *TxPool) local() map[common.Address]types.Transactions {
	...
	for addr := range pool.locals.accounts {
		if pending := pool.pending[addr]; pending != nil {
 			// 添加 pending 的
			txs[addr] = append(txs[addr], pending.Flatten()...)
		}
		if queued := pool.queue[addr]; queued != nil {
 			// 添加 queue 的
			txs[addr] = append(txs[addr], queued.Flatten()...)
		}
	}
	return txs
}

// all 参数,来源于上面 local()
func (journal *txJournal) rotate(all map[common.Address]types.Transactions) error {
	...
	// journal.path+".new" 后缀 .new
	replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
	if err != nil {
		return err
	}
	journaled := 0
	for _, txs := range all {
		for _, tx := range txs {
			if err = rlp.Encode(replacement, tx); err != nil {
				replacement.Close()
				return err
			}
		}
		journaled += len(txs)
	}
	replacement.Close()
 	// rename,重命名文件到原始的 path,达到更新,替换目的
	if err = os.Rename(journal.path+".new", journal.path); err != nil {
		return err
	}
	sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0755)
	if err != nil {
		return err
	}
	...
	return nil
}

远程交易

P2P 通讯模块的初始化

源码文件:eth/backend.go

代码语言:javascript
复制
func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
	...
	if config.TxPool.Journal != "" {
		config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
	}
 	// 初始化交易池
	eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)
	...
 	// 使用 交易池指针对象 作为参数初始化 protocolManager
	if eth.protocolManager, err = NewProtocolManager(
    		chainConfig, checkpoint, config.SyncMode, config.NetworkId, 
            	eth.eventMux, `eth.txPool`, eth.engine, 
                eth.blockchain, chainDb, cacheLimit, config.Whitelist); err != nil {
		return nil, err
	}
	...
	return eth, nil
}

func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
	// 下面初始化 tx_fetcher,使用 txpool.AddRemotes 赋值给函数变量 addTxs
	manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, fetchTx)
}

接收 P2P 消息

源码文件:eth/handler.go

代码语言:javascript
复制
func (pm *ProtocolManager) handleMsg(p *peer) error {
	...
    switch {
    ...
    // 接收到其它节点的交易数据
    case msg.Code == TransactionMsg || (msg.Code == PooledTransactionsMsg && p.version >= eth65):
		...
 		// Enqueue 将交易添加到交易池
		pm.txFetcher.Enqueue(p.id, txs, msg.Code == PooledTransactionsMsg)

    }
    ...
}
// tx_fetcher.go 文件
func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
 	...
	errs := f.addTxs(txs) // 执行添加,这个函数其实就是 tx_pool.go 的 AddRemotes
	...
}

添加远程交易到交易池

代码语言:javascript
复制
// tx_pool.go
// addTxs 内部就会把交易添加到 Pending 和 Queue 里面
func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
	return pool.addTxs(txs, false, false)
}

打完收工

更多以太坊的开发知识,见我的书籍:

《2.0-区块链DApp开发:基于以太坊和比特币公链》

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-01-11 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
    • 本地交易
      • 1. 本地钱包地址的初始化
      • 2. 从本地文件,加载交易数据数据,即加载本地交易
      • 3. pool.journal.load
      • 4. pool.AddLocals
      • 5. 本地交易文件的更新 ( 插入 / 删除 )
    • 远程交易
      • P2P 通讯模块的初始化
      • 接收 P2P 消息
      • 添加远程交易到交易池
  • 打完收工
相关产品与服务
区块链
云链聚未来,协同无边界。腾讯云区块链作为中国领先的区块链服务平台和技术提供商,致力于构建技术、数据、价值、产业互联互通的区块链基础设施,引领区块链底层技术及行业应用创新,助力传统产业转型升级,推动实体经济与数字经济深度融合。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档