diff --git a/run.sh b/run.sh new file mode 100644 index 0000000..dd97118 --- /dev/null +++ b/run.sh @@ -0,0 +1,25 @@ +#!/bin/bash +set -e +set -x + +while true; do +for w in 1 2 100; do + for s in 1 2 100; do + for z in 0 1 2; do + for q in 1 8 32; do + rm -rf /tmp/db + mkdir /tmp/db + bin/zlog_bench \ + --backend lmdb \ + --db-path /tmp/db \ + --runtime 10 \ + --verify \ + --width $w \ + --slots $s \ + --size $z \ + --qdepth $q + done + done + done +done +done diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c1eb92e..5782a5f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -53,7 +53,13 @@ endif(WITH_JNI) add_executable(zlog_bench bench.cc) target_link_libraries(zlog_bench libzlog - zlog_backend_ram + ${Boost_PROGRAM_OPTIONS_LIBRARY} + ${Boost_SYSTEM_LIBRARY} +) + +add_executable(zlog zlog.cc) +target_link_libraries(zlog + libzlog ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_SYSTEM_LIBRARY} ) diff --git a/src/bench.cc b/src/bench.cc index 2cce542..948d447 100644 --- a/src/bench.cc +++ b/src/bench.cc @@ -1,46 +1,231 @@ +#include +#include #include +#include +#include +#include +#include +#include #include +#include #include "zlog/backend/ram.h" #include "zlog/options.h" #include "zlog/log.h" +namespace po = boost::program_options; + +class rand_data_gen { + public: + rand_data_gen(size_t buf_size, size_t samp_size) : + buf_size_(buf_size), + dist_(0, buf_size_ - samp_size - 1) + {} + + void generate() { + std::uniform_int_distribution d( + std::numeric_limits::min(), + std::numeric_limits::max()); + buf_.reserve(buf_size_); + while (buf_.size() < buf_size_) { + uint64_t val = d(gen_); + buf_.append((const char *)&val, sizeof(val)); + } + if (buf_.size() > buf_size_) + buf_.resize(buf_size_); + } + + inline const char *sample() { + assert(!buf_.empty()); + return buf_.c_str() + dist_(gen_); + } + + private: + const size_t buf_size_; + std::string buf_; + std::default_random_engine gen_; + std::uniform_int_distribution dist_; +}; + +static inline uint64_t __getns(clockid_t clock) +{ + struct timespec ts; + clock_gettime(clock, &ts); + return (((uint64_t)ts.tv_sec) * 1000000000ULL) + ts.tv_nsec; +} + +static inline uint64_t getus() +{ + return __getns(CLOCK_MONOTONIC) / 1000; +} + +static std::atomic shutdown; +static std::atomic op_count; + +static std::mutex lock; +static std::condition_variable cond; + +static void sig_handler(int sig) +{ + shutdown = true; +} + +static void stats_entry() +{ + while (true) { + auto start_ops_count = op_count.load(); + auto start_us = getus(); + + std::unique_lock lk(lock); + cond.wait_for(lk, std::chrono::seconds(1), + [&] { return shutdown.load(); }); + if (shutdown) { + break; + } + + auto end_us = getus(); + auto end_ops_count = op_count.load(); + + auto elapsed_us = end_us - start_us; + + auto iops = (double)((end_ops_count - start_ops_count) * + 1000000ULL) / (double)elapsed_us; + + std::cout << iops << std::endl; + } +} + int main(int argc, char **argv) { - auto backend = std::unique_ptr( - new zlog::storage::ram::RAMBackend()); + std::string log_name; + uint32_t width; + uint32_t slots; + size_t entry_size; + int qdepth; + bool excl_open; + bool verify; + int runtime; + std::string backend; + std::string pool; + std::string db_path; + bool blackhole; + + po::options_description opts("Benchmark options"); + opts.add_options() + ("help", "show help message") + ("name", po::value(&log_name)->default_value("bench"), "log name") + ("width", po::value(&width)->default_value(10), "stripe width") + ("slots", po::value(&slots)->default_value(10), "object slots") + ("size", po::value(&entry_size)->default_value(1024), "entry size") + ("qdepth", po::value(&qdepth)->default_value(1), "queue depth") + ("excl", po::bool_switch(&excl_open), "exclusive open") + ("verify", po::bool_switch(&verify), "verify writes") + ("runtime", po::value(&runtime)->default_value(0), "runtime") + + ("backend", po::value(&backend)->required(), "backend") + ("pool", po::value(&pool)->default_value("zlog"), "pool (ceph)") + ("db-path", po::value(&db_path)->default_value("/tmp/zlog.bench.db"), "db path (lmdb)") + ("blackhole", po::bool_switch(&blackhole), "black hole (ram)") + ; + + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, opts), vm); + + if (vm.count("help")) { + std::cout << opts << std::endl; + return 1; + } + + po::notify(vm); + + runtime = std::max(runtime, 0); zlog::Options options; - options.backend = std::move(backend); + options.backend_name = backend; + + if (backend == "ceph") { + options.backend_options["pool"] = pool; + // zero-length string here causes default path search + options.backend_options["conf_file"] = ""; + } + + if (backend == "lmdb") { + options.backend_options["path"] = db_path; + } + + if (backend == "ram") { + if (blackhole) { + options.backend_options["blackhole"] = "true"; + } + } + options.create_if_missing = true; - options.error_if_exists = true; - options.init_stripe_on_create = true; + options.error_if_exists = excl_open; + + options.stripe_width = width; + options.stripe_slots = slots; + options.max_inflight_ops = qdepth; zlog::Log *log; - int ret = zlog::Log::Open(options, "mylog", &log); - assert(ret == 0); - -#if 0 - for (int i = 0; i < 100000; i++) { - uint64_t pos; - int ret = log->Append("data", &pos); - assert(ret == 0); - std::cout << pos << std::endl; - } -#else - std::mutex lock; - for (int i = 0; i < 50000; i++) { - int ret = log->appendAsync("data", [&](int ret, uint64_t pos) { - std::lock_guard lk(lock); - std::cout << pos << " " << ret << std::endl; + int ret = zlog::Log::Open(options, log_name, &log); + if (ret) { + std::cerr << "log::open failed: " << strerror(-ret) << std::endl; + return -1; + } + + signal(SIGINT, sig_handler); + signal(SIGALRM, sig_handler); + alarm(runtime); + + rand_data_gen dgen(1ULL << 22, entry_size); + dgen.generate(); + + // TODO: by always logging the same entry data we may trigger low-level + // compression to take affect, if such a thing exists. something to be aware + // of and watch out for. + const auto entry_data = std::string(dgen.sample(), entry_size); + + std::thread stats_thread(stats_entry); + + op_count = 0; + while (!shutdown) { + int ret = log->appendAsync(entry_data, [&](int ret, uint64_t pos) { + if (ret && ret != -ESHUTDOWN) { + std::cerr << "appendAsync cb failed: " << strerror(-ret) << std::endl; + assert(0); + return; + } + op_count++; }); - assert(ret == 0); + if (ret) { + std::cerr << "appendAsync failed: " << strerror(-ret) << std::endl; + assert(0); + break; + } + } + + shutdown = true; + cond.notify_one(); + stats_thread.join(); + + if (verify) { + uint64_t tail; + auto ret = log->CheckTail(&tail); + if (ret) { + std::cerr << "checktail failed: " << strerror(-ret) << std::endl; + } else { + for (uint64_t pos = 0; pos < tail; pos++) { + std::string data; + ret = log->Read(pos, &data); + if (ret) { + std::cerr << "read failed at pos " << pos << ": " << strerror(-ret) << std::endl; + } else if (data != entry_data) { + std::cerr << "verify failed at pos " << pos << std::endl; + assert(0); + } + } + } } -#endif - // will wait for async ops to run callbacks - std::cout << "done looping" << std::endl; - sleep(3); - std::cout << "done sleeping" << std::endl; delete log; return 0; diff --git a/src/include/zlog/backend/ram.h b/src/include/zlog/backend/ram.h index da1f3af..23c6acd 100644 --- a/src/include/zlog/backend/ram.h +++ b/src/include/zlog/backend/ram.h @@ -15,6 +15,7 @@ namespace ram { class RAMBackend : public Backend { public: RAMBackend() : + blackhole_(false), options_{{"scheme", "ram"}} {} @@ -97,8 +98,9 @@ class RAMBackend : public Backend { private: mutable std::mutex lock_; + bool blackhole_; std::map options_; - std::map> objects_; }; diff --git a/src/include/zlog/options.h b/src/include/zlog/options.h index 7bdfdc9..14c3d6c 100644 --- a/src/include/zlog/options.h +++ b/src/include/zlog/options.h @@ -46,9 +46,6 @@ struct Options { // advanced int max_refresh_views_read = 20; - int width = 10; - int entries_per_object = 200; - Statistics* statistics = nullptr; std::vector http; diff --git a/src/libzlog/striper.cc b/src/libzlog/striper.cc index b9295b3..c612c20 100644 --- a/src/libzlog/striper.cc +++ b/src/libzlog/striper.cc @@ -343,6 +343,10 @@ int Striper::propose_sequencer() auto& stripe = it->second; int ret = seal_stripe(stripe, next_epoch, &max_pos, &empty); if (ret < 0) { + if (ret == -ESPIPE) { + update_current_view(v.epoch()); + return 0; + } return ret; } @@ -363,6 +367,10 @@ int Striper::propose_sequencer() auto& stripe = it->second; int ret = seal_stripe(stripe, next_epoch, nullptr, nullptr); if (ret < 0) { + if (ret == -ESPIPE) { + update_current_view(v.epoch()); + return 0; + } return ret; } } diff --git a/src/storage/ram/ram.cc b/src/storage/ram/ram.cc index dd79b9a..00a5220 100644 --- a/src/storage/ram/ram.cc +++ b/src/storage/ram/ram.cc @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -17,6 +18,11 @@ RAMBackend::~RAMBackend() int RAMBackend::Initialize( const std::map& opts) { + auto it = opts.find("blackhole"); + if (it != opts.end()) { + blackhole_ = boost::iequals(it->second, "yes") || + boost::iequals(it->second, "true"); + } return 0; } @@ -306,7 +312,9 @@ int RAMBackend::Write(const std::string& oid, const std::string& data, auto it = lobj->entries.find(position); if (it == lobj->entries.end()) { LogEntry entry; - entry.data = data; + if (!blackhole_) { + entry.data = data; + } lobj->entries.emplace(position, entry); lobj->maxpos = std::max(lobj->maxpos, position); return 0; diff --git a/src/storage/test_backend.cc b/src/storage/test_backend.cc index 90d47c1..8ae12f7 100644 --- a/src/storage/test_backend.cc +++ b/src/storage/test_backend.cc @@ -744,6 +744,8 @@ TEST_F(BackendTest, ListHeads) { std::vector output; ASSERT_EQ(backend->ListHeads(output), 0); + std::sort(output.begin(), output.end()); + ASSERT_EQ(output, expected); } @@ -759,8 +761,11 @@ TEST_F(BackendTest, ListLinks) { ASSERT_EQ(backend->CreateLog("lastOne", "", nullptr, nullptr), 0); std::vector expected = { "head.another_log", "head.lastOne", "head.log1" }; + std::sort(expected.begin(), expected.end()); std::vector output; ASSERT_EQ(backend->ListLinks(output), 0); + std::sort(output.begin(), output.end()); + ASSERT_EQ(output, expected); } diff --git a/src/zlog.cc b/src/zlog.cc new file mode 100644 index 0000000..9a6dff0 --- /dev/null +++ b/src/zlog.cc @@ -0,0 +1,100 @@ +#include +#include +#include +#include +#include "zlog/backend.h" +#include "zlog/options.h" +#include "libzlog/striper.h" +#include "proto/zlog.pb.h" + +namespace po = boost::program_options; + +int main(int argc, char **argv) +{ + std::string log_name; + std::string backend_name; + std::string pool; + std::string db_path; + + po::options_description opts("Benchmark options"); + opts.add_options() + ("help", "show help message") + ("name", po::value(&log_name)->default_value("bench"), "log name") + ("backend", po::value(&backend_name)->required(), "backend") + ("pool", po::value(&pool)->default_value("zlog"), "pool (ceph)") + ("db-path", po::value(&db_path)->default_value("/tmp/zlog.bench.db"), "db path (lmdb)") + ; + + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, opts), vm); + + if (vm.count("help")) { + std::cout << opts << std::endl; + return 1; + } + + po::notify(vm); + + zlog::Options options; + options.backend_name = backend_name; + + if (backend_name == "ceph") { + options.backend_options["pool"] = pool; + // zero-length string here causes default path search + options.backend_options["conf_file"] = ""; + } else if (backend_name == "lmdb") { + options.backend_options["path"] = db_path; + } + + std::shared_ptr backend; + int ret = zlog::Backend::Load(options.backend_name, + options.backend_options, backend); + if (ret) { + std::cerr << "backend::load " << ret << std::endl; + return ret; + } + + std::string hoid; + std::string prefix; + ret = backend->OpenLog(log_name, &hoid, &prefix); + if (ret) { + std::cerr << "backend::openlog " << ret << std::endl; + return ret; + } + + uint64_t epoch = 1; + while (true) { + std::map views; + ret = backend->ReadViews(hoid, epoch, 1, &views); + if (ret) { + std::cerr << "read views error " << ret << std::endl; + return ret; + } + + if (views.empty()) { + break; + } + + assert(views.size() == 1u); + auto it = views.find(epoch); + assert(it != views.end()); + + zlog_proto::View view_src; + if (!view_src.ParseFromString(it->second)) { + assert(0); + exit(1); + } + + auto view = std::make_shared(prefix, it->first, view_src); + + std::cout << "view@" << view->epoch() << std::endl; + for (auto it : view->object_map.stripes()) { + std::cout << " stripe@" << it.second.id() << " [" << it.first + << ", " << it.second.max_position() << "]" << std::endl; + } + + epoch++; + } + + return 0; +}