Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

one batch, one sequence number #464

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
10 changes: 10 additions & 0 deletions src/io/test/tablet_io_test.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "io/tablet_io.h"

#include <stdlib.h>
#include <iostream>

#include "gflags/gflags.h"
#include "glog/logging.h"
Expand Down Expand Up @@ -514,6 +515,15 @@ TEST_F(TabletIOTest, FindAverageKey) {
start = "";
end = std::string("\x0", 1);
ASSERT_FALSE(TabletIO::FindAverageKey(start, end, &ave));

start = "000000000000001480186993";
end = "000000000000002147352684";
std::string expect
= "000000000000001263264783_";
TabletIO::FindAverageKey(start, end, &ave);
std::cerr << start << "-" << end << "-" << expect << "-" << ave << std::endl;
//ASSERT_TRUE(expect != ave);
ASSERT_NE(expect, ave);
}
} // namespace io
} // namespace tera
Expand Down
4 changes: 2 additions & 2 deletions src/leveldb/db/corruption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,11 @@ class CorruptionTest {
uint64_t number;
FileType type;
std::string fname;
int picked_number = -1;
uint64_t picked_number = 0;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type) &&
type == filetype &&
int(number) > picked_number) { // Pick latest file
uint64_t(number) > picked_number) { // Pick latest file
fname = db_path + "/" + filenames[i];
picked_number = number;
}
Expand Down
18 changes: 7 additions & 11 deletions src/leveldb/db/db_impl.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -735,16 +735,9 @@ Status DBImpl::RecoverInsertMem(WriteBatch* batch, VersionEdit* edit) {
recover_mem_ = NewMemTable();
recover_mem_->Ref();
}
uint64_t log_sequence = WriteBatchInternal::Sequence(batch);
uint64_t last_sequence = log_sequence + WriteBatchInternal::Count(batch) - 1;

// if duplicate record, ignore
if (log_sequence <= recover_mem_->GetLastSequence()) {
assert (last_sequence <= recover_mem_->GetLastSequence());
Log(options_.info_log, "[%s] duplicate record, ignore %lu ~ %lu",
dbname_.c_str(), log_sequence, last_sequence);
return Status::OK();
}

// checked by db_table
assert(WriteBatchInternal::Sequence(batch) >= recover_mem_->GetLastSequence());

Status status = WriteBatchInternal::InsertInto(batch, recover_mem_);
MaybeIgnoreError(&status);
Expand Down Expand Up @@ -1365,7 +1358,7 @@ Status DBImpl::Get(const ReadOptions& options,
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
LookupKey lkey(key, snapshot, mem->GetLastInternalSeq());
if (mem->Get(lkey, value, options.rollbacks, &s)) {
// Done
} else if (imm != NULL && imm->Get(lkey, value, options.rollbacks, &s)) {
Expand Down Expand Up @@ -1772,10 +1765,13 @@ uint64_t DBImpl::GetLastSequence(bool is_locked) {
}
uint64_t retval;
if (mem_->GetLastSequence() > 0) {
Log(options_.info_log, "[%s] LL: mem seq=%lu", dbname_.c_str(), mem_->GetLastSequence());
retval = mem_->GetLastSequence();
} else if (imm_ != NULL && imm_->GetLastSequence()) {
Log(options_.info_log, "[%s] LL: imm seq=%lu", dbname_.c_str(), imm_->GetLastSequence());
retval = imm_->GetLastSequence();
} else {
Log(options_.info_log, "[%s] LL: version seq=%lu", dbname_.c_str(), versions_->LastSequence());
retval = versions_->LastSequence();
}
if (is_locked) {
Expand Down
82 changes: 59 additions & 23 deletions src/leveldb/db/db_table.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "leveldb/table_utils.h"
#include "table/merger.h"
#include "util/string_ext.h"
#include "../common/timer.h"

namespace leveldb {

Expand Down Expand Up @@ -250,6 +251,7 @@ Status DBTable::Init() {
if (last_sequence_ < last_seq) {
last_sequence_ = last_seq;
}
last_timestamp_ = last_sequence_;
} else {
Log(options_.info_log, "[%s] fail to recover lg %d", dbname_.c_str(), i);
break;
Expand Down Expand Up @@ -317,7 +319,7 @@ Status DBTable::Init() {
}

if (s.ok() && !options_.disable_wal) {
std::string log_file_name = LogHexFileName(dbname_, last_sequence_ + 1);
std::string log_file_name = LogHexFileName(dbname_, GetNewSequenceNumber());
s = options_.env->NewWritableFile(log_file_name, &logfile_);
if (s.ok()) {
//Log(options_.info_log, "[%s] open logfile %s",
Expand Down Expand Up @@ -402,9 +404,11 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) {

RecordWriter* last_writer = &w;
WriteBatch* updates = NULL;
SequenceNumber batch_seq = GetNewSequenceNumber();
if (s.ok()) {
updates = GroupWriteBatch(&last_writer);
WriteBatchInternal::SetSequence(updates, last_sequence_ + 1);
Log(options_.info_log, "[%s] LL: set sequence to: %lu", dbname_.c_str(), batch_seq);
WriteBatchInternal::SetSequence(updates, batch_seq);
}

if (s.ok() && !options_.disable_wal && !options.disable_wal) {
Expand All @@ -414,6 +418,8 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) {
s = Status::IOError(dbname_ + ": fail to open log: ", s.ToString());
} else {
force_switch_log_ = false;
// after SwitchLog, set current batch's seq to log number
WriteBatchInternal::SetSequence(updates, last_sequence_);
}
mutex_.Lock();
}
Expand All @@ -435,6 +441,8 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) {
dbname_.c_str(), current_log_size_, slice.size(), wait_sec);
int ret = SwitchLog(true);
if (ret == 0) {
// after SwitchLog, set current batch's seq to log number
WriteBatchInternal::SetSequence(updates, last_sequence_);
continue;
} else if (ret == 1) {
s = log_->WaitDone(-1);
Expand Down Expand Up @@ -519,7 +527,7 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) {
for (uint32_t i = 0; i < lg_list_.size(); ++i) {
lg_list_[i]->ReleaseSnapshot(commit_snapshot_);
}
commit_snapshot_ = last_sequence_ + WriteBatchInternal::Count(updates);
commit_snapshot_ = batch_seq;
}

if (created_new_wb) {
Expand All @@ -534,7 +542,10 @@ Status DBTable::Write(const WriteOptions& options, WriteBatch* my_batch) {

// Update last_sequence
if (updates) {
last_sequence_ += WriteBatchInternal::Count(updates);
if (WriteBatchInternal::Count(updates) != 0) {
last_sequence_ = batch_seq;
}
Log(options_.info_log, "[%s] LL: set last_sequence_->%lu", dbname_.c_str(), last_sequence_);
current_log_size_ += WriteBatchInternal::ByteSize(updates);
}
if (updates == tmp_batch_) tmp_batch_->Clear();
Expand Down Expand Up @@ -763,6 +774,7 @@ void DBTable::CompactRange(const Slice* begin, const Slice* end, int lg_no) {
// @begin_num: the 1st record(sequence number) should be recover
Status DBTable::GatherLogFile(uint64_t begin_num,
std::vector<uint64_t>* logfiles) {
Log(options_.info_log, "[%s] LL: GatherLogFile begin_num=%lu", dbname_.c_str(), begin_num);
std::vector<std::string> files;
Status s = env_->GetChildren(dbname_, &files);
if (!s.ok()) {
Expand All @@ -778,9 +790,13 @@ Status DBTable::GatherLogFile(uint64_t begin_num,
if (ParseFileName(files[i], &number, &type)
&& type == kLogFile
&& number >= begin_num) {
Log(options_.info_log, "[%s] LL: GatherLogFile push file=%lu", dbname_.c_str(), number);
logfiles->push_back(number);
} else if (type == kLogFile && number > last_number) {
last_number = number;
Log(options_.info_log, "[%s] LL: GatherLogFile set last_number=%lu", dbname_.c_str(), number);
} else {
Log(options_.info_log, "[%s] LL: GatherLogFile other", dbname_.c_str());
}
}
std::sort(logfiles->begin(), logfiles->end());
Expand Down Expand Up @@ -854,18 +870,18 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
continue;
}
WriteBatchInternal::SetContents(&batch, record);
uint64_t first_seq = WriteBatchInternal::Sequence(&batch);
uint64_t last_seq = first_seq + WriteBatchInternal::Count(&batch) - 1;
//Log(options_.info_log, "[%s] batch_seq= %lu, last_seq= %lu, count=%d",
// dbname_.c_str(), batch_seq, last_sequence_, WriteBatchInternal::Count(&batch));
if (last_seq >= recover_limit) {
Log(options_.info_log, "[%s] exceed limit %lu, ignore %lu ~ %lu",
dbname_.c_str(), recover_limit, first_seq, last_seq);
continue;
uint64_t batch_seq = WriteBatchInternal::Sequence(&batch);
Log(options_.info_log, "[%s] recover batch_seq= %lu, count=%d",
dbname_.c_str(), batch_seq, WriteBatchInternal::Count(&batch));

assert(batch_seq <= recover_limit);
if (batch_seq == recover_limit) {
Log(options_.info_log, "[%s] on limit, batch_seq=%lu recover_limit=%lu",
dbname_.c_str(), batch_seq, recover_limit);
}

if (last_seq > last_sequence_) {
last_sequence_ = last_seq;
if (batch_seq > last_sequence_) {
last_sequence_ = batch_seq;
}

std::vector<WriteBatch*> lg_updates;
Expand All @@ -888,18 +904,24 @@ Status DBTable::RecoverLogFile(uint64_t log_number, uint64_t recover_limit,
if (lg_updates[i] == NULL) {
continue;
}
if (last_seq <= lg_list_[i]->GetLastSequence()) {
if (batch_seq < lg_list_[i]->GetLastSequence() &&
recover_limit != kMaxSequenceNumber) {
Log(options_.info_log, "[%s] LL: lg %d seq gone back!! batch_seq=%lu lg_last=%lu recover_limit=%lu now=%lu",
dbname_.c_str(), i, batch_seq, lg_list_[i]->GetLastSequence(), recover_limit, common::timer::get_micros());
// assert(0);
continue;
} else {
Log(options_.info_log, "[%s] LL: lg %d batch_seq=%lu lg_last=%lu recover_limit=%lu now=%lu",
dbname_.c_str(), i, batch_seq, lg_list_[i]->GetLastSequence(), recover_limit, common::timer::get_micros());
}
uint64_t first = WriteBatchInternal::Sequence(lg_updates[i]);
uint64_t last = first + WriteBatchInternal::Count(lg_updates[i]) - 1;
// Log(options_.info_log, "[%s] recover log batch first= %lu, last= %lu\n",
// dbname_.c_str(), first, last);
uint64_t lg_batch_seq = WriteBatchInternal::Sequence(lg_updates[i]);
Log(options_.info_log, "[%s] LL: recover log lg = %d batch = %lu, count = %d\n",
dbname_.c_str(), i, lg_batch_seq, WriteBatchInternal::Count(lg_updates[i]));

Status lg_s = lg_list_[i]->RecoverInsertMem(lg_updates[i], (*edit_list)[i]);
if (!lg_s.ok()) {
Log(options_.info_log, "[%s] recover log fail batch first= %lu, last= %lu\n",
dbname_.c_str(), first, last);
Log(options_.info_log, "[%s] LL: recover log lg = %d fail batch = %lu, count = %d\n",
dbname_.c_str(), i, lg_batch_seq, WriteBatchInternal::Count(lg_updates[i]));
status = lg_s;
}
}
Expand Down Expand Up @@ -1008,6 +1030,18 @@ void DBTable::ArchiveFile(const std::string& fname) {
fname.c_str(), s.ToString().c_str());
}

uint64_t DBTable::GetNewSequenceNumber() {
uint64_t now = static_cast<uint64_t>(common::timer::get_micros());
if (now <= last_timestamp_) {
last_timestamp_ += 1;
Log(options_.info_log, "[%s] LL: Got %lu, set last_timestamp_ to %lu",
dbname_.c_str(), now, last_timestamp_);
return last_timestamp_;
}
Log(options_.info_log, "[%s] LL: GetNewSequenceNumber %lu", dbname_.c_str(), now);
return now;
}

// tera-specific
bool DBTable::FindSplitKey(double ratio,
std::string* split_key) {
Expand Down Expand Up @@ -1114,10 +1148,12 @@ int DBTable::SwitchLog(bool blocked_switch) {
if (!blocked_switch ||
log::AsyncWriter::BlockLogNum() < options_.max_block_log_number) {
if (current_log_size_ == 0) {
last_sequence_++;
last_sequence_ = GetNewSequenceNumber();
Log(options_.info_log, "[%s] LL: new log name = %lu", dbname_.c_str(), last_sequence_);
}
WritableFile* logfile = NULL;
std::string log_file_name = LogHexFileName(dbname_, last_sequence_ + 1);
Log(options_.info_log, "[%s] LL: log name = %lu", dbname_.c_str(), last_sequence_);
std::string log_file_name = LogHexFileName(dbname_, last_sequence_);
Status s = env_->NewWritableFile(log_file_name, &logfile);
if (s.ok()) {
log_->Stop(blocked_switch);
Expand Down
2 changes: 2 additions & 0 deletions src/leveldb/db/db_table.h
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ class DBTable : public DB {
Status DeleteLogFile(const std::vector<uint64_t>& log_numbers);
void DeleteObsoleteFiles(uint64_t seq_no = -1U);
void ArchiveFile(const std::string& filepath);
uint64_t GetNewSequenceNumber();

// return 0: switch log successed
// return 1: cannot switch log right now
Expand Down Expand Up @@ -189,6 +190,7 @@ class DBTable : public DB {
log::AsyncWriter* log_;
bool force_switch_log_;
uint64_t last_sequence_;
uint64_t last_timestamp_;
size_t current_log_size_;

std::deque<RecordWriter*> writers_;
Expand Down
36 changes: 33 additions & 3 deletions src/leveldb/db/dbformat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,34 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
return r;
}

int InternalKeyComparator::CompareWithInternalSeq(const Slice& akey, const Slice& bkey) const {
// Order by:
// increasing user key (according to user-supplied comparator)
// decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate)
int r = user_comparator_->Compare(ExtractUserKeyWithInternalSeq(akey), ExtractUserKeyWithInternalSeq(bkey));
if (r == 0) {
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 16) >> 8;
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 16) >> 8;
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
r = +1;
} else {
const uint64_t a_internal_seq = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t b_internal_seq = DecodeFixed64(bkey.data() + bkey.size() - 8);
if (a_internal_seq > b_internal_seq) {
r = -1;
} else if (a_internal_seq < b_internal_seq) {
r = +1;
} else {
r = 0;
}
}
}
return r;
}

void InternalKeyComparator::FindShortestSeparator(
std::string* start,
const Slice& limit) const {
Expand Down Expand Up @@ -122,22 +150,24 @@ bool InternalFilterPolicy::KeyMayMatch(const Slice& key, const Slice& f) const {
return user_policy_->KeyMayMatch(ExtractUserKey(key), f);
}

LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
LookupKey::LookupKey(const Slice& user_key, SequenceNumber s, SequenceNumber internal_seq) {
size_t usize = user_key.size();
size_t needed = usize + 13; // A conservative estimate
size_t needed = usize + 13 + 8; // A conservative estimate
char* dst;
if (needed <= sizeof(space_)) {
dst = space_;
} else {
dst = new char[needed];
}
start_ = dst;
dst = EncodeVarint32(dst, usize + 8);
dst = EncodeVarint32(dst, usize + 8 + 8);
kstart_ = dst;
memcpy(dst, user_key.data(), usize);
dst += usize;
EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
dst += 8;
EncodeFixed64(dst, internal_seq);
dst += 8;
end_ = dst;
}

Expand Down
12 changes: 9 additions & 3 deletions src/leveldb/db/dbformat.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ inline Slice ExtractUserKey(const Slice& internal_key) {
return Slice(internal_key.data(), internal_key.size() - 8);
}

inline Slice ExtractUserKeyWithInternalSeq(const Slice& internal_key) {
assert(internal_key.size() >= 16);
return Slice(internal_key.data(), internal_key.size() - 16);
}

inline ValueType ExtractValueType(const Slice& internal_key) {
assert(internal_key.size() >= 8);
const size_t n = internal_key.size();
Expand All @@ -115,6 +120,7 @@ class InternalKeyComparator : public Comparator {
explicit InternalKeyComparator(const Comparator* c) : user_comparator_(c) { }
virtual const char* Name() const;
virtual int Compare(const Slice& a, const Slice& b) const;
virtual int CompareWithInternalSeq(const Slice& akey, const Slice& bkey) const;
virtual void FindShortestSeparator(
std::string* start,
const Slice& limit) const;
Expand Down Expand Up @@ -198,18 +204,18 @@ class LookupKey {
public:
// Initialize *this for looking up user_key at a snapshot with
// the specified sequence number.
LookupKey(const Slice& user_key, SequenceNumber sequence);
LookupKey(const Slice& user_key, SequenceNumber sequence, SequenceNumber internal_seq);

~LookupKey();

// Return a key suitable for lookup in a MemTable.
Slice memtable_key() const { return Slice(start_, end_ - start_); }

// Return an internal key (suitable for passing to an internal iterator)
Slice internal_key() const { return Slice(kstart_, end_ - kstart_); }
Slice internal_key() const { return Slice(kstart_, end_ - kstart_ - 8); }

// Return the user key
Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); }
Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8 - 8); }

private:
// We construct a char array of the form:
Expand Down
Loading