写入过程
Leveldb的put/delete操作实际都被转成batch然后write。
1 2 3
| WriteBatch batch; batch.Put(kvp.first, kvp.second); EXPECT_TRUE(db_->Write(WriteOptions(), &batch).ok());
|
在leveldb中,使用Write操作进行写入,流程如下:
- 创建writer对象并加入队列(同时只允许一个写入操作运行)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { Writer w(&mutex_); w.batch = updates; w.sync = options.sync; w.done = false;
MutexLock l(&mutex_); writers_.push_back(&w); while (!w.done && &w != writers_.front()) { w.cv.Wait(); } if (w.done) { return w.status; } ... }
|
- 为写操作分配空间
1 2 3
| Status status = MakeRoomForWrite(updates == nullptr); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w;
|
- 构建批次并执行写操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| if (status.ok() && updates != nullptr) { WriteBatch* write_batch = BuildBatchGroup(&last_writer); WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); last_sequence += WriteBatchInternal::Count(write_batch);
{ mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); if (!status.ok()) { sync_error = true; } } if (status.ok()) { status = WriteBatchInternal::InsertInto(write_batch, mem_); } mutex_.Lock(); if (sync_error) { RecordBackgroundError(status); } }
versions_->SetLastSequence(last_sequence); }
|
- 清理并唤醒写操作
1 2 3 4 5 6 7 8 9 10 11 12 13
| while (true) { Writer* ready = writers_.front(); writers_.pop_front(); if (ready != &w) { ready->status = status; ready->done = true; ready->cv.Signal(); } if (ready == last_writer) break; } if (!writers_.empty()) { writers_.front()->cv.Signal(); }
|
读取过程
- 加锁,基于snapshot进行数据读取
1 2 3 4 5 6 7 8 9 10 11 12 13
| Status DBImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) { Status s; MutexLock l(&mutex_); SequenceNumber snapshot; if (options.snapshot != nullptr) { snapshot = static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number(); } else { snapshot = versions_->LastSequence(); } ...... }
|
- 引用mem、imm以及current version,避免读取过程中被销毁
1 2 3 4 5 6
| MemTable* mem = mem_; MemTable* imm = imm_; Version* current = versions_->current(); mem->Ref(); if (imm != nullptr) imm->Ref(); current->Ref();
|
- 查询数据,释放锁避免占用cpu时间过长
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| { mutex_.Unlock(); LookupKey lkey(key, snapshot); if (mem->Get(lkey, value, &s)) { } else if (imm != nullptr && imm->Get(lkey, value, &s)) { } else { s = current->Get(options, lkey, value, &stats); have_stat_update = true; } mutex_.Lock(); }
|
- 压缩解引等操作
1 2 3 4 5 6 7
| if (have_stat_update && current->UpdateStats(stats)) { MaybeScheduleCompaction(); } mem->Unref(); if (imm != nullptr) imm->Unref(); current->Unref(); return s;
|
leveldb读取分为三步:
在memory db中查找指定的key,若搜索到符合条件的数据项,结束查找;
在冻结的memory db中查找指定的key,若搜索到符合条件的数据项,结束查找;
按低层至高层的顺序在level i层的sstable文件中查找指定的key,若搜索到符合条件的数据项,结束查找,否则返回Not Found错误,表示数据库中不存在指定的数据;
注解
注意leveldb在每一层sstable中查找数据时,都是按序依次查找sstable的。
0层的文件比较特殊。由于0层的文件中可能存在key重合的情况,因此在0层中,文件编号大的sstable优先查找。理由是文件编号较大的sstable中存储的总是最新的数据。
非0层文件,一层中所有文件之间的key不重合,因此leveldb可以借助sstable的元数据(一个文件中最小与最大的key值)进行快速定位,每一层只需要查找一个sstable文件的内容。