首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >自底向上分析boltdb源码之精简版

自底向上分析boltdb源码之精简版

作者头像
jaydenwen123
发布2021-07-19 16:02:54
5930
发布2021-07-19 16:02:54
举报
文章被收录于专栏:后台通用技术后台通用技术

前言

boltdb是一个纯go编写的磁盘型kv数据库、支持事务,底层采用b+树来组织数据。目前主要的用途是做分布式组件的wal,或者单机磁盘型数据存储。对数据库感兴趣的小伙伴,非常值得一读boltdb的源码。代码量不大只有3k~4k,但功能很强大,从中可以学到不少知识。boltdb项目还是蛮出名的,现在由etcd团队在维护,etcd维护的组件叫bbolt,从boltdb fork而来,此外还有其他的一些知名的开源项目在生产环境使用boltdb。本文最初是本着好奇心和兴趣的驱使,最后通过一种自底向上的方式对boltdb内部实现一探究竟。

本文采用自底向上的方式来介绍boltdb内部的实现原理。其实我们经常都在采用自底向上或者自顶向下这两种方式来思考和求解问题。

例如:我们阅读源码时,通常都是从最顶层的接口点进去,然后层层深入内部。这其实本质上就是一种自顶向下的方式。再比如我们平常做开发时,都是先将系统进行拆分、解耦。然后一般都会采用从下而上或者从上而下的方式来进行开发迭代。 又比如在执行OKR的时候,我们通常都是先定目标、然后依据该目标再层层分解。最后分解到可执行的单元为止。这其实是一种自顶向下的方式;在真正执行时,我们通常又是从原先分解到最底层的原子单元开始执行,然后层层递进。最终所有的原子单元执行完后,我们的目标也就实现了。这其实又是自底向上的完成任务方式。

前面所提到的上下其实是指某些事物、组件、目标内部是存在相互依赖、因果、先后、递进关系,而依赖方或者结果方则位于上,被依赖方、原因方位于下;个人认为自下而上的方式比较适合执行每一个任务或者解决子问题,每一层做完时,都可以独立的进行测试和验证。当我们层层自下而上开发。最后将前面的所有东西拼接在一起时,就构成了一个完整的组件或者实现了该目标。

回到最初的话题,为什么本文要采用自底向上的方式来写呢? 对于一个文件型数据库而言,所谓的上指的是暴露给用户侧的调用接口。所谓的下又指它的输出(数据)最终要落到磁盘这种存储介质上。采用自底向上的方式的话,也就意味着我们先从磁盘这一层进行分析。然后逐步衍生到内存;再到用户接口这一层。层层之间是被依赖的一种关系。这样的话,其实就比较好理解了。在本文中,本人采用自底向上的方式来介绍。希望阅读完后,有一种自己从0到1构建了一个数据库的快感。

当然也可以采用自顶向下的方式来介绍,这时我们就需要在介绍最上层时,先假设它所依赖的底层都已经就绪了,我们只分析当层内容。然后层层往下扩展。

之前和一位大佬进行过针对此问题的探讨,在不同的场景、不同的组件中。具体采用自底向上还是自顶向下来分析。见仁见智,也具体问题具体分析。

在开始主要内容之前,先交代一些题外话。

本文核心内容主要摘自个人上周完结的自底向上剖析boltdb源码 一书。属于原先书籍的精简版。精简版的目的在于保证核心内容完整的前提下,把不太重要的部分和代码进行裁剪。大家可以根据需要自行选择阅读在线版或者精简版。

关于为什么会产生该书的原因,感兴趣的童鞋可以点击下面文章进行查阅,此处不再过多赘述。

  1. https://gocn.vip/topics/11941
  2. https://studygolang.com/resources/15467

上述书籍完整内容目前有以下渠道可以阅读和交流。

  1. 书栈网在线or app端 https://www.bookstack.cn/books/jaydenwen123-boltdb_book
  2. 个人github博客 https://jaydenwen123.github.io/boltdb/
  3. 微信交流群
  1. 加群管理员进群

备注:boltdb交流学习

下面开始正文介绍。

1. boltdb简要介绍

本章是我们的开篇,我们主要从以下几个方面做一个讲述。希望这章让大家认识一下boltdb,知道它是什么?做什么?后续所有的内容都建立再此基础上,给大家详细介绍它内部是怎么做的。因此本章内容的定位是为后续章节做一个过渡和铺垫,对boltdb比较熟悉的童鞋可以直接跳过前三节,直接从第四节阅读。本章的主要内容从以下几个方面展开:

  1. boltdb是什么?
  2. 为什么要分析boltdb?
  3. boltdb的简单用法
  4. boltdb的整体数据组织结构

1.1 boltdb是什么?

在用自己的话介绍boltdb之前,我们先看下boltdb官方是如何自我介绍的呢?

Bolt is a pure Go key/value store inspired by [Howard Chu's][hyc_symas][LMDB project][lmdb]. The goal of the project is to provide a simple,fast, and reliable database for projects that don't require a full database server such as Postgres or MySQL.

Since Bolt is meant to be used as such a low-level piece of functionality,simplicity is key. The API will be small and only focus on getting values and setting values. That's it.

看完了官方的介绍,接下来让我用一句话对boltdb进行介绍:

boltdb是一个纯go编写的支持事务的文件型单机kv数据库。

下面对上述几个核心的关键词进行一一补充。

纯go: 意味着该项目只由golang语言开发,不涉及其他语言的调用。因为大部分的数据库基本上都是由c或者c++开发的,boltdb是一款难得的golang编写的数据库。 支持事务: boltdb数据库支持两类事务:读写事务只读事务。这一点就和其他kv数据库有很大区别。 文件型: boltdb所有的数据都是存储在磁盘上的,所以它属于文件型数据库。这里补充一下个人的理解,在某种维度来看,boltdb很像一个简陋版的innodb存储引擎。底层数据都存储在文件上,同时数据都涉及数据在内存和磁盘的转换。但不同的是,innodb在事务上的支持比较强大。 单机: boltdb不是分布式数据库,它是一款单机版的数据库。个人认为比较适合的场景是,用来做wal日志或者读多写少的存储场景。 kv数据库: boltdb不是sql类型的关系型数据库,它和其他的kv组件类似,对外暴露的是kv的接口,不过boltdb支持的数据类型key和value都是[]byte。

1.2 为什么要分析boltdb?

前文介绍完了什么是boltdb。那我们先扪心自问一下,为什么要学习、分析boltdb呢?闲的吗? 答案:当然不是。我们先看看其他几个人对这个问题是如何答复的。

github用户ZhengHe-MD是这么答复的:

要达到好的学习效果,就要有输出。以我平时的工作节奏,在闲暇时间依葫芦画瓢写一个键值数据库不太现实。于是我选择将自己对源码阅读心得系统地记录下来,最终整理成本系列文章,旨在尽我所能正确地描述 boltDB。 恰好我在多次尝试在网上寻找相关内容后,发现网上大多数的文章、视频仅仅是介绍 boltDB的用法和特性。因此,也许本系列文章可以作为它们以及 boltDB 官方文档 的补充,帮助想了解它的人更快地、深入地了解 boltDB。 如果你和我一样是初学者,相信它对你会有所帮助;如果你是一名经验丰富的数据库工程师,也许本系列文章对你来说没有太多新意。

微信公众号作者TheFutureIsOurs是这么答复的: boltdb源码阅读

最近抽时间看了boltdb的源码,代码量不大(大概4000行左右),而且支持事务,结构也很清晰,由于比较稳定,已经归档,确实是学习数据库的最佳选择。而且不少出名的开源项目在使用它,比如etcd,InfluxDB等。

下面我来以自身的角度来回答下这个问题:

首先在互联网里面,所有的系统、软件都离不开数据。而提到数据,我们就会想到数据的存储和数据检索。这些功能不就是一个数据库最基本的吗。从而数据库在计算机的世界里面有着无比重要的位置。作为一个有梦想的程序员,总是想知其然并知其所以然。这个是驱动我决定看源码的原因之一。

其次最近在组里高涨的系统学习mysql、redis的氛围下,我也加入了阵营。想着把这两块知识好好消化、整理一番。尤其是mysql,大家主要还是以核心学习innodb存储引擎为目标。本人也不例外,在我看完了从根儿上理解mysql后。整体上对innodb有了宏观和微观的了解和认识,但是更近一步去看mysql的代码。有几个难点:1.本人项目主要以golang为主。说实话看c和c++的项目或多或少有些理解难度;2.mysql作为上古神兽,虽然功能很完善,但是要想短期内看完源码基本上是不可能的,而工作之余的时间有限,因此性价比极低。而boltdb完美的符合了我的这两个要求。所以这就是选择boltdb的第二个原因,也是一个主要原因。

最后还是想通过分析这个项目,在下面两个方面有所提升。

  1. 一方面让自己加深原先学习的理论知识;
  2. 另外一方面也能真正的了解工程上是如何运用的,理论结合实践,然后对存储引擎有一个清晰的认识;

介绍完了boltdb是什么?为什么要分析boltdb后,我们就正式进入主题了。让我们先以一个简单例子认识下boltdb。

1.3 boltdb的简单用法

其实boltdb的用法很简单,从其项目github的文档里面就可以看得出来。它本身的定位是key/value(后面简称为kv)存储的嵌入式数据库,因此那提到kv我们自然而然能想到的最常用的操作,就是set(k,v)和get(k)了。确实如此boltdb也就是这么简单。

不过在详细介绍boltdb使用之前,我们先以日常生活中的一些场景来作为切入点,引入一些在boltdb中抽象出来的专属名词(DB、Bucket、Cursor、k/v等),下面将进入正文,前面提到boltdb的使用确实很简单,就是set和get。但它还在此基础上还做了一些额外封装。下面通过现实生活对比来介绍这些概念。

boltdb本质就是存放数据的,那这和现实生活中的柜子就有点类似了,如果我们把boltdb看做是一个存放东西的柜子的话,它里面可以存放各种各样的东西,确实是的,但是我们想一想,所有东西都放在一起会不会有什么问题呢?

咦,如果我们把钢笔、铅笔、外套、毛衣、短袖、餐具这些都放在一个柜子里的话,会有啥问题呢?这对于特那些别喜欢收拾屋子,东西归类放置的人而言,简直就是一个不可容忍的事情,因为所有的东西都存放在一起,当东西多了以后就会显得杂乱无章。

在生活中我们都有分类、归类的习惯,例如对功能类似的东西(钢笔、铅笔、圆珠笔等)放一起,或者同类型的东西(短袖、长袖等)放一起。把前面的柜子通过隔板来隔开,分为几个小的小柜子,第一个柜子可以放置衣服,第二个柜子可以放置书籍和笔等。当然了,这是很久以前的做法了,现在买的柜子,厂家都已经将其内部通过不同的存放东西的规格做好了分隔。大家也就不用为这些琐事操心了。既然这样,那把分类、归类这个概念往计算机中迁移过来,尤其是对于存放数据的数据库boltdb中,它也需要有分类、归类的思想,因为归根到底,它也是由人创造出来的嘛。

好了到这儿,我们引入我们的三大名词了“DB”“Bucket”“k/v”

DB: 对应我们上面的柜子。

Bucket: 对应我们将柜子分隔后的小柜子或者抽屉了。

k/v: 对应我们放在抽屉里的每一件东西。为了方便我们后面使用的时候便捷,我们需要给每个东西都打上一个标记,这个标记是可以区分每件东西的,例如k可以是该物品的颜色、或者价格、或者购买日期等,v就对应具体的东西啦。这样当我们后面想用的时候,就很容易找到。尤其是女同胞们的衣服和包包,哈哈

再此我们就可以得到一个大概的层次结构,一个柜子(DB)里面可以有多个小柜子(Bucket),每个小柜子里面存放的就是每个东西(k/v)啦。

那我们想一下,我们周末买了一件新衣服,回到家,我们要把衣服放在柜子里,那这时候需要怎么操作呢?

很简单啦,下面看看我们平常怎么做的。

第一步:如果家里没有柜子,那就得先买一个柜子;

第二步:在柜子里找找之前有没有放置衣服的小柜子,没有的话,那就分一块出来,总不能把新衣服和钢笔放在一块吧。

第三步:有了放衣服的柜子,那就里面找找,如果之前都没衣服,直接把衣服打上标签,然后丢进去就ok啦;如果之前有衣服,那我们就需要考虑要怎么放了,随便放还是按照一定的规则来放。这里我猜大部分人还是会和我一样吧。喜欢按照一定的规则放,比如按照衣服的新旧来摆放,或者按照衣服的颜色来摆放,或者按照季节来摆放,或者按照价格来摆放。哈哈

我们在多想一下,周一早上起来我们要找一件衣服穿着去上班,那这时候我们又该怎么操作呢?

第一步:去找家里存放东西的柜子,家里没柜子,那就连衣服都没了,尴尬...。所以我们肯定是有柜子的,对不对

第二步:找到柜子了,然后再去找放置衣服的小柜子,因为衣服在小柜子存放着。

第三步:找到衣服的柜子了,那就从里面找一件衣服了,找哪件呢!最新买的?最喜欢的?天气下雨了,穿厚一点的?天气升温了,穿薄一点的?今天没准可能要约会,穿最有气质的?.....

那这时候根据不同场景来确定了规则,明确了我们要找的衣服的标签,找起来就会很快了。我们一下子就能定位到要穿的衣服了。嗯哼,这就是排序、索引的威力了

如果之前放置的衣服没有按照这些规则来摆放。那这时候就很悲剧了,就得挨个挨个找,然后自己选了。哈哈,有点全表扫描的味道了

啰里啰嗦扯了一大堆,就是为了给大家科普清楚,一些boltdb中比较重要的概念,让大家对比理解。降低理解难度。下面开始介绍boltdb是如何简单使用的。

使用无外乎两个操作:setget

func main() {
   // 我们的大柜子
   db, err := bolt.Open("./my.db", 0600, nil)
   if err != nil {
      panic(err)
   }
   defer db.Close()
   // 往小柜子里放东西
   err = db.Update(func(tx *bolt.Tx) error {
      //我们的小柜子
      bucket, err := tx.CreateBucketIfNotExists([]byte("user"))
      if err != nil {
         log.Fatalf("CreateBucketIfNotExists err:%s", err.Error())
         return err
      }
      //放入东西
      if err = bucket.Put([]byte("hello"), []byte("world")); err != nil {
         log.Fatalf("bucket Put err:%s", err.Error())
         return err
      }
      return nil
   })
   if err != nil {
      log.Fatalf("db.Update err:%s", err.Error())
   }
   // 从柜子里取东西
   err = db.View(func(tx *bolt.Tx) error {
      //找到柜子
      bucket := tx.Bucket([]byte("user"))
      //找东西
      val := bucket.Get([]byte("hello"))
      log.Printf("the get val:%s", val)
      val = bucket.Get([]byte("hello2"))
      log.Printf("the get val2:%s", val)
      return nil
   })
   if err != nil {
      log.Fatalf("db.View err:%s", err.Error())
   }
}

1.4 boltdb的整体数据组织结构

下面这幅图完整的展示了boltdb中数据在磁盘文件(file)、文件中的每页(page)上的存储格式以及内存(bucket、node)中b+树形式的组织情况。先从整体上给大家展示一下,大家暂时看不懂不要紧,后面章节会详细的分析每一部分的内容。

1.5 总结

再此做一个小结,本章我们首先对boltdb进行一个简要的介绍,让大家知道boltdb是什么,做什么用的。接着又回答了为什么要分析boltdb。在第三节中通过类比现实生活的场景给大家介绍了一下boltdb的简单用法。既然我们用起来了boltdb,那我们就留下了一个悬念,它内部是如何运转的呢?在分析悬念之前,我们在第四节中,从整体对大家展示了boltdb中数据时如何组织存储的。后面的内容都是围绕着这张图来展开的。

2.boltdb的核心数据结构分析

从一开始,boltdb的定位就是一款文件数据库,顾名思义它的数据都是存储在磁盘文件上的,目前我们大部分场景使用的磁盘还是机械磁盘。而我们又知道数据落磁盘其实是一个比较慢的操作(此处的快慢是和操作内存想对比而言)。所以怎么样在这种硬件条件无法改变的情况下,如何提升性能就成了一个恒定不变的话题。而提升性能就不得不提到它的数据组织方式了。所以这部分我们主要来分析boltdb的核心数据结构。

我们都知道,操作磁盘之所以慢,是因为对磁盘的读写耗时主要包括:寻道时间+旋转时间+传输时间。而这儿的大头主要是在寻道时间上,因为寻道是需要移动磁头到对应的磁道上,通过马达驱动磁臂的移动是一种机械运动,比较耗时。我们往往对磁盘的操作都是随机读写,简而言之,随机读写的话,需要频繁移动磁头到对应的磁道。这种方式性能比较低。还有一种和它对应的方式:顺序读写。顺序读写的性能要比随机读写高很多。

因此,所谓的提升性能,无非就是尽可能的减少磁盘的随机读写,更大程度采用顺序读写的方式。这是主要矛盾,不管是mysql的innodb还是boltdb他们都是围绕这个核心来展开的。如何将用户写进来在内存中的数据尽可能采用顺序写的方式放进磁盘,同时在用户读时,将磁盘中保存的数据以尽可能少的IO调用次数加载到内存中,近而返回用户。这里面就涉及到具体的数据在磁盘、内存中的组织结构以及相互转换了。下面我们就对这一块进行详细的分析。

这里面主要包含几块内容:一个是它在磁盘上的数据组织结构page、一个是它在内存中的数据组织结构node、还有一个是page和node之间的相互转换关系。 这里先给大家直观的科普一点: set操作: 本质上对应的是 set->node->page->file的过程 get操作: 本质上对应的是 file->page->node->get的过程

2.1 boltdb page结构

在boltdb中,一个db对应一个真实的磁盘文件。而在具体的文件中,boltdb又是按照以page为单位来读取和写入数据的,也就是说所有的数据在磁盘上都是按照页(page)来存储的,而此处的页大小是保持和操作系统对应的内存页大小一致,也就是4k。每页由两部分数据组成:页头数据+真实数据,页头信息占16个字节,下面的页的结构定义

type pgid uint64
type page struct {
	// 页id 8字节
	id pgid
	// flags:页类型,可以是分支,叶子节点,元信息,空闲列表  2字节,该值的取值详细参见下面描述
	flags uint16
	// 个数 2字节,统计叶子节点、非叶子节点、空闲列表页的个数
	count uint16
	// 4字节,数据是否有溢出,主要在空闲列表上有用
	overflow uint32
	// 真实的数据
	ptr uintptr
}

其中,ptr是一个无类型指针,它就是表示每页中真实的存储的数据地址。而其余的几个字段(id、flags、count、overflow)为我们前面提到的页头信息。 下图展现的是boltdb中page的数据存储方式。

在boltdb中,它把页划分为四类:

page页类型

类型定义

类型值

用途

分支节点页

branchPageFlag

0x01

存储索引信息(页号、元素key值)

叶子节点页

leafPageFlag

0x02

存储数据信息(页号、插入的key值、插入的value值)

元数据页

metaPageFlag

0x04

存储数据库的元信息,例如空闲列表页id、放置桶的根页等

空闲列表页

freelistPageFlag

0x10

存储哪些页是空闲页,可以用来后续分配空间时,优先考虑分配

boltdb通过定义的常量来描述

// 页头的大小:16字节
const pageHeaderSize = int(unsafe.Offsetof(((*page)(nil)).ptr))

const minKeysPerPage = 2

//分支节点页中每个元素所占的大小
const branchPageElementSize = int(unsafe.Sizeof(branchPageElement{}))
//叶子节点页中每个元素所占的大小
const leafPageElementSize = int(unsafe.Sizeof(leafPageElement{}))

const (
	branchPageFlag   = 0x01 //分支节点页类型
	leafPageFlag     = 0x02 //叶子节点页类型
	metaPageFlag     = 0x04 //元数据页类型
	freelistPageFlag = 0x10 //空闲列表页类型
)

同时每页都有一个方法来判断该页的类型,我们可以清楚的看到每页时通过其flags字段来标识页的类型。

// typ returns a human readable page type string used for debugging.
func (p *page) typ() string {
	if (p.flags & branchPageFlag) != 0 {
		return "branch"
	} else if (p.flags & leafPageFlag) != 0 {
		return "leaf"
	} else if (p.flags & metaPageFlag) != 0 {
		return "meta"
	} else if (p.flags & freelistPageFlag) != 0 {
		return "freelist"
	}
	return fmt.Sprintf("unknown<%02x>", p.flags)
}

下面我们一一对其数据结构进行分析。

2.2 元数据页

每页有一个meta()方法,如果该页是元数据页的话,可以通过该方法来获取具体的元数据信息。

// meta returns a pointer to the metadata section of the page.
func (p *page) meta() *meta {
	// 将p.ptr转为meta信息
	return (*meta)(unsafe.Pointer(&p.ptr))
}

详细的元数据信息定义如下:

type meta struct {
	magic    uint32 //魔数
	version  uint32 //版本
	pageSize uint32 //page页的大小,该值和操作系统默认的页大小保持一致
	flags    uint32 //保留值,目前貌似还没用到
	root     bucket //所有小柜子bucket的根
	freelist pgid //空闲列表页的id
	pgid     pgid //元数据页的id
	txid     txid //最大的事务id
	checksum uint64 //用作校验的校验和
}

下图展现的是元信息存储方式。

下面我们重点关注该meta数据是如何写入到一页中的,以及如何从磁盘中读取meta信息并封装到meta中 1. meta->page

// write writes the meta onto a page.
func (m *meta) write(p *page) {
   if m.root.root >= m.pgid {
      panic(fmt.Sprintf("root bucket pgid (%d) above high water mark (%d)", m.root.root, m.pgid))
   } else if m.freelist >= m.pgid {
      panic(fmt.Sprintf("freelist pgid (%d) above high water mark (%d)", m.freelist, m.pgid))
   }

   // Page id is either going to be 0 or 1 which we can determine by the transaction ID.
   // 指定页id和页类型
   p.id = pgid(m.txid % 2)
   p.flags |= metaPageFlag

   // Calculate the checksum.
   m.checksum = m.sum64()
   // 这儿p.meta()返回的是p.ptr的地址,因此通过copy之后,meta信息就放到page中了
   m.copy(p.meta())
}

// copy copies one meta object to another.
func (m *meta) copy(dest *meta) {
   *dest = *m
}

2. page->meta

page.go
// meta returns a pointer to the metadata section of the page.
func (p *page) meta() *meta {
	// 将p.ptr转为meta信息
	return (*meta)(unsafe.Pointer(&p.ptr))
}

2.3 空闲列表页

空闲列表页中主要包含三个部分:所有已经可以重新利用的空闲页列表ids、将来很快被释放掉的事务关联的页列表pending、页id的缓存。详细定义在freelist.go文件中,下面给大家展示其空闲页的定义。

// freelist represents a list of all pages that are available for allocation.
// It also tracks pages that have been freed but are still in use by open transactions.
type freelist struct {
   // 已经可以被分配的空闲页
   ids []pgid // all free and available free page ids.
   // 将来很快能被释放的空闲页,部分事务可能在读或者写
   pending map[txid][]pgid // mapping of soon-to-be free page ids by tx.
   cache   map[pgid]bool   // fast lookup of all free and pending page ids.
}

// newFreelist returns an empty, initialized freelist.
func newFreelist() *freelist {
   return &freelist{
      pending: make(map[txid][]pgid),
      cache:   make(map[pgid]bool),
   }
}

下图展示的是空闲列表的存储方式。

1. freelist->page

将空闲列表转换成页信息,写到磁盘中,此处需要注意一个问题,页头中的count字段是一个uint16,占两个字节,其最大可以表示2^16 即65536个数字,当空闲页的个数超过65535时时,需要将p.ptr中的第一个字节用来存储其空闲页的个数,同时将p.count设置为0xFFFF。否则不超过的情况下,直接用count来表示其空闲页的个数

// write writes the page ids onto a freelist page. All free and pending ids are
// saved to disk since in the event of a program crash, all pending ids will
// become free.
//将 freelist信息写入到p中
func (f *freelist) write(p *page) error {
	// Combine the old free pgids and pgids waiting on an open transaction.

	// Update the header flag.
	// 设置页头中的页类型标识
	p.flags |= freelistPageFlag

	// The page.count can only hold up to 64k elements so if we overflow that
	// number then we handle it by putting the size in the first element.

	lenids := f.count()
	if lenids == 0 {
		p.count = uint16(lenids)
	} else if lenids < 0xFFFF {
		p.count = uint16(lenids)
		// 拷贝到page的ptr中
		f.copyall(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[:])
	} else {
		// 有溢出的情况下,后面第一个元素放置其长度,然后再存放所有的pgid列表
		p.count = 0xFFFF
		((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0] = pgid(lenids)
		// 从第一个元素位置拷贝
		f.copyall(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[1:])
	}
	return nil
}

// copyall copies into dst a list of all free ids and all pending ids in one sorted list.
// f.count returns the minimum length required for dst.
func (f *freelist) copyall(dst []pgid) {
	// 首先把pending状态的页放到一个数组中,并使其有序
	m := make(pgids, 0, f.pending_count())
	for _, list := range f.pending {
		m = append(m, list...)
	}
	sort.Sort(m)
	// 合并两个有序的列表,最后结果输出到dst中
	mergepgids(dst, f.ids, m)
}

// mergepgids copies the sorted union of a and b into dst.
// If dst is too small, it panics.
// 将a和b按照有序合并成到dst中,a和b有序
func mergepgids(dst, a, b pgids) {
	...
	// Assign lead to the slice with a lower starting value, follow to the higher value.
	lead, follow := a, b
	if b[0] < a[0] {
		lead, follow = b, a
	}

	// Continue while there are elements in the lead.
	for len(lead) > 0 {
		// Merge largest prefix of lead that is ahead of follow[0].
		n := sort.Search(len(lead), func(i int) bool { return lead[i] > follow[0] })
		merged = append(merged, lead[:n]...)
		if n >= len(lead) {
			break
		}

		// Swap lead and follow.
		lead, follow = follow, lead[n:]
	}
	// Append what's left in follow.
	_ = append(merged, follow...)
	...
}

2. page->freelist

从磁盘中加载空闲页信息,并转为freelist结构,转换时,也需要注意其空闲页的个数的判断逻辑,当p.count为0xFFFF时,需要读取p.ptr中的第一个字节来计算其空闲页的个数。否则则直接读取p.ptr中存放的数据为空闲页ids列表

//从磁盘中的page初始化freelist
// read initializes the freelist from a freelist page.
func (f *freelist) read(p *page) {
	// If the page.count is at the max uint16 value (64k) then it's considered
	// an overflow and the size of the freelist is stored as the first element.
	idx, count := 0, int(p.count)
	if count == 0xFFFF {
		idx = 1
		// 用第一个uint64来存储整个count的值
		count = int(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0])
	}

	// Copy the list of page ids from the freelist.
	if count == 0 {
		f.ids = nil
	} else {
		ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx:count]
		f.ids = make([]pgid, len(ids))
		copy(f.ids, ids)

		// Make sure they're sorted.
		sort.Sort(pgids(f.ids))
	}

	// Rebuild the page cache.
	f.reindex()
}

// reindex rebuilds the free cache based on available and pending free lists.
func (f *freelist) reindex() {
	f.cache = make(map[pgid]bool, len(f.ids))
	for _, id := range f.ids {
		f.cache[id] = true
	}
	for _, pendingIDs := range f.pending {
		for _, pendingID := range pendingIDs {
			f.cache[pendingID] = true
		}
	}
}

2.4 分支节点页

分支节点主要用来构建索引,方便提升查询效率。下面我们来看看boltdb的分支节点的数据是如何存储的。

1. 分支节点页中元素定义: 分支节点在存储时,一个分支节点页上会存储多个分支页元素即branchPageElement。这个信息可以记做为分支页元素元信息。元信息中定义了具体该元素的页id(pgid)、该元素所指向的页中存储的最小key的值大小、最小key的值存储的位置距离当前的元信息的偏移量pos。下面是branchPageElement的详细定义:

// branchPageElement represents a node on a branch page.
type branchPageElement struct {
	pos   uint32 //该元信息和真实key之间的偏移量
	ksize uint32
	pgid  pgid
}

// key returns a byte slice of the node key.
func (n *branchPageElement) key() []byte {
	buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
	// pos~ksize
	return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[n.pos]))[:n.ksize]
}

2. 分支节点页page中获取下标为index的某一个element的信息和获取全部的elements信息

// branchPageElement retrieves the branch node by index
func (p *page) branchPageElement(index uint16) *branchPageElement {
	return &((*[0x7FFFFFF]branchPageElement)(unsafe.Pointer(&p.ptr)))[index]
}

// branchPageElements retrieves a list of branch nodes.
func (p *page) branchPageElements() []branchPageElement {
	if p.count == 0 {
		return nil
	}
	return ((*[0x7FFFFFF]branchPageElement)(unsafe.Pointer(&p.ptr)))[:]
}

下图展现的是非叶子节点存储方式。

在内存中,分支节点页和叶子节点页都是通过node来表示,只不过的区别是通过其node中的isLeaf这个字段来区分。下面和大家分析分支节点页page和内存中的node的转换关系。

下面在介绍具体的转换关系前,我们介绍一下内存中的分支节点和叶子节点是如何描述的。

// node represents an in-memory, deserialized page.
type node struct {
	bucket     *Bucket
	isLeaf     bool
	unbalanced bool
	spilled    bool
	key        []byte
	pgid       pgid
	parent     *node
	children   nodes
	inodes     inodes
}

在内存中,具体的一个分支节点或者叶子节点都被抽象为一个node对象,其中是分支节点还是叶子节点主要通通过其isLeaf字段来区分。下面对分支节点和叶子节点做两点说明:

  1. 对叶子节点而言,其没有children这个信息。同时也没有key信息。isLeaf字段为true,其上存储的key、value都保存在inodes中
  2. 对于分支节点而言,其具有key信息,同时children也不一定为空。isLeaf字段为false,同时该节点上的数据保存在inode中。

为了方便大家理解,node和page的转换,下面大概介绍下inode和nodes结构。我们在下一章会详细介绍node。


const (
	bucketLeafFlag = 0x01
)
type nodes []*node

func (s nodes) Len() int           { return len(s) }
func (s nodes) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
func (s nodes) Less(i, j int) bool { return bytes.Compare(s[i].inodes[0].key, s[j].inodes[0].key) == -1 }

// inode represents an internal node inside of a node.
// It can be used to point to elements in a page or point
// to an element which hasn't been added to a page yet.
type inode struct {
	// 表示是否是子桶叶子节点还是普通叶子节点。如果flags值为1表示子桶叶子节点,否则为普通叶子节点
	flags uint32
	// 当inode为分支元素时,pgid才有值,为叶子元素时,则没值
	pgid  pgid
	key   []byte
	// 当inode为分支元素时,value为空,为叶子元素时,才有值
	value []byte
}

type inodes []inode

3. page->node

通过分支节点页来构建node节点

// 根据page来初始化node
// read initializes the node from a page.
func (n *node) read(p *page) {
	n.pgid = p.id
	n.isLeaf = ((p.flags & leafPageFlag) != 0)
	n.inodes = make(inodes, int(p.count))

	for i := 0; i < int(p.count); i++ {
		inode := &n.inodes[i]
		if n.isLeaf {
			// 获取第i个叶子节点
			elem := p.leafPageElement(uint16(i))
			inode.flags = elem.flags
			inode.key = elem.key()
			inode.value = elem.value()
		} else {
			// 树枝节点
			elem := p.branchPageElement(uint16(i))
			inode.pgid = elem.pgid
			inode.key = elem.key()
		}
		_assert(len(inode.key) > 0, "read: zero-length inode key")
	}

	// Save first key so we can find the node in the parent when we spill.
	if len(n.inodes) > 0 {
		// 保存第1个元素的值
		n.key = n.inodes[0].key
		_assert(len(n.key) > 0, "read: zero-length node key")
	} else {
		n.key = nil
	}
}

4. node->page 将node中的数据写入到page中

// write writes the items onto one or more pages.
// 将node转为page
func (n *node) write(p *page) {
	// Initialize page.
	// 判断是否是叶子节点还是非叶子节点
	if n.isLeaf {
		p.flags |= leafPageFlag
	} else {
		p.flags |= branchPageFlag
	}

	// 这儿叶子节点不可能溢出,因为溢出时,会分裂
	if len(n.inodes) >= 0xFFFF {
		panic(fmt.Sprintf("inode overflow: %d (pgid=%d)", len(n.inodes), p.id))
	}
	p.count = uint16(len(n.inodes))

	// Stop here if there are no items to write.
	if p.count == 0 {
		return
	}

	// Loop over each item and write it to the page.
	// b指向的指针为提逃过所有item头部的位置
	b := (*[maxAllocSize]byte)(unsafe.Pointer(&p.ptr))[n.pageElementSize()*len(n.inodes):]
	for i, item := range n.inodes {
		_assert(len(item.key) > 0, "write: zero-length inode key")

		// Write the page element.
		// 写入叶子节点数据
		if n.isLeaf {
			elem := p.leafPageElement(uint16(i))
			elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem)))
			elem.flags = item.flags
			elem.ksize = uint32(len(item.key))
			elem.vsize = uint32(len(item.value))
		} else {
			// 写入分支节点数据
			elem := p.branchPageElement(uint16(i))
			elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem)))
			elem.ksize = uint32(len(item.key))
			elem.pgid = item.pgid
			_assert(elem.pgid != p.id, "write: circular dependency occurred")
		}

		// If the length of key+value is larger than the max allocation size
		// then we need to reallocate the byte array pointer.
		//
		// See: https://github.com/boltdb/bolt/pull/335
		klen, vlen := len(item.key), len(item.value)
		if len(b) < klen+vlen {
			b = (*[maxAllocSize]byte)(unsafe.Pointer(&b[0]))[:]
		}

		// Write data for the element to the end of the page.
		copy(b[0:], item.key)
		b = b[klen:]
		copy(b[0:], item.value)
		b = b[vlen:]
	}

	// DEBUG ONLY: n.dump()
}

2.5 叶子节点页

叶子节点主要用来存储实际的数据,也就是key+value了。下面看看具体的key+value是如何设计的。 在boltdb中,每一对key/value在存储时,都有一份元素元信息,也就是leafPageElement。其中定义了key的长度、value的长度、具体存储的值距离元信息的偏移位置pos。

// leafPageElement represents a node on a leaf page.
// 叶子节点既存储key,也存储value
type leafPageElement struct {
	flags uint32 //该值主要用来区分,是子桶叶子节点元素还是普通的key/value叶子节点元素。flags值为1时表示子桶。否则为key/value
	pos   uint32
	ksize uint32
	vsize uint32
}

// 叶子节点的key
// key returns a byte slice of the node key.
func (n *leafPageElement) key() []byte {
	buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
	// pos~ksize
	return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[n.pos]))[:n.ksize:n.ksize]
}

// 叶子节点的value
// value returns a byte slice of the node value.
func (n *leafPageElement) value() []byte {
	buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
	// key:pos~ksize
	// value:pos+ksize~pos+ksize+vsize
	return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[n.pos+n.ksize]))[:n.vsize:n.vsize]
}

下面是具体在叶子节点的page中获取下标为index的某个key/value的元信息。根据其元信息,就可以进一步获取其存储的key和value的值了,具体方法可以看上面的key()和value()

// leafPageElement retrieves the leaf node by index
func (p *page) leafPageElement(index uint16) *leafPageElement {
	n := &((*[0x7FFFFFF]leafPageElement)(unsafe.Pointer(&p.ptr)))[index]

	// 最原始的指针:unsafe.Pointer(&p.ptr)
	// 将其转为(*[0x7FFFFFF]leafPageElement)(unsafe.Pointer(&p.ptr))
	// 然后再取第index个元素 ((*[0x7FFFFFF]leafPageElement)(unsafe.Pointer(&p.ptr)))[index]
	// 最后返回该元素指针 &((*[0x7FFFFFF]leafPageElement)(unsafe.Pointer(&p.ptr)))[index]

	// ((*[0x7FFFFFF]leafPageElement)(unsafe.Pointer(&p.ptr)))
	// (*[0x7FFFFFF]leafPageElement)(unsafe.Pointer(&p.ptr))==>[]leafPageElement
	// &leafElements[index]
	return n
}

// leafPageElements retrieves a list of leaf nodes.
func (p *page) leafPageElements() []leafPageElement {
	if p.count == 0 {
		return nil
	}
	return ((*[0x7FFFFFF]leafPageElement)(unsafe.Pointer(&p.ptr)))[:]
}

下图展现的是叶子节点存储方式。

其具体叶子节点页page转换成node时的转变过程如同分支节点转换的方法一样,此处就不做赘述,可以参考2.4节介绍的read()和write()方法

2.6 总结

本章中我们重点分析了boltdb中的核心数据结构(page、freelist、meta、node)以及他们之间的相互转化。 在底层磁盘上存储时,boltdb是按照页的单位来存储实际数据的,页的大小取自于它运行的操作系统的页大小。在boltdb中,根据存储的数据的类型不同,将页page整体上分为4大类: 1. 元信息页(meta page) 2. 空闲列表页(freelist page) 3. 分支节点页(branch page) 4. 叶子节点页(leaf page)

在page的头信息中通过flags字段来区分。

在内存中同样有对应的结构来映射磁盘上的上述几种页。如元信息meta空闲列表freelist分支/叶子节点node(通过isLeaf区分分支节点还是叶子节点)。我们在每一节中先详细介绍其数据结构的定义。接着再重点分析在内存和磁盘上该类型的页时如何进行转换的。可以准确的说,数据结构属于boltdb核心中的核心。梳理清楚了每个数据结构存储的具体数据和格式后。下一章我们将重点分析其另外两个核心结构bucket和node。

3. boltdb的b+树(Bucket、node)分析

在第一章我们提到在boltdb中,一个db对应底层的一个磁盘文件。一个db就像一个大柜子一样,其中可以被分隔多个小柜子,用来存储同类型的东西。每个小柜子在boltdb中就是Bucket了。bucket英文为。很显然按照字面意思来理解,它在生活中也是存放数据的一种容器。目前为了方便大家理解,在boltdb中的Bucket可以粗略的认为,它里面主要存放的内容就是我们的k/v键值对啦。但这儿其实不准确,后面会详细说明。下面详细进行分析Bucket。在boltdb中定义有bucket、Bucket两个结构。我们在此处所指的Bucket都是指Bucket哈。请大家注意!

3.1 boltdb的Bucket结构

先来看官方文档的一段描述Bucket的话。

Bucket represents a collection of key/value pairs inside the database.

下面是Bucket的详细定义,本节我们先暂时忽略事务Tx,后面章节会详细介绍事务

// 16 byte
const bucketHeaderSize = int(unsafe.Sizeof(bucket{}))

// Bucket represents a collection of key/value pairs inside the database.
// 一组key/value的集合,也就是一个b+树
type Bucket struct {
	*bucket //在内联时bucket主要用来存储其桶的value并在后面拼接所有的元素,即所谓的内联
	tx       *Tx                // the associated transaction
	buckets  map[string]*Bucket // subbucket cache
	page     *page              // inline page reference,内联页引用
	rootNode *node              // materialized node for the root page.
	nodes    map[pgid]*node     // node cache

	// Sets the threshold for filling nodes when they split. By default,
	// the bucket will fill to 50% but it can be useful to increase this
	// amount if you know that your write workloads are mostly append-only.
	//
	// This is non-persisted across transactions so it must be set in every Tx.
	// 填充率,默认值0.5
	FillPercent float64
}

// bucket represents the on-file representation of a bucket.
// This is stored as the "value" of a bucket key. If the bucket is small enough,
// then its root page can be stored inline in the "value", after the bucket
// header. In the case of inline buckets, the "root" will be 0.
type bucket struct {
	root     pgid   // page id of the bucket's root-level page
	sequence uint64 // monotonically incrementing, used by NextSequence()
}

// newBucket returns a new bucket associated with a transaction.
func newBucket(tx *Tx) Bucket {
	var b = Bucket{tx: tx, FillPercent: DefaultFillPercent}
	if tx.writable {
		b.buckets = make(map[string]*Bucket)
		b.nodes = make(map[pgid]*node)
	}
	return b
}

下图展现的是数据在bucket中的存储方式。

上面是一个Bucket的定义,在开始下面的内容前,我们先提前介绍一下另一个角色Cursor,因为后面会频繁的用到它。大家在这里先知道,一个Bucket就是一个b+树就可以了。我们后面会对其进行详细的分析。

3.2 Bucket遍历之Cursor

本节我们先做一节内容的铺垫,暂时不讲如何创建、获取、删除一个Bucket。而是介绍一个boltdb中的新对象Cursor。

答案是:所有的上述操作都是建立在首先定位到一个Bucket所属的位置,然后才能对其进行操作。而定位一个Bucket的功能就是由Cursor来完成的。所以我们先这一节给大家介绍一下boltdb中的Cursor。

我们先看下官方文档对Cursor的描述

Cursor represents an iterator that can traverse over all key/value pairs in a bucket in sorted order.

用大白话讲,既然一个Bucket逻辑上是一颗b+树,那就意味着我们可以对其进行遍历。前面提到的set、get操作,无非是要在Bucket上先找到合适的位置,然后再进行操作。而“找”这个操作就是交由Cursor来完成的。简而言之对Bucket这颗b+树的遍历工作由Cursor来执行。一个Bucket对象关联一个Cursor。下面我们先看看Bucket和Cursor之间的关系。

// Cursor creates a cursor associated with the bucket.
// The cursor is only valid as long as the transaction is open.
// Do not use a cursor after the transaction is closed.
func (b *Bucket) Cursor() *Cursor {
	// Update transaction statistics.
	b.tx.stats.CursorCount++

	// Allocate and return a cursor.
	return &Cursor{
		bucket: b,
		stack:  make([]elemRef, 0),
	}
}
3.2.1 Cursor结构

从上面可以清楚的看到,在获取一个游标Cursor对象时,会将当前的Bucket对象传进去,同时还初始化了一个栈对象,结合数据结构中学习的树的知识。我们也就知道,它的内部就是对树进行遍历。下面我们详细介绍Cursor这个人物。

// Cursor represents an iterator that can traverse over all key/value pairs in a bucket in sorted order.
// Cursors see nested buckets with value == nil.
// Cursors can be obtained from a transaction and are valid as long as the transaction is open.
//
// Keys and values returned from the cursor are only valid for the life of the transaction.
//
// Changing data while traversing with a cursor may cause it to be invalidated
// and return unexpected keys and/or values. You must reposition your cursor
// after mutating data.
type Cursor struct {
	bucket *Bucket
	// 保存遍历搜索的路径
	stack []elemRef
}

// elemRef represents a reference to an element on a given page/node.
type elemRef struct {
	page  *page
	node  *node
	index int
}
3.2.2 Cursor对外接口

下面我们看一下Cursor对外暴露的接口有哪些。看之前也可以心里先想一下。针对一棵树我们需要哪些遍历接口呢?

主体也就是三类:定位到某一个元素的位置、在当前位置从前往后找、在当前位置从后往前找

// First moves the cursor to the first item in the bucket and returns its key and value.
// If the bucket is empty then a nil key and value are returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) First() (key []byte, value []byte)

// Last moves the cursor to the last item in the bucket and returns its key and value.
//...
func (c *Cursor) Last() (key []byte, value []byte)

// Next moves the cursor to the next item in the bucket and returns its key and value.
//...
func (c *Cursor) Next() (key []byte, value []byte)

// Prev moves the cursor to the previous item in the bucket and returns its key and value.
//...
func (c *Cursor) Prev() (key []byte, value []byte)

// Delete removes the current key/value under the cursor from the bucket.
// Delete fails if current key/value is a bucket or if the transaction is not writable.
func (c *Cursor) Delete() error

// Seek moves the cursor to a given key and returns it.
// If the key does not exist then the next key is used. If no keys follow, a nil key is returned.
// The returned key and value are only valid for the life of the transaction.
func (c *Cursor) Seek(seek []byte) (key []byte, value []byte)
3.2.3 Seek(key)、First()、Last()实现分析

由于篇幅有限,关于Seek()、First()、Last()、Next()、Prev()、Delete()这三个方法的内部实现。代码就不贴出来了。大家感兴趣可以点击此处或者点击此处 进入对应章节阅读。

3.3 node节点的相关操作

在开始分析node节点之前,我们先看一下官方对node节点的描述

node represents an in-memory, deserialized page

一个node节点,既可能是叶子节点,也可能是根节点,也可能是分支节点。是物理磁盘上读取进来的页page的内存表现形式。

3.3.1 node节点的定义
// node represents an in-memory, deserialized page.
type node struct {
	bucket     *Bucket // 关联一个桶
	isLeaf     bool
	unbalanced bool   // 值为true的话,需要考虑页合并
	spilled    bool   // 值为true的话,需要考虑页分裂
	key        []byte // 对于分支节点的话,保留的是最小的key
	pgid       pgid   // 分支节点关联的页id
	parent     *node  // 该节点的parent
	children   nodes  // 该节点的孩子节点
	inodes     inodes // 该节点上保存的索引数据
}

// inode represents an internal node inside of a node.
// It can be used to point to elements in a page or point
// to an element which hasn't been added to a page yet.
type inode struct {
	// 表示是否是子桶叶子节点还是普通叶子节点。如果flags值为1表示子桶叶子节点,否则为普通叶子节点
	flags uint32
	// 当inode为分支元素时,pgid才有值,为叶子元素时,则没值
	pgid pgid
	key  []byte
	// 当inode为分支元素时,value为空,为叶子元素时,才有值
	value []byte
}

type inodes []inode
3.3.2 node节点和page转换

在node对象上有两个方法,read(page)write(page),其中read(page)方法是用来通过page构建一个node节点;而write(page)方法则是将当前的node节点写入到page中,我们在前面他提到了node节点和page节点的相互转换,大家可以回到2.4节内容进行回顾。此处不再重复。

3.3.3 node节点的增删改查

put(k,v)

// put inserts a key/value.
// 如果put的是一个key、value的话,不需要指定pgid。
// 如果put的一个树枝节点,则需要指定pgid,不需要指定value
func (n *node) put(oldKey, newKey, value []byte, pgid pgid, flags uint32) {
	if pgid >= n.bucket.tx.meta.pgid {
		panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", pgid, n.bucket.tx.meta.pgid))
	} else if len(oldKey) <= 0 {
		panic("put: zero-length old key")
	} else if len(newKey) <= 0 {
		panic("put: zero-length new key")
	}

	// Find insertion index.
	index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, oldKey) != -1 })

	// Add capacity and shift nodes if we don't have an exact match and need to insert.
	exact := (len(n.inodes) > 0 && index < len(n.inodes) && bytes.Equal(n.inodes[index].key, oldKey))
	if !exact {
		n.inodes = append(n.inodes, inode{})
		copy(n.inodes[index+1:], n.inodes[index:])
	}

	inode := &n.inodes[index]
	inode.flags = flags
	inode.key = newKey
	inode.value = value
	inode.pgid = pgid
	_assert(len(inode.key) > 0, "put: zero-length inode key")
}

get(k)

在node中,没有get(k)的方法,其本质是在Cursor中就返回了get的数据。大家可以看看Cursor中的keyValue()方法。

del(k)

// del removes a key from the node.
func (n *node) del(key []byte) {
	// Find index of key.
	index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, key) != -1 })

	// Exit if the key isn't found.
	if index >= len(n.inodes) || !bytes.Equal(n.inodes[index].key, key) {
		return
	}

	// Delete inode from the node.
	n.inodes = append(n.inodes[:index], n.inodes[index+1:]...)

	// Mark the node as needing rebalancing.
	n.unbalanced = true
}
3.3.4 node节点的分裂和合并

上面我们看了对node节点的操作,包括put和del方法。经过这些操作后,可能会导致当前的page填充度过高或者过低。因此就引出了node节点的分裂和合并。下面简单介绍下什么是分裂和合并。

分裂: 当一个node中的数据过多时,最简单就是当超过了page的填充度时,就需要将当前的node拆分成两个,也就是底层会将一页数据拆分存放到两页中。具体实现在spill()方法中。

spill writes the nodes to dirty pages and splits nodes as it goes. Returns an error if dirty pages cannot be allocated.

合并: 当删除了一个或者一批对象时,此时可能会导致一页数据的填充度过低,此时空间可能会浪费比较多。所以就需要考虑对页之间进行数据合并。具体实现在rebalance()方法中。

rebalance attempts to combine the node with sibling nodes if the node fill size is below a threshold or if there are not enough keys.

由于内容过长,此处代码就不贴出来了。关于该部分的代码分析大家感兴趣可以点击此处 或者点击此处 进行阅读。

3.4 Bucket的相关操作

前面我们分析完了如何遍历、查找一个Bucket之后,下面我们来看看如何创建、获取、删除一个Bucket对象。

3.4.1 创建一个Bucket

1. CreateBucketIfNotExists()、CreateBucket()分析

根据指定的key来创建一个Bucket,如果指定key的Bucket已经存在,则会报错。如果指定的key之前有插入过元素,也会报错。否则的话,会在当前的Bucket中找到合适的位置,然后新建一个Bucket插入进去,最后返回给客户端。


// CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
// Returns an error if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error) {
	child, err := b.CreateBucket(key)
	if err == ErrBucketExists {
		return b.Bucket(key), nil
	} else if err != nil {
		return nil, err
	}
	return child, nil
}

// CreateBucket creates a new bucket at the given key and returns the new bucket.
// Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
	// ...
	// 省去异常检查逻辑
	
	// Move cursor to correct position.
	// 拿到游标
	c := b.Cursor()
	// 开始遍历、找到合适的位置
	k, _, flags := c.seek(key)

	// Return an error if there is an existing key.
	if bytes.Equal(key, k) {
		// 是桶,已经存在了
		if (flags & bucketLeafFlag) != 0 {
			return nil, ErrBucketExists
		}
		// 不是桶、但key已经存在了
		return nil, ErrIncompatibleValue
	}

	// Create empty, inline bucket.
	var bucket = Bucket{
		bucket:      &bucket{},
		rootNode:    &node{isLeaf: true},
		FillPercent: DefaultFillPercent,
	}
	// 拿到bucket对应的value
	var value = bucket.write()

	// Insert into node.
	key = cloneBytes(key)
	// 插入到inode中
	// c.node()方法会在内存中建立这棵树,调用n.read(page)
	c.node().put(key, key, value, 0, bucketLeafFlag)

	// Since subbuckets are not allowed on inline buckets, we need to
	// dereference the inline page, if it exists. This will cause the bucket
	// to be treated as a regular, non-inline bucket for the rest of the tx.
	b.page = nil

	//根据key获取一个桶
	return b.Bucket(key), nil
}

// write allocates and writes a bucket to a byte slice.
// 内联桶的话,其value中bucketHeaderSize后面的内容为其page的数据
func (b *Bucket) write() []byte {
	// Allocate the appropriate size.
	var n = b.rootNode
	var value = make([]byte, bucketHeaderSize+n.size())

	// Write a bucket header.
	var bucket = (*bucket)(unsafe.Pointer(&value[0]))
	*bucket = *b.bucket

	// Convert byte slice to a fake page and write the root node.
	var p = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))
	// 将该桶中的元素压缩存储,放在value中
	n.write(p)

	return value
}


// node returns the node that the cursor is currently positioned on.
func (c *Cursor) node() *node {
	_assert(len(c.stack) > 0, "accessing a node with a zero-length cursor stack")

	// If the top of the stack is a leaf node then just return it.
	if ref := &c.stack[len(c.stack)-1]; ref.node != nil && ref.isLeaf() {
		return ref.node
	}

	// Start from root and traverse down the hierarchy.
	var n = c.stack[0].node
	if n == nil {
		n = c.bucket.node(c.stack[0].page.id, nil)
	}
	// 非叶子节点
	for _, ref := range c.stack[:len(c.stack)-1] {
		_assert(!n.isLeaf, "expected branch node")
		n = n.childAt(int(ref.index))
	}
	_assert(n.isLeaf, "expected leaf node")
	return n
}

// put inserts a key/value.
// 如果put的是一个key、value的话,不需要指定pgid。
// 如果put的一个树枝节点,则需要指定pgid,不需要指定value
func (n *node) put(oldKey, newKey, value []byte, pgid pgid, flags uint32) {
	put方法的具体实现3.3节中的put(k,v)。
}

3.4.2 获取一个Bucket

根据指定的key来获取一个Bucket。如果找不到则返回nil。

// Bucket retrieves a nested bucket by name.
// Returns nil if the bucket does not exist.
// The bucket instance is only valid for the lifetime of the transaction.
func (b *Bucket) Bucket(name []byte) *Bucket {
	if b.buckets != nil {
		if child := b.buckets[string(name)]; child != nil {
			return child
		}
	}
	// Move cursor to key.
	// 根据游标找key
	c := b.Cursor()
	k, v, flags := c.seek(name)
	// Return nil if the key doesn't exist or it is not a bucket.
	if !bytes.Equal(name, k) || (flags&bucketLeafFlag) == 0 {
		return nil
	}
	// Otherwise create a bucket and cache it.
	// 根据找到的value来打开桶。
	var child = b.openBucket(v)
	// 加速缓存的作用
	if b.buckets != nil {
		b.buckets[string(name)] = child
	}

	return child
}

// Helper method that re-interprets a sub-bucket value
// from a parent into a Bucket
func (b *Bucket) openBucket(value []byte) *Bucket {
	var child = newBucket(b.tx)

	// If unaligned load/stores are broken on this arch and value is
	// unaligned simply clone to an aligned byte array.
	unaligned := brokenUnaligned && uintptr(unsafe.Pointer(&value[0]))&3 != 0

	if unaligned {
		value = cloneBytes(value)
	}

	// If this is a writable transaction then we need to copy the bucket entry.
	// Read-only transactions can point directly at the mmap entry.
	if b.tx.writable && !unaligned {
		child.bucket = &bucket{}
		*child.bucket = *(*bucket)(unsafe.Pointer(&value[0]))
	} else {
		child.bucket = (*bucket)(unsafe.Pointer(&value[0]))
	}

	// Save a reference to the inline page if the bucket is inline.
	// 内联桶
	if child.root == 0 {
		child.page = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))
	}

	return &child
}
3.4.3 删除一个Bucket

DeleteBucket()方法用来删除一个指定key的Bucket。其内部实现逻辑是先递归的删除其子桶。然后再释放该Bucket的page,并最终从叶子节点中移除

// DeleteBucket deletes a bucket at the given key.
// Returns an error if the bucket does not exists, or if the key represents a non-bucket value.
func (b *Bucket) DeleteBucket(key []byte) error {
	//异常逻辑检查
	// Move cursor to correct position.
	c := b.Cursor()
	k, _, flags := c.seek(key)

	// Return an error if bucket doesn't exist or is not a bucket.
	if !bytes.Equal(key, k) {
		return ErrBucketNotFound
	} else if (flags & bucketLeafFlag) == 0 {
		return ErrIncompatibleValue
	}

	// Recursively delete all child buckets.
	child := b.Bucket(key)
	// 将该桶下面的所有桶都删除
	err := child.ForEach(func(k, v []byte) error {
		if v == nil {
			if err := child.DeleteBucket(k); err != nil {
				return fmt.Errorf("delete bucket: %s", err)
			}
		}
		return nil
	})
	if err != nil {
		return err
	}

	// Remove cached copy.
	delete(b.buckets, string(key))

	// Release all bucket pages to freelist.
	child.nodes = nil
	child.rootNode = nil
	child.free()

	// Delete the node if we have a matching key.
	c.node().del(key)

	return nil
}

// del removes a key from the node.
func (n *node) del(key []byte) {
	node的del()方法具体实现参考之前3.3节del(k)
}

3.5 key&value的插入、获取、删除

上面一节我们介绍了一下如何创建一个Bucket、如何获取一个Bucket。有了Bucket,我们就可以对我们最关心的key/value键值对进行增删改查了。其实本质上,对key/value的所有操作最终都要表现在底层的node上。因为node节点就是用来存储真实数据的。

3.5.1 插入一个key&value
// Put sets the value for a key in the bucket.
// If the key exist then its previous value will be overwritten.
// Supplied value must remain valid for the life of the transaction.
// Returns an error if the bucket was created from a read-only transaction,
// if the key is blank, if the key is too large, or if the value is too large.
func (b *Bucket) Put(key []byte, value []byte) error {
	if b.tx.db == nil {
		return ErrTxClosed
	} else if !b.Writable() {
		return ErrTxNotWritable
	} else if len(key) == 0 {
		return ErrKeyRequired
	} else if len(key) > MaxKeySize {
		return ErrKeyTooLarge
	} else if int64(len(value)) > MaxValueSize {
		return ErrValueTooLarge
	}

	// Move cursor to correct position.
	c := b.Cursor()
	k, _, flags := c.seek(key)

	// Return an error if there is an existing key with a bucket value.
	if bytes.Equal(key, k) && (flags&bucketLeafFlag) != 0 {
		return ErrIncompatibleValue
	}

	// Insert into node.
	key = cloneBytes(key)
	c.node().put(key, key, value, 0, 0)

	return nil
}
3.5.2 获取一个key&value
// Get retrieves the value for a key in the bucket.
// Returns a nil value if the key does not exist or if the key is a nested bucket.
// The returned value is only valid for the life of the transaction.
func (b *Bucket) Get(key []byte) []byte {
	k, v, flags := b.Cursor().seek(key)
	// Return nil if this is a bucket.
	if (flags & bucketLeafFlag) != 0 {
		return nil
	}
	// If our target node isn't the same key as what's passed in then return nil.
	if !bytes.Equal(key, k) {
		return nil
	}
	return v
}
3.5.3 删除一个key&value
// Delete removes a key from the bucket.
// If the key does not exist then nothing is done and a nil error is returned.
// Returns an error if the bucket was created from a read-only transaction.
func (b *Bucket) Delete(key []byte) error {
	if b.tx.db == nil {
		return ErrTxClosed
	} else if !b.Writable() {
		return ErrTxNotWritable
	}
	// Move cursor to correct position.
	c := b.Cursor()
	_, _, flags := c.seek(key)

	// Return an error if there is already existing bucket value.
	if (flags & bucketLeafFlag) != 0 {
		return ErrIncompatibleValue
	}
	// Delete the node if we have a matching key.
	c.node().del(key)

	return nil
}
3.5.4 遍历Bucket中所有的key&value
// ForEach executes a function for each key/value pair in a bucket.
// If the provided function returns an error then the iteration is stopped and
// the error is returned to the caller. The provided function must not modify
// the bucket; this will result in undefined behavior.
func (b *Bucket) ForEach(fn func(k, v []byte) error) error {
	if b.tx.db == nil {
		return ErrTxClosed
	}
	c := b.Cursor()
	// 遍历键值对
	for k, v := c.First(); k != nil; k, v = c.Next() {
		if err := fn(k, v); err != nil {
			return err
		}
	}
	return nil
}

3.6 Bucket的页分裂、页合并

关于本部分的内容过长,大家可以点击此处或者点击此处 进行阅读。

3.7 总结

本章我们主要介绍了boltdb中比较核心的两个数据结构:Bucketnode。为什么这两个数据结构放在一起介绍呢?答案是在boltdb中一个Bucket就对应一颗b+树。而b+树的结构(根节点、叶子节点、非叶子节点)、组织都是通过node来完成的。这也是为什么把他们放在一起介绍的主要原因。

在介绍中,我们主要围绕Bucket的创建获取删除遍历增删改kv等操作进行展开。其次在遍历时,就引入了Cursor数据结构,一个Bucket对象(一颗b+树)的遍历在boltdb中时通过一个栈来维护遍历的路径来完成的。这也是Cursor中stack的意义。

其次Bucket中对kv的操作都反应到底层的node上,因此我们又同时介绍了node的相关方法,例如putgetdelspillrebalance

最后到此为止,我们的数据时如何存储的、组织的。以及内存和磁盘数据时如何转换映射的,我们就清楚了。下章将介绍boltdb中的事务了。有了事务我们的数据库才称得上是一个完备的数据库。

4. boltdb事务控制

事务可以说是一个数据库必不可少的特性,对boltdb而言也不例外。我们都知道提到事务,必然会想到事务的四大特性。那么下面就让我们看看在boltdb中到底是怎么实现它的事务的呢?

4.1 boltdb事务简介

我们先看一下,boltdb官方文档中对事务的描述:

Bolt allows only one read-write transaction at a time but allows as many read-only transactions as you want at a time. Each transaction has a consistent view of the data as it existed when the transaction started.

Individual transactions and all objects created from them (e.g. buckets, keys) are not thread safe. To work with data in multiple goroutines you must start a transaction for each one or use locking to ensure only one goroutine accesses a transaction at a time. Creating transaction from the DB is thread safe.

Read-only transactions and read-write transactions should not depend on one another and generally shouldn't be opened simultaneously in the same goroutine. This can cause a deadlock as the read-write transaction needs to periodically re-map the data file but it cannot do so while a read-only transaction is open.

我们再简单总结下,在boltdb中支持两类事务:读写事务只读事务。同一时间有且只能有一个读写事务执行;但同一个时间可以允许有多个只读事务执行。每个事务都拥有自己的一套一致性视图。

此处需要注意的是,在boltdb中打开一个数据库时,有两个选项:只读模式读写模式。内部在实现时是根据不同的选项来底层加不同的锁(flock)。只读模式对应共享锁,读写模式对应互斥锁。具体加解锁的实现可以在bolt_unix.go 和bolt_windows.go中找到。

关于事务的ACID特性此处就不特别说明了。此部分内容大家可以自行查阅资料,下面我们进入主题。

4.2 boltdb事务Tx定义

// txid represents the internal transaction identifier.
type txid uint64

// Tx represents a read-only or read/write transaction on the database.
// Read-only transactions can be used for retrieving values for keys and creating cursors.
// Read/write transactions can create and remove buckets and create and remove keys.
// IMPORTANT: You must commit or rollback transactions when you are done with
// them. Pages can not be reclaimed by the writer until no more transactions
// are using them. A long running read transaction can cause the database to
// quickly grow.
// Tx 主要封装了读事务和写事务。其中通过writable来区分是读事务还是写事务
type Tx struct {
	writable       bool
	managed        bool
	db             *DB
	meta           *meta
	root           Bucket
	pages          map[pgid]*page
	stats          TxStats
	// 提交时执行的动作
	commitHandlers []func()

	// WriteFlag specifies the flag for write-related methods like WriteTo().
	// Tx opens the database file with the specified flag to copy the data.
	
	// By default, the flag is unset, which works well for mostly in-memory
	// workloads. For databases that are much larger than available RAM,
	// set the flag to syscall.O_DIRECT to avoid trashing the page cache.
	WriteFlag int
}

// init initializes the transaction.
func (tx *Tx) init(db *DB) {
	tx.db = db
	tx.pages = nil

	// Copy the meta page since it can be changed by the writer.
	// 拷贝元信息
	tx.meta = &meta{}
	db.meta().copy(tx.meta)

	// Copy over the root bucket.
	// 拷贝根节点
	tx.root = newBucket(tx)
	tx.root.bucket = &bucket{}
	// meta.root=bucket{root:3}
	*tx.root.bucket = tx.meta.root

	// Increment the transaction id and add a page cache for writable transactions.
	if tx.writable {
		tx.pages = make(map[pgid]*page)
		tx.meta.txid += txid(1)
	}
}

4.3 Begin()实现

此处需要说明一下:在boltdb中,事务的开启方法是绑定在DB对象上的,为了保证内容的完整性,我们还是把事务开启的Begin()方法补充到这个地方。 前面提到boltdb中事务分为两类,它的区分就是在开启事务时,根据传递的参数来内部执行不同的逻辑。 在读写事务中,开始事务时加锁,也就是db.rwlock.Lock()。在事务提交或者回滚时才释放锁:db.rwlock.UnLock()。同时也印证了我们前面说的,同一时刻只能有一个读写事务在执行。

// Begin starts a new transaction.
// Multiple read-only transactions can be used concurrently but only one
// write transaction can be used at a time. Starting multiple write transactions
// will cause the calls to block and be serialized until the current write
// transaction finishes.
//
// Transactions should not be dependent on one another. Opening a read
// transaction and a write transaction in the same goroutine can cause the
// writer to deadlock because the database periodically needs to re-mmap itself
// as it grows and it cannot do that while a read transaction is open.
//
// If a long running read transaction (for example, a snapshot transaction) is
// needed, you might want to set DB.InitialMmapSize to a large enough value
// to avoid potential blocking of write transaction.
//
// IMPORTANT: You must close read-only transactions after you are finished or
// else the database will not reclaim old pages.
func (db *DB) Begin(writable bool) (*Tx, error) {
	if writable {
		return db.beginRWTx()
	}
	return db.beginTx()
}

func (db *DB) beginTx() (*Tx, error) {
	// Lock the meta pages while we initialize the transaction. We obtain
	// the meta lock before the mmap lock because that's the order that the
	// write transaction will obtain them.
	db.metalock.Lock()

	// Obtain a read-only lock on the mmap. When the mmap is remapped it will
	// obtain a write lock so all transactions must finish before it can be
	// remapped.
	db.mmaplock.RLock()

	// Exit if the database is not open yet.
	if !db.opened {
		db.mmaplock.RUnlock()
		db.metalock.Unlock()
		return nil, ErrDatabaseNotOpen
	}

	// Create a transaction associated with the database.
	t := &Tx{}
	t.init(db)

	// Keep track of transaction until it closes.
	db.txs = append(db.txs, t)
	n := len(db.txs)

	// Unlock the meta pages.
	db.metalock.Unlock()

	// Update the transaction stats.
	db.statlock.Lock()
	db.stats.TxN++
	db.stats.OpenTxN = n
	db.statlock.Unlock()

	return t, nil
}

func (db *DB) beginRWTx() (*Tx, error) {
	// If the database was opened with Options.ReadOnly, return an error.
	if db.readOnly {
		return nil, ErrDatabaseReadOnly
	}

	// Obtain writer lock. This is released by the transaction when it closes.
	// This enforces only one writer transaction at a time.
	db.rwlock.Lock()

	// Once we have the writer lock then we can lock the meta pages so that
	// we can set up the transaction.
	db.metalock.Lock()
	defer db.metalock.Unlock()

	// Exit if the database is not open yet.
	if !db.opened {
		db.rwlock.Unlock()
		return nil, ErrDatabaseNotOpen
	}

	// Create a transaction associated with the database.
	t := &Tx{writable: true}
	t.init(db)
	db.rwtx = t

	// Free any pages associated with closed read-only transactions.
	var minid txid = 0xFFFFFFFFFFFFFFFF
	// 找到最小的事务id
	for _, t := range db.txs {
		if t.meta.txid < minid {
			minid = t.meta.txid
		}
	}
	if minid > 0 {
		// 将之前事务关联的page全部释放了,因为在只读事务中,没法释放,只读事务的页,因为可能当前的事务已经完成 ,但实际上其他的读事务还在用
		db.freelist.release(minid - 1)
	}

	return t, nil
}

4.4 Commit()实现

Commit()方法内部实现中,总体思路是:

  1. 先判定节点要不要合并、分裂
  2. 对空闲列表的判断,是否存在溢出的情况,溢出的话,需要重新分配空间
  3. 将事务中涉及改动的页进行排序(保证尽可能的顺序IO),排序后循环写入到磁盘中,最后再执行刷盘
  4. 当数据写入成功后,再将元信息页写到磁盘中,刷盘以保证持久化
  5. 上述操作中,但凡有失败,当前事务都会进行回滚
// Commit writes all changes to disk and updates the meta page.
// Returns an error if a disk write error occurs, or if Commit is
// called on a read-only transaction.
// 先更新数据然后再更新元信息
// 更新数据成功、元信息未来得及更新机器就挂掉了。数据如何恢复?
func (tx *Tx) Commit() error {
	// 此处省去异常逻辑检查..
	// 删除时,进行平衡,页合并
	// Rebalance nodes which have had deletions.
	var startTime = time.Now()
	tx.root.rebalance()
	if tx.stats.Rebalance > 0 {
		tx.stats.RebalanceTime += time.Since(startTime)
	}

	// 页分裂
	// spill data onto dirty pages.
	startTime = time.Now()
	// 这个内部会往缓存tx.pages中加page
	if err := tx.root.spill(); err != nil {
		tx.rollback()
		return err
	}
	tx.stats.SpillTime += time.Since(startTime)

	// Free the old root bucket.
	tx.meta.root.root = tx.root.root

	opgid := tx.meta.pgid

	// Free the freelist and allocate new pages for it. This will overestimate
	// the size of the freelist but not underestimate the size (which would be bad).
	// 分配新的页面给freelist,然后将freelist写入新的页面
	tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
	// 空闲列表可能会增加,因此需要重新分配页用来存储空闲列表
	// 因为在开启写事务的时候,有去释放之前读事务占用的页信息,因此此处需要判断是否freelist会有溢出的问题
	p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
	if err != nil {
		tx.rollback()
		return err
	}
	// 将freelist写入到连续的新页中
	if err := tx.db.freelist.write(p); err != nil {
		tx.rollback()
		return err
	}
	// 更新元数据的页id
	tx.meta.freelist = p.id

	// If the high water mark has moved up then attempt to grow the database.
	// 在allocate中有可能会更改meta.pgid
	if tx.meta.pgid > opgid {
		if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
			tx.rollback()
			return err
		}
	}

	// Write dirty pages to disk.
	startTime = time.Now()
	// 写数据
	if err := tx.write(); err != nil {
		tx.rollback()
		return err
	}

	// If strict mode is enabled then perform a consistency check.
	// Only the first consistency error is reported in the panic.
	if tx.db.StrictMode {
		//做一致性检查逻辑此处省略..
	}

	// Write meta to disk.
	// 元信息写入到磁盘
	if err := tx.writeMeta(); err != nil {
		tx.rollback()
		return err
	}
	tx.stats.WriteTime += time.Since(startTime)

	// Finalize the transaction.
	tx.close()

	// Execute commit handlers now that the locks have been removed.
	for _, fn := range tx.commitHandlers {
		fn()
	}

	return nil
}

// write writes any dirty pages to disk.
func (tx *Tx) write() error {
	// Sort pages by id.
	// 保证写的页是有序的
	pages := make(pages, 0, len(tx.pages))
	for _, p := range tx.pages {
		pages = append(pages, p)
	}
	// Clear out page cache early.
	tx.pages = make(map[pgid]*page)
	sort.Sort(pages)

	// Write pages to disk in order.
	for _, p := range pages {
		// 页数和偏移量
		size := (int(p.overflow) + 1) * tx.db.pageSize
		offset := int64(p.id) * int64(tx.db.pageSize)

		// Write out page in "max allocation" sized chunks.
		ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p))
		// 循环写某一页
		for {
			// Limit our write to our max allocation size.
			sz := size
			// 2^31=2G
			if sz > maxAllocSize-1 {
				sz = maxAllocSize - 1
			}

			// Write chunk to disk.
			buf := ptr[:sz]
			if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
				return err
			}

			// Update statistics.
			tx.stats.Write++

			// Exit inner for loop if we've written all the chunks.
			size -= sz
			if size == 0 {
				break
			}

			// Otherwise move offset forward and move pointer to next chunk.
			// 移动偏移量
			offset += int64(sz)
			// 同时指针也移动
			ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz]))
		}
	}

	// Ignore file sync if flag is set on DB.
	if !tx.db.NoSync || IgnoreNoSync {
		if err := fdatasync(tx.db); err != nil {
			return err
		}
	}

	// Put small pages back to page pool.
	for _, p := range pages {
		// Ignore page sizes over 1 page.
		// These are allocated using make() instead of the page pool.
		if int(p.overflow) != 0 {
			continue
		}

		buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:tx.db.pageSize]

		// See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1
		// 清空buf,然后放入pagePool中
		for i := range buf {
			buf[i] = 0
		}
		tx.db.pagePool.Put(buf)
	}

	return nil
}

// writeMeta writes the meta to the disk.
func (tx *Tx) writeMeta() error {
	// Create a temporary buffer for the meta page.
	buf := make([]byte, tx.db.pageSize)
	p := tx.db.pageInBuffer(buf, 0)
	// 将事务的元信息写入到页中
	tx.meta.write(p)

	// Write the meta page to file.
	if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
		return err
	}
	if !tx.db.NoSync || IgnoreNoSync {
		if err := fdatasync(tx.db); err != nil {
			return err
		}
	}

	// Update statistics.
	tx.stats.Write++

	return nil
}

// allocate returns a contiguous block of memory starting at a given page.
// 分配一段连续的页
func (tx *Tx) allocate(count int) (*page, error) {
	p, err := tx.db.allocate(count)
	if err != nil {
		return nil, err
	}
	// Save to our page cache.
	tx.pages[p.id] = p

	// Update statistics.
	tx.stats.PageCount++
	tx.stats.PageAlloc += count * tx.db.pageSize

	return p, nil
}

4.5 Rollback()实现

Rollback()中,主要对不同事务进行不同操作:

  1. 如果当前事务是只读事务,则只需要从db中的txs中找到当前事务,然后移除掉即可。
  2. 如果当前事务是读写事务,则需要将空闲列表中和该事务关联的页释放掉,同时重新从freelist中加载空闲页。
// Rollback closes the transaction and ignores all previous updates. Read-only
// transactions must be rolled back and not committed.
func (tx *Tx) Rollback() error {
	_assert(!tx.managed, "managed tx rollback not allowed")
	if tx.db == nil {
		return ErrTxClosed
	}
	tx.rollback()
	return nil
}

func (tx *Tx) rollback() {
	if tx.db == nil {
		return
	}
	if tx.writable {
		// 移除该事务关联的pages
		tx.db.freelist.rollback(tx.meta.txid)
		// 重新从freelist页中读取构建空闲列表
		tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
	}
	tx.close()
}

func (tx *Tx) close() {
	if tx.db == nil {
		return
	}
	if tx.writable {
		// Grab freelist stats.
		var freelistFreeN = tx.db.freelist.free_count()
		var freelistPendingN = tx.db.freelist.pending_count()
		var freelistAlloc = tx.db.freelist.size()

		// Remove transaction ref & writer lock.
		tx.db.rwtx = nil
		tx.db.rwlock.Unlock()

		// Merge statistics.
		tx.db.statlock.Lock()
		tx.db.stats.FreePageN = freelistFreeN
		tx.db.stats.PendingPageN = freelistPendingN
		tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize
		tx.db.stats.FreelistInuse = freelistAlloc
		tx.db.stats.TxStats.add(&tx.stats)
		tx.db.statlock.Unlock()
	} else {
		// 只读事务
		tx.db.removeTx(tx)
	}

	// Clear all references.
	tx.db = nil
	tx.meta = nil
	tx.root = Bucket{tx: tx}
	tx.pages = nil
}

// removeTx removes a transaction from the database.
func (db *DB) removeTx(tx *Tx) {
	// Release the read lock on the mmap.
	db.mmaplock.RUnlock()

	// Use the meta lock to restrict access to the DB object.
	db.metalock.Lock()

	// Remove the transaction.
	for i, t := range db.txs {
		if t == tx {
			last := len(db.txs) - 1
			db.txs[i] = db.txs[last]
			db.txs[last] = nil
			db.txs = db.txs[:last]
			break
		}
	}
	n := len(db.txs)

	// Unlock the meta pages.
	db.metalock.Unlock()

	// Merge statistics.
	db.statlock.Lock()
	db.stats.OpenTxN = n
	db.stats.TxStats.add(&tx.stats)
	db.statlock.Unlock()
}

4.6 总结

本章主要详细分析了下,boltdb内部事务的实现机制,再此基础上对事务中核心的几个方法做了代码的分析。到此基本上一个数据库核心的部件都已经实现完毕。那剩下的功能就把各部分功能进行组装起来,实现一个完整对外可用的数据库了。下一章我们来详细分析下boltdb中DB对象的内部一些实现。

5. boltdb的DB对象分析

前面我们介绍了boltdb底层在磁盘上数据时如何组织存储(page)的,然后又介绍了磁盘中的数据在内存中又是如何存储(node)的。接着我们又介绍了管理kv数据集合的Bucket对象以及用来遍历Bucket的Cursor对象。最后我们详细的介绍了boltdb中事务是如何实现(Tx)的。到此boltdb中各个零散的部件我们都一一熟悉了,接下来是时候将他们组织在一起工作了。因而就有了boltdb中最上层的DB对象。本章主要介绍DB对象相关的方法以及其内部实现。

5.1 DB结构

DB在boltdb是一个结构体,里面封装了很多属性,部分属性添加了中文注释,其他部分属性,大家可以直接看英文注释,感觉英文表述的很通俗易懂。

//省略部分常量定义

// Default values if not set in a DB instance.
const (
	DefaultMaxBatchSize  int =s 1000
	DefaultMaxBatchDelay     = 10 * time.Millisecond
	// 16k
	DefaultAllocSize = 16 * 1024 * 1024
)

// default page size for db is set to the OS page size.
var defaultPageSize = os.Getpagesize()

// DB represents a collection of buckets persisted to a file on disk.
// All data access is performed through transactions which can be obtained through the DB.
// All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.
type DB struct {
	
	//限于篇幅此处省略一些配置项(MaxBatchDelay、MaxBatchDelay、MmapFlags、NoGrowSync等)相关的参加定义。想了解全部内容的可以参考源码,或者文章末尾的书籍链接进行阅读。

	path     string
	file     *os.File  // 真实存储数据的磁盘文件
	lockfile *os.File // windows only
	dataref  []byte   // mmap'ed readonly, write throws SEGV
	// 通过mmap映射进来的地址
	data   *[maxMapSize]byte
	datasz int
	filesz int // current on disk file size
	//  元数据
	meta0 *meta
	meta1 *meta

	pageSize int
	opened   bool
	rwtx     *Tx  // 写事务锁
	txs      []*Tx  // 读事务数组
	freelist *freelist // 空闲列表
	stats    Stats

	pagePool sync.Pool

	batchMu sync.Mutex
	batch   *batch

	rwlock   sync.Mutex   // Allows only one writer at a time.
	metalock sync.Mutex   // Protects meta page access.
	mmaplock sync.RWMutex // Protects mmap access during remapping.
	statlock sync.RWMutex // Protects stats access.

	ops struct {
		writeAt func(b []byte, off int64) (n int, err error)
	}

	// Read only mode.
	// When true, Update() and Begin(true) return ErrDatabaseReadOnly immediately.
	readOnly bool
}

5.2 对外接口

1.Open()创建数据库接口

// Open creates and opens a database at the given path.
// If the file does not exist then it will be created automatically.
// Passing in nil options will cause Bolt to open the database with the default options.
// 创建数据库接口
func Open(path string, mode os.FileMode, options *Options) (*DB, error)

2.View()查询接口

// View executes a function within the context of a managed read-only transaction.
// Any error that is returned from the function is returned from the View() method.
// Attempting to manually rollback within the function will cause a panic.
func (db *DB) View(fn func(*Tx) error) error

3.Update()更新接口

// Update executes a function within the context of a read-write managed transaction.
// If no error is returned from the function then the transaction is committed.
// If an error is returned then the entire transaction is rolled back.
// Any error that is returned from the function or returned from the commit is
// returned from the Update() method.
// Attempting to manually commit or rollback within the function will cause a panic.
func (db *DB) Update(fn func(*Tx) error) error

4.Batch()批量更新接口

// Batch calls fn as part of a batch. It behaves similar to Update,
// except:
//
// 1. concurrent Batch calls can be combined into a single Bolt
// transaction.
//
// 2. the function passed to Batch may be called multiple times,
// regardless of whether it returns error or not.
//
// This means that Batch function side effects must be idempotent and
// take permanent effect only after a successful return is seen in
// caller.
//
// The maximum batch size and delay can be adjusted with DB.MaxBatchSize
// and DB.MaxBatchDelay, respectively.
//
// Batch is only useful when there are multiple goroutines calling it.
func (db *DB) Batch(fn func(*Tx) error) error

5.Begin()开启事务接口

// Begin starts a new transaction.
// Multiple read-only transactions can be used concurrently but only one
// write transaction can be used at a time. Starting multiple write transactions
// will cause the calls to block and be serialized until the current write
// transaction finishes.
//
// Transactions should not be dependent on one another. Opening a read
// transaction and a write transaction in the same goroutine can cause the
// writer to deadlock because the database periodically needs to re-mmap itself
// as it grows and it cannot do that while a read transaction is open.
//
// If a long running read transaction (for example, a snapshot transaction) is
// needed, you might want to set DB.InitialMmapSize to a large enough value
// to avoid potential blocking of write transaction.
//
// IMPORTANT: You must close read-only transactions after you are finished or
// else the database will not reclaim old pages.
func (db *DB) Begin(writable bool) (*Tx, error)

备注:Begin()的实现分析,参见事务4.3节内容,下面不在做分析。

下面我们将对上述接口做一一分析

5.3 Open()实现分析

Open()方法主要用来创建一个boltdb的DB对象,底层会执行新建或者打开存储数据的文件,当指定的文件不存在时, boltdb就会新建一个数据文件。否则的话,就直接加载指定的数据库文件内容。 值的注意是,boltdb会根据Open时,options传递的参数来判断到底加互斥锁还是共享锁。 新建时: 会调用init()方法,内部主要是新建一个文件,然后第0页、第1页写入元数据信息;第2页写入freelist信息;第3页写入bucket leaf信息。并最终刷盘。 加载时: 会读取第0页内容,也就是元信息。然后对其进行校验和校验,当校验通过后获取pageSize。否则的话,读取操作系统默认的pagesize(一般4k)

上述操作完成后,会通过mmap来映射数据。最后再根据磁盘页中的freelist数据初始化db的freelist字段。

// Open creates and opens a database at the given path.
// If the file does not exist then it will be created automatically.
// Passing in nil options will cause Bolt to open the database with the default options.
func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
	var db = &DB{opened: true}

	// Set default options if no options are provided.
	// 初始化一些配置项,此处省略
	// Open data file and separate sync handler for metadata writes.
	db.path = path
	var err error
	// 打开db文件
	if db.file, err = os.OpenFile(db.path, flag|os.O_CREATE, mode); err != nil {
		_ = db.close()
		return nil, err
	}

	// Lock file so that other processes using Bolt in read-write mode cannot
	// use the database  at the same time. This would cause corruption since
	// the two processes would write meta pages and free pages separately.
	// The database file is locked exclusively (only one process can grab the lock)
	// if !options.ReadOnly.
	// The database file is locked using the shared lock (more than one process may
	// hold a lock at the same time) otherwise (options.ReadOnly is set).

	// 只读加共享锁、否则加互斥锁
	if err := flock(db, mode, !db.readOnly, options.Timeout); err != nil {
		_ = db.close()
		return nil, err
	}

	// Default values for test hooks
	db.ops.writeAt = db.file.WriteAt

	// Initialize the database if it doesn't exist.
	if info, err := db.file.Stat(); err != nil {
		return nil, err
	} else if info.Size() == 0 {
		// Initialize new files with meta pages.
		// 初始化新db文件
		if err := db.init(); err != nil {
			return nil, err
		}
	} else {
		// 不是新文件,读取第一页元数据
		// Read the first meta page to determine the page size.
		// 2^12,正好是4k
		var buf [0x1000]byte
		if _, err := db.file.ReadAt(buf[:], 0); err == nil {
			// 仅仅是读取了pageSize
			m := db.pageInBuffer(buf[:], 0).meta()
			if err := m.validate(); err != nil {
				// If we can't read the page size, we can assume it's the same
				// as the OS -- since that's how the page size was chosen in the
				// first place.
				//
				// If the first page is invalid and this OS uses a different
				// page size than what the database was created with then we
				// are out of luck and cannot access the database.
				db.pageSize = os.Getpagesize()
			} else {
				db.pageSize = int(m.pageSize)
			}
		}
	}

	// Initialize page pool.
	db.pagePool = sync.Pool{
		New: func() interface{} {
			// 4k
			return make([]byte, db.pageSize)
		},
	}

	// Memory map the data file.
	// mmap映射db文件数据到内存
	if err := db.mmap(options.InitialMmapSize); err != nil {
		_ = db.close()
		return nil, err
	}

	// Read in the freelist.
	db.freelist = newFreelist()
	// db.meta().freelist=2
	// 读第二页的数据
	// 然后建立起freelist中
	db.freelist.read(db.page(db.meta().freelist))

	// Mark the database as opened and return.
	return db, nil
}


// init creates a new database file and initializes its meta pages.
func (db *DB) init() error {
	// Set the page size to the OS page size.
	db.pageSize = os.Getpagesize()

	// Create two meta pages on a buffer.
	buf := make([]byte, db.pageSize*4)
	for i := 0; i < 2; i++ {
		p := db.pageInBuffer(buf[:], pgid(i))
		p.id = pgid(i)
		// 第0页和第1页存放元数据
		p.flags = metaPageFlag

		// Initialize the meta page.
		m := p.meta()
		m.magic = magic
		m.version = version
		m.pageSize = uint32(db.pageSize)
		m.freelist = 2
		m.root = bucket{root: 3}
		m.pgid = 4
		m.txid = txid(i)
		m.checksum = m.sum64()
	}

	// Write an empty freelist at page 3.
	// 拿到第2页存放freelist
	p := db.pageInBuffer(buf[:], pgid(2))
	p.id = pgid(2)
	p.flags = freelistPageFlag
	p.count = 0

	// 第三块存放叶子page
	// Write an empty leaf page at page 4.
	p = db.pageInBuffer(buf[:], pgid(3))
	p.id = pgid(3)
	p.flags = leafPageFlag
	p.count = 0

	// Write the buffer to our data file.
	// 写入4页的数据
	if _, err := db.ops.writeAt(buf, 0); err != nil {
		return err
	}
	// 刷盘
	if err := fdatasync(db); err != nil {
		return err
	}

	return nil
}

// page retrieves a page reference from the mmap based on the current page size.
func (db *DB) page(id pgid) *page {
	pos := id * pgid(db.pageSize)
	return (*page)(unsafe.Pointer(&db.data[pos]))
}

// pageInBuffer retrieves a page reference from a given byte array based on the current page size.
func (db *DB) pageInBuffer(b []byte, id pgid) *page {
	return (*page)(unsafe.Pointer(&b[id*pgid(db.pageSize)]))
}

// mmap opens the underlying memory-mapped file and initializes the meta references.
// minsz is the minimum size that the new mmap can be.
func (db *DB) mmap(minsz int) error {
	// 此处主要完成mmap。内容省略,感兴趣的同学可以查阅在线书籍对应部分详细阅读。
}

5.4 db.View()实现分析

View()主要用来执行只读事务。事务的开启、提交、回滚都交由tx控制。

// View executes a function within the context of a managed read-only transaction.
// Any error that is returned from the function is returned from the View() method.
//
// Attempting to manually rollback within the function will cause a panic.
func (db *DB) View(fn func(*Tx) error) error {
	t, err := db.Begin(false)
	if err != nil {
		return err
	}

	// Make sure the transaction rolls back in the event of a panic.
	defer func() {
		if t.db != nil {
			t.rollback()
		}
	}()

	// Mark as a managed tx so that the inner function cannot manually rollback.
	t.managed = true

	// If an error is returned from the function then pass it through.
	err = fn(t)
	t.managed = false
	if err != nil {
		_ = t.Rollback()
		return err
	}

	if err := t.Rollback(); err != nil {
		return err
	}

	return nil
}

5.5 db.Update()实现分析

Update()主要用来执行读写事务。事务的开始、提交、回滚都交由tx内部控制

// Update executes a function within the context of a read-write managed transaction.
// If no error is returned from the function then the transaction is committed.
// If an error is returned then the entire transaction is rolled back.
// Any error that is returned from the function or returned from the commit is
// returned from the Update() method.
//
// Attempting to manually commit or rollback within the function will cause a panic.
func (db *DB) Update(fn func(*Tx) error) error {
	t, err := db.Begin(true)
	if err != nil {
		return err
	}

	// Make sure the transaction rolls back in the event of a panic.
	defer func() {
		if t.db != nil {
			t.rollback()
		}
	}()

	// Mark as a managed tx so that the inner function cannot manually commit.
	t.managed = true

	// If an error is returned from the function then rollback and return error.
	err = fn(t)
	t.managed = false
	if err != nil {
		_ = t.Rollback()
		return err
	}

	return t.Commit()
}

5.6 db.Batch()实现分析

现在对Batch()方法稍作分析,在DB定义的那一节中我们可以看到,一个DB对象拥有一个batch对象,该对象是全局的。当我们使用Batch()方法时,内部会对将传递进去的fn缓存在calls中。 其内部也是调用了Update,只不过是在Update内部遍历之前缓存的calls。

有两种情况会触发调用Update。

  1. 第一种情况是到达了MaxBatchDelay时间,就会触发Update
  2. 第二种情况是len(db.batch.calls) >= db.MaxBatchSize,即缓存的calls个数大于等于MaxBatchSize时,也会触发Update。

Batch的本质是: 将每次写、每次刷盘的操作转变成了多次写、一次刷盘,从而提升性能。

关于Batch的代码实现分析内容有点长,此处就不贴了,大家可以点击此处或者点击此处进行阅读。

5.7 总结

本章我们主要介绍了boltdb中最上层的DB对象的知识。首先介绍了DB的定义,然后介绍了下创建DB的Open()以及DB对外暴露的一些接口,这些接口基本上是平常使用最频繁的api。 在介绍了几个接口后,然后逐一对其内部的源码实现进行了分析。其实有了前几节的知识后,再来看这些接口的实现,相对比较简单。因为他们无非就是对之前的Tx、Bucket、node做的 一些封装。底层还是调用的之前介绍的哪些方法。到此我们所有和bolt相关的源码分析就告一段落了。

在第6章也给大家提供了一些其他技术大牛写的源码分析的文章,大家有兴趣可以进一步阅读和学习。

6. 参考资料

  1. 阅读 boltDB 源码后的小结
  2. 给boltdb源码添加注释仓库
  3. boltdb官方仓库
  4. 分析boltdb源码的微信公众号文章集合

7.结尾

在boltdb中它还自带了一个命令行工具主要功能用来查看boltdb中所有的页以及不同页上的数据信息,以及做性能测试等,后续抽时间也会将该工具支持的功能补充到该文章中。

在没有做这件事情之前,总感觉对框架或者组件的源码分析,基本上停留在给代码加一些注释、画图梳理的层面。当真正自己动手从头到尾来写时,才发现中间有太多太多的细节,需要重新理解和把握。总体来说,这算是一次不错的体验和收获了。

在最后,本文基本上都是按照个人的理解和阅读源码基础上完成的。文章中难免有错误和理解有误的地方,大家看的时候发现问题,可以及时反馈给我,同时欢迎大家一起交流学习。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 半路出家的后台技术人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 1. boltdb简要介绍
    • 1.1 boltdb是什么?
      • 1.2 为什么要分析boltdb?
        • 1.3 boltdb的简单用法
          • 1.4 boltdb的整体数据组织结构
            • 1.5 总结
            • 2.boltdb的核心数据结构分析
              • 2.1 boltdb page结构
                • 2.2 元数据页
                  • 2.3 空闲列表页
                    • 2.4 分支节点页
                      • 2.5 叶子节点页
                        • 2.6 总结
                        • 3. boltdb的b+树(Bucket、node)分析
                          • 3.1 boltdb的Bucket结构
                            • 3.2 Bucket遍历之Cursor
                              • 3.2.1 Cursor结构
                              • 3.2.2 Cursor对外接口
                              • 3.2.3 Seek(key)、First()、Last()实现分析
                            • 3.3 node节点的相关操作
                              • 3.3.1 node节点的定义
                              • 3.3.2 node节点和page转换
                              • 3.3.3 node节点的增删改查
                              • 3.3.4 node节点的分裂和合并
                            • 3.4 Bucket的相关操作
                              • 3.4.1 创建一个Bucket
                              • 3.4.2 获取一个Bucket
                              • 3.4.3 删除一个Bucket
                            • 3.5 key&value的插入、获取、删除
                              • 3.5.1 插入一个key&value
                              • 3.5.2 获取一个key&value
                              • 3.5.3 删除一个key&value
                              • 3.5.4 遍历Bucket中所有的key&value
                            • 3.6 Bucket的页分裂、页合并
                              • 3.7 总结
                              • 4. boltdb事务控制
                                • 4.1 boltdb事务简介
                                  • 4.2 boltdb事务Tx定义
                                    • 4.3 Begin()实现
                                      • 4.4 Commit()实现
                                        • 4.5 Rollback()实现
                                          • 4.6 总结
                                          • 5. boltdb的DB对象分析
                                            • 5.1 DB结构
                                              • 5.2 对外接口
                                                • 5.3 Open()实现分析
                                                  • 5.4 db.View()实现分析
                                                    • 5.5 db.Update()实现分析
                                                      • 5.6 db.Batch()实现分析
                                                        • 5.7 总结
                                                        • 6. 参考资料
                                                        • 7.结尾
                                                        相关产品与服务
                                                        对象存储
                                                        对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
                                                        领券
                                                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档