事务有ACID四个属性,它们的含义如下,虽然通常都说事务有这四大特性,但是这些特性之间并不是完全正交的。小编认为A、I、D是因,C是果,因为有AID的保障,所有才有一致性C.
注意,这里所说的事务的ACID是单一数据源的情况,在分布式环境中,一致性是指多个副本间数据的一致性。我们知道磁盘并不是长久不坏的,所以仅凭单个磁盘不能保证数据的可靠,在分布式环境下衍生出多个副本等冗余策略,来提升数据可靠性。
对于隔离性,在数据库系统中,一般有下面四种隔离级别,它们的隔离强度依次递减。
根据场景的不同,事务可以划分为多种,例如在分布下有分布式事务,本文这里介绍单数据源下两种典型实现方法。一种是通过写日志,另一种是通过shadow paging方式。
通过日志实现事务的原子性和持久性是现在的主流方案,像MySQL采用的就是写日志的方法。shadow paging方法在轻量级数据库SQLite version3中被采用,本文的boltdb数据库也采用的是这种方法。两种方法对比起来,shadow paging要比写日志更简单,但涉及隔离性与并发锁时,通过shadow paging实现事务的能力要弱些。
事务的原子性即一组数据库操作,要么全部修改成功,要么全部撤销,不存在部分操作成功部分失败的情况。boltdb是如何实现事务原子性的,可以从两个方面来分析。一方面在我们遇到问题的时候,可以主动调用tx.Rollback进行回滚,例如在执行tx.Commit过程中出现错误,调用回滚操作,undo该事务到目前为止的所有操作。另一方面是在执行事务的过程中,例如在向磁盘写数据的过程中,出现设备掉电等导致boltdb实例挂掉,这是通过boltdb采用的shadow paging方法实现的,在tx.Commit的时候,元信息page最后写磁盘,只有元信息写入成功,所有的改动才对用户可见。如果在写数据的时候宕机,这时元信息还没有写入,改动不会生效。
在没有执行tx.Commit操作之前,读写事务的写入操作都是修改内存中的node节点,内存中的node是原数据的副本,修改node不影响原始数据。如下图所示,Put操作是在叶子节点node d上,这时候修改的数据都在内存,如果此时执行tx.Rollback操作,只要丢弃这些内存中的node,不写入磁盘即可。
对于只读事务来说,不能执行tx.Commit操作,否则会panic, 但必须执行tx.Rollback操作,用于释放一些资源,清理一些引用,GC尽快进行垃圾回收。对于读写事务,执行tx.Rollback操作,会恢复空闲数据页freelist,恢复元数据页的txid,还有释放读写锁这步关键操作,使得其他读写事务可以获取到锁可以运行。
func (tx *Tx) Rollback() error {
_assert(!tx.managed, "managed tx rollback not allowed")
if tx.db == nil {
return ErrTxClosed
}
tx.rollback()
return nil
}
func (tx *Tx) rollback() {
if tx.db == nil {
return
}
if tx.writable {
// 恢复暂存的缓存
tx.db.freelist.rollback(tx.meta.txid)
// 从meta数据中恢复freelist
tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
}
tx.close()
}
tx.close操作释放一些资源,清理所有的引用。如果是只读事务,会将事务句柄tx从db中移除,如果是读写事务,会释放读写锁。
func (tx *Tx) close() {
if tx.db == nil {
return
}
if tx.writable {
...
tx.db.rwtx = nil
// 释放进行读写操作的读写锁
tx.db.rwlock.Unlock()
...
} else {
// 溢出当前的只读事务tx
tx.db.removeTx(tx)
}
// 清理所有引用
tx.db = nil
tx.meta = nil
tx.root = Bucket{tx: tx}
tx.pages = nil
}
如果在执行tx.Commit过程中出现异常情况,boltdb是如何保证操作原子性的呢?下面源码抽取关键的4步操作:
tx.db.freelist.free
语句。tx.allocate
和tx.db.freelist.write
操作,前者新分配合适大小的freelist页,并将当前哪些页是空闲的填入到新的freelist中。tx.close
操作func (tx *Tx) Commit() error {
...
tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
// 申请一片空间用于存储新的freelist信息
p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
if err != nil {
tx.rollback()
return err
}
// 将freelist页中的信息序列化到内存p中,并将脏页p加入到缓存中
if err := tx.db.freelist.write(p); err != nil {
tx.rollback()
return err
}
tx.meta.freelist = p.id
...
// 刷新脏页(包含新的freelist)到磁盘
if err := tx.write(); err != nil {
tx.rollback()
return err
}
...
// 刷新元数据到磁盘
if err := tx.writeMeta(); err != nil {
tx.rollback()
return err
}
tx.stats.WriteTime += time.Since(startTime)
// 关闭事务
tx.close()
...
}
「注意,上面关键的4步操作的顺序是不能调整的,刷新元数据落盘的操作一定要放到后面,元信息页作为 “全局指针”,以该指针的写入原子性来保证事务的原子性」。
第3步刷新元数据页到磁盘是最为关键的一步,writeMeta只有一处有返回错误,就是在执行tx.db.ops.writeAt的时候,其他操作都是内存中操作。writeMeta处理逻辑是先在内存中开辟一个4KB的空间,然后向里面填充page字段,在然后将tx.meta中的信息写入刚申请的空间中。注意这里pgid的确定,通过p.id = pgid(m.txid % 2)
操作,轮流写入,一次写入meat page0,另一次写入meta page1,相当于灾备写入,如果出现异常,可以通过另一个正常的meta恢复。
func (tx *Tx) writeMeta() error {
// 开辟一个4KB大小的字节数组
buf := make([]byte, tx.db.pageSize)
// 将字节数组转为page,page和buf都是同一片空间,转成page,是为了
// 方便向里面填充值
p := tx.db.pageInBuffer(buf, 0)
// 将DB.meta写入p(page)中
tx.meta.write(p)
// 将buf,也就是meta page写入到磁盘文件上
if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
return err
}
if !tx.db.NoSync || IgnoreNoSync {
if err := fdatasync(tx.db); err != nil {
return err
}
}
// Update statistics.
tx.stats.Write++
return nil
}
// 将meta中的内容输出到page中
func (m *meta) write(p *page) {
...
// 轮流写入,一次写入meat page0,另一次写入meta page1
p.id = pgid(m.txid % 2)
// 设置flags为元数据页标识
p.flags |= metaPageFlag
// 计算数据校验和
m.checksum = m.sum64()
// 将m中的内容填充到p.ptr中
m.copy(p.meta())
}
总结起来,对于tx.Commit执行过程中出现的异常,是通过tx.Commit特定的处理逻辑顺序巧妙处理,降低了出现不原子性结果的概率。
对于boltdb来说,不存在并发的写操作,所以事务的隔离被限定在了多个只读事务的隔离性和1个写事务操作和读事务操作的隔离性上。
本小节,主要来学习两面内容,一是怎么实现写事务操作只有一个,读事务有多个限制的;二是事务隔离,通过什么隔离,是怎么隔离的。
写事务只有一个,读事务操作有多个是通过锁来实现的。DB结构体中有5把锁,batchMu是执行批量事务操作用的,保护的对象是batch. rwlock是一个互斥锁,它是用来确保只有一个写事务的,开启一个写事务,需要获取rwlock的写锁,如果写锁已被别人获取,会被阻塞。metalock用于保护元素数据信息meta对象。mmaplock是一个读写锁,开启只读事务的时候,需要获取它的读锁,可以同时获取多个读锁,所以读事务操作是可以并发的。statlock是保护boltdb统计分析对象用的,这里不用过多关心。
type DB struct {
...
batchMu sync.Mutex
batch *batch
rwlock sync.Mutex
metalock sync.Mutex
mmaplock sync.RWMutex
statlock sync.RWMutex
...
}
开启只读事务,需要获取mmap读锁
func (db *DB) beginTx() (*Tx, error) {
...
// 获取mmap的读锁
db.mmaplock.RLock()
...
}
关闭只读事务,即执行tx.Rollback操作,释放mmaplock读锁. tx.Rollback-->tx.rollback-->tx.close-->tx.db.removeTx.
func (db *DB) removeTx(tx *Tx) {
db.mmaplock.RUnlock()
...
}
开启写事务,需要获取rwlock的写锁
func (db *DB) beginRWTx() (*Tx, error) {
...
db.rwlock.Lock()
...
}
执行回滚操作tx.Rollback,或者提交事务操作tx.Commit,释放rwlock的写锁
func (tx *Tx) close() {
...
if tx.writable {
...
// 释放进行读写操作的读写锁
tx.db.rwlock.Unlock()
...
} else {
...
}
...
}
「强调一点,只读事务必须执行tx.Rollback操作,否则可能阻塞写事务的tx.Commit操作」。前面分析了,只读事务会进行db.mmaplock.RLock()操作,也就是对mmaplock获取读锁,而在tx.Commit中会为修改的数据分配新page和分配新的freelist page都是通过tx.db.allocate(count)
来分配的,它里面会调用db.map,db.map中会获取db.mmaplock写锁。两条调用路径如下,所以如果只读事务中的读锁没有释放,下面获取写锁会阻塞。
1.
tx.root.spill()-->b.rootNode.spill()-->tx.allocate((node.size() / tx.db.pageSize) + 1)-->tx.db.allocate(count)-->db.mmap(minsz)-->db.mmaplock.Lock()
2.
tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)-->tx.db.allocate(count)-->db.mmap(minsz)-->db.mmaplock.Lock()
事务持久性需要将数据存储到硬盘等长久保存的设备中,boltdb采用的是写磁盘的方式,每次执行tx.Commit都将当前的修改的数据刷新到磁盘中。而写磁盘操作相比内存是很费时的,存在下面的问题:
所以boltdb使用于读多写少的场景,对于写操作多并且性能要求高的情况,boltdb就不太适合。boltdb采用B+Tree组织,节点的degree高达几百,所以它是个矮胖小子,加载叶子节点到内存不需要进行很多次IO操作,并且采用mmap映射,提升性能。
在事务提交时,需要将用户进行的一系列更新、插入和删除操作相关的node进行调整,按一定策略调整为B+Tree,使得它维持好的查询性质,最后将所有的node序列化为page写入磁盘,构成一颗新的平衡的B+Tree.上述核心实现在tx.root.rebalance()
和tx.root.spill()
,下面分析下tx.Commit中的这两个关键操作.
rebalance是再平衡操作,主要是如果node过小(key很少或者整体占用的空间不大),将其他与邻近的节点进行合并,可以提高页面的利用率。
rebalance先对当前Bucket缓存的所有node进行调整,在对它的孩子Bucket进行平衡调整。
func (b *Bucket) rebalance() {
// 对当前Bucket所有缓存的node进行调整
for _, n := range b.nodes {
n.rebalance()
}
// 在对孩子Bucket进行平衡操作
for _, child := range b.buckets {
child.rebalance()
}
}
核心在于对node进行合并操作,下面对源码关键做了详细注解,总结起来,有以下操作步骤:
func (n *node) rebalance() {
// 如果节点node n是平衡的,直接返回
if !n.unbalanced {
return
}
// 标记节点n是平衡的,经过下面的处理之后就是平衡的了
n.unbalanced = false
n.bucket.tx.stats.Rebalance++
// 阈值threshold为page页大小的1/4,page大小为4KB,即threshold为1KB
var threshold = n.bucket.tx.db.pageSize / 4
// 当节点n所占空间的大小大于1KB 并且 节点中的元素的个数超过最小值,对于页节点是1个
// 对于分支节点是2个,即叶子节点元素最少为2个,分支节点元素最少为3个
// 不用进行合并平衡处理
if n.size() > threshold && len(n.inodes) > n.minKeys() {
return
}
// n是根节点
if n.parent == nil {
// 如果该节点是根节点,且只有一个孩子节点,则将其和其唯一的孩子合并
if !n.isLeaf && len(n.inodes) == 1 {
child := n.bucket.node(n.inodes[0].pgid, n)
n.isLeaf = child.isLeaf
n.inodes = child.inodes[:]
n.children = child.children
// 修改inodes的父节点,从原来的child改为现在的n
for _, inode := range n.inodes {
if child, ok := n.bucket.nodes[inode.pgid]; ok {
child.parent = n
}
}
// 解除child关联的父节点
child.parent = nil
delete(n.bucket.nodes, child.pgid)
// 释放child节点,加入freelist
child.free()
}
return
}
// 节点n中没有任何元素
if n.numChildren() == 0 {
n.parent.del(n.key)
// 从节点n的父节点中将节点n移除
n.parent.removeChild(n)
// 删除bucket nodes中的缓存
delete(n.bucket.nodes, n.pgid)
// 释放节点n
n.free()
// 对节点n的父节点继续进行平衡操作,因为节点n被删除,可能会导致它的父节点存在不平衡
n.parent.rebalance()
return
}
_assert(n.parent.numChildren() > 1, "parent must have at least 2 children")
// 下面将节点n和它的兄弟节点进行合并,默认是与节点n的左边的兄弟节点合并,但是如果节点n
// 本来就是第一个节点,也就是它没有左兄弟,这种情况下,将节点n与它的右兄弟进行合并
var target *node
// 判断节点n是不是第一个节点,如果是则将它与它右边的兄弟节点合并
var useNextSibling = (n.parent.childIndex(n) == 0)
if useNextSibling {
// 选择与右兄弟节点进行合并
target = n.nextSibling()
} else {
// 选择与左兄弟节点进行合并
target = n.prevSibling()
}
// 与右兄弟进行合并
if useNextSibling {
// 调整target中所有孩子的节点父节点,从target调整为n,因为target和n会合并
// 合并之后的节点为n
for _, inode := range target.inodes {
if child, ok := n.bucket.nodes[inode.pgid]; ok {
// 从旧父节点中移除child
child.parent.removeChild(child)
// 调整child的新的父节点为当前的节点n
child.parent = n
// 将child加入到节点n的孩子节点集合中
child.parent.children = append(child.parent.children, child)
}
}
// 将target中的数据合并到节点n中
n.inodes = append(n.inodes, target.inodes...)
// 从当前节点n的父节点中删除target key
n.parent.del(target.key)
// 将target从B+Tree中移除
n.parent.removeChild(target)
// 缓存的node集合中删除target
delete(n.bucket.nodes, target.pgid)
// 释放target占用的空间
target.free()
} else {
// 操作与上述一致,只不过是将当前的节点n合并到target中
for _, inode := range n.inodes {
if child, ok := n.bucket.nodes[inode.pgid]; ok {
child.parent.removeChild(child)
child.parent = target
child.parent.children = append(child.parent.children, child)
}
}
target.inodes = append(target.inodes, n.inodes...)
n.parent.del(n.key)
n.parent.removeChild(n)
delete(n.bucket.nodes, n.pgid)
n.free()
}
// 上面的调整可能会导致节点的删除,因此向上递归看是否需要进一步进行平衡调整
n.parent.rebalance()
}
「对于n.parent.rebalance操作,也许有同学有疑问,如果处理的时候先处理的n.parent节点,这时不是将n.parent的unbalanced已经设置为false吗,那执行n.parent.rebalance操作不是在开头判断直接退出了吗?」,不会的,因为前面有执行n.parent.del(target.key)
操作,它的实现中会将unbalanced设置为true.
func (n *node) del(key []byte) {
...
// 给该节点n做一个不平衡的标记,在事务进行提交的时候,决定是否要对该节点进行rebalance操作
n.unbalanced = true
}
只有node.del()的调用才会导致n.unbalanced被标记为true。只有两个地方会调用node.del,一个地方就上面的rebalance操作的时候,另一个地方是在用户调用bucket.Delete函数删除数据的时候,而前者又是因为后者引起的,所以说只有用户在某次写事务中删除数据时,才会引起node.rebanlance逻辑的实际执行。
spill方法核心功能有两点,一是将占用空间过大的节点进行拆分,二是将节点转成page(脏页),为稍后写入磁盘做准备。采用自底向上的处理方式,先处理当前Bucket的子Bucket,然后再处理当前Bucket.如果一个Bucket中的内容很少,将会直接内嵌在父Bucket的叶子节点中。否则,先调用子Bucket的spill函数,将子Bucket的根节点pgid放在父节Bucket的叶子节点中。对于桶内的处理调用node.spill方法。
func (b *Bucket) spill() error {
// 先对子Bucket进行spill处理
for name, child := range b.buckets {
var value []byte
// 判断child是否可以被内联
if child.inlineable() {
// 可以被内联,释放child
child.free()
// 将child序列化内容放入到它父节点page中
value = child.write()
} else {
// 递归对child的子Bucket进行spill处理
if err := child.spill(); err != nil {
return err
}
// 重新获取child.bucket信息,因为在对child进行spill处理后
// 因为有分裂操作,可能会导致child.bucket发生变化
value = make([]byte, unsafe.Sizeof(bucket{}))
var bucket = (*bucket)(unsafe.Pointer(&value[0]))
*bucket = *child.bucket
}
if child.rootNode == nil {
continue
}
var c = b.Cursor()
k, _, flags := c.seek([]byte(name))
if !bytes.Equal([]byte(name), k) {
panic(fmt.Sprintf("misplaced bucket header: %x -> %x", []byte(name), k))
}
if flags&bucketLeafFlag == 0 {
panic(fmt.Sprintf("unexpected bucket header flag: %x", flags))
}
// 更新name的value
c.node().put([]byte(name), []byte(name), value, 0, bucketLeafFlag)
}
if b.rootNode == nil {
return nil
}
// 从根节点开始进行溢出处理
if err := b.rootNode.spill(); err != nil {
return err
}
b.rootNode = b.rootNode.root()
if b.rootNode.pgid >= b.tx.meta.pgid {
panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", b.rootNode.pgid, b.tx.meta.pgid))
}
// 更新b的根pgid
b.root = b.rootNode.pgid
return nil
}
node.spill方法是对Bucket内部节点按tx.db.pageSize进行切分,转成对应的page. 主要处理逻辑如下:
func (n *node) spill() error {
var tx = n.bucket.tx
// 如果节点已经处理过了,直接返回
if n.spilled {
return nil
}
// 先对孩子节点进行排序
sort.Sort(n.children)
// 先处理当前节点n的孩子节点,处理过程是一个自底向上的递归
for i := 0; i < len(n.children); i++ {
if err := n.children[i].spill(); err != nil {
return err
}
}
// 处理到这里的时候,节点n的孩子节点已经全部处理完并保存到分配的脏页page中,脏页page已经被保存到
// tx.pages中了,所以可以将n.children设置为nil,GC可以回收孩子节点node了
n.children = nil
// 按给定的pageSize对节点n进行切分
var nodes = n.split(tx.db.pageSize)
for _, node := range nodes {
// 不是新分配的node,也就是该node在db中有关联的page,需要将node对应的page加入到freelist中
// 在下一次开启读写事务时,此page可以被重新复用
if node.pgid > 0 {
tx.db.freelist.free(tx.meta.txid, tx.page(node.pgid))
node.pgid = 0
}
// 为节点n分配page,注意分配的page页此时还是位于内存中,尚未刷新到磁盘,
// 这里的count为啥要+1? 因为前面对节点n是按pageSize切分的,切分后node节点的大小有可能
// 小于pageSize,也有可能比pageSize大一点点,split逻辑里面是 size>threshold 才break的
// 所以需要+1才能满足。count的值最小为1,也有可能比较大,例如节点n中存在一个超大的key-value,
// 直接导致node.size()很大
p, err := tx.allocate((node.size() / tx.db.pageSize) + 1)
if err != nil {
return err
}
if p.id >= tx.meta.pgid {
panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", p.id, tx.meta.pgid))
}
// 分配的page id值赋值给node
node.pgid = p.id
// 将node中的元素内容序列化化到page p中
node.write(p)
// 标记节点node已经spill处理过了
node.spilled = true
// 将节点node添加到它的父节点中
if node.parent != nil {
var key = node.key
if key == nil {
key = node.inodes[0].key
}
node.parent.put(key, node.inodes[0].key, nil, node.pgid, 0)
node.key = node.inodes[0].key
_assert(len(node.key) > 0, "spill: zero-length node key")
}
tx.stats.Spill++
}
// 对根节点进行spill
if n.parent != nil && n.parent.pgid == 0 {
// 先将children设置nil,防止又递归spill
n.children = nil
// 对根节点进行spill
return n.parent.spill()
}
return nil
}
通过上面的spill实现可以看到,boltdb在维护B+Tree查找性质的时候,并不是像数据库上介绍的那样将分支节点的数量保持在一个固定的范围,而是直接按节点数据是否能够保存到一个page中来实现的,这样做的好处是可以减少page内部碎片,提升空间利用率。