星云链源代码-基本数据结构

在介绍Block区块之前,先介绍区块使用的一些通用的数据结构。

1)存储 Storage

Nebulas定义了存储(Storage)的接口,并实现了三种存储类型:RocksStorage,MemoryStorage, DiskStorage。相关的源代码在storage目录下。

1.// Storage interface of Storage.

2.type Storage interface {

3.// Get return the value to the key in Storage.

4.Get(key []byte) ([]byte, error) //读

5.

6.// Put put the key-value entry to Storage.

7.Put(key []byte, value []byte) error //写

8.

9.// Del delete the key entry in Storage.

10.Del(key []byte) error //删除

11.

12.// EnableBatch enable batch write.

13.EnableBatch() //开启Batch模式

14.

15.// DisableBatch disable batch write.

16.DisableBatch() //关闭Batch模式

17.

18.// Flush write and flush pending batch write.

19.Flush() error //刷缓存

20.}

RocksStorage是基于gorocksdb项目(RocksDB的go的封装)实现。DiskStorage是基于goleveldb项目实现。MemoryStorage是一个简单的基于内存的KV(key-value)实现。

RocksStorage能充分利用高速存储器的性能(比如flash,内存)。

2) Trie

Nebualas实现了Merkle Patrica Trie,代码在common/trie/trie.go文件中。Merkle Patrica Trie集成了Merkle树和Patrica树的优点,在以太坊中被设计和首次应用的。

1.type Triestruct{

2.rootHash []byte //树根的hash

3.storage storage.Storage //存储对象

4.changelog []*Entry //所有的changelog

5.needChangelogbool//是否需要changelog

6.}

7.

8.// Entry in log, [key, old value, new value]

9.type Entrystruct{

10.action Action //有三种操作:插入,更改和删除

11.key []byte //key

12.old []byte //old的value

13.update []byte //更新的value

14.}

如果保存了changelog,可以通过Replay函数重构Trie。

Trie中的节点由Node类型表示:

1.// Node in trie, three kinds,

2.// Branch Node [hash_0, hash_1, ..., hash_f]

3.// Extension Node [flag, encodedPath, next hash]

4.// Leaf Node [flag, encodedPath, value]

5.type nodestruct{

6.Hash []byte //node的hash值

7.Bytes []byte //node的系列化数据

8.Val [][]byte //node的类型和数值

9.}

Node有三种类型:

1)分支节点(BranchNode)

分支结点是由16个hash组成。

2)扩展节点(ExtentionNode)

扩展节点由三个字节数组组成:扩展标记,路径,以及hash。

3)叶子结点(Leaf Node)

叶子结点由三个字节数组组成:叶子标记,叶子的路径,以及hash。

以646f83,646f67,686f72三个key为例,三种结点的关系如下图所示:

注意:Nebulas的Trie的设计中没有考虑扩展节点,可能本身就是叶子的情况。因为在实际运行的时候,key都是hash值,很少遇到这样的情况。在以太坊最新的代码中,扩展节点本身也可能是也叶子结点(可以存储value值)。

Trie主要提供三类函数:

1)一般操作函数:“Put”,“Get”,“Del”等函数,对树添加,获取,删除结点。注意“Put”/“Del”函数会直接更改存储。

2)同步函数:SyncTrie和SyncPath,同步整棵树或者一个路径。

3)证明函数:Prove和Verify函数提供证明和验证某个节点在一棵树中。

以证明节点(646f83)为例,Prove函数的结果是N1到N5节点的Val的数组,也就是下图中的蓝线经过的节点的Val组成的数组。

3)MVCCDB

MVCCDB是Nebulas设计的多版本并发控制数据库,类型结构及其关系如下图所示。

MVCCDB支持如下的三种操作类型:

1)直接读写(Get/Put/Del)

MVCCDB直接从Storage读写数据。

2)事务处理(begin – Get/Put/Del – commit/rollback)

StagingTable会缓存一个事务中的Get/Put/Del操作,在commit中同步到Storage中。

3)预准备事务处理(begin – prepare – Get/Put/Del – update –commit/rollback)

预准备事务处理会创建预处理MVCCDB。update的时候,将预处理MVCCDB中的KV值更新到父MVCCDB中。每次更新会更新父StagingTable中的版本。

MVCCDB类型定义在core/mvccdb/mvccdb.go文件中:

1.// MVCCDB the data with MVCC supporting.

2.type MVCCDBstruct{

3.tid interface{} //MVCCDB的tid(事务编号)

4.storage storage.Storage //MVCCDB的底层存储(KV)

5.stagingTable *StagingTable //带版本信息的KV对

6.mutex sync.Mutex //同步锁

7.parentDB *MVCCDB //父亲MVCCDB

8.isInTransactionbool//是否在一次事务流程中?

9.isPreparedDBbool//是否是“预准备”DB

10.isPreparedDBClosedbool//“预准备”DB是否关闭

11.preparedDBs map[interface{}]*MVCCDB//所有“预准备”DB的MAP

12.isTrieSameKeyCompatibilitybool//trie中是否允许同样的key值

13.}

MVCCDB是个嵌套的类型,基本的属性包括:tid(事务编号),storage(存储类型)和stagingTable(在存入存储之前,在内存中缓存的带版本的KV对)。MVCCDB可以发起事务,为每个事务会创建一个MVCCDB,作为preparedDB。parentDB是preparedDB的父亲MVCCDB。

仔细分析一下begin/prepare/Get/Put/commit函数。

begin函数很简单,设置isInTransaction标志:

1.// Begin begin a transaction.

2.func (db *MVCCDB) Begin() error {

3.db.mutex.Lock()

4.defer db.mutex.Unlock()

5.

6.ifdb.isInTransaction {

7.returnErrUnsupportedNestedTransaction

8.}

9.

10.ifdb.isPreparedDB {

11.returnErrUnsupportedBeginInPreparedDB

12.}

13.

14.db.isInTransaction =true

15.

16.returnnil

17.}

prepare函数创建一个preparedDB,每个preparedDB都有唯一的标示tid:

1.// Prepare a nested transaction

2.func (db *MVCCDB) Prepare(tid interface{}) (*MVCCDB, error) {

3.db.mutex.Lock()

4.defer db.mutex.Unlock()

5.

6.if!db.isInTransaction { //只有在一个事务中,才能创建preparedDB

7.returnnil, ErrTransactionNotStarted

8.}

9.

10.iftid == nil { //每个preparedDB都必须有个唯一的tid

11.returnnil, ErrTidIsNil

12.}

13.

14.ifdb.preparedDBs[tid] != nil { //检查是否preparedDB已经创建

15.returnnil, ErrTidIsExist

16.}

17.//为preparedDB创建preparedStagingTable

18.preparedStagingTable, err := db.stagingTable.Prepare(tid)

19.iferr != nil {

20.returnnil, err

21.}

22.

23.preparedDB := &MVCCDB{

24.tid: tid,

25.storage: db.storage,

26.stagingTable: preparedStagingTable,

27.parentDB: db,

28.isInTransaction:true,

29.isPreparedDB:true,

30.isPreparedDBClosed:false,

31.preparedDBs: make(map[interface{}]*MVCCDB),

32.isTrieSameKeyCompatibility: db.isTrieSameKeyCompatibility,

33.}

34.

35.db.preparedDBs[tid] = preparedDB

36.returnpreparedDB, nil

37.}

Get/Put/Del函数相当简单,从storage或者stagingTable获取,设置,或者删除key值。

CheckAndUpdate函数,合并preparedDB的KV数据到父亲MVCCDB中。

1.// CheckAndUpdate merge current changes to `FinalVersionizedValues`.

2.func (db *MVCCDB) CheckAndUpdate() ([]interface{}, error) {

3.db.mutex.Lock()

4.defer db.mutex.Unlock()

5.

6.if!db.isInTransaction {

7.returnnil, ErrTransactionNotStarted

8.}

9.

10.if!db.isPreparedDB { //只有preparedDB允许update操作

11.returnnil, ErrDisallowedCallingInNoPreparedDB

12.}

13.

14.ifdb.isPreparedDBClosed {

15.returnnil, ErrPreparedDBIsClosed

16.}

17.

18.ret, err := db.stagingTable.MergeToParent() //合并到parent的MVCCDB的stagingTable中

19.

20.iferr == nil {

21.// cleanup.

22.db.stagingTable.Purge()

23.}

24.

25.returnret, err

26.}

commit函数讲stagingTable的KV值,存储到stoage中。也就是说,commit函数讲内存中的KV值存储到永久存储storage中。

带版本信息的KV值由StagingTable类型管理,定义在common/mvccdb/staging_table.go中:

1.// StagingTable a struct to store all staging changed key/value pairs.

2.// There are two map to store the key/value pairs. One are stored associated with tid,

3.// the other is `finalVersionizedValue`, record the `ready to commit` key/value pairs.

4.type StagingTablestruct{

5.storage storage.Storage //对应的持久化存储

6.globalVersion int64 //版本编号

7.parentStagingTable *StagingTable //父stagingTable

8.versionizedValues stagingValuesMap //KV对

9.tid interface{} //事务编号

10.mutex sync.Mutex //锁

11.prepareingGlobalVersion int64 //预准备时的版本编号

12.preparedStagingTables map[interface{}]*StagingTable //preparedDB对应的preparedStagingTables

13.isTrieSameKeyCompatibilitybool//是否允许相同的key

14.disableStrictGlobalVersionCheckbool// default `true`

15.}

StagingTable也是嵌套类型,和MVCCDB逻辑几乎一致。StagingValuesMap是带版本的KV对的map表。

1.type stagingValuesMap map[string]*VersionizedValueItem

2.

3.// VersionizedValueItem a struct for key/value pair, with version, dirty, deleted flags.

4.type VersionizedValueItemstruct{

5.tid interface{} //事务编号

6.key []byte //K

7.val []byte //V

8.versionint//版本号(每次更新,版本加一)

9.deletedbool//是否删除

10.dirtybool//是否修改

11.globalVersion int64 //全局版本号(每次update,版本加一)

12.}

一个带版本信息的KV对由VersionizedValueItem表示。具体分析一下MergeToParent函数:

1.// MergeToParent merge key/value pair of tid to `finalVersionizedValues` which the version of value are the same.

2.func (tbl *StagingTable) MergeToParent() ([]interface{}, error) {

3.iftbl.parentStagingTable == nil {

4.returnnil, ErrParentStagingTableIsNil

5.}

6.

7.tbl.parentStagingTable.mutex.Lock() //锁 parentStagingTable

8.defer tbl.parentStagingTable.mutex.Unlock()

9.

10.tbl.mutex.Lock() //锁当前的StagingTable

11.defer tbl.mutex.Unlock()

12.

13.dependentTids := make(map[interface{}]bool)

14.conflictKeys := make(map[string]interface{})

15.

16.// 1. check version.

17.targetValues := tbl.parentStagingTable.versionizedValues //获得parentStagingTable的所有KV对

18.//查看当前StagingTable的所有KV对

19.forkeyStr, fromValueItem := range tbl.versionizedValues {

20.targetValueItem := targetValues[keyStr]

21.

22.iftargetValueItem == nil {

23.continue

24.}

25.

26.// 1. record conflict.

27.iffromValueItem.isConflict(targetValueItem, tbl.isTrieSameKeyCompatibility) {

28.conflictKeys[keyStr] = targetValueItem.tid

29.continue

30.}

31.

32.// 2. record dependentTids.

33.

34.// skip default value loaded from storage.

35.iftargetValueItem.isDefault() {

36.continue

37.}

38.

39.// ignore same parent tid for dependentTids.

40.iftargetValueItem.tid == tbl.parentStagingTable.tid {

41.continue

42.}

43.

44.// ignore version check when TrieSameKeyCompatibility is enabled.

45.iftbl.isTrieSameKeyCompatibility {

46.continue

47.}

48.//如果存在targetValueItem,也就是说,parentStagingTable存在同样key的KV对,并且tid不一样,说明当前的tid和改tid有依赖关系

49.dependentTids[targetValueItem.tid] =true

50.}

51.

52.iflen(conflictKeys) > 0 {

53.logging.VLog().WithFields(logrus.Fields{

54."tid": tbl.tid,

55."parentTid": tbl.parentStagingTable.tid,

56."conflictKeys": conflictKeys,

57.}).Debug("Failed to be merged into parent.")

58.returnnil, ErrStagingTableKeyConfliction

59.}

60.

61.// 2. merge to final.

62.

63.// incr parentStagingTable.globalVersion.

64.tbl.parentStagingTable.globalVersion++ //更新parentStagingTable的全局版本

65.

66.forkeyStr, fromValueItem := range tbl.versionizedValues {

67.// ignore default value item.

68.iffromValueItem.isDefault() {

69.continue

70.}

71.

72.// ignore non-dirty.

73.if!fromValueItem.dirty {

74.continue

75.}

76.

77.// merge.

78.value := fromValueItem.CloneForMerge(tbl.parentStagingTable.globalVersion)

79.targetValues[keyStr] = value //Clone当前的KV对,并更新到parentStaingTable中

80.}

81.

82.tids := make([]interface{}, 0, len(dependentTids)) //收集所有的依赖关系

83.forkey := range dependentTids {

84.tids = append(tids, key)

85.}

86.

87.returntids, nil

88.}

特别注意的是,MergeToParent返回和当前tid依赖的其他tid。

4) DAG

DAG(有向无环图)定义在common/dag/dag.go文件中:

1.// Dag struct

2.type Dagstruct{

3.nodes map[interface{}]*Node

4.indexint

5.indexs map[int]interface{}

6.}

DAG由一系列的Node组成:

1.// Node struct

2.type Nodestruct{

3.key interface{}

4.indexint

5.children []*Node

6.parentCounterint

7.}

Dispatcher是在DAG类型基础上,提供的辅助类型。针对DAG的节点的依赖关系,Dispatcher能最大化地对节点进行并行化处理。相关的代码比较容易理解,可以查看具体的代码common/dag/dispatcher.go。

举个例子,说明Dispatcher的逻辑,假设DAG的节点关系如下:

Dispatcher会尽力先并行化处理1/2/3节点,在处理完后,再并行化处理其他节点。

1.// NewDispatcher create Dag Dispatcher instance.

2.func NewDispatcher(dag *Dag, concurrencyint, elapseInMs int64, context interface{}, cb Callback) *Dispatcher {

NewDispatcher函数用来创建一个Dispatcher,其中concurrency参数代表最大的并行个数,elapseInMs参数代表最大执行时间(ms为单位),cb参数是针对DAG中的节点的回调函数。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180731G1RSGI00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券