前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >自己动手写数据库:实现表扫描

自己动手写数据库:实现表扫描

作者头像
望月从良
发布2022-12-02 20:31:59
3160
发布2022-12-02 20:31:59
举报
文章被收录于专栏:Coding迪斯尼Coding迪斯尼

在上一节我们实现了记录管理,本节我们看看记录的读取实现,也就是所谓的表扫描。我们将实现一个名为TableScan的类,它把表的记录当做数组来读取,通过挪到一个记录指针来遍历表的记录,它的作用类似于cursor。我们实现这个类的目的是增加一层抽象,前面我们实现的RecordPage暴露很多底层信息,例如记录的数据格式,区块号等,我们通过这个类将这些底层信息隐藏起来,通过它来读写记录可以避开对区块号,存储页面等底层对象。

一条记录可以由两个信息来确定,分别是区块号,其用来表明记录信息存储在哪个区块,另一个是插槽号,用来查找记录的起始偏移,我们先用一个类RecordIdentifier将这些信息封装起来,我们先看看其接口定义,在interface.go中添加代码如下:

代码语言:javascript
复制
type RIDInterface interface {
    BlockNumber() int //记录所在的区块号
    Slot()  int //记录的插槽号
}

TableScan类用来遍历给定表的记录,它首先定位”当前目录“,然后通过其提供的接口可以实现读取上一条或下一条目录,我们先看看其要实现的接口,在interface.go中添加代码如下:

代码语言:javascript
复制
type TableScanInterface interface {
    Close()
    HasField(field_name string) bool
    BeforeFirst()             //将指针放在第一条记录前
    Next() bool               //读取下一条记录
    MoveToRid(r RIDInterface) //跳转到给定目录
    Insert()

    GetInt(field_name string) int 
    GetString(field_name string) string 
    SetInt(field_name string, val int)
    SetString(field_name string, val string)
    CurrentRID() RIDInterface
    Delete() 
}

可以看到TableScan类对应接口跟RecordPage很像,只不过它不直接处理交易对象(Tx),区块号等底层数据。下面我们看看TableScan类的具体实现,增加名为table_scan.go文件,增加内容如下:

代码语言:javascript
复制
package record_manager

import (
    fm "file_manager"
    "tx"
)

type TableScan struct {
    tx           *tx.Transation
    layout       LayoutInterface
    rp           RecordManagerInterface
    file_name    string
    current_slot int
}

func NewTableScan(tx *tx.Transation, table_name string, layout LayoutInterface) *TableScan {
    table_scan := &TableScan{
        tx:        tx,
        layout:    layout,
        file_name: table_name + ".tbl",
    }

    size, err := tx.Size(table_scan.file_name)
    if err != nil {
        panic(err)
    }
    if size == 0 {
        //如果文件为空,那么增加一个区块
        table_scan.MoveToNewBlock()
    } else {
        //先读取第一个区块
        table_scan.MoveToBlock(0)
    }

    return table_scan
}

func (t *TableScan) Close() {
    if t.rp != nil {
        t.tx.UnPin(t.rp.Block())
    }
}

func (t *TableScan) BeforeFirst() {
    t.MoveToBlock(0)
}

func (t *TableScan) Next() bool {
    /*
        如果在当前区块找不到给定有效记录则遍历后续区块,直到所有区块都遍历为止
    */
    t.current_slot = t.rp.NextAfter(t.current_slot)
    for t.current_slot < 0 {
        if t.AtLastBlock() {
            //直到最后一个区块都找不到给定插槽
            return false
        }

        t.MoveToBlock(int(t.rp.Block().Number() + 1))
        t.current_slot = t.rp.NextAfter(t.current_slot)
    }

    return true
}

func (t *TableScan) GetInt(field_name string) int {
    return t.rp.GetInt(t.current_slot, field_name)
}

func (t *TableScan) GetString(field_name string) string {
    return t.rp.GetString(t.current_slot, field_name)
}

func (t *TableScan) GetVal(field_name string) *Constant {
    if t.layout.Schema().Type(field_name) == INTEGER {
        return NewConstantInt(t.GetInt(field_name))
    }

    return NewConstantString(t.GetString(field_name))
}

func (t *TableScan) HasField(field_name string) bool {
    return t.layout.Schema().HasFields(field_name)
}

func (t *TableScan) SetInt(field_name string, val int) {
    t.rp.SetInt(t.current_slot, field_name, val)
}

func (t *TableScan) SetString(field_name string, val string) {
    t.rp.SetString(t.current_slot, field_name, val)
}

func (t *TableScan) SetVal(field_name string, val *Constant) {
    if t.layout.Schema().Type(field_name) == INTEGER {
        t.SetInt(field_name, val.IVal)
    } else {
        t.SetString(field_name, val.SVal)
    }
}

func (t *TableScan) Insert() {
    /*
        将当前插槽号指向下一个可用插槽
    */
    t.current_slot = t.rp.InsertAfter(t.current_slot)
    for t.current_slot < 0 { //当前区块找不到可用插槽
        if t.AtLastBlock() {
            //如果当前处于最后一个区块,那么新增一个区块
            t.MoveToNewBlock()
        } else {
            t.MoveToBlock(int(t.rp.Block().Number() + 1))
        }

        t.current_slot = t.rp.InsertAfter(t.current_slot)
    }
}

func (t *TableScan) Delete() {
    t.rp.Delete(t.current_slot)
}

func (t *TableScan) MoveToRid(r RIDInterface) {
    t.Close()
    blk := fm.NewBlockId(t.file_name, uint64(r.BlockNumber()))
    t.rp = NewRecordPage(t.tx, blk, t.layout)
    t.current_slot = r.Slot()
}

func (t *TableScan) GetRid() RIDInterface {
    return NewRID(int(t.rp.Block().Number()), t.current_slot)
}

func (t *TableScan) MoveToBlock(blk_num int) {
    t.Close()
    blk := fm.NewBlockId(t.file_name, uint64(blk_num))
    t.rp = NewRecordPage(t.tx, blk, t.layout)
    t.current_slot = -1
}

func (t *TableScan) MoveToNewBlock() {
    t.Close()
    blk, err := t.tx.Append(t.file_name)
    if err != nil {
        panic(err)
    }
    t.rp = NewRecordPage(t.tx, blk, t.layout)
    t.rp.Format()
    t.current_slot = -1
}

func (t *TableScan) AtLastBlock() bool {
    size, err := t.tx.Size(t.file_name)
    if err != nil {
        panic(err)
    }
    return t.rp.Block().Number() == size-1
}

TableScan 主要是在RecordPage基础上对记录的读取进行了封装,用于依次读取给定表的记录。在使用它时,首先调用其BeforeFirst接口将记录的指针挪到首条记录,然后调用Next挪到记录指针指向下一条要读取的记录,最后使用GetInt,GetString来读取对应字段的内容。它的实现中还用到两个类,分别是Constant和RID,这两个类的实现都很简单,先创建Constant.go,实现内容如下:

代码语言:javascript
复制
package record_manager

import (
    "strconv"
)

type Constant struct {
    IVal  int
    SVal  string
    isInt bool
}

func NewConstantInt(val int) *Constant {
    return &Constant{
        IVal:  val,
        isInt: true,
    }
}

func NewConstantString(val string) *Constant {
    return &Constant{
        SVal:  val,
        isInt: false,
    }
}

func (c *Constant) AsInt() int {
    return c.IVal
}

func (c *Constant) AsString() string {
    return c.SVal
}

func (c *Constant) Equals(other *Constant) bool {
    if c.isInt {
        return c.IVal == other.IVal
    }

    return c.SVal == other.SVal
}

func (c *Constant) CompareTo(other *Constant) int {
    if c.isInt {
        return c.IVal - other.IVal
    }

    if c.SVal > other.SVal {
        return 1
    } else if c.SVal == other.SVal {
        return 0
    } else {
        return -1
    }
}

func (c *Constant) ToString() string {
    if c.isInt {
        return strconv.Itoa(c.IVal)
    }

    return c.SVal
}

创建rid.go,实现内容如下:

代码语言:javascript
复制
package record_manager

import (
    "fmt"
)

type RID struct {
    blk_num int
    slot    int
}

func NewRID(blk_num int, slot int) *RID {
    return &RID{
        blk_num: blk_num,
        slot:    slot,
    }
}

func (r *RID) BlockNumber() int {
    return r.blk_num
}

func (r *RID) Slot() int {
    return r.slot
}

func (r *RID) Equals(other RIDInterface) bool {
    return r.blk_num == other.BlockNumber() && r.slot == other.Slot()
}

func (r *RID) ToString() string {
    return fmt.Sprintf("[ %d , %d ]", r.blk_num, r.slot)
}

最后添加table_scan_test.go,实现TableScan对象的测试用例:

代码语言:javascript
复制
package record_manager

import (
    bmg "buffer_manager"
    fm "file_manager"
    "fmt"
    "github.com/stretchr/testify/require"
    lm "log_manager"
    "math/rand"
    "testing"
    "tx"
)

func TestTableScanInsertAndDelete(t *testing.T) {
    file_manager, _ := fm.NewFileManager("recordtest", 400)
    log_manager, _ := lm.NewLogManager(file_manager, "logfile.log")
    buffer_manager := bmg.NewBufferManager(file_manager, log_manager, 3)

    tx := tx.NewTransation(file_manager, log_manager, buffer_manager)
    sch := NewSchema()

    sch.AddIntField("A")
    sch.AddStringField("B", 9)
    layout := NewLayoutWithSchema(sch)
    for _, field_name := range layout.Schema().Fields() {
        offset := layout.Offset(field_name)
        fmt.Printf("%s has offset %d\n", field_name, offset)
    }

    ts := NewTableScan(tx, "T", layout)
    fmt.Println("Filling the table with 50 random records")
    ts.BeforeFirst()
    val_for_field_A := make([]int, 0)
    for i := 0; i < 50; i++ {
        ts.Insert() //指向一个可用插槽
        n := rand.Intn(50)
        ts.SetInt("A", n)
        val_for_field_A = append(val_for_field_A, n)
        s := fmt.Sprintf("rec%d", n)
        ts.SetString("B", s)
        fmt.Printf("inserting into slot %s: {%d , %s}\n", ts.GetRid().ToString(), n, s)
    }

    ts.BeforeFirst()
    //测试插入的内容是否正确
    slot := 0
    for ts.Next() {
        a := ts.GetInt("A")
        b := ts.GetString("B")
        require.Equal(t, a, val_for_field_A[slot])
        require.Equal(t, b, fmt.Sprintf("rec%d", a))
        slot += 1
    }

    fmt.Println("Deleting records with A-values < 25")
    count := 0
    ts.BeforeFirst()
    for ts.Next() {
        a := ts.GetInt("A")
        b := ts.GetString("B")
        if a < 25 {
            count += 1
            fmt.Printf("slot %s : { %d , %s}\n", ts.GetRid().ToString(), a, b)
            ts.Delete()
        }
    }

    fmt.Println("Here are the remaining records:")
    ts.BeforeFirst()
    for ts.Next() {
        a := ts.GetInt("A")
        b := ts.GetString("B")
        require.Equal(t, a >= 25, true)
        fmt.Printf("slot %s : { %d , %s}\n", ts.GetRid(), a, b)
    }

    ts.Close()
    tx.Commit()
}

其内容和RecordPage的测试用例一模一样,只不过调用的接口稍微有所变化而已。更详细的调试演示在B站搜索Coding迪斯尼。代码下载:链接: https://pan.baidu.com/s/1gaFleDPkR1FcPt6wwHCEdA 提取码: 2tji

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-10-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Coding迪斯尼 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档