0%

读写过程

写入过程

Leveldb的put/delete操作实际都被转成batch然后write。

1
2
3
WriteBatch batch;
batch.Put(kvp.first, kvp.second);
EXPECT_TRUE(db_->Write(WriteOptions(), &batch).ok());

在leveldb中,使用Write操作进行写入,流程如下:

  1. 创建writer对象并加入队列(同时只允许一个写入操作运行)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;

MutexLock l(&mutex_);
writers_.push_back(&w);
// 若当前 Writer 不是队列头部,则等待(锁被释放,进入阻塞)
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
return w.status;
}
...
}

  1. 为写操作分配空间
1
2
3
Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
  1. 构建批次并执行写操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
if (status.ok() && updates != nullptr) {
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);

{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error) {
RecordBackgroundError(status);
}
}

versions_->SetLastSequence(last_sequence);
}

  1. 清理并唤醒写操作
1
2
3
4
5
6
7
8
9
10
11
12
13
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}

读取过程

  1. 加锁,基于snapshot进行数据读取
1
2
3
4
5
6
7
8
9
10
11
12
13
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot =
static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
} else {
snapshot = versions_->LastSequence();
}
......
}
  1. 引用mem、imm以及current version,避免读取过程中被销毁
1
2
3
4
5
6
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != nullptr) imm->Ref();
current->Ref();
  1. 查询数据,释放锁避免占用cpu时间过长
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
  1. 压缩解引等操作
1
2
3
4
5
6
7
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
return s;

leveldb读取分为三步:

在memory db中查找指定的key,若搜索到符合条件的数据项,结束查找;

在冻结的memory db中查找指定的key,若搜索到符合条件的数据项,结束查找;

按低层至高层的顺序在level i层的sstable文件中查找指定的key,若搜索到符合条件的数据项,结束查找,否则返回Not Found错误,表示数据库中不存在指定的数据;

注解

注意leveldb在每一层sstable中查找数据时,都是按序依次查找sstable的。

0层的文件比较特殊。由于0层的文件中可能存在key重合的情况,因此在0层中,文件编号大的sstable优先查找。理由是文件编号较大的sstable中存储的总是最新的数据。

非0层文件,一层中所有文件之间的key不重合,因此leveldb可以借助sstable的元数据(一个文件中最小与最大的key值)进行快速定位,每一层只需要查找一个sstable文件的内容。