Skip to content

Commit

Permalink
Use rocksdb directly in journal, not through wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
gbitzes committed Nov 15, 2016
1 parent 916a8b1 commit 3ae3c6d
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 89 deletions.
35 changes: 3 additions & 32 deletions src/RocksDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
#include <rocksdb/merge_operator.h>
#include <rocksdb/utilities/checkpoint.h>

#define RETURN_ON_ERROR(st) if(!st.ok()) return st;
#define THROW_ON_ERROR(st) if(!st.ok()) qdb_throw(st.ToString())
#define ASSERT_OK_OR_NOTFOUND(st) if(!st.ok() && !st.IsNotFound()) qdb_throw(st.ToString())
#define RETURN_ON_ERROR(st) { rocksdb::Status st2 = st; if(!st2.ok()) return st2; }
#define THROW_ON_ERROR(st) { rocksdb::Status st2 = st; if(!st2.ok()) qdb_throw(st2.ToString()); }
#define ASSERT_OK_OR_NOTFOUND(st) { rocksdb::Status st2 = st; if(!st2.ok() && !st2.IsNotFound()) qdb_throw(st2.ToString()); }

using namespace quarkdb;

Expand Down Expand Up @@ -434,13 +434,6 @@ rocksdb::Status RocksDB::flushall(LogIndex index) {
return remove_all_with_prefix("", index);
}

void RocksDB::set_or_die(const std::string &key, const std::string &value) {
rocksdb::Status st = this->set(key, value);
if(!st.ok()) {
throw FatalException(SSTR("unable to set key " << key << " to " << value << ". Error: " << st.ToString()));
}
}

rocksdb::Status RocksDB::checkpoint(const std::string &path) {
rocksdb::Checkpoint *checkpoint = nullptr;
RETURN_ON_ERROR(rocksdb::Checkpoint::Create(db, &checkpoint));
Expand All @@ -451,28 +444,6 @@ rocksdb::Status RocksDB::checkpoint(const std::string &path) {
return st;
}


std::string RocksDB::get_or_die(const std::string &key) {
std::string tmp;
rocksdb::Status st = this->get(key, tmp);
if(!st.ok()) {
throw FatalException(SSTR("unable to get key " << key << ". Error: " << st.ToString()));
}
return tmp;
}

int64_t RocksDB::get_int_or_die(const std::string &key) {
std::string tmp = this->get_or_die(key);
return binaryStringToInt(tmp.c_str());
}

void RocksDB::set_int_or_die(const std::string &key, int64_t value) {
rocksdb::Status st = this->set(key, intToBinaryString(value));
if(!st.ok()) {
throw FatalException(SSTR("unable to set key " << key << ". Error: " << st.ToString()));
}
}

RocksDB::TransactionPtr RocksDB::startTransaction() {
return TransactionPtr(transactionDB->BeginTransaction(rocksdb::WriteOptions()));
}
Expand Down
10 changes: 0 additions & 10 deletions src/RocksDB.hh
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,6 @@ public:
// Checkpoint for online backups
//----------------------------------------------------------------------------
rocksdb::Status checkpoint(const std::string &path);

//----------------------------------------------------------------------------
// Convenience functions
//----------------------------------------------------------------------------

void set_or_die(const std::string &key, const std::string &value);
std::string get_or_die(const std::string &key);
int64_t get_int_or_die(const std::string &key);
void set_int_or_die(const std::string &key, int64_t value);

private:
LogIndex lastApplied;

Expand Down
138 changes: 94 additions & 44 deletions src/raft/RaftJournal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "../Common.hh"
#include "../Utils.hh"
#include "RaftState.hh"
#include <rocksdb/utilities/checkpoint.h>

using namespace quarkdb;

Expand Down Expand Up @@ -81,47 +82,45 @@ static std::string encodeEntryKey(LogIndex index) {
//------------------------------------------------------------------------------

void RaftJournal::ObliterateAndReinitializeJournal(const std::string &path, RaftClusterID clusterID, std::vector<RaftServer> nodes) {
RocksDB store(path);
return ObliterateAndReinitializeJournal(store, clusterID, nodes);
RaftJournal journal(path, clusterID, nodes);
}

void RaftJournal::ObliterateAndReinitializeJournal(RocksDB &store, RaftClusterID clusterID, std::vector<RaftServer> nodes) {
store.flushall();

store.set_int_or_die("RAFT_CURRENT_TERM", 0);
store.set_int_or_die("RAFT_LOG_SIZE", 1);
store.set_int_or_die("RAFT_LOG_START", 0);
store.set_or_die("RAFT_CLUSTER_ID", clusterID);
store.set_or_die("RAFT_VOTED_FOR", "");
store.set_int_or_die("RAFT_COMMIT_INDEX", 0);
void RaftJournal::obliterate(RaftClusterID newClusterID, const std::vector<RaftServer> &newNodes) {
IteratorPtr iter(db->NewIterator(rocksdb::ReadOptions()));
for(iter->SeekToFirst(); iter->Valid(); iter->Next()) {
db->Delete(rocksdb::WriteOptions(), iter->key().ToString());
}

RedisRequest req { "UPDATE_RAFT_NODES", serializeNodes(nodes) };
store.set_or_die(encodeEntryKey(0), serializeRedisRequest(0, req));
store.set_or_die("RAFT_NODES", serializeNodes(nodes));
store.set_or_die("RAFT_OBSERVERS", "");
}
this->set_int_or_die("RAFT_CURRENT_TERM", 0);
this->set_int_or_die("RAFT_LOG_SIZE", 1);
this->set_int_or_die("RAFT_LOG_START", 0);
this->set_or_die("RAFT_CLUSTER_ID", newClusterID);
this->set_or_die("RAFT_VOTED_FOR", "");
this->set_int_or_die("RAFT_COMMIT_INDEX", 0);

RedisRequest req { "UPDATE_RAFT_NODES", serializeNodes(newNodes) };
this->set_or_die(encodeEntryKey(0), serializeRedisRequest(0, req));
this->set_or_die("RAFT_NODES", serializeNodes(newNodes));
this->set_or_die("RAFT_OBSERVERS", "");

void RaftJournal::obliterate(RaftClusterID newClusterID, const std::vector<RaftServer> &newNodes) {
ObliterateAndReinitializeJournal(store, newClusterID, newNodes);
initialize();
}

void RaftJournal::initialize() {
currentTerm = store.get_int_or_die("RAFT_CURRENT_TERM");
logSize = store.get_int_or_die("RAFT_LOG_SIZE");
logStart = store.get_int_or_die("RAFT_LOG_START");
clusterID = store.get_or_die("RAFT_CLUSTER_ID");
commitIndex = store.get_int_or_die("RAFT_COMMIT_INDEX");
std::string vote = store.get_or_die("RAFT_VOTED_FOR");
currentTerm = this->get_int_or_die("RAFT_CURRENT_TERM");
logSize = this->get_int_or_die("RAFT_LOG_SIZE");
logStart = this->get_int_or_die("RAFT_LOG_START");
clusterID = this->get_or_die("RAFT_CLUSTER_ID");
commitIndex = this->get_int_or_die("RAFT_COMMIT_INDEX");
std::string vote = this->get_or_die("RAFT_VOTED_FOR");
this->fetch_or_die(logSize-1, termOfLastEntry);

std::string tmp = store.get_or_die("RAFT_NODES");
std::string tmp = this->get_or_die("RAFT_NODES");
if(!parseServers(tmp, nodes)) {
qdb_throw("journal corruption, cannot parse RAFT_NODES: " << tmp);
}

tmp = store.get_or_die("RAFT_OBSERVERS");
tmp = this->get_or_die("RAFT_OBSERVERS");
if(!tmp.empty() && !parseServers(tmp, observers)) {
qdb_throw("journal corruption, cannot parse RAFT_OBSERVERS: " << tmp);
}
Expand All @@ -131,13 +130,35 @@ void RaftJournal::initialize() {
}
}

RaftJournal::RaftJournal(const std::string &filename, RaftClusterID clusterID, const std::vector<RaftServer> &nodes)
: store(filename) {
ObliterateAndReinitializeJournal(store, clusterID, nodes);
initialize();
void RaftJournal::openDB(const std::string &path) {
qdb_info("Opening journal database " << quotes(path));
dbPath = path;

rocksdb::Options options;
options.create_if_missing = true;
rocksdb::Status status = rocksdb::TransactionDB::Open(options, rocksdb::TransactionDBOptions(), path, &transactionDB);
if(!status.ok()) qdb_throw("Error while opening journal in " << path << ":" << status.ToString());

db = transactionDB->GetBaseDB();
}

RaftJournal::RaftJournal(const std::string &filename, RaftClusterID clusterID, const std::vector<RaftServer> &nodes) {
openDB(filename);
obliterate(clusterID, nodes);
}

RaftJournal::~RaftJournal() {
qdb_info("Closing journal database " << quotes(dbPath));

if(transactionDB) {
delete transactionDB;
transactionDB = nullptr;
db = nullptr;
}
}

RaftJournal::RaftJournal(const std::string &filename) : store(filename) {
RaftJournal::RaftJournal(const std::string &filename) {
openDB(filename);
initialize();
}

Expand All @@ -164,9 +185,9 @@ bool RaftJournal::setCurrentTerm(RaftTerm term, RaftServer vote) {
// Just in case we crash in the middle, make sure votedFor becomes invalid first
//----------------------------------------------------------------------------

store.set_or_die("RAFT_VOTED_FOR", RaftState::BLOCKED_VOTE.toString());
store.set_int_or_die("RAFT_CURRENT_TERM", term);
store.set_or_die("RAFT_VOTED_FOR", vote.toString());
this->set_or_die("RAFT_VOTED_FOR", RaftState::BLOCKED_VOTE.toString());
this->set_int_or_die("RAFT_CURRENT_TERM", term);
this->set_or_die("RAFT_VOTED_FOR", vote.toString());

currentTerm = term;
votedFor = vote;
Expand All @@ -185,7 +206,7 @@ bool RaftJournal::setCommitIndex(LogIndex newIndex) {
}

if(commitIndex < newIndex) {
store.set_int_or_die("RAFT_COMMIT_INDEX", newIndex);
this->set_int_or_die("RAFT_COMMIT_INDEX", newIndex);
commitIndex = newIndex;
commitNotifier.notify_all();
}
Expand Down Expand Up @@ -238,24 +259,24 @@ void RaftJournal::trimUntil(LogIndex newLogStart) {

LogIndex prevLogStart = logStart;
logStart = newLogStart;
store.set_int_or_die("RAFT_LOG_START", logStart);
this->set_int_or_die("RAFT_LOG_START", logStart);

for(LogIndex i = prevLogStart; i < newLogStart; i++) {
rocksdb::Status st = store.del(encodeEntryKey(i));
rocksdb::Status st = db->Delete(rocksdb::WriteOptions(), encodeEntryKey(i));
if(!st.ok()) qdb_critical("Error when trimming journal, cannot delete entry " << i << ": " << st.ToString());
}
}

void RaftJournal::rawAppend(LogIndex index, RaftTerm term, const RedisRequest &cmd) {
store.set_or_die(encodeEntryKey(index), serializeRedisRequest(term, cmd));
this->set_or_die(encodeEntryKey(index), serializeRedisRequest(term, cmd));
}

void RaftJournal::setLogSize(LogIndex index) {
if(index <= commitIndex) {
throw FatalException(SSTR("Attempted to remove applied entry by setting logSize to " << index << " while commitIndex = " << commitIndex));
}

store.set_int_or_die("RAFT_LOG_SIZE", index);
this->set_int_or_die("RAFT_LOG_SIZE", index);
logSize = index;
}

Expand All @@ -272,14 +293,14 @@ std::vector<RaftServer> RaftJournal::getNodes() {
void RaftJournal::setNodes(const std::vector<RaftServer> &newNodes) {
std::lock_guard<std::mutex> lock(nodesMutex);

store.set_or_die("RAFT_NODES", serializeNodes(newNodes));
this->set_or_die("RAFT_NODES", serializeNodes(newNodes));
nodes = newNodes;
}

void RaftJournal::setObservers(const std::vector<RaftServer> &obs) {
std::lock_guard<std::mutex> lock(observersMutex);

store.set_or_die("RAFT_OBSERVERS", serializeNodes(obs));
this->set_or_die("RAFT_OBSERVERS", serializeNodes(obs));
observers = obs;
}

Expand Down Expand Up @@ -310,7 +331,7 @@ bool RaftJournal::removeEntries(LogIndex from) {
qdb_warn("Removing inconsistent log entries, [" << from << "," << logSize-1 << "]");

for(LogIndex i = from; i < logSize; i++) {
rocksdb::Status st = store.del(encodeEntryKey(i));
rocksdb::Status st = db->Delete(rocksdb::WriteOptions(), encodeEntryKey(i));
if(!st.ok()) qdb_critical("Error when deleting entry " << i << ": " << st.ToString());
}

Expand Down Expand Up @@ -369,7 +390,7 @@ rocksdb::Status RaftJournal::fetch(LogIndex index, RaftEntry &entry) {
// really contained in the journal

std::string data;
rocksdb::Status st = store.get(encodeEntryKey(index), data);
rocksdb::Status st = db->Get(rocksdb::ReadOptions(), encodeEntryKey(index), &data);
if(!st.ok()) return st;

deserializeRedisRequest(data, entry.term, entry.request);
Expand Down Expand Up @@ -397,10 +418,39 @@ void RaftJournal::fetch_or_die(LogIndex index, RaftTerm &term) {
}
}

void RaftJournal::set_or_die(const std::string &key, const std::string &value) {
rocksdb::Status st = db->Put(rocksdb::WriteOptions(), key, value);
if(!st.ok()) {
qdb_throw("unable to set journal key " << key << ". Error: " << st.ToString());
}
}

void RaftJournal::set_int_or_die(const std::string &key, int64_t value) {
this->set_or_die(key, intToBinaryString(value));
}

int64_t RaftJournal::get_int_or_die(const std::string &key) {
return binaryStringToInt(this->get_or_die(key).c_str());
}

std::string RaftJournal::get_or_die(const std::string &key) {
std::string tmp;
rocksdb::Status st = db->Get(rocksdb::ReadOptions(), key, &tmp);
if(!st.ok()) qdb_throw("error when getting journal key " << key << ": " << st.ToString());
return tmp;
}

//------------------------------------------------------------------------------
// Checkpoint for online backup
//------------------------------------------------------------------------------

rocksdb::Status RaftJournal::checkpoint(const std::string &path) {
return store.checkpoint(path);
rocksdb::Checkpoint *checkpoint = nullptr;
rocksdb::Status st = rocksdb::Checkpoint::Create(db, &checkpoint);
if(!st.ok()) return st;

st = checkpoint->CreateCheckpoint(path);
delete checkpoint;

return st;
}
21 changes: 18 additions & 3 deletions src/raft/RaftJournal.hh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
#ifndef __QUARKDB_RAFT_JOURNAL_H__
#define __QUARKDB_RAFT_JOURNAL_H__

#include "../RocksDB.hh"
#include <rocksdb/db.h>
#include <rocksdb/utilities/transaction_db.h>
#include <rocksdb/utilities/transaction.h>

#include <mutex>
#include <condition_variable>
#include "RaftCommon.hh"
Expand All @@ -34,13 +37,13 @@ namespace quarkdb {
class RaftJournal {
public:
static void ObliterateAndReinitializeJournal(const std::string &path, RaftClusterID clusterID, std::vector<RaftServer> nodes);
static void ObliterateAndReinitializeJournal(RocksDB &store, RaftClusterID clusterID, std::vector<RaftServer> nodes);

// opens an existing journal
RaftJournal(const std::string &path);

// re-initializes a journal, obliterates the contents of the old one, if it exists
RaftJournal(const std::string &path, RaftClusterID clusterID, const std::vector<RaftServer> &nodes);
~RaftJournal();

// should never have to be called during normal operation, only in the tests
// assumes there's no other concurrent access to the journal
Expand Down Expand Up @@ -78,7 +81,14 @@ public:
rocksdb::Status checkpoint(const std::string &path);
void trimUntil(LogIndex newLogStart);
private:
RocksDB store;
void openDB(const std::string &path);

rocksdb::TransactionDB* transactionDB = nullptr;
rocksdb::DB* db = nullptr;
std::string dbPath;

using IteratorPtr = std::shared_ptr<rocksdb::Iterator>;
using TransactionPtr = std::shared_ptr<rocksdb::Transaction>;

//----------------------------------------------------------------------------
// Cached values, always backed to stable storage
Expand Down Expand Up @@ -116,6 +126,11 @@ private:

void rawAppend(LogIndex index, RaftTerm term, const RedisRequest &cmd);
void setLogSize(LogIndex index);

void set_or_die(const std::string &key, const std::string &value);
void set_int_or_die(const std::string &key, int64_t value);
std::string get_or_die(const std::string &key);
int64_t get_int_or_die(const std::string &key);
};

}
Expand Down

0 comments on commit 3ae3c6d

Please sign in to comment.