Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support histograms for command latency statistics #2721

Merged
merged 21 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ bind 127.0.0.1
# unixsocket /tmp/kvrocks.sock
# unixsocketperm 777

# Allows a parent process to open a socket and pass its FD down to kvrocks as a child
# Allows a parent process to open a socket and pass its FD down to kvrocks as a child
# process. Useful to reserve a port and prevent race conditions.
#
# PLEASE NOTE:
# If this is overridden to a value other than -1, the bind and tls* directives will be
#
# PLEASE NOTE:
# If this is overridden to a value other than -1, the bind and tls* directives will be
# ignored.
#
#
# Default: -1 (not overridden, defer to creating a connection to the specified port)
socket-fd -1

Expand Down Expand Up @@ -369,10 +369,22 @@ json-storage-format json
# NOTE: This is an experimental feature. If you find errors, performance degradation,
# excessive memory usage, excessive disk I/O, etc. after enabling it, please try disabling it.
# At the same time, we welcome feedback on related issues to help iterative improvements.
#
#
# Default: no
txn-context-enabled no

# Define the histogram bucket values.
#
# If enabled, those values will be used to store the command execution latency values
# in buckets defined below. The values should be integers and must be sorted.
# An implicit bucket (+Inf in prometheus jargon) will be added to track the highest values
# that are beyond the bucket limits.

# NOTE: This is an experimental feature. There might be some performance overhead when using this
# feature, please be aware.
# Default: disabled
# histogram-bucket-boundaries 10,20,40,60,80,100,150,250,350,500,750,1000,1500,2000,4000,8000

################################## TLS ###################################

# By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0.
Expand Down Expand Up @@ -1031,7 +1043,7 @@ rocksdb.partition_filters yes
# Specifies the maximum size in bytes for a write batch in RocksDB.
# If set to 0, there is no size limit for write batches.
# This option can help control memory usage and manage large WriteBatch operations more effectively.
#
#
# Default: 0
# rocksdb.write_options.write_batch_max_bytes 0

Expand Down
20 changes: 20 additions & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ Config::Config() {
new EnumField<JsonStorageFormat>(&json_storage_format, json_storage_formats, JsonStorageFormat::JSON)},
{"txn-context-enabled", true, new YesNoField(&txn_context_enabled, false)},
{"skip-block-cache-deallocation-on-close", false, new YesNoField(&skip_block_cache_deallocation_on_close, false)},
{"histogram-bucket-boundaries", true, new StringField(&histogram_bucket_boundaries_str_, "")},

/* rocksdb options */
{"rocksdb.compression", false,
Expand Down Expand Up @@ -755,6 +756,25 @@ void Config::initFieldCallback() {
{"tls-session-cache-size", set_tls_option},
{"tls-session-cache-timeout", set_tls_option},
#endif
{"histogram-bucket-boundaries",
[this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k, const std::string &v) -> Status {
std::vector<std::string> buckets = util::Split(v, ",");
histogram_bucket_boundaries.clear();
if (buckets.size() < 1) {
return Status::OK();
}
for (const auto &bucket_val : buckets) {
auto parse_result = ParseFloat<double>(bucket_val);
if (!parse_result) {
return {Status::NotOK, "The values in the bucket list must be double or integer."};
}
histogram_bucket_boundaries.push_back(*parse_result);
}
if (!std::is_sorted(histogram_bucket_boundaries.begin(), histogram_bucket_boundaries.end())) {
return {Status::NotOK, "The values for the histogram must be sorted."};
}
return Status::OK();
}},
};
for (const auto &iter : callbacks) {
auto field_iter = fields_.find(iter.first);
Expand Down
3 changes: 3 additions & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ struct Config {

bool skip_block_cache_deallocation_on_close = false;

std::vector<double> histogram_bucket_boundaries;

struct RocksDB {
int block_size;
bool cache_index_and_filter_blocks;
Expand Down Expand Up @@ -260,6 +262,7 @@ struct Config {
std::string profiling_sample_commands_str_;
std::map<std::string, std::unique_ptr<ConfigField>> fields_;
std::vector<std::string> rename_command_;
std::string histogram_bucket_boundaries_str_;

void initFieldValidator();
void initFieldCallback();
Expand Down
34 changes: 33 additions & 1 deletion src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,30 @@
#include "worker.h"

Server::Server(engine::Storage *storage, Config *config)
: storage(storage),
: stats(config->histogram_bucket_boundaries),
storage(storage),
indexer(storage),
index_mgr(&indexer, storage),
start_time_secs_(util::GetTimeStamp()),
config_(config),
namespace_(storage) {
// init commands stats here to prevent concurrent insert, and cause core
auto commands = redis::CommandTable::GetOriginal();

for (const auto &iter : *commands) {
stats.commands_stats[iter.first].calls = 0;
stats.commands_stats[iter.first].latency = 0;

if (stats.bucket_boundaries.size() > 0) {
// NB: Extra index for the last bucket (Inf)
for (std::size_t i{0}; i <= stats.bucket_boundaries.size(); ++i) {
auto bucket_ptr = std::make_shared<std::atomic<uint64_t>>(0);

stats.commands_histogram[iter.first].buckets.push_back(bucket_ptr);
}
stats.commands_histogram[iter.first].calls = 0;
stats.commands_histogram[iter.first].sum = 0;
}
}

// init cursor_dict_
Expand Down Expand Up @@ -1165,6 +1178,25 @@ void Server::GetCommandsStatsInfo(std::string *info) {
<< ",usec_per_call=" << static_cast<float>(latency / calls) << "\r\n";
}

for (const auto &cmd_hist : stats.commands_histogram) {
auto command_name = cmd_hist.first;
auto calls = stats.commands_histogram[command_name].calls.load();
if (calls == 0) continue;

auto sum = stats.commands_histogram[command_name].sum.load();
string_stream << "cmdstathist_" << command_name << ":";
for (std::size_t i{0}; i < stats.commands_histogram[command_name].buckets.size(); ++i) {
auto bucket_value = stats.commands_histogram[command_name].buckets[i]->load();
auto bucket_bound = std::numeric_limits<double>::infinity();
if (i < stats.bucket_boundaries.size()) {
bucket_bound = stats.bucket_boundaries[i];
}

string_stream << bucket_bound << "=" << bucket_value << ",";
}
string_stream << "sum=" << sum << ",count=" << calls << "\r\n";
}

*info = string_stream.str();
}

Expand Down
14 changes: 13 additions & 1 deletion src/stats/stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
#include "fmt/format.h"
#include "time_util.h"

Stats::Stats() {
Stats::Stats(std::vector<double> bucket_boundaries) : bucket_boundaries(bucket_boundaries) {
rabunkosar-dd marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < STATS_METRIC_COUNT; i++) {
InstMetric im;
im.last_sample_time_ms = 0;
Expand Down Expand Up @@ -86,10 +86,22 @@ int64_t Stats::GetMemoryRSS() {
void Stats::IncrCalls(const std::string &command_name) {
total_calls.fetch_add(1, std::memory_order_relaxed);
commands_stats[command_name].calls.fetch_add(1, std::memory_order_relaxed);

if (bucket_boundaries.size() > 0) {
commands_histogram[command_name].calls.fetch_add(1, std::memory_order_relaxed);
}
}

void Stats::IncrLatency(uint64_t latency, const std::string &command_name) {
commands_stats[command_name].latency.fetch_add(latency, std::memory_order_relaxed);

if (bucket_boundaries.size() > 0) {
commands_histogram[command_name].sum.fetch_add(latency, std::memory_order_relaxed);

const auto bucket_index = static_cast<std::size_t>(std::distance(
bucket_boundaries.begin(), std::lower_bound(bucket_boundaries.begin(), bucket_boundaries.end(), latency)));
commands_histogram[command_name].buckets[bucket_index]->fetch_add(1, std::memory_order_relaxed);
}
}

void Stats::TrackInstantaneousMetric(int metric, uint64_t current_reading) {
Expand Down
17 changes: 16 additions & 1 deletion src/stats/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@

#include <unistd.h>

#include <algorithm>
#include <atomic>
#include <map>
#include <shared_mutex>
#include <string>
#include <vector>

#include "config/config.h"
rabunkosar-dd marked this conversation as resolved.
Show resolved Hide resolved

enum StatsMetricFlags {
STATS_METRIC_COMMAND = 0, // Number of commands executed
STATS_METRIC_NET_INPUT, // Bytes read to network
Expand All @@ -43,6 +46,13 @@ enum StatsMetricFlags {

constexpr int STATS_METRIC_SAMPLES = 16; // Number of samples per metric

// Experimental part to support histograms for cmd statistics
struct CommandHistogram {
std::vector<std::shared_ptr<std::atomic<uint64_t>>> buckets;
PragmaTwice marked this conversation as resolved.
Show resolved Hide resolved
std::atomic<uint64_t> calls;
std::atomic<uint64_t> sum;
};

struct CommandStat {
std::atomic<uint64_t> calls;
std::atomic<uint64_t> latency;
Expand All @@ -69,7 +79,12 @@ class Stats {
std::atomic<uint64_t> psync_ok_count = {0};
std::map<std::string, CommandStat> commands_stats;

Stats();
using BucketBoundaries = std::vector<double>;
BucketBoundaries bucket_boundaries;
std::map<std::string, CommandHistogram> commands_histogram;

explicit Stats(std::vector<double> histogram_bucket_boundaries);

void IncrCalls(const std::string &command_name);
void IncrLatency(uint64_t latency, const std::string &command_name);
void IncrInboundBytes(uint64_t bytes) { in_bytes.fetch_add(bytes, std::memory_order_relaxed); }
Expand Down
2 changes: 2 additions & 0 deletions tests/cppunit/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ TEST(Config, GetAndSet) {
{"rocksdb.rate_limiter_auto_tuned", "yes"},
{"rocksdb.compression_level", "32767"},
{"rocksdb.wal_compression", "no"},
{"histogram-bucket-boundaries", "10,100,1000,10000"},

};
for (const auto &iter : immutable_cases) {
s = config.Set(nullptr, iter.first, iter.second);
Expand Down
23 changes: 22 additions & 1 deletion tests/gocase/unit/info/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"fmt"
"strconv"
"strings"
"testing"
"time"

Expand All @@ -33,7 +34,10 @@ import (
)

func TestInfo(t *testing.T) {
srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
srv0 := util.StartServer(t, map[string]string{
"cluster-enabled": "yes",
"histogram-bucket-boundaries": "10,20,30,50",
})
defer func() { srv0.Close() }()
rdb0 := srv0.NewClient()
defer func() { require.NoError(t, rdb0.Close()) }()
Expand Down Expand Up @@ -102,6 +106,23 @@ func TestInfo(t *testing.T) {
t.Run("get cluster information by INFO - cluster enabled", func(t *testing.T) {
require.Equal(t, "1", util.FindInfoEntry(rdb0, "cluster_enabled", "cluster"))
})

t.Run("get command latencies via histogram INFO - histogram-bucket-boundaries", func(t *testing.T) {
output := util.FindInfoEntry(rdb0, "cmdstathist", "cmdstathist_info")
if len(output) == 0 {
t.SkipNow()
}

splitValues := strings.FieldsFunc(output, func(r rune) bool {
return r == '=' || r == ','
})

// expected: 10=..,20=..,30=..,50=..,inf=..,sum=...,count=..
require.GreaterOrEqual(t, len(splitValues), 15)
require.Contains(t, splitValues, "sum")
require.Contains(t, splitValues, "count")
require.Contains(t, splitValues, "info")
})
}

func TestKeyspaceHitMiss(t *testing.T) {
Expand Down
Loading