前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >tidwall之raft-wal源码分析

tidwall之raft-wal源码分析

作者头像
jaydenwen123
发布2021-07-19 16:02:06
4400
发布2021-07-19 16:02:06
举报
文章被收录于专栏:后台通用技术后台通用技术

1. tidwall/wal模块简要介绍

在分析tidwall/raft-wal的源码前,我们先介绍一下tidwall/wal模块,raft-wal内部是采用tidwall/wal来实现的一套持久化方案

具体关于tidwall/wal的详细分析可以参考之前的这篇文章

2.tidwall/raft-wal的持久化数据存储结构

3.源码分析

所谓的持久化,无非就是几个接口:

  • 初始化
  • 写入
  • 读取
  • 删除

下面进行一一分析

3.1 核心数据结构介绍

代码语言:javascript
复制
// LogStore is a write ahead Raft log
type LogStore struct {
    mu    sync.Mutex
    // wal的日志对象
    log   *wal.Log
    buf   []byte
    batch wal.Batch
}
代码语言:javascript
复制
// Log entries are replicated to all members of the Raft cluster
// and form the heart of the replicated state machine.
type Log struct {
    // Index holds the index of the log entry.
    Index uint64

    // Term holds the election term of the log entry.
    Term uint64

    // Type holds the type of the log entry.
    Type LogType

    // Data holds the log entry's type-specific data.
    Data []byte

    // Extensions holds an opaque byte slice of information for middleware. It
    // is up to the client of the library to properly modify this as it adds
    // layers and remove those layers when appropriate. This value is a part of
    // the log, so very large values could cause timing issues.
    //
    // N.B. It is _up to the client_ to handle upgrade paths. For instance if
    // using this with go-raftchunking, the client should ensure that all Raft
    // peers are using a version that can handle that extension before ever
    // actually triggering chunking behavior. It is sometimes sufficient to
    // ensure that non-leaders are upgraded first, then the current leader is
    // upgraded, but a leader changeover during this process could lead to
    // trouble, so gating extension behavior via some flag in the client
    // program is also a good idea.
    Extensions []byte
}

3.2 Read、Write、DeleteRange源码分析

3.2.1 初始化、基础接口
代码语言:javascript
复制
var _ raft.LogStore = &LogStore{}

// Options for Open
type Options struct {
    // NoSync disables fsync after writes. This is less durable and puts the
    // log at risk of data loss when there's a server crash. Default false.
    NoSync bool
}

// Open the Raft log
func Open(path string, opts *Options) (*LogStore, error) {
    s := new(LogStore)
    // 默认选项
    wopts := *wal.DefaultOptions
    if opts != nil {
        wopts.NoSync = opts.NoSync
    }
    // opts.LogFormat = wal.JSON
    var err error
    s.log, err = wal.Open(path, &wopts)
    if err != nil {
        return nil, err
    }
    return s, nil
}

// Close the Raft log
func (s *LogStore) Close() error {
    s.mu.Lock()
    defer s.mu.Unlock()
    return s.log.Close()
}

// FirstIndex returns the first known index from the Raft log.
func (s *LogStore) FirstIndex() (uint64, error) {
    s.mu.Lock()
    defer s.mu.Unlock()
    return s.log.FirstIndex()
}

// LastIndex returns the last known index from the Raft log.
func (s *LogStore) LastIndex() (uint64, error) {
    s.mu.Lock()
    defer s.mu.Unlock()
    return s.log.LastIndex()
}
3.2.2 Read读取日志接口
代码语言:javascript
复制
// GetLog is used to retrieve a log from FastLogDB at a given index.
// 根据index读取日志
func (s *LogStore) GetLog(index uint64, log *raft.Log) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    data, err := s.log.Read(index)
    if err != nil {
        if err == wal.ErrNotFound {
            return raft.ErrLogNotFound
        }
        return err
    }
    // 设置index
    log.Index = index
    if len(data) == 0 {
        return wal.ErrCorrupt
    }
    // 日志读取解析的格式:Type+Term+len(Data)+Data+len(Ext)+Ext
    // 读取1个字节的type
    log.Type = raft.LogType(data[0])
    // 偏移data
    data = data[1:]
    var n int
    // 读取varint term
    log.Term, n = binary.Uvarint(data)
    if n <= 0 {
        return wal.ErrCorrupt
    }
    // 偏移data
    data = data[n:]
    // 读取数据长度
    size, n := binary.Uvarint(data)
    if n <= 0 {
        return wal.ErrCorrupt
    }

    // 偏移数据
    data = data[n:]
    if uint64(len(data)) < size {
        return wal.ErrCorrupt
    }
    // 读取数据
    log.Data = data[:size]
    // 偏移数据
    data = data[size:]
    // 读取扩展数据长度
    size, n = binary.Uvarint(data)
    if n <= 0 {
        return wal.ErrCorrupt
    }
    // 偏移数据
    data = data[n:]
    if uint64(len(data)) < size {
        return wal.ErrCorrupt
    }
    // 读取偏移数据内容
    log.Extensions = data[:size]
    // 偏移数据
    data = data[size:]
    if len(data) > 0 {
        return wal.ErrCorrupt
    }
    return nil
}

func appendUvarint(dst []byte, x uint64) []byte {
    var buf [10]byte
    n := binary.PutUvarint(buf[:], x)
    dst = append(dst, buf[:n]...)
    return dst
}
3.2.3 Write写入日志接口
代码语言:javascript
复制
// StoreLog is used to store a single raft log
func (s *LogStore) StoreLog(log *raft.Log) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.buf = s.buf[:0]
    s.buf = appendLog(s.buf, log)
    return s.log.Write(log.Index, s.buf)
}

// StoreLogs is used to store a set of raft logs
func (s *LogStore) StoreLogs(logs []*raft.Log) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.batch.Clear()
    for _, log := range logs {
        s.buf = s.buf[:0]
        s.buf = appendLog(s.buf, log)
        s.batch.Write(log.Index, s.buf)
    }
    return s.log.WriteBatch(&s.batch)
}
// 日志写入的格式:Type+Term+len(Data)+Data+len(Ext)+Ext
func appendLog(dst []byte, log *raft.Log) []byte {
    dst = append(dst, byte(log.Type))
    dst = appendUvarint(dst, log.Term)
    dst = appendUvarint(dst, uint64(len(log.Data)))
    dst = append(dst, log.Data...)
    dst = appendUvarint(dst, uint64(len(log.Extensions)))
    dst = append(dst, log.Extensions...)
    return dst
}
3.2.4 DeleteRange删除日志接口
代码语言:javascript
复制
// DeleteRange is used to delete logs within a given range inclusively.
func (s *LogStore) DeleteRange(min, max uint64) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    first, err := s.log.FirstIndex()
    if err != nil {
        return err
    }
    last, err := s.log.LastIndex()
    if err != nil {
        return err
    }
    if min == first {
        // 删除包括max在内的之前的数据
        if err := s.log.TruncateFront(max + 1); err != nil {
            return err
        }
    } else if max == last {
        // 删除包括min在内之后的数据
        if err := s.log.TruncateBack(min - 1); err != nil {
            return err
        }
    } else {
        return wal.ErrOutOfRange
    }
    return nil
}

// Sync performs an fsync on the log. This is not necessary when the
// durability is set to High.
func (s *LogStore) Sync() {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.log.Sync()
}

4.相关资料

  • tidwall/wal库github地址
  • tidwall/raft-wal库github地址
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-03-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 半路出家的后台技术人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. tidwall/wal模块简要介绍
  • 2.tidwall/raft-wal的持久化数据存储结构
  • 3.源码分析
    • 3.1 核心数据结构介绍
      • 3.2 Read、Write、DeleteRange源码分析
        • 3.2.1 初始化、基础接口
        • 3.2.2 Read读取日志接口
        • 3.2.3 Write写入日志接口
        • 3.2.4 DeleteRange删除日志接口
    • 4.相关资料
    相关产品与服务
    数据保险箱
    数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档