前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >JuiceFS 源码阅读-中

JuiceFS 源码阅读-中

作者头像
用户1260683
发布2021-08-12 14:53:39
7650
发布2021-08-12 14:53:39
举报

JuiceFS锁的实现

JuiceFS的锁实现,目前同时实现了BSD locks(对应Flock)和POSIX locks(对应Setlk)。细节上最大区别就是BSD locks只能以FD为最小控制单位(简单理解为单文件加锁,锁定的是文件描述符fd对应的文件),而POSIX locks可以在一个文件中以文件的offset+length的方式进行加锁(按文件内容进行范围加锁)。

BSD locks

代码语言:javascript
复制
//pkg/vfs/vfs_unix.go

func Flock(ctx Context, ino Ino, fh uint64, owner uint64, typ uint32, block bool) (err syscall.Errno) {
    var name string
    var reqid uint32
    defer func() { logit(ctx, "flock (%d,%d,%016X,%s,%t): %s", reqid, ino, owner, name, block, strerr(err)) }()
    switch typ {
    case syscall.F_RDLCK:
        name = "LOCKSH"
    case syscall.F_WRLCK:
        name = "LOCKEX"
    case syscall.F_UNLCK:
        name = "UNLOCK"
    default:
        err = syscall.EINVAL
        return
    }

    if IsSpecialNode(ino) {
        err = syscall.EPERM
        return
    }
    h := findHandle(ino, fh)
    if h == nil {
        err = syscall.EBADF
        return
    }
    h.addOp(ctx)
    defer h.removeOp(ctx)
    err = m.Flock(ctx, ino, owner, typ, block) //核心在元数据部分的控制,具体参考下面部分代码注释
    if err == 0 {
        h.Lock()
        if typ == syscall.F_UNLCK {
            h.locks &= 2
        } else {
            h.locks |= 1
            h.flockOwner = owner
        }
        h.Unlock()
    }
    return
}
代码语言:javascript
复制
//pkg/meta/sql_unix.go
func (m *dbMeta) Flock(ctx Context, inode Ino, owner uint64, ltype uint32, block bool) syscall.Errno {
    if ltype == syscall.F_UNLCK {//如果为解锁操作,则只需要删除对应的db记录即可
        return errno(m.txn(func(s *xorm.Session) error {
            _, err := s.Delete(&flock{Inode: inode, Owner: owner, Sid: m.sid})
            return err
        }))
    }
    var err syscall.Errno
    //循环处理加锁请求,分为阻塞(block=true)和非阻塞两种类型操作
    for {
        err = errno(m.txn(func(s *xorm.Session) error {
        //获取inode信息,避免锁指向的对象不存在,成为空锁。
            if exists, err := s.Get(&node{Inode: inode}); err != nil || !exists {
                if err == nil && !exists {
                    err = syscall.ENOENT
                }
                return err
            }
            //查询inode关联的全部锁信息
            rows, err := s.Rows(&flock{Inode: inode})
            if err != nil {
                return err
            }
            type key struct {
                sid uint64
                o   uint64
            }
            var locks = make(map[key]flock)
            var l flock
            for rows.Next() {
                if rows.Scan(&l) == nil {
                //执行迭代,将查询结果临时保存到locks数据结构中
                    locks[key{l.Sid, l.Owner}] = l
                }
            }
            rows.Close()
//判断需要加锁的类型是否为读锁,如果已经有写锁则加锁失败
            if ltype == syscall.F_RDLCK {
                for _, l := range locks {
                    if l.Ltype == 'W' {
                        return syscall.EAGAIN
                    }
                }
                //没有写锁冲突,则通过insert记录加上读锁
                return mustInsert(s, flock{Inode: inode, Owner: owner, Ltype: 'R', Sid: m.sid})
            }

            //加写入锁逻辑:先判断是否已经有写入锁(判断locks中是否有重复键值),如果有则更新锁的记录,否则直接insert插入对应的记录
            me := key{m.sid, owner}
            _, ok := locks[me]
            delete(locks, me)
            if len(locks) > 0 {
                return syscall.EAGAIN
            }
            if ok {
                _, err = s.Cols("Ltype").Update(&flock{Ltype: 'W'}, &flock{Inode: inode, Owner: owner, Sid: m.sid})
            } else {
                err = mustInsert(s, flock{Inode: inode, Owner: owner, Ltype: 'W', Sid: m.sid})
            }
            return err
        }))
    //非阻塞or报错直接返回结果
        if !block || err != syscall.EAGAIN {
            break
        }
        //阻塞情况下加写锁,则等待固定时长再进行下一轮加锁操作
        if ltype == syscall.F_WRLCK {
            time.Sleep(time.Millisecond * 1)
        } else {
            time.Sleep(time.Millisecond * 10)
        }
        if ctx.Canceled() {
            return syscall.EINTR
        }
    }
    return err
}

POSIX locks

按pid进行范围加锁,实现起来相对比较复杂,核心算法在updateLocks中实现。

代码语言:javascript
复制
func (m *dbMeta) Setlk(ctx Context, inode Ino, owner_ uint64, block bool, ltype uint32, start, end uint64, pid uint32) syscall.Errno {
    var err syscall.Errno
    lock := plockRecord{ltype, pid, start, end}//以pid为粒度,所以适合单机多进/线程模型,跨节点不太合适
    owner := int64(owner_)
    for {
        err = errno(m.txn(func(s *xorm.Session) error {
            if exists, err := s.Get(&node{Inode: inode}); err != nil || !exists {
                if err == nil && !exists {
                    err = syscall.ENOENT
                }
                return err
            }
            //unlock操作
            if ltype == F_UNLCK {
            //sid代表session ID,每个客户端的数据库连接都有一个独立的ID实例
                var l = plock{Inode: inode, Owner: owner, Sid: m.sid}
                ok, err := m.engine.Get(&l) //按inode、owner、sid三个字段组合,查询锁列表
                if err != nil {
                    return errno(err)
                }
                if !ok {
                    return nil
                }
                ls := loadLocks([]byte(l.Records)) //解析锁列表信息
                if len(ls) == 0 {
                    return nil
                }
                ls = updateLocks(ls, lock) //在已有所列表里面新增锁记录,有点复杂,之后详细介绍
                if len(ls) == 0 {
                    _, err = s.Delete(&plock{Inode: inode, Owner: owner, Sid: m.sid}) //删除锁记录
                } else {
                    _, err = s.Cols("records").Update(plock{Records: dumpLocks(ls)}, l) //更新已有所记录
                }
                return err
            }
            //以inode为关键字,查找已有的锁列表
            rows, err := s.Rows(&plock{Inode: inode})
            if err != nil {
                return errno(err)
            }
            type key struct {
                sid   uint64
                owner int64
            }
            var locks = make(map[key][]byte)
            var l plock
            //按查询结果构建锁map
            for rows.Next() {
                if rows.Scan(&l) == nil {
                    locks[key{l.Sid, l.Owner}] = dup(l.Records)
                }
            }
            rows.Close()
            //遍历map,判断是否有冲突锁
            lkey := key{m.sid, owner}
            for k, d := range locks {
                if k == lkey {
                    continue
                }
                ls := loadLocks([]byte(d))
                for _, l := range ls {
                    // find conflicted locks
                    if (ltype == F_WRLCK || l.ltype == F_WRLCK) && end > l.start && start < l.end {
                        return syscall.EAGAIN
                    }
                }
            }
            ls := updateLocks(loadLocks([]byte(locks[lkey])), lock) //更新锁列表信息
            var n int64
            //保存锁列表记录到DB
            if len(locks[lkey]) > 0 {
                n, err = s.Cols("records").Update(plock{Records: dumpLocks(ls)},
                    &plock{Inode: inode, Sid: m.sid, Owner: owner})
            } else {
                n, err = s.InsertOne(&plock{Inode: inode, Sid: m.sid, Owner: owner, Records: dumpLocks(ls)})
            }
            if err == nil && n == 0 {
                err = fmt.Errorf("insert/update failed")
            }
            return err
        }))
    //如果加锁失败且不进行阻塞,则直接返回结果
        if !block || err != syscall.EAGAIN {
            break
        }
        //加锁失败,阻塞,进入下一轮操作
        if ltype == F_WRLCK {
            time.Sleep(time.Millisecond * 1)
        } else {
            time.Sleep(time.Millisecond * 10)
        }
        if ctx.Canceled() {
            return syscall.EINTR
        }
    }
    return err
}

updateLocks 的代码逻辑如下,通过加上debug输出,更加容易观察其中细节

代码语言:javascript
复制
const (
    F_UNLCK = syscall.F_UNLCK
    F_RDLCK = syscall.F_RDLCK
    F_WRLCK = syscall.F_WRLCK
)

type plockRecord struct {
    ltype uint32
    pid   uint32
    start uint64
    end   uint64
}


func insertLocks(ls []plockRecord, i int, nl plockRecord) []plockRecord {
    //fmt.Println(i,"insertLocks before ls=",ls)
    nls := make([]plockRecord, len(ls)+1)
    copy(nls[:i], ls[:i])
    nls[i] = nl
    copy(nls[i+1:], ls[i:])
    ls = nls
    //fmt.Println(i,"insertLocks after ls=",ls)
    return ls
}

func updateLocks(ls []plockRecord, nl plockRecord) []plockRecord {
    // ls is ordered by l.start without overlap
    var i int
    for i < len(ls) && nl.end > nl.start {
        l := ls[i]
        if l.end < nl.start {
            fmt.Println("新增锁设定的区域超过当前锁范围,查找下一个")
        } else if l.start < nl.start {
            //fmt.Println("l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
            fmt.Println("1. 当前锁包含部分新锁区域,拆分成两个锁,调整当前锁范围从[",ls[i].start,"->",ls[i].end,"]调整为[",ls[i].start,"->",nl.start,"],并在当前位置之后插入新锁 [",nl.start,"->",l.end,"]")
            //fmt.Println("1-> l.start=",l.start," < nl.start=",nl.start,"l.end=",l.end," < nl.end=",nl.end )
            ls = insertLocks(ls, i+1, plockRecord{nl.ltype, nl.pid, nl.start, l.end})
            ls[i].end = nl.start
            i++
            nl.start = l.end
        } else if l.end < nl.end {
            //fmt.Println("2-> l.end < nl.end","nl.start=",nl.start,"ls[i].start=",ls[i].start)
            fmt.Println("2. 当前锁区间属于新锁区间,缩小当前锁范围,从[",ls[i].start,"->",ls[i].end,"]调整为[",nl.start,"->",ls[i].end,"]")
            ls[i].ltype = nl.ltype
            ls[i].start = nl.start
            nl.start = l.end
        } else if l.start < nl.end {
            //fmt.Println("3. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end=",nl.end)
            ls = insertLocks(ls, i, nl)
            fmt.Println("3. 新锁与当前锁有部分内容重叠,需要在当前位置插入新锁=[",nl.start,nl.end,"],并调整下一个锁的起始位置从[",ls[i+1].start,ls[i+1].end,"] -> [",nl.end,ls[i+1].end,"]")
            ls[i+1].start = nl.end
            nl.start = nl.end
        } else {
            fmt.Println("4. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
            fmt.Println("4. 新锁右侧和当前锁没有重叠(l.start>nl.end),仅需要在当前位置插入新锁=[",nl.start,nl.end,"]")
            ls = insertLocks(ls, i, nl)
            nl.start = nl.end
        }
        i++
    }

    if nl.start < nl.end {
        ls = append(ls, nl)
        fmt.Println("5. 仍然有部分尾部内容没有,补充末尾部分的锁内容,补充后=",ls)
    }

    i = 0
    //再次遍历锁列表,进行无效内容删除or区间合并操作。
    for i < len(ls) {
        if ls[i].ltype == F_UNLCK || ls[i].start == ls[i].end {
            // remove empty one
            //fmt.Println("删除锁列表从i=",i,"位置的内容,删除前ls=",ls)
            //fmt.Println("删除锁列表从i=",i,"位置的内容",ls[i:i+1])
            copy(ls[i:], ls[i+1:]) //从i位置开始左移1个单位
            ls = ls[:len(ls)-1] //删除末尾
            fmt.Println("6-1. 删除锁列表从i=",i,"位置的内容,删除后ls=",ls)
        } else {
            if i+1 < len(ls) && ls[i].ltype == ls[i+1].ltype && ls[i].end == ls[i+1].start {
                fmt.Println("6-2. 锁类型相同,且首尾相接,进行区间合并操作",ls)
                // combine continuous range
                ls[i].end = ls[i+1].end
                ls[i+1].start = ls[i+1].end
                //fmt.Println("锁类型相同,且首尾相接,进行区间合并操作2",ls)
            }
            i++
        }
    }
    return ls
}
小结

整个POSIX locks的算法主要是通过遍历已有的锁列表ls(数组结构),并按照一定规则进行新增锁记录的插入(简单理解为滑动窗口查找),其中nl代表窗口滑动范围锁,l代表当前已经有的锁。

-w900

根据代码注释,大概分为4种类型的锁处理。其中lodlock代表已有的锁记录(对应l),Newlock是新增锁的记录(对应nl)

类型1:

-w881

类型2:

-w899

类型3:

-w899

类型4:

-w899

发现问题

发现一个pid同步的bug,当新锁的内容覆盖旧锁时,并未更新对应的pid记录,导致加锁虽然成功,但是锁的pid还是指向旧的pid内容。复现代码如下

代码语言:javascript
复制
package main

import (
    "fmt"
    "syscall"
)

const (
    F_UNLCK = syscall.F_UNLCK
    F_RDLCK = syscall.F_RDLCK
    F_WRLCK = syscall.F_WRLCK
)

type plockRecord struct {
    ltype uint32
    pid   uint32
    start uint64
    end   uint64
}


func insertLocks(ls []plockRecord, i int, nl plockRecord) []plockRecord {
    //fmt.Println(i,"insertLocks before ls=",ls)
    nls := make([]plockRecord, len(ls)+1)
    copy(nls[:i], ls[:i])
    nls[i] = nl
    copy(nls[i+1:], ls[i:])
    ls = nls
    //fmt.Println(i,"insertLocks after ls=",ls)
    return ls
}

func updateLocks(ls []plockRecord, nl plockRecord) []plockRecord {
    // ls is ordered by l.start without overlap
    var i int
    for i < len(ls) && nl.end > nl.start {
        l := ls[i]
        if l.end < nl.start {
            fmt.Println("新增锁设定的区域超过当前锁范围,查找下一个")
        } else if l.start < nl.start {
            //fmt.Println("l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
            fmt.Println("1. 当前锁包含部分新锁区域,拆分成两个锁,调整当前锁范围从[",ls[i].start,"->",ls[i].end,"]调整为[",ls[i].start,"->",nl.start,"],并在当前位置之后插入新锁 [",nl.start,"->",l.end,"]")
            //fmt.Println("1-> l.start=",l.start," < nl.start=",nl.start,"l.end=",l.end," < nl.end=",nl.end )
            ls = insertLocks(ls, i+1, plockRecord{nl.ltype, nl.pid, nl.start, l.end})
            ls[i].end = nl.start
            i++
            nl.start = l.end
        } else if l.end < nl.end {
            //fmt.Println("2-> l.end < nl.end","nl.start=",nl.start,"ls[i].start=",ls[i].start)
            fmt.Println("2. 当前锁区间属于新锁区间,缩小当前锁范围,从[",ls[i].start,"->",ls[i].end,"]调整为[",nl.start,"->",ls[i].end,"]")
            ls[i].ltype = nl.ltype
            ls[i].start = nl.start
            //if ls[i].pid != nl.pid { //patch
            //  ls[i].pid = nl.pid
            //}
            nl.start = l.end
        } else if l.start < nl.end {
            //fmt.Println("3. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end=",nl.end)
            ls = insertLocks(ls, i, nl)
            fmt.Println("3. 新锁与当前锁有部分内容重叠,需要在当前位置插入新锁=[",nl.start,nl.end,"],并调整下一个锁的起始位置从[",ls[i+1].start,ls[i+1].end,"] -> [",nl.end,ls[i+1].end,"]")
            ls[i+1].start = nl.end
            nl.start = nl.end
        } else {
            fmt.Println("4. l.start=",l.start,"l.end=",l.end,"nl.start=",nl.start,"nl.end",nl.end)
            fmt.Println("4. 新锁右侧和当前锁没有重叠(l.start>nl.end),仅需要在当前位置插入新锁=[",nl.start,nl.end,"]")
            ls = insertLocks(ls, i, nl)
            nl.start = nl.end
        }
        i++
    }

    if nl.start < nl.end {
        ls = append(ls, nl)
        fmt.Println("5. 仍然有部分尾部内容没有,补充末尾部分的锁内容,补充后=",ls)
    }

    i = 0
    //再次遍历锁列表,进行无效内容删除or区间合并操作。
    for i < len(ls) {
        if ls[i].ltype == F_UNLCK || ls[i].start == ls[i].end {
            // remove empty one
            //fmt.Println("删除锁列表从i=",i,"位置的内容,删除前ls=",ls)
            //fmt.Println("删除锁列表从i=",i,"位置的内容",ls[i:i+1])
            copy(ls[i:], ls[i+1:]) //从i位置开始左移1个单位
            ls = ls[:len(ls)-1] //删除末尾
            fmt.Println("6-1. 删除锁列表从i=",i,"位置的内容,删除后ls=",ls)
        } else {
            if i+1 < len(ls) && ls[i].ltype == ls[i+1].ltype && ls[i].end == ls[i+1].start {
                fmt.Println("6-2. 锁类型相同,且首尾相接,进行区间合并操作",ls)
                // combine continuous range
                ls[i].end = ls[i+1].end
                ls[i+1].start = ls[i+1].end
                //fmt.Println("锁类型相同,且首尾相接,进行区间合并操作2",ls)
            }
            i++
        }
    }
    return ls
}


func Setlk( ltype uint32, start, end uint64, pid uint32) {

    lock := plockRecord{ltype, pid, start, end}
    //ls := []plockRecord{plockRecord{F_WRLCK, pid, 0, 4},{F_WRLCK, pid, 7, 10},{F_WRLCK, pid, 13, 16}}
    ls := []plockRecord{plockRecord{F_WRLCK, 100, 0, 4},{F_WRLCK, 102, 7, 10}}
    //ls := []plockRecord{plockRecord{F_WRLCK, pid, 1, 4}}
    fmt.Println("before updateLocks=",ls)
    ls = updateLocks(ls, lock)
    fmt.Println("after updateLocks=",ls)
}

func main(){
    Setlk(F_WRLCK,6,13,103) //理论上加锁以后的记录应该对应pid=103,上面的patch已经修复这个问题
}

输出内容如下:

代码语言:javascript
复制
before updateLocks= [{3 100 0 4} {3 102 7 10}]
新增锁设定的区域超过当前锁范围,查找下一个
2. 当前锁区间属于新锁区间,缩小当前锁范围,从[ 7 -> 10 ]调整为[ 6 -> 10 ]
5. 仍然有部分尾部内容没有,补充末尾部分的锁内容,补充后= [{3 100 0 4} {3 102 6 10} {3 103 10 13}]
6-2. 锁类型相同,且首尾相接,进行区间合并操作 [{3 100 0 4} {3 102 6 10} {3 103 10 13}]
6-1. 删除锁列表从i= 2 位置的内容,删除后ls= [{3 100 0 4} {3 102 6 13}]
after updateLocks= [{3 100 0 4} {3 102 6 13}] //理论上这里的pid=102是对应的是旧锁内容,应该被新增加的锁记录pid=103覆盖
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-08-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Ceph对象存储方案 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • JuiceFS锁的实现
    • BSD locks
      • POSIX locks
        • 小结
    • 发现问题
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档