前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >6.824 raft Lab 3 kvRaft

6.824 raft Lab 3 kvRaft

原创
作者头像
冰寒火
修改2022-10-17 10:06:11
7680
修改2022-10-17 10:06:11
举报
文章被收录于专栏:软件设计软件设计

一、背景

前面实现了raft协议,本文实现一个单机键-值数据库,并通过raft建立主从架构,使得能够容错,但是没有分片。

1 基本功能

这个键-值数据库需要实现以下几点功能:

server端:

  1. 能够支持Get、Put、Append三种操作。
  2. 实现一个状态机存放数据。
  3. 支持客户端重试,实现幂等性。
  4. 实现线性读。
  5. 实现快照压缩数据。

client端:

  1. 能够定位到Leader,并且支持重试。
  2. 单个client实例串行调用server端。

2 测试结果

Lab 3A 测试结果
Lab 3A 测试结果
Lab 3B 测试结果
Lab 3B 测试结果

二、交互流程和详细设计

交互图
交互图

client端:

  1. 客户端需要负载均衡并记录上一次server id。
  2. 将读请求按照写请求的形式执行,实现线性读。

server端:

  1. 写请求处理过程是异步的,需要向raft写入日志,然后异步等待日志提交到状态机。 a. 先记录上下文,然后通过raft.Start写入日志,最后阻塞在waitCh上等待唤醒。
  2. 日志应用协程会不断的从applyCh中读取已提交的日志,此时需要考虑幂等性,相同的请求不要处理两次。 a. 添加去重表,如果日志的Request Id小于等于该client的lastRequestId,表示已经执行过,则过滤掉,达到幂等性效果,否则就写入到状态机,并更新lastRequestId。然后唤醒写协程,让其响应client。
  3. 根据日志数据量触发主动压缩日志。 a. 需要考虑持久化的字段,包括状态机、压缩点、去重表。去重表是:向状态机应用已经提交的日志时过滤已经执行过的请求。如果server宕掉了,就会先读快照,然后重新向状态机apply已经提交的日志进行,此时还是需要去重,所以去重表需要持久化。

三、代码实现

1 client实现

1.1 定义

代码语言:go
复制
var clientGerarator int32
//UniqueRequestId=clientId<<32+nextRequestId
type Clerk struct {
	mu              sync.Mutex
	servers         []*labrpc.ClientEnd
	lastRpcServerId int //上次请求的server
	clientId        int //唯一
	nextRequestId   uint64 //递增
}

func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
	ck := new(Clerk)
	ck.servers = servers
	ck.mu = sync.Mutex{}
	ck.lastRpcServerId = 0
	ck.clientId = int(atomic.AddInt32(&clientGerarator, 1))
	ck.nextRequestId = 0
	return ck
}

func (ck *Clerk) currentRpcServerId() int {
	//ck.mu.Lock()
	//defer ck.mu.Unlock()
	return ck.lastRpcServerId
}
func (ck *Clerk) setRpcServerId(rpcServerId int) {
	//ck.mu.Lock()
	//defer ck.mu.Unlock()
	ck.lastRpcServerId = rpcServerId
	ck.lastRpcServerId %= len(ck.servers)
}

1.2 实现

代码语言:go
复制
func (ck *Clerk) Get(key string) string {

	start := time.Now()
	defer func() {
		DPrintf("client Get cost: %v", time.Now().Sub(start).Milliseconds())
	}()
	ck.mu.Lock()
	defer ck.mu.Unlock()
	args := &GetArgs{
		ClientId:  ck.clientId,
		RequestId: atomic.AddUint64(&ck.nextRequestId, 1), //
		Key:       key,
	}
	rpcServerId := ck.currentRpcServerId()

	for {
		reply := &GetReply{}
		ok := ck.servers[rpcServerId].Call("KVServer.Get", args, reply)
		//DPrintf("client Get, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
		if !ok {
			rpcServerId++
			rpcServerId %= len(ck.servers)
		} else if reply.Err == OK {
			ck.setRpcServerId(rpcServerId) //记录调用成功的server,便于下次调用
			return reply.Value
		} else {
			rpcServerId++
			rpcServerId %= len(ck.servers)
		}
		time.Sleep(time.Millisecond * 1)
	}
}

func (ck *Clerk) PutAppend(key string, value string, op string) {

	start := time.Now()

	ck.mu.Lock()
	defer ck.mu.Unlock()
	args := &PutAppendArgs{
		ClientId:  ck.clientId,
		RequestId: atomic.AddUint64(&ck.nextRequestId, 1),
		Key:       key,
		Value:     value,
		Op:        op,
	}
	defer func() {
		DPrintf("client Put cost: %v,op: %v, key: %v, value: %v, args: %v", time.Now().Sub(start).Milliseconds(), op, key, value, mr.Any2String(args))
	}()
	rpcServerId := ck.currentRpcServerId()

	for {
		reply := &PutAppendReply{}
		ok := ck.servers[rpcServerId].Call("KVServer.PutAppend", args, reply)
		//DPrintf("client Put, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
		if !ok {
			DPrintf("client Put, ok: %v, rpcServerId: %d, args: %v, reply: %v", ok, rpcServerId, mr.Any2String(args), mr.Any2String(reply))
			rpcServerId++
			rpcServerId %= len(ck.servers)
		} else if reply.Err == OK {
			ck.setRpcServerId(rpcServerId)
			DPrintf("client Put, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
			return
		} else {
			DPrintf("client Put, rpcServerId: %d, args: %v, reply: %v", rpcServerId, mr.Any2String(args), mr.Any2String(reply))
			rpcServerId++
			rpcServerId %= len(ck.servers)
		}
		time.Sleep(time.Millisecond * 1)
	}
}

func (ck *Clerk) Put(key string, value string) {
	ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
	ck.PutAppend(key, value, "Append")
}

客户端实现比较简单,两者都不断的重试,直至请求成功。

2 server实现

2.1 定义

代码语言:go
复制
type OpType string

const (
	OpTypeGet    = "Get"
	OpTypePut    = "Put"
	OpTypeAppend = "Append"
)

type Op struct {
	ClientId       int
	RequestId      uint64
	OpType         OpType
	Key            string
	Value          string
	StartTimestamp int64
}

type OpContext struct {
	ClientId        int
	RequestId       uint64
	UniqueRequestId uint64 //两者结合才是唯一ID
	Op              *Op
	Term            int
	WaitCh          chan string //用于实现写协程和日志应用循环协程的交互
}

func NewOpContext(op *Op, term int) *OpContext {
	return &OpContext{
		ClientId:        op.ClientId,
		RequestId:       op.RequestId,
		UniqueRequestId: UniqueRequestId(op.ClientId, op.RequestId),
		Op:              op,
		Term:            term,
		WaitCh:          make(chan string, 1), //缓冲区1是防止阻塞日志应用协程
	}
}

type KVServer struct {
	mu      sync.Mutex
	me      int
	rf      *raft.Raft
	applyCh chan raft.ApplyMsg
	dead    int32 // set by Kill()

	maxraftstate int // snapshot if log grows this big

	// Your definitions here.
	kvStore          map[string]string     //k-v对,状态机
	opContextMap     map[uint64]*OpContext //用于每个请求的上下文
	lastRequestIdMap map[int]uint64        //clientId-->lastRequestId,维持幂等性,需要客户端能够保证串行
}

2.2 PutAppend实现

代码语言:go
复制
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {

	op := &Op{
		ClientId:       args.ClientId,
		RequestId:      args.RequestId,
		OpType:         OpType(args.Op),
		Key:            args.Key,
		Value:          args.Value,
		StartTimestamp: time.Now().UnixMilli(),
	}
	term := 0
	isLeader := false
	reply.Err = ErrWrongLeader
	if term, isLeader = kv.rf.GetState(); !isLeader {
		return
	}

	start := time.Now()
	defer func() {
		DPrintf("server PutAppend cost: %v, requestId: %d, node: %d, leaderId: %d", time.Now().Sub(start).Milliseconds(), op.RequestId, kv.me, kv.rf.LeaderId())
	}()
	kv.mu.Lock()
	//可能存在前一次请求超时,但是这个请求实际上执行成功了,那么就直接return掉
	if lastRequestId, ok := kv.lastRequestIdMap[op.ClientId]; ok && lastRequestId >= op.RequestId {
		reply.Err = OK
		kv.mu.Unlock()
		return
	}
	opContext := NewOpContext(op, term)
	kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)] = opContext
	kv.mu.Unlock()
	_, _, ok := kv.rf.Start(*op)
	defer func() {
		//DPrintf("server PutAppend, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
		kv.mu.Lock()
		delete(kv.opContextMap, UniqueRequestId(op.ClientId, op.RequestId))
		kv.mu.Unlock()
	}()
	if !ok {
		return
	}
	//阻塞等待
	select {
	case <-opContext.WaitCh:
		reply.Err = OK
	case <-time.After(time.Millisecond * 1000):
		reply.Err = ErrTimeout
	}
}

请求执行前需要判断是否已经执行过了,如果已经执行,就直接返回OK。如果没执行过,则通过raft同步到其他节点上,然后阻塞在waitCh上等待该条日志提交,需要有超时机制,不要一直阻塞。最后执行结束时需要删除掉上下文,要不然会有内存泄漏。

2.3 Get

代码语言:go
复制
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
	op := &Op{
		ClientId:       args.ClientId,
		RequestId:      args.RequestId,
		OpType:         OpTypeGet,
		Key:            args.Key,
		StartTimestamp: time.Now().UnixMilli(),
	}
	//Append不能先append然后将日志传给raft
	term := 0
	isLeader := false
	if term, isLeader = kv.rf.GetState(); !isLeader {
		reply.Err = ErrWrongLeader
		return
	}
	start := time.Now()
	defer func() {
		DPrintf("server Get cost: %v, node: %v, leaderId: %d", time.Now().Sub(start).Milliseconds(), kv.me, kv.rf.LeaderId())
	}()
	kv.mu.Lock()
	opContext := NewOpContext(op, term)
	kv.opContextMap[opContext.UniqueRequestId] = opContext
	kv.mu.Unlock()
	_, _, ok := kv.rf.Start(*op)

	defer func() {
		//DPrintf("server Get, args: %v, reply: %v", mr.Any2String(args), mr.Any2String(reply))
		kv.mu.Lock()
		delete(kv.opContextMap, opContext.UniqueRequestId)
		kv.mu.Unlock()
	}()
	if !ok {
		reply.Err = ErrWrongLeader
		return
	}
	//阻塞等待
	select {
	case c := <-opContext.WaitCh:
		reply.Err = OK
		reply.Value = c
	case <-time.After(time.Millisecond * 1000):
		reply.Err = ErrTimeout
	}
}

为了实现线性读,将读请求当成写请求执行,流程大体和写请求一样,但不需要幂等性逻辑。

2.4 日志应用循环

代码语言:go
复制
//串行写状态机
func (kv *KVServer) applyStateMachineLoop() {

	for !kv.killed() {

		select {
		case applyMsg := <-kv.applyCh:
			if applyMsg.CommandValid {
				func() {
					kv.mu.Lock()
					defer kv.mu.Unlock()
					op := applyMsg.Command.(Op)
					//保证幂等性
					if op.RequestId <= kv.lastRequestIdMap[op.ClientId] {
						return
					}
					switch op.OpType {
					case OpTypePut:
						kv.kvStore[op.Key] = op.Value
						kv.lastRequestIdMap[op.ClientId] = op.RequestId
					case OpTypeAppend:
						kv.kvStore[op.Key] += op.Value
						kv.lastRequestIdMap[op.ClientId] = op.RequestId
					case OpTypeGet:
						//Get请求不需要更新lastRequestId
					}
					DPrintf("op: %v, value: %v, node: %v cost: %v,requestId: %v, stateMachine: %v", mr.Any2String(op), kv.kvStore[op.Key], kv.me, time.Now().UnixMilli()-op.StartTimestamp, op.RequestId, mr.Any2String(kv.kvStore))
					val := kv.kvStore[op.Key]
					//使得写入的client能够响应
					if c, ok := kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)]; ok {
						c.WaitCh <- val
					}
				}()
			}
		}
	}
}

另起协程不断的从applyCh中读取已提交的日志。首先,幂等性逻辑,如果op.RequestId <= kv.lastRequestIdMapop.ClientId,则表示该请求已经执行过,不能再次执行。然后更新状态机,最后唤醒写协程。

3 snapshot

在Lab 2D中已经讲解过raft如何进行压缩、同步snapshot等,而在本实验主要考虑:

  1. 应用层主动压缩snapshot。
  2. apply从leader拉取到的snapshot。

3.1 压缩字段

上文讲过需要保存压缩点、状态机、去重表。

代码语言:go
复制
func (kv *KVServer) maybeSnapshot(index int) {
	if kv.maxraftstate == -1 {
		return
	}
	if kv.persister.RaftStateSize() > kv.maxraftstate {
		DPrintf("maybeSnapshot starting, index: %v", index)
		kv.rf.Snapshot(index, kv.encodeSnapshot(index))
	}
}

//上层加锁
func (kv *KVServer) encodeSnapshot(lastIncludedIndex int) []byte {
	
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	e.Encode(kv.kvStore)
	e.Encode(lastIncludedIndex)
	e.Encode(kv.lastRequestIdMap) //持久化每个client的最大已执行过的写请求
	return w.Bytes()
}

//上层加锁
func (kv *KVServer) decodeSnapshot(snapshot []byte) bool {

	if len(snapshot) == 0 {
		return true
	}
	r := bytes.NewBuffer(snapshot)
	d := labgob.NewDecoder(r)

	if err := d.Decode(&kv.kvStore); err != nil {
		return false
	}
	if err := d.Decode(&kv.lastIncludedIndex); err != nil {
		return false
	}
	//持久化每个client的最大已执行过的写请求
	if err := d.Decode(&kv.lastRequestIdMap); err != nil {
		return false
	}
	return true
}

3.2 apply快照

代码语言:go
复制
//串行写状态机
func (kv *KVServer) applyStateMachineLoop() {

	for !kv.killed() {

		select {
		case applyMsg := <-kv.applyCh:
			if applyMsg.CommandValid {
				//...省略
			} else if applyMsg.SnapshotValid {
				func() {
					kv.mu.Lock()
					defer kv.mu.Unlock()
					//将snapshot apply到状态机
					if kv.decodeSnapshot(applyMsg.Snapshot) {
						//截断日志
						kv.rf.CondInstallSnapshot(applyMsg.SnapshotTerm, applyMsg.SnapshotIndex, applyMsg.Snapshot)
					}
				}()
			}
		}
		DPrintf("snapshot size: %v, stateMachine: %v", kv.persister.SnapshotSize(), mr.Any2String(kv.kvStore))
	}
}

3.3 主动触发压缩

在将日志apply到状态机时根据日志数据量决定是否日志压缩。

代码语言:go
复制
func (kv *KVServer) applyStateMachineLoop() {

	for !kv.killed() {

		select {
		case applyMsg := <-kv.applyCh:
			if applyMsg.CommandValid {
				func() {
					kv.mu.Lock()
					defer kv.mu.Unlock()
					op := applyMsg.Command.(Op)
					//保证幂等性
					if op.RequestId <= kv.lastRequestIdMap[op.ClientId] {
						return
					}
					//过滤掉snapshot前的日志
					if applyMsg.CommandIndex <= kv.lastIncludedIndex && op.OpType != OpTypeGet {
						if c, ok := kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)]; ok {
							c.WaitCh <- "0"
						}
						return
					}
					switch op.OpType {
					case OpTypePut:
						kv.kvStore[op.Key] = op.Value
						kv.lastRequestIdMap[op.ClientId] = op.RequestId
						//尝试触发日志压缩
						kv.maybeSnapshot(applyMsg.CommandIndex)
					case OpTypeAppend:
						kv.kvStore[op.Key] += op.Value
						kv.lastRequestIdMap[op.ClientId] = op.RequestId
						//尝试触发日志压缩
						kv.maybeSnapshot(applyMsg.CommandIndex)
					case OpTypeGet:
						//Get请求不需要更新lastRequestId
					}
					DPrintf("op: %v, value: %v, node: %v cost: %v,requestId: %v, stateMachine: %v", mr.Any2String(op), kv.kvStore[op.Key], kv.me, time.Now().UnixMilli()-op.StartTimestamp, op.RequestId, mr.Any2String(kv.kvStore))
					val := kv.kvStore[op.Key]
					//使得写入的client能够响应
					if c, ok := kv.opContextMap[UniqueRequestId(op.ClientId, op.RequestId)]; ok {
						c.WaitCh <- val
					}
				}()
			} else if applyMsg.SnapshotValid {
				func() {
					kv.mu.Lock()
					defer kv.mu.Unlock()
					if kv.decodeSnapshot(applyMsg.Snapshot) {
						kv.rf.CondInstallSnapshot(applyMsg.SnapshotTerm, applyMsg.SnapshotIndex, applyMsg.Snapshot)
					}
				}()
			}
		}
		DPrintf("snapshot size: %v, stateMachine: %v", kv.persister.SnapshotSize(), mr.Any2String(kv.kvStore))
	}
}

四、优化和小结

1 优化

1.1 线性读

线性读方案有很多,本文以写请求形式处理读请求,简单但性能不是很好。本文推荐另外一种方案的实现,读Follower,并且从Leader复制此时已经提交过的日志,性能比前者要好一些,但多了一次与Leader的交互,该方案可以留到日后优化。

1.2 性能优化

每个请求的处理时间比预期高一些,后续打算采用成组提交机制来批量处理写操作。

2 小结

这个实验在6.824 Lab2D raft上实现一个single group的键值数据库,支持Get、Put、Append三种操作,能够保证客户端幂等性和线性读。本实验通过记录上下文和每个client的requestId来保证幂等性,以写请求的逻辑处理读请求来实现线性读。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、背景
    • 1 基本功能
      • 2 测试结果
      • 二、交互流程和详细设计
      • 三、代码实现
        • 1 client实现
          • 1.1 定义
          • 1.2 实现
        • 2 server实现
          • 2.1 定义
          • 2.2 PutAppend实现
          • 2.3 Get
          • 2.4 日志应用循环
        • 3 snapshot
          • 3.1 压缩字段
          • 3.2 apply快照
          • 3.3 主动触发压缩
      • 四、优化和小结
        • 1 优化
          • 1.1 线性读
          • 1.2 性能优化
        • 2 小结
        相关产品与服务
        数据库
        云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档