前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >leveldb源码分析--写数据

leveldb源码分析--写数据

原创
作者头像
小林帮
修改2021-03-22 11:33:26
8180
修改2021-03-22 11:33:26
举报

原理

回想一下LSM数据写入的流程:

  • 写磁盘的WAL日志文件;
  • 更新内存中的MemTable数据;

写数据调用

代码语言:txt
复制
// 写数据对外接口
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;
  batch.Put(key, value);
  return Write(opt, &batch);
}

批量写入

WriteBatch封装了一个批量修改数据的原子化操作;

其主要完成数据的串行化拼接,拼接后的格式如下:

count | record | record... (kTypeValue+Key+Value;kKeyDeletion+Key)

代码语言:txt
复制
// 把字符串编码,拼接保存到rep_变量中
void WriteBatch::Put(const Slice& key, const Slice& value) {
  WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
  rep_.push_back(static_cast<char>(kTypeValue));
  PutLengthPrefixedSlice(&rep_, key);
  PutLengthPrefixedSlice(&rep_, value);
}

写数据

由于WAL日志文件和MemTable内存结构是全局共享资源,在多线程同时写入数据时,需加互斥锁来保证操作的隔离性。考虑到写WAL涉及磁盘的写入操作,耗时较久,会影响数据写入的并发性能。

leveldb针对此问题,做了一个批量写入的优化:

把数据的写入操作拆分成两个阶段,来缩短锁等待的时间; 在准备阶段,写入时获取到锁后,把更改的数据加入到待写入的队列中;再检查自己是不是排在待写入队列的头部,如不是,则释放锁,进入等待中; 如检查到自己出于带写入队列的头部,则再次获取锁,并尽可能多的从待写入队列上读取数据,写入到WAL日志文件中。

代码语言:txt
复制
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
  Writer w(&mutex_);
  w.batch = updates;
  w.sync = options.sync;
  w.done = false;

  MutexLock l(&mutex_);
  writers_.push_back(&w);
  // 如数据未完成写入,且当前writer不在待写入队列的头部,则等待
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }
  // 如数据写入完成,则退出(其他线程已帮忙完成数据写入)
  if (w.done) {
    return w.status;
  }

  // 数据限流机制
  // May temporarily unlock and wait.
  Status status = MakeRoomForWrite(updates == nullptr);
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
    // 尽可能多的从待写入队列中取出数据,拼接成写的WriteBatch结构
    WriteBatch* write_batch = BuildBatchGroup(&last_writer);
    WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(write_batch);

    {
      // 释放锁,让其他线程可以读数据,或者把数据放入待写入队列中
      mutex_.Unlock();
      // 批量写入数据
      status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
      }
      // 把数据写入到MemTable中
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(write_batch, mem_);
      }
      mutex_.Lock();
    }
    if (write_batch == tmp_batch_) tmp_batch_->Clear();

    // 更新全局的sequence
    versions_->SetLastSequence(last_sequence);
  }

  // 从写入队列中剔除此次已完成的写入,更新其为写入完成状态,并唤醒其等待的线程
  while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {
      ready->status = status;
      ready->done = true;
      ready->cv.Signal();
    }
    if (ready == last_writer) break;
  }

  // 如队列中还有数据,则唤醒其他线程继续写入数据
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
  }

  return status;
}
代码语言:txt
复制

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 原理
  • 写数据调用
  • 批量写入
  • 写数据
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档