在上篇文章中,我们讲解了 Raft Propose 的 Commit 和 Apply 情景分析,相信大家对 TiKV 的 Raft 写流程有了大概了解。这篇文章将尝试向大家较为完整的介绍下 TiKV 中的 Raft 读流程的实现,特别是 read index 和 lease read(或称 local read)。关于 read index 和 lease read 的介绍和理论基础,请大家参阅 TiKV 功能介绍 - Lease Read 或者 Raft 论文第 6.4 节,不在这里赘述。
如何发起 Raft 读请求?
TiKV 的实现是分层的,不同模块负责不同事情,下图直观地介绍了 TiKV 的模块的层级关系。
TiKV 中所有 Raft 相关的逻辑都在 Raftstore 模块,如何发起 Raft 读请求就是说如何通过 Raftstore 发起读请求。Raftstore 对外(TXN/MVCC)提供接口叫做 RaftStoreRouter
,它提供了多方s法,但能供外面发起读写请求的只有一个,叫做 send_command
。
/// Routes messages to the raftstore.
pub trait RaftStoreRouter<E>: Send + Clone
where
E: KvEngine,
{
/// Sends RaftCmdRequest to local store.
fn send_command(&self, req: RaftCmdRequest, cb: Callback<E>) -> RaftStoreResult<()>;
// Other methods are elided.
}
所有的读写请求统一使用这个方法发起。当操作完成后,不管成功与否,都调用 cb: Callbck<E>
,并将回复传入。
这篇文章接下来的部分将围绕图中黄色部分展开。
读请求有哪些?
既然这么问,肯定意味着 TiKV 中有多个不同类型的读请求。这就需要了解下 RaftCmdRequest
的构成了。TiKV 对外的请求都是 Protocol buffer message,RaftCmdRequest
定义在 kvproto/raft_cmd.proto,它包含了所有 TiKV 支持的读写请求。
message Request {
CmdType cmd_type = 1;
GetRequest get = 2;
PutRequest put = 4;
DeleteRequest delete = 5;
SnapRequest snap = 6;
PrewriteRequest prewrite = 7;
DeleteRangeRequest delete_range = 8;
IngestSSTRequest ingest_sst = 9;
ReadIndexRequest read_index = 10;
}
上面代码中加粗的就是 TiKV 目前支持的几种读请求。
注意:不要把 ReadIndexRequst 和 Read Index 搞混。ReadIndexRequest 是一种读的请求,ReadIndex 是一种处理读请求的方式。
Raft 如何处理读请求?
我们以日常使用中最常见的 SnapRequest 为例,说一下 Read Index 和 Local read 的流程。
在 TXN/MVCC 层通过 send_command
发起一个读请求后,Raftstore 中对应的 PeerFsm (就是一个 Raft 状态机)会在 PeerFsm::handld_msgs
中收到该请求。
PeerFsm::propose_raft_command
fn propose_raft_command(&mut self, mut msg: RaftCmdRequest, cb: Callback<RocksEngine>) {
// Irrelevant code is elided.
match self.pre_propose_raft_command(&msg) {
Ok(Some(resp)) => {
cb.invoke_with_response(resp);
return;
}
Err(e) => {
cb.invoke_with_response(new_error(e));
return;
}
_ => (),
}
let mut resp = RaftCmdResponse::default();
if self.fsm.peer.propose(self.ctx, cb, msg, resp) {
self.fsm.has_ready = true;
}
}
PeerFsm 在会将该请求传入 PeerFsm::propose_raft_command
做进一步处理。为了突出重点,无关代码已被删去。
Peer::propose
pub fn propose<T: Transport, C>(
&mut self,
ctx: &mut PollContext<T, C>,
cb: Callback<RocksEngine>,
req: RaftCmdRequest,
mut err_resp: RaftCmdResponse,
) -> bool {
// Irrelevant code is elided.
let policy = self.inspect(&req);
let res = match policy {
Ok(RequestPolicy::ReadLocal) => {
self.read_local(ctx, req, cb);
return false;
}
Ok(RequestPolicy::ReadIndex) => return self.read_index(ctx, req, err_resp, cb),
Ok(RequestPolicy::ProposeTransferLeader)
| Ok(RequestPolicy::ProposeConfChange)
| Ok(RequestPolicy::ProposeNormal) => {
// Irrelevant code is elided.
}
Err(e) => Err(e),
};
}
由于 RaftCmdRequest 可能包含了多种请求,加上请求间的处理方式各有不同,所以我们需要判断下该如何处理。
Peer::inspect
fn inspect(&mut self, req: &RaftCmdRequest) -> Result<RequestPolicy> {
// Irrelevant code is elided.
if req.get_header().get_read_quorum() {
return Ok(RequestPolicy::ReadIndex);
}
if !self.has_applied_to_current_term() {
return Ok(RequestPolicy::ReadIndex);
}
match self.inspect_lease() {
LeaseState::Valid => Ok(RequestPolicy::ReadLocal),
LeaseState::Expired | LeaseState::Suspect => {
Ok(RequestPolicy::ReadIndex)
}
}
}
fn inspect_lease(&mut self) -> LeaseState {
if !self.raft_group.raft.in_lease() {
return LeaseState::Suspect;
}
// None means now.
let state = self.leader_lease.inspect(None);
if LeaseState::Expired == state {
self.leader_lease.expire();
}
state
}
inspect 方法也不复杂,我们住逐行看一下:
这判断总的来说就是,如果不确定能安全地读 RocksDB 就用 read index,否则大胆地使用 local read 处理。
多线程 local read
细心的读者可能已经发现,是否能 local read 关键在 leader 是否在 lease 内,而判断 lease 其实是不用经过 Raft 状态机的,所以我们能不能扩展下 lease,让它能在多线程间共享,特别是在 TXN/MVCC 层,这样读请求就能绕过 Raft 直接执行了。答案是可以的,而且 TiKV 已经实现了。话不多说,直接看代码。
impl<E> RaftStoreRouter<E> for ServerRaftStoreRouter<E> where E: KvEngine
{
fn send_command(&self, req: RaftCmdRequest, cb: Callback<E>) -> RaftStoreResult<()> {
let cmd = RaftCommand::new(req, cb);
if LocalReader::<RaftRouter<E>, E>::acceptable(&cmd.request) {
self.local_reader.execute_raft_command(cmd);
Ok(())
} else {
let region_id = cmd.request.get_header().get_region_id();
self.router
.send_raft_command(cmd)
.map_err(|e| handle_send_error(region_id, e))
}
}
}
这个实现的有些取巧,我们直接把它做到 raftstore 的入口处,也就是 RaftStoreRouter 中。这里的 LocalReader 其实就是一个 cache,缓存了现有 leader 处理读请求时的一些状态。
LocalReader::execute_raft_command
pub fn execute_raft_command(&self, cmd: RaftCommand<E>) {
// Irrelevant code is elided.
let region_id = cmd.request.get_header().get_region_id();
let mut executor = ReadExecutor::new(
self.kv_engine.clone(),
false, /* dont check region epoch */
true, /* we need snapshot time */
);
match self.pre_propose_raft_command(&cmd.request) {
Ok(Some(delegate)) => {
if let Some(resp) =
delegate.handle_read(&cmd.request, &mut executor, &mut *metrics) {
cmd.callback.invoke_read(resp);
} else { self.redirect(cmd) }
}
Ok(None) => {
if self.delegates.borrow().get(®ion_id).is_some() {
self.redirect(cmd);
}
let meta = self.store_meta.lock().unwrap();
match meta.readers.get(®ion_id).cloned() {
Some(reader) => {
self.delegates.borrow_mut().insert(region_id, Some(reader));
}
None => self.redirect(cmd),
}
}
Err(e) => {
let mut response = cmd_resp::new_error(e);
if let Some(Some(ref delegate)) = self.delegates.borrow().get(®ion_id) {
cmd_resp::bind_term(&mut response, delegate.term);
}
cmd.callback.invoke_read(ReadResponse { response, snapshot: None });
self.delegates.borrow_mut().remove(®ion_id);
}
}
}
上述代码就是 Localreader 中处理请求的关键逻辑。注意为了突出重点,我们对该函数做了适当精简,完整代码请参考 链接。
Localreader 中对 lease 的处理和 raftstore 略有不同,关键代码在 这里 和 这里,至于为什么可以这么写,在这就不说了,作为课后作业留给读者思考 :-p
最后
read index 和 local read 的源码阅读就到这结束了,希望读者看完后能了解并掌握 TiKV 处理读请求的逻辑。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。