-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathmemtable.cc
executable file
·159 lines (140 loc) · 6.34 KB
/
memtable.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/memtable.h"
#include "db/dbformat.h"
#include "leveldb/comparator.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "util/coding.h"
namespace leveldb {
static Slice GetLengthPrefixedSlice(const char* data) {
uint32_t len;
const char* p = data;
p = GetVarint32Ptr(p, p + 5, &len); // +5: we assume "p" is not corrupted
return Slice(p, len);
}
MemTable::MemTable(const InternalKeyComparator& comparator)
: comparator_(comparator), refs_(0), table_(comparator_, &arena_) {}
MemTable::~MemTable() { assert(refs_ == 0); }
size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); }
int MemTable::KeyComparator::operator()(const char* aptr,
const char* bptr) const {
// Internal keys are encoded as length-prefixed strings.
Slice a = GetLengthPrefixedSlice(aptr);
Slice b = GetLengthPrefixedSlice(bptr);
return comparator.Compare(a, b);
}
// Encode a suitable internal key target for "target" and return it.
// Uses *scratch as scratch space, and the returned pointer will point
// into this scratch space.
static const char* EncodeKey(std::string* scratch, const Slice& target) {
scratch->clear();
PutVarint32(scratch, target.size());
scratch->append(target.data(), target.size());
return scratch->data();
}
class MemTableIterator : public Iterator {
public:
explicit MemTableIterator(MemTable::Table* table) : iter_(table) {}
MemTableIterator(const MemTableIterator&) = delete;
MemTableIterator& operator=(const MemTableIterator&) = delete;
~MemTableIterator() override = default;
bool Valid() const override { return iter_.Valid(); }
void Seek(const Slice& k) override { iter_.Seek(EncodeKey(&tmp_, k)); }
void SeekToFirst() override { iter_.SeekToFirst(); }
void SeekToLast() override { iter_.SeekToLast(); }
void Next() override { iter_.Next(); }
void Prev() override { iter_.Prev(); }
Slice key() const override { return GetLengthPrefixedSlice(iter_.key()); }
Slice value() const override {
Slice key_slice = GetLengthPrefixedSlice(iter_.key());
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
}
Status status() const override { return Status::OK(); }
private:
MemTable::Table::Iterator iter_;
std::string tmp_; // For passing to EncodeKey
};
Iterator* MemTable::NewIterator() { return new MemTableIterator(&table_); }
void MemTable::Add(SequenceNumber s, ValueType type, const Slice& key,
const Slice& value) {
// Format of an entry is concatenation of:
// key_size : varint32 of internal_key.size()
// key bytes : char[internal_key.size()]
// tag : uint64((sequence << 8) | type)
// value_size : varint32 of value.size()
// value bytes : char[value.size()]
// 键值对的格式为:
// key_size: varint32 编码的 internal_key 长度
// internal_key 内容
// tag: 包含 SequenceNumber 和 type: uint64((sequence << 8) | type)
// value_size: varint32 编码的 value 长度
// value 内容
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8; // internal_key 由 user_key + uint64((sequence << 8) | type) 组成, 比 user_key 多一个 uint64 即 8 个字节
const size_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size; // 计算 entry 编码后大小
char* buf = arena_.Allocate(encoded_len);
char* p = EncodeVarint32(buf, internal_key_size);
std::memcpy(p, key.data(), key_size);
p += key_size;
EncodeFixed64(p, (s << 8) | type); // 计算 tag, 存储到 buffer
p += 8;
p = EncodeVarint32(p, val_size);
std::memcpy(p, value.data(), val_size);
assert(p + val_size == buf + encoded_len);
table_.Insert(buf); // 将键值对插入跳表中
}
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key(); // memtable_key 由 user_key + sequence 组成
Table::Iterator iter(&table_);
// seek 底层是 SkipList::FindGreaterOrEqual,即第一个 key >= memkey 的元素
// 由于 MemTable 的 key 首先根据 user_key 升序排列然后根据 sequence 降序排列
// 所以,所有 sequence 大于 memkey 的键值对都会被跳过
iter.Seek(memkey.data());
if (iter.Valid()) {
// entry format is:
// klength varint32
// userkey char[klength]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
// 键值对的格式为:
// internal_key_size: varint32 编码的 internal_key 长度
// internal_key: 由 user_key + uint64((sequence << 8) | type) 组成
// tag: 包含 SequenceNumber 和 type: uint64((sequence << 8) | type)
// value_length: varint32 编码的 value 长度
// value 内容
const char* entry = iter.key();
uint32_t key_length;
// 注意这里 key_length 是 internal_key 的长度,key_ptr 指向 internal_key 的第一个字节
const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
// 由于 seek 找到的是第一个 key >= memkey,因此我们要检查一下 key 是否等于 target
// seek 已经跳过了 sequence 过大的元素,这里就不用再次检测了
// key_length - 8 跳过了 internal_key 尾部的 8 个字节,Slice(key_ptr, key_length - 8) 即为 user_key
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8), key.user_key()) == 0) {
// 从中取出 value 和 type
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
}
case kTypeDeletion:
*s = Status::NotFound(Slice());
return true;
}
}
}
return false;
}
} // namespace leveldb