前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >自己动手写数据库:实现交易对象和恢复管理器

自己动手写数据库:实现交易对象和恢复管理器

作者头像
望月从良
发布2022-06-21 16:10:25
2840
发布2022-06-21 16:10:25
举报
文章被收录于专栏:Coding迪斯尼Coding迪斯尼

前面一节我们完成了用于实现系统恢复的日志,本节我们看看如何基于日志内容实现系统恢复。我们将设计一个系统恢复管理器,它在系统启动时读取日志内容,根据读到的日志对数据进行恢复,由于所谓“恢复”其实是交易的回滚,因此我们首先实现交易对象,前面为了测试方便,我们简单的提供了交易对象的几个简单接口,这里我们将实现一个逻辑完整的交易对象,只不过我们暂时忽略其并发管理逻辑,并发功能我们将在后面的章节进行实现。

首先我们先了解交易对象的基本结构:

这里我们先忽略并发管理,它将在后一节进行针对性的研究,我们首先实现Transation,BufferList,和RecoverMgr。前面章节中我们使用Buffer对象来实现数据写入缓存页面或者存入磁盘,而Transation其实是对Buffer提供接口的封装和调用,它除了支持数据读写功能外,还在此基础上提供了并发控制,恢复控制等功能,后面其他模块都必须通过Transation对象来实现数据的写入和读取,首先我们在interface.go中增加一个常量定义:

代码语言:javascript
复制
const (
    UINT64_LENGTH = 8
    END_OF_FILE = -1
)

接着增加一个buffer_list.go,它用来记录或快速查询当前被pin的内存页面,其内容如下:

代码语言:javascript
复制
package tx

import (
    fm "file_manager"
    "fmt"
    bm "buffer_manager"
)

type BufferList struct {
    buffers map[*fm.BlockId]*bm.Buffer 
    buffer_mgr *bm.BufferManager
    pins []*fm.BlockId
}

func NewBufferList(buffer_mgr *bm.BufferManager) *BufferList {
    buffer_list := &BufferList{
        buffer_mgr : buffer_mgr,
        buffers: make(map[*fm.BlockId]*bm.Buffer ),
        pins: []fm.BlockId,
    }

    return buffer_list
}

func (b *BufferList) get_buffer(blk *fm.BlockId) *bm.Buffer {
    buff, _ := b.buffers[blk]
    return buff 
}

func (b *BufferList) Pin(blk *fm.BlockId) error{
    //一旦一个内存页被pin后,将其加入map进行追踪管理
    buff, err := b.buffer_mgr.Pin(blk)
    if err != nil {
        return err
    }
    s.buffers[blk] = buff
    b.pins = append(b.pins, blk)
    return nil 
}

func (b *BufferList)Unpin(blk *fm.BlockId) {
    buffer, ok := b.buffers[blk]
    if !ok {
        return 
    }

    b.buffer_mgr.Unpin(blk)
    for idx, pinned_blk := range b.pins {
        if pinned_blk == blk {
            b.pins = append(s.pins[:idx], s.pins[idx+1]...)
            break
        }
    }

    delete(s.buffers, blk)
}

func (b *BufferList)UnpinAll() {
    for _, blk in range b.pins {
        buffer := s.buffers[blk]
        s.buffer_mgr.Unpin(buffer)
    }

    s.buffers = make(map[*fm.BlockId]*bm.Buffer)
    s.pins = make([]*fm.BlockId)
}

下面我们看看交易对象的实现,增加transation.go,添加代码如下:

代码语言:javascript
复制
package tx

import (
    fm "file_manager"
    "fmt"
    lg "log_manager"
    bm "buffer_manager"
    "sync"
    "errors"
)

var tx_num_mu sync.Mutex
next_tx_num := int32(0)

func NxtTxNum() int32{
    tx_num_mu.Lock()
    defer tx_num_mu.Unlock()

    next_tx_num = next_tx_num + 1

    return next_tx_num
}

type  Transation struct{
    //concur_mgr  ConcurrentMgr*
    //recovery_mgr RecorveryMgr* 
    file_manager *fm.FileManager
    log_manager *lg.LogManager 
    buffer_manager *bm.BufferManager
    my_buffers  *BufferList 
    tx_num      int32
}

func NewTransation(file_manager *fm.FileManager, log_manager *lg.LogManager, 
    buffer_manager bm.BufferManager) *Transation {
        tx := &Transation {
            //创建同步管理器
            //创建恢复管理器
            file_manager: file_manager,
            log_manager: log_manager,
            buffer_manager: buffer_manager,
            my_buffers: NewBufferList(buffer_manager), 
        }

        return tx
}

func (t *Transation)Commit() {
    //调用恢复管理器执行commit
    //t.recovery_mgr.Commit()

    r := fmt.Sprintf("transation %d  committed", t.tx_num)
    fmt.Println(r)
    //释放同步管理器
    t.my_buffers.UnpinAll()
}

func (t *Transation) Rollback() {
    //调用恢复管理器rollback
    //t.recovery_mft.Rollback()
    r := fmt.Sprintf("transation %d roll back", t.tx_num)
    //释放同步管理器
    t.my_buffers.UnpinAll()
}

func(t *Transation)Recover() {
    //系统启动时会在所有交易执行前执行该函数
    t.bm.FlushAll(t.tx_num)
    //调用回复管理器的recover接口
    //t.recovery_mgr.Recover()
}

func (t *Transation)Pin(blk *fm.BlockId) {
    t.my_buffers.Pin(blk)
}

func (t *Transation) Unpin(blk *fm.BlockId) {
    t.my_buffers.Unpin(blk)
}

func (t *Transation) buffer_no_exist(blk *fm.BlockId) error{
    err_s := fmt.Sprintf("No buffer found for given blk : %d with file name: %s\n", 
    blk.Number(), blk.FileName())
    err := errors.New(err_s)
    return err 
}

func (t *Transation) GetInt(blk *fm.BlockId, offset uint64) int64, error {
    //调用同步管理器加s锁
    //t.concur_mgr.Slock(blk)

    buff := t.my_buffers.get_buffer(blk)
    if buff == nil {
        return -1, t.buffer_no_exist(blk) 
    }

    return int64(buff.Contents.GetInt(offset)), nil 
}

func(t *Transation) GetString(blk *fm.BlockId, offset uint64) string, error {
    //调用同步管理器加s锁
    //t.concur_mgr.Slock(blk)

    buff := t.my_buffers.get_buffer(blk)
    if buff == nil {
        return "", t.buffer_no_exist(blk) 
    }

    return buff.Contents().GetString(offset), nil 
}

func (t *Transation) SetInt(blk *fm.BlockId, offset uint64, val int64, okToLog bool) error {
    //调用同步管理器加x锁
    //t.concur_mgr.Xlock(blk)

    buff := t.my_buffers.get_buffer(blk)
    if buff == nil {
        return t.buffer_no_exist(blk)
    }

    lsn := 0
    if okToLog {
        //调用恢复管理器的SetInt方法
        //lsn = t.recovery_mgr.SetInt(buff, offset, val)
    }

    p = buff.Contents()
    p.SetInt(offset, uint64(val))
    buff.SetModified(t.tx_num, lsn)
}

func (t *Transation) SetString(blk *fm.BlockId, offset uint64, val string, okToLog bool) error {
    //使用同步管理器加x锁
    //t.concur_mgr.Xlock(blk)

    buff := t.my_buffers.get_buffer(blk)
    if buff == nil {
        return t.buffer_no_exist(blk)
    }

    lsn := 0
    if okToLog {
        //调用恢复管理器SetString方法
        //lsn = t.recovery_mgr.SetString(buff, offset, val)
    }

    p := buff.Contents()
    p.SetString(offset, val)
    buff.SetModified(t.tx_num, lsn)
}

func (t *Transation) Size(file_name string) uint64 {
    //调用同步管理器加锁
    //dummy_blk := fm.NewBlockId(file_name, END_OF_FILE)
    //t.concur_mgr.Slock(dummy_blk)
    return t.file_manager.Size(file_name)
}

func (t *Transation)Append(file_name string) *fm.BlockId{
    //调用同步管理器加锁
    //dummy_blk := fm.NewBlockId(file_name, END_OF_FILE)
    //t.concur_mgr.Xlock(dummy_blk)
    blk, err := t.file_manager.Append(file_name)
    if err != nil {
        return nil 
    }

    return blk 
}

func (t *Transation) BlockSize() uint64{
    return t.file_manager.BlockSize()
}

func (t *Transation) AvailableBuffers() uint64{
    return t.buffer_manager.Available()
}

由于交易对象在执行写入或读出时需要根据并发情况加相应的锁,而且它在写入数据时还需要调用恢复管理器记录写入状况以便未来执行恢复操作,但是并发管理器和恢复管理器目前尚未实现,因此我们在调用他们的接口时先注释掉。下面我们看看恢复管理器的实现,一旦完成恢复管理器的代码后,我们再将上面涉及到恢复管理器的注释进行返注释。

下面我们再看看恢复管理器的实现,添加recovery_mgr.go,添加代码如下:

代码语言:javascript
复制
package tx

import (
    bm "buffer_manager"
    fm "file_manager"
    lg "log_manager"
)

type RecoveryManager struct {
    log_manager    *lg.LogManager
    buffer_manager *bm.BufferManager
    tx             *Transation
    tx_num         int32
}

func NewRecoveryManager(tx *Transation, tx_num int32, log_manager *lg.LogManager,
    buffer_manager *bm.BufferManager) *RecoveryManager {
    recovery_mgr := &RecoveryManager{
        tx:             tx,
        log_manager:    log_manager,
        buffer_manager: buffer_manager,
    }

    p := fm.NewPageBySize(32)
    p.SetInt(0, uint64(START))
    p.SetInt(8, uint64(tx_num))
    start_record := NewStartRecord(p, log_manager)
    start_record.WriteToLog()

    return recovery_mgr
}

func (r *RecoveryManager) Commit() error {
    r.buffer_manager.FlushAll(r.tx_num)
    lsn, err := WriteCommitkRecordLog(r.log_manager, uint64(r.tx_num))
    if err != nil {
        return err
    }

    r.log_manager.FlushByLSN(lsn)
    return nil
}

func (r *RecoveryManager) Rollback() error {
    r.doRollback()
    r.buffer_manager.FlushAll(r.tx_num)
    lsn, err := WriteRollBackLog(r.log_manager, uint64(r.tx_num))
    if err != nil {
        return err
    }

    r.log_manager.FlushByLSN(lsn)
    return nil
}

func (r *RecoveryManager) Recover() error {
    r.doRecover()
    r.buffer_manager.FlushAll(r.tx_num)
    lsn, err := WriteCheckPointToLog(r.log_manager)
    if err != nil {
        return err
    }

    r.log_manager.FlushByLSN(lsn)
    return nil
}

func (r *RecoveryManager) SetInt(buffer *bm.Buffer, offset uint64, new_val int64) (uint64, error) {
    old_val := buffer.Contents().GetInt(offset)
    blk := buffer.Block()
    buffer.Contents().SetInt(offset, uint64(new_val))
    return WriteSetIntLog(r.log_manager, uint64(r.tx_num), blk, offset, old_val)
}

func (r *RecoveryManager) SetString(buffer *bm.Buffer, offset uint64, new_val string) (uint64, error) {
    old_val := buffer.Contents().GetString(offset)
    blk := buffer.Block()
    buffer.Contents().SetString(offset, new_val)
    return WriteSetStringLog(r.log_manager, uint64(r.tx_num), blk, offset, old_val)
}

func (r *RecoveryManager) CreateLogRecord(bytes []byte) LogRecordInterface {
    p := fm.NewPageByBytes(bytes)
    switch RECORD_TYPE(p.GetInt(0)) {
    case CHECKPOINT:
        return NewCheckPointRecord()
    case START:
        return NewStartRecord(p, r.log_manager)
    case COMMIT:
        return NewCommitkRecordRecord(p)
    case ROLLBACK:
        return NewRollBackRecord(p)
    case SETINT:
        return NewSetIntRecord(p)
    case SETSTRING:
        return NewSetStringRecord(p)
    default:
        panic("Unknow log interface")
    }
}

func (r *RecoveryManager) doRollback() {
    iter := r.log_manager.Iterator()
    for iter.HasNext() {
        rec := iter.Next()
        log_record := r.CreateLogRecord(rec)
        if log_record.TxNumber() == uint64(r.tx_num) {
            if log_record.Op() == START {
                return
            }

            log_record.Undo(r.tx)
        }
    }
}

func (r *RecoveryManager) doRecover() {
    finishedTxs := make(map[uint64]bool)
    iter := r.log_manager.Iterator()
    for iter.HasNext() {
        rec := iter.Next()
        log_record := r.CreateLogRecord(rec)
        if log_record.Op() == CHECKPOINT {
            return
        }
        if log_record.Op() == COMMIT || log_record.Op() == ROLLBACK {
            finishedTxs[log_record.TxNumber()] = true
        }
        existed, _ := finishedTxs[log_record.TxNumber()]
        if existed {
            log_record.Undo(r.tx)
        }
    }
}

完成上面代码后,我们记得取消掉transation.go里面关于恢复管理器的注释,为了检测我们的代码基本逻辑是否正确,我们在main.go中拟写如下代码:

代码语言:javascript
复制
package main

import (
    //"encoding/binary"
    fm "file_manager"
    lm "log_manager"

    bmg "buffer_manager"

    "fmt"

    "tx"
)

func main() {
    file_manager, _ := fm.NewFileManager("txtest", 400)
    log_manager, _ := lm.NewLogManager(file_manager, "logfile")
    buffer_manager := bmg.NewBufferManager(file_manager, log_manager, 3)

    tx1 := tx.NewTransation(file_manager, log_manager, buffer_manager)
    blk := fm.NewBlockId("testfile", 1)
    tx1.Pin(blk)
    //设置log为false,因为一开始数据没有任何意义,因此不能进行日志记录
    tx1.SetInt(blk, 80, 1, false) 
    tx1.SetString(blk, 40, "one", false)
    tx1.Commit() //执行回滚操作后,数据会还原到这里写入的内容

    tx2 := tx.NewTransation(file_manager, log_manager, buffer_manager)
    tx2.Pin(blk)
    ival, _ := tx2.GetInt(blk, 80)
    sval, _ := tx2.GetString(blk, 40)
    fmt.Println("initial value at location 80 = ", ival)
    fmt.Println("initial value at location 40 = ", sval)
    new_ival := ival + 1
    new_sval := sval + "!"
    tx2.SetInt(blk, 80, new_ival, true)
    tx2.SetString(blk, 40, new_sval, true)
    tx2.Commit() //尝试写入新的数据

    tx3 := tx.NewTransation(file_manager, log_manager, buffer_manager)
    tx3.Pin(blk)
    ival, _ = tx3.GetInt(blk, 80)
    sval, _ = tx3.GetString(blk, 40)
    fmt.Println("new ivalue at location 80: ", ival)
    fmt.Println("new svalue at location 40: ", sval)
    tx3.SetInt(blk, 80, 999, true)
    ival, _ = tx3.GetInt(blk, 80)
    //写入数据后检查是否写入正确
    fmt.Println("pre-rollback ivalue at location 80: ", ival)
    tx3.RollBack() //执行回滚操作,并确定回滚到第一次写入内容

    tx4 := tx.NewTransation(file_manager, log_manager, buffer_manager)
    tx4.Pin(blk)
    ival, _ = tx4.GetInt(blk, 80)
    fmt.Println("post-rollback at location 80 = ", ival)
    tx4.Commit() //执行到这里时,输出内容应该与第一次写入内容相同

}

完成后我们执行上面代码得到输出如下:

从输出结果看,我们先在给定位置写入数值1,然后再写入数组2,999,最后调用RollBack()执行回滚,然后再取出同样位置的数据进行打印,通过输出可以看到,RollBack执行后,给定位置的数值变成了最开始我们输入的数值,如此看来,恢复管理器的基本逻辑应该是正确的。代码下载:

链接: https://pan.baidu.com/s/1rmNZngVOEnuCpjdGKshkKg 提取码: t772

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-05-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Coding迪斯尼 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档