前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >迪奥布兰度正在挑战fgo 小说_god eater resurrection

迪奥布兰度正在挑战fgo 小说_god eater resurrection

作者头像
全栈程序员站长
发布2022-11-10 16:31:11
4290
发布2022-11-10 16:31:11
举报
文章被收录于专栏:全栈程序员必看

godis之aof持久化

文章目录

基本说明

在godis中,只有aof持久化,而没有rdb持久化。aof持久化分为三个基本的模块:

  1. 将命令持久化到aof文件
  2. 将aof文件的命令加载到内存
  3. aof文件重写

文件写入

handlerAof函数的作用是将命令持久化到aof文件中。它监听着aof通道并写入到aof文件,在初始化handler的时候,就开启一个子goroutine来执行这个函数。

代码语言:javascript
复制
// 监听aof通道并写入文件
func (handler *Handler) handleAof()  { 

// 序列化执行
handler.currentDB = 0
// 遍历通道中的命令
for p := range handler.aofChan { 

// 阻止其它协程暂停AOF
handler.pausingAof.RLock()
// 如果管道中的命令不属于当前选择的数据库,还要加上选择数据库的命令
if p.dbIndex != handler.currentDB { 

data := RESP.MakeMultiBulkReply(Utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
// 将选择数据库的指令写入aof文件
_, err := handler.aofFile.Write(data)
if err != nil { 

log.Error(err)
continue
}
handler.currentDB = p.dbIndex
}
// 将命令转化为字节数组
data := RESP.MakeMultiBulkReply(p.cmdLine).ToBytes()
_, err := handler.aofFile.Write(data)
if err != nil { 

log.Error(err)
}
handler.pausingAof.RUnlock()
}
// 将msg发送到主协程
handler.aofFinished <- struct{ 
}{ 
}
}

加载文件

LoadAof函数的作用是将aof文件中的数据加载到内存。函数的执行流程可以分为以下四个步骤:

  1. 删除aofChan以防止再次写入。
  2. 打开aof文件。
  3. 开始读文件。
  4. 创建一个伪客户端,并且遍历读取到的数据,通过伪客户端将命令发送到服务器的内存中。
代码语言:javascript
复制
// LoadAof 将文件中的数据加载到内存
func (handler *Handler) LoadAof(maxBytes int) { 

// 删除aofChan以防再次写入
aofChan := handler.aofChan
handler.aofChan = nil
defer func(aofChan chan *payload) { 

handler.aofChan = aofChan
}(aofChan)
// 打开aof文件
file, err := os.Open(handler.aofFilename)
if err != nil { 

// 如果是路径有错,就直接结束
if _, ok := err.(*os.PathError); ok { 

return
}
log.Error(err)
return
}
defer file.Close()
// 开始读文件
var reader io.Reader
if maxBytes > 0 { 

reader = io.LimitReader(file, int64(maxBytes))
} else { 

reader = file
}
// 从 reader 读取数据并通过通道发送payload
ch := RESP.ParseStream(reader)
// 创建一个伪客户端
fakeConn := &connection.FakeConn{ 
}
for p := range ch { 

if p.Err != nil { 

// 文件读取完毕就退出
if p.Err == io.EOF { 

break
}
log.Error("parse error: " + p.Err.Error())
continue
}
if p.Data == nil { 

log.Error("empty payload")
continue
}
// 这里的p.Data必须是一个字符串list
r, ok := p.Data.(*RESP.MultiBulkReply)
if !ok { 

log.Error("require multi bulk reply")
continue
}
// 使用伪客户端发送命令
ret := handler.db.Exec(fakeConn, r.Args)
// 检查格式是否正确
if ret.ToBytes()[0] == '-' { 

log.Error("exec err", err)
}
}
}

文件重写

因为golang不能fork一个子进程,所以不能像redis中那样使用子进程来执行重写功能,而采用读写一个临时文件来代替。

重写功能被拆分成了三个函数:

  • StartRewrite
    1. 首先暂停aof写入。
    2. 用fsync将缓冲区中的数据落盘。
    3. 获得当前aof文件大小。
    4. 创建临时文件。 函数返回创建的临时文件指针、aof文件大小以及重写开始时aof文件选中的数据库。
  • DoRewrite
    1. 将重写开始前的数据加载到内存。
    2. 将内存中的数据写入临时文件。
  • FinishRewrite
    1. 打开线上的aof文件并seek到重写开始时的位置。
    2. 写入一条select命令,使临时文件选中重写开始时刻线上aof文件选中的数据库。
    3. 对齐数据库以后,把重写过程中产生的数据复制到临时文件中。
    4. 用临时文件替换线上aof文件。
    5. 重新打开线上的aof文件,保证aof文件中的数据库与正在使用的数据库对齐。
代码语言:javascript
复制
// StartRewrite 为重写做准备
// 返回一个RewriteCtx, 包含了一个临时文件指针、文件大小以及当前数据库的编号
func (handler *Handler) StartRewrite() (*RewriteCtx, error) { 

// 暂停aof写入,数据会在aofChan中暂时堆积
handler.pausingAof.Lock()
defer handler.pausingAof.Unlock()
// 调用 fsync 将缓冲区中的数据落盘,防止 aof 文件不完整造成错误
err := handler.aofFile.Sync()
if err != nil { 

log.Error("fsync failed")
return nil, err
}
// 获得当前 aof 文件大小,用于判断哪些数据是 aof 重写过程中产生的
// handleAof 会保证每次写入完整的一条指令
fileInfo, _ := os.Stat(handler.aofFilename)
filesize := fileInfo.Size()
// 创建临时文件
// 系统会自动将*号替换成随机的字符
file, err := ioutil.TempFile("", "*.aof")
if err != nil { 

log.Error("tmp file create failed")
return nil, err
}
return &RewriteCtx{ 

tmpFile:  file,
fileSize: filesize,
dbIdx:    handler.currentDB,   // 重写开始时 aof 文件选中的数据库
}, nil
}
代码语言:javascript
复制
// DoRewrite 实际上重写的是aof文件
// 外部调用请使用 Rewrite 函数
func (handler *Handler) DoRewrite(ctx *RewriteCtx) error { 

tmpFile := ctx.tmpFile
// 将重写开始前的数据加载到内存
tmpAof := handler.newRewriteHandler()
tmpAof.LoadAof(int(ctx.fileSize))
// 将内存中的数据写入临时文件
for i := 0; i < Properties.Databases; i++ { 

// 选择数据库
data := RESP.MakeMultiBulkReply(Utils.ToCmdLine("SELECT", strconv.Itoa(i))).ToBytes()
// 写入文件
_, err := tmpFile.Write(data)
if err != nil { 

return err
}
// dump db
tmpAof.db.ForEach(i, func(key string, entity *database.DataEntity, expiration *time.Time) bool { 

// 序列化为命令
cmd := EntityToCmd(key, entity)
if cmd != nil { 

// 将命令写入到临时文件中
_, _ = tmpFile.Write(cmd.ToBytes())
}
// 如果命令已经过期,那就为命令添加上过期的标签并设置过期时间
if expiration != nil { 

cmd := MakeExpireCmd(key, *expiration)
if cmd != nil { 

_, _ = tmpFile.Write(cmd.ToBytes())
}
}
return true
})
}
return nil
}
代码语言:javascript
复制
// FinishRewrite 进行重写的善后操作
func (handler *Handler) FinishRewrite(ctx *RewriteCtx) { 

// 同样暂停 handleAof 的写入
handler.pausingAof.Lock()
defer handler.pausingAof.Unlock()
// 打开线上 aof 文件并 seek 到重写开始的位置
tmpFile := ctx.tmpFile
src, err := os.Open(handler.aofFilename)
if err != nil { 

log.Error("open aofFilename failed:" + err.Error())
return
}
defer func() { 

_ = src.Close()
}()
_, err = src.Seek(ctx.fileSize, 0)
if err != nil { 

log.Error("seek failed: " + err.Error())
return
}
// 写入一条 Select 命令,使 tmpAof 选中重写开始时刻线上 aof 文件选中的数据库
data := RESP.MakeMultiBulkReply(Utils.ToCmdLine("SELECT", strconv.Itoa(ctx.dbIdx))).ToBytes()
_, err = tmpFile.Write(data)
if err != nil { 

log.Error("tmp file rewrite failed: " + err.Error())
return
}
// 对齐数据库后就可以把重写过程中产生的数据复制到 tmpAof 文件了
_, err = io.Copy(tmpFile, src)
if err != nil { 

log.Error("copy aof filed failed: " + err.Error())
return
}
// 使用 mv 命令用 tmpAof 代替线上 aof 文件
_ = handler.aofFile.Close()
_ = os.Rename(tmpFile.Name(), handler.aofFilename)
// 重新打开线上 aof
aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil { 

panic(err)
}
handler.aofFile = aofFile
// reset selected db 重新写入一次 select 指令保证 aof 中的数据库与 handler.currentDB 一致
data = RESP.MakeMultiBulkReply(Utils.ToCmdLine("SELECT", strconv.Itoa(handler.currentDB))).ToBytes()
_, err = handler.aofFile.Write(data)
if err != nil { 

panic(err)
}
}

数据转化为redis命令

在执行重写的时候需要将持久化的数据实体序列化为redis命令。

EntityToCmd函数的功能就是序列化数据。

代码语言:javascript
复制
// EntityToCmd 将数据实体序列化为 redis 命令
// DataEntity 存储绑定到一个键的数据,包括字符串、列表、哈希、集合等
func EntityToCmd(key string, entity *database.DataEntity) *RESP.MultiBulkReply { 

if entity == nil { 

return nil
}
var cmd *RESP.MultiBulkReply
// 用类型断言来判断键的类型,然后序列化为对应的命令并返回
switch val := entity.Data.(type) { 

case []byte:
cmd = stringToCmd(key, val)
case *list.LinkedList:
cmd = listToCmd(key, val)
case *set.Set:
cmd = setToCmd(key, val)
case dict.Dict:
cmd = hashToCmd(key, val)
case *sortedset.SortedSet:
cmd = zSetToCmd(key, val)
}
return cmd
}

外部调用

外部调用AddAof函数,将需要持久化的命令存入aofChan中。

代码语言:javascript
复制
// AddAof 通过通道向 aof goroutine 发送命令
func (handler *Handler) AddAof(dbIndex int, cmdLine CmdLine)  { 

// 如果持久化选项开启 且 aof通道被初始化
if Properties.AppendOnly == true && handler.aofChan != nil { 

handler.aofChan <- &payload{ 

cmdLine: cmdLine,
dbIndex: dbIndex,
}
}
}

完整代码

https://github.com/Gotosp/gotosp/tree/feature/aof/pkg/aof

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/184531.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022年10月8日 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • godis之aof持久化
    • 文章目录
      • 基本说明
        • 文件写入
          • 加载文件
            • 文件重写
              • 数据转化为redis命令
                • 外部调用
                  • 完整代码
                  相关产品与服务
                  云数据库 Redis®
                  腾讯云数据库 Redis®(TencentDB for Redis®)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档