copIterator 其实就是一个迭代器,获取数据是通过调用 copIterator 的 Next 方法获取:
Copyfunc (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
var (
resp *copResponse
ok bool
closed bool
)
...
// 如果数据不需要排序,那么直接从 respChan 中获取数据
if it.respChan != nil {
// Get next fetched resp from chan
resp, ok, closed = it.recvFromRespCh(ctx, it.respChan)
if !ok || closed {
it.actionOnExceed.close()
return nil, nil
}
// 表示读到 respChan 最后一个数据
if resp == finCopResp {
it.actionOnExceed.destroyTokenIfNeeded(func() {
it.sendRate.PutToken()
})
return it.Next(ctx)
}
} else {
for {
if it.curr >= len(it.tasks) {
// Resp will be nil if iterator is finishCh.
it.actionOnExceed.close()
return nil, nil
}
// 如果数据是有序的,那么从 task 的 respChan 获取数据
task := it.tasks[it.curr]
resp, ok, closed = it.recvFromRespCh(ctx, task.respChan)
if closed {
return nil, nil
}
if ok {
break
}
it.actionOnExceed.destroyTokenIfNeeded(func() {
it.sendRate.PutToken()
})
it.tasks[it.curr] = nil
it.curr++
}
}
if resp.err != nil {
return nil, errors.Trace(resp.err)
}
err := it.store.CheckVisibility(it.req.StartTs)
if err != nil {
return nil, errors.Trace(err)
}
return resp, nil
}
获取数据根据是否有序也是分为两种,无序的数据直接从 copIterator 的 respChan 中获取数据,如果是有序的,那么需要获取到 task 里面的 respChan 来获取数据。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。