前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >自己动手写数据库:并发管理组件lock_table的原理和实现

自己动手写数据库:并发管理组件lock_table的原理和实现

作者头像
望月从良
发布2022-12-02 20:27:47
2850
发布2022-12-02 20:27:47
举报
文章被收录于专栏:Coding迪斯尼

在前面章节,我们描述的并发控制的一些基本原理。其中一个重要原则就是“序列化”,也就数据库引擎要对交易提交的请求进行调度,调度的结果要使得每个交易就好像独占了引擎那样。要实现这样的效果就必须进行相应的加锁。但是加锁必然会降低高并发的效率,因此改进办法是实现两种锁,一种是互斥锁,他用于保证区块写入的安全性,一个区块加锁后其他任何操作,无论是读还是写,都不能执行,必须要等到互斥锁释放。另一种是共享锁,他运行多个读操作同时进行,但是不允许执行写操作,必须等到共享锁全部释放后才可以。

本节的目的就在于如何实现两种锁机制。尽管go语言提供了很多并发机制,他也有共享锁和互斥锁,但还不足以满足一个数据库的并发要求,这也是我们需要进行相应设计的原因。我们所设计的锁机制作用的对象是区块,因此我们需要一种对应机制,不同的区块要对应不同的锁,因此我们要实现一个类似map的对象,他也称为LockTable。

他要实现的目的有,一,让不同的区块对应不同的锁,因此引擎在读写区块1时,不影响对区块2的操作。第二,他要有超时回退机制,当引擎通过他读写某个区块,发现长时间得不到操作所需要的锁时,这个时候极有可能发生了死锁,因此他要能检测此种情况并实现操作的回滚。下面我们看看代码的实现,首先在tx模块中添加新文件命名为lock_table.go,然后先输入如下代码:

代码语言:javascript
复制
package tximport (    "errors"
    fm "file_manager"
    "sync"
    "time")const (
    MAX_WAITING_TIME = 3 //3用于测试,在正式使用时设置为10)func NewLockTable() *LockTable {    /*
        如果给定blk对应的值为-1,表明有互斥锁,如果大于0表明有相应数量的共享锁加在对应区块上,
        如果是0则表示没有锁
    */
    lock_table := &LockTable{
        lock_map:    make(map[*fm.BlockId]int64),
        notify_chan: make(map[*fm.BlockId]chan struct{}),
        notify_wg:   make(map[*fm.BlockId]*sync.WaitGroup),
    }    return lock_table
}

接下来我们要实现这样的功能,假设有3个线程编号分别为1,2,3,第一个线程要写入区块1,第二,三个线程要读取区块1,如果线程1先获得了互斥锁,那么线程2,3就必须挂起,等待线程1释放锁。如果在时间MAX_WAITING_TIME范围内线程1完成操作释放了互斥锁,那么线程2,3就能被唤醒,他们将获得共享锁,然后同时读取区块1的数据。如果在MAX_WAITING_TIME所表达的时间范围内依然无法读取区块1,那么他们就需要唤醒,然后放弃读取操作。

确切的说这里需要实现WaitGivenTimeOut功能,当调用这个函数时,对应的线程会在给定时间段内挂起,一旦超时后才被唤醒。同时我们还需要实现NotifyAll接口,一旦某个线程调用这个函数后,所有因为执行WaitGivenTimeOut而被挂起的线程都要被唤醒,然后往下执行。

如果了解go语言的同学可能知道,sync包中有个Cond类,他的Wait接口能实现调用线程挂起功能,同时他对应的Broadcast能实现唤醒所有因为调用Wait而挂起的线程。但是问题在于Wait接口不能实现挂起特定时间,因此一旦调用该接口后,必须等待其他线程调用Signal接口或是Broadcast接口才能实现唤醒。

由此我们要自己实现WaitGivenTimeOut对应的功能,相关代码如下:

代码语言:javascript
复制
type LockTable struct {
    lock_map    map[*fm.BlockId]int64           //将锁和区块对应起来
    notify_chan map[*fm.BlockId]chan struct{}   //用于实现超时回退的管道
    notify_wg   map[*fm.BlockId]*sync.WaitGroup //用于实现唤醒通知
    method_lock sync.Mutex                      //实现方法调用的线程安全,相当于java的synchronize关键字}func (l *LockTable) waitGivenTimeOut(blk *fm.BlockId) {
    wg, ok := l.notify_wg[blk]    if !ok {        var new_wg sync.WaitGroup
        l.notify_wg[blk] = &new_wg
        wg = &new_wg
    }
    wg.Add(1)    defer wg.Done()
    l.method_lock.Unlock() //挂起前释放方法锁
    select {    case <-time.After(MAX_WAITING_TIME * time.Second):    case <-l.notify_chan[blk]:
    }
    l.method_lock.Lock() //唤起后加上方法锁}func (l *LockTable) notifyAll(blk *fm.BlockId) {    go func() {        //等待所有线程返回后再重新设置channel
        l.notify_wg[blk].Wait()
        l.notify_chan[blk] = make(chan struct{})
    }()    close(l.notify_chan[blk])
}

函数waitGivenTimeOut需要输入模块对应的BlockId对象。notify_wg是一个map对象,他将WaitGoup和特定的区块号对应起来,如果有多个线程要访问同一个区块,那么对应的WaitGroup就会执行Add(1)操作。接下来的Select 语句用于实现线程挂起,第一个case用于将线程挂起给定时间,第二个case用于将线程唤醒。

假设线程2,3因为执行waitGivenTimeOut函数而被挂起,那么这两个线程会因为两种情况会被重新唤起,第一种情况就是超时,也就是time.After对应的管道会启动,从而将两个线程唤起。第二种情况是线程1调用了notifyAll,注意到这里关闭了对应区块的管道,于是select语句中第二个case得到执行,于是挂起的线程被唤醒。注意到notifyAll还启动了一个线程,他的作用是等待给定区块对应的WaitGroup能完成,当l.notify_wg[blk].Wait返回后,那意味着所有挂起的线程都完成了唤醒操作,这时他就重新给区块对应的管道重新赋值,以便于下次使用。

并发设计难度很大,也很容易出错,上面的做法可能存在一些问题,以后发现时我们再进行修改。接下来我们需要设计互斥锁和共享锁,互斥锁对应的接口为XLock, 共享锁对应的接口为SLock:

代码语言:javascript
复制
func (l *LockTable) initWaitingOnBlk(blk *fm.BlockId) {
    _, ok := l.notify_chan[blk]    if !ok {
        l.notify_chan[blk] = make(chan struct{})
    }

    _, ok = l.notify_wg[blk]    if !ok {
        l.notify_wg[blk] = &sync.WaitGroup{}
    }
}func (l *LockTable) SLock(blk *fm.BlockId) error {
    l.method_lock.Lock()    defer l.method_lock.Unlock()
    l.initWaitingOnBlk(blk)
    start := time.Now()    for l.hasXlock(blk) && !l.waitingTooLong(start) {
        l.waitGivenTimeOut(blk)
    }    //如果等待过长时间,有可能是产生了死锁
    if l.hasXlock(blk) {        return errors.New("SLock Exception: XLock on given blk")
    }

    val := l.getLockVal(blk)
    l.lock_map[blk] = val + 1
    return nil}func (l *LockTable) XLock(blk *fm.BlockId) error {
    l.method_lock.Lock()    defer l.method_lock.Unlock()
    l.initWaitingOnBlk(blk)
    start := time.Now()    for l.hasOtherSLocks(blk) && !l.waitingTooLong(start) {
        l.waitGivenTimeOut(blk)
    }    if l.hasOtherSLocks(blk) {        return errors.New("XLock error: SLock on given blk")
    }    //-1表示区块被加上互斥锁
    l.lock_map[blk] = -1

    return nil}func (l *LockTable) UnLock(blk *fm.BlockId) {
    l.method_lock.Lock()    defer l.method_lock.Unlock()

    val := l.getLockVal(blk)    if val >= 1 {
        l.lock_map[blk] = val - 1
    } else {        delete(l.lock_map, blk)        //通知所有等待给定区块的线程从Wait中恢复
        l.notifyAll(blk)
    }
}func (l *LockTable) hasXlock(blk *fm.BlockId) bool {    return l.getLockVal(blk) < 0}func (l *LockTable) hasOtherSLocks(blk *fm.BlockId) bool {    return l.getLockVal(blk) >= 1}func (l *LockTable) waitingTooLong(start time.Time) bool {
    elapsed := time.Since(start).Seconds()    if elapsed >= MAX_WAITING_TIME {        return true
    }    return false}func (l *LockTable) getLockVal(blk *fm.BlockId) int64 {
    val, ok := l.lock_map[blk]    if !ok {        return 0
    }    return val
}

在上面的代码实现中,需要注意的是,在调用XLock或是SLock时,首先需要判断给定区块是否被加上其他锁,也就是调用XLock时需要判断区块是否已经被加了共享锁,调用SLock时判断区块是否已经被加了互斥锁。

如果区块已经被加上了其他锁,那么线程就会先挂起一段时间,例如某个线程调用SLock给区块1加上共享锁,此时他发现区块1已经被加上了互斥锁,那么他就会调用waitGivenTimeOut来挂起一段时间,超时后他会自动唤醒,然后再次判断互斥锁是否已经解除,如果还没解除,那么他需要返回一个错误,于是对应的交易就要放弃读取给定区块的内容。如果它在挂起期间,另一个线程已经完成了写入操作,于是就会调用notifyAll接口,然后该线程就会从挂起中恢复,接下来他就会对区块加上共享锁,然后读取区块的数据。

这里我们实现共享锁和互斥锁的机制很简单,我们使用一个map来实现。如果给定区块被加上互斥锁,那么该区块对应数值就是-1,如果被加上共享锁,那么对应的数值就是一个大于0的数,如果有三个线程针对同一区块获取共享锁,那么该区块对应的数值就是3,当给定区块数值对应为0时,表示没有锁加在给定区块上。

下面我们需要对上面实现的逻辑进行检测,首先要检验waitGivenTimeOut和notifyAll的正确性,测试用例这么做,首先创建区块1,然后启动4个线程,第一个线程先在区块1上获取互斥锁,接下来启动线程2,3,4,后面三个线程针对区块1获得共享锁,由于此时区块1已经被加上互斥锁,因此后面三个线程会被挂起给定时长MAX_WAITING_TIME,一旦超时后他们被唤醒,然后发现区块1依然被加上了互斥锁,于是返回错误,下面是测试用例的实现:

代码语言:javascript
复制
func TestRoutinesWithSLockTimeout(t *testing.T) {    var err_array []error    var err_array_lock sync.Mutex
    blk := fm.NewBlockId("testfile", 1)
    lock_table := NewLockTable()
    lock_table.XLock(blk)    var wg sync.WaitGroup    for i := 0; i < 3; i++ {        go func() {
            wg.Add(1)            defer wg.Done()
            err_array_lock.Lock()            defer err_array_lock.Unlock()
            err := lock_table.SLock(blk)            if err == nil {
                fmt.Println("access slock ok")
            }
            err_array = append(err_array, err)
        }()
    }
    time.Sleep(1 * time.Second) //让线程都运行起来
    start := time.Now()
    wg.Wait()
    elapsed := time.Since(start).Seconds()
    require.Equal(t, elapsed >= MAX_WAITING_TIME, true)
    require.Equal(t, len(err_array), 3)    for i := 0; i < 3; i++ {
        require.Equal(t, err_array[i], errors.New("SLock Exception: XLock on given blk"))
    }
}

我们用如下命令执行上面的用例:

代码语言:javascript
复制
go test tx -run TestRoutinesWithSLockTimeout

运行后返回结果为:

代码语言:javascript
复制
ok      tx      9.753s

也就是说测试用例的执行是成功的。我们再看一个用例,线程1先获取互斥锁,然后启动3个线程去获取共享锁并进入挂起状态,线程1在挂起超时前释放互斥锁,调用notifyAll唤起所有挂起的线程,被唤起的线程都能获得共享锁并读取区块数据,代码如下:

代码语言:javascript
复制
func TestRoutinesWithSLockAfterXLockRelease(t *testing.T) {
    var err_array []error
    var err_array_lock sync.Mutex
    blk := fm.NewBlockId("testfile", 1)
    lock_table := NewLockTable()
    lock_table.XLock(blk)
    var wg sync.WaitGroup
    for i := 0; i < 3; i++ {
        go func() {
            wg.Add(1)
            defer wg.Done()
            err_array_lock.Lock()
            defer err_array_lock.Unlock()
            err := lock_table.SLock(blk)
            if err == nil {
                fmt.Println("access slock ok")
            }
            err_array = append(err_array, err)
        }()
    }
    time.Sleep(1 * time.Second) //让线程都运行起来
    lock_table.UnLock(blk)      //释放加在区块上的互斥锁
    start := time.Now()
    wg.Wait()
    elapsed := time.Since(start).Seconds()
    require.Equal(t, elapsed < MAX_WAITING_TIME, true)
    require.Equal(t, len(err_array), 3)
    for i := 0; i < 3; i++ {
        require.Nil(t, err_array[i]) //所有线程能获得共享锁然后读取数据
    }

    require.Equal(t, lock_table.lock_map[blk], int64(3))
}

上面的用例经过多次运行均能通过。更多更详细的视频演示和讲解请参看b站,搜索Coding迪斯尼。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-08-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Coding迪斯尼 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档