Skip to content

Commit

Permalink
bench: send a batch early if no backoff is needed
Browse files Browse the repository at this point in the history
  • Loading branch information
nerdroychan committed Sep 16, 2024
1 parent c0726a7 commit 8912806
Showing 1 changed file with 52 additions and 34 deletions.
86 changes: 52 additions & 34 deletions src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ pub(crate) fn init(text: &str) -> (BenchKVMap, Vec<Arc<Benchmark>>) {
(map, phases)
}

fn bench_phase_should_break(
fn bench_repeat_should_break(
len: &Length,
count: u64,
start: Instant,
Expand Down Expand Up @@ -631,28 +631,21 @@ fn bench_stat_final(
/// throughput is achieved. Otherwise, it does nothing.
struct RateLimiter {
ops: u64,
start: Instant,
}

impl RateLimiter {
fn new(kops: u64, nr_threads: usize) -> Self {
let ops = match kops {
0 => 0,
r => {
let per = r * 1000 / u64::try_from(nr_threads).unwrap() + 1;
per
}
};
Self { ops }
fn new(kops: u64, nr_threads: usize, start: Instant) -> Self {
assert!(kops > 0);
let ops = kops * 1000 / u64::try_from(nr_threads).unwrap() + 1;
Self { ops, start }
}

/// Returns whether the backoff is done.
#[inline(always)]
fn try_backoff(&self, count: u64, start: Instant) -> bool {
if self.ops == 0 {
return true;
}
fn try_backoff(&self, count: u64) -> bool {
// self.kops is the target rate in kops, which is op/ms
let elapsed = u64::try_from(start.elapsed().as_nanos()).unwrap();
let elapsed = u64::try_from(self.start.elapsed().as_nanos()).unwrap();
let ops = count * 1_000_000_000 / elapsed;
if ops <= self.ops {
return true;
Expand All @@ -662,15 +655,9 @@ impl RateLimiter {

/// Blocking backoff.
#[inline(always)]
fn backoff(&self, count: u64, start: Instant) {
if self.ops == 0 {
return;
}
// self.kops is the target rate in kops, which is op/ms
fn backoff(&self, count: u64) {
loop {
let elapsed = u64::try_from(start.elapsed().as_nanos()).unwrap();
let ops = count * 1_000_000_000 / elapsed;
if ops <= self.ops {
if self.try_backoff(count) {
break;
}
}
Expand Down Expand Up @@ -701,17 +688,23 @@ fn bench_worker_regular(map: Arc<Box<dyn KVMap>>, context: WorkerContext) {
Some(_) => || Some(Instant::now()),
None => || None,
};
let rate_limiter = RateLimiter::new(benchmark.rate_limit, thread_info.1);

let mut handle = map.handle();
let mut rng = rand::thread_rng();
let mut workload = Workload::new(&benchmark.wopt, Some(thread_info));
let start = Instant::now(); // for thread 0
let start = Instant::now();

for i in 0..benchmark.repeat {
let counter = measurements[id].counters[i].reference();
// start the benchmark phase at roughly the same time
barrier.wait();
let start = Instant::now();

let rate_limiter = match benchmark.rate_limit {
0 => None,
r => Some(RateLimiter::new(r, thread_info.1, start)),
};

// start benchmark
loop {
let op = workload.next(&mut rng);
Expand All @@ -736,11 +729,12 @@ fn bench_worker_regular(map: Arc<Box<dyn KVMap>>, context: WorkerContext) {
}
*counter += 1;

// internally it will not do anything if no rate limiter is in place
rate_limiter.backoff(*counter, start);
if let Some(r) = &rate_limiter {
r.backoff(*counter);
}

// check if we need to break
if bench_phase_should_break(&benchmark.len, *counter, start, &mut workload) {
if bench_repeat_should_break(&benchmark.len, *counter, start, &mut workload) {
workload.reset();
break;
}
Expand Down Expand Up @@ -810,7 +804,6 @@ fn bench_worker_async(map: Arc<Box<dyn AsyncKVMap>>, context: WorkerContext) {
true => Some(measurements[id].latency.lock()),
false => None,
};
let rate_limiter = RateLimiter::new(benchmark.rate_limit, thread_info.1);

let responder = Rc::new(RefCell::new(Vec::<Response>::new()));
let mut handle = map.handle(responder.clone());
Expand All @@ -820,12 +813,19 @@ fn bench_worker_async(map: Arc<Box<dyn AsyncKVMap>>, context: WorkerContext) {
let mut pending = 0usize;
let mut requests = Vec::<Request>::with_capacity(benchmark.batch);
let mut rid = 0usize;
let start = Instant::now(); // for thread 0
let start = Instant::now();

for i in 0..benchmark.repeat {
let counter = measurements[id].counters[i].reference();
// start the benchmark phase at roughly the same time
barrier.wait();
let start = Instant::now();

let rate_limiter = match benchmark.rate_limit {
0 => None,
r => Some(RateLimiter::new(r, thread_info.1, start)),
};

// start benchmark
loop {
// first clear the requests vector
Expand All @@ -839,9 +839,17 @@ fn bench_worker_async(map: Arc<Box<dyn AsyncKVMap>>, context: WorkerContext) {
// otherwise the last check may fail because the time check is after a certain
// interval, but the mod is never 0
*counter += 1;
if bench_phase_should_break(&benchmark.len, *counter, start, &mut workload) {
// stop the batch generation if the repeat is done
if bench_repeat_should_break(&benchmark.len, *counter, start, &mut workload) {
break;
}
// if a rate limiter is in place and no further backoff is needed, break early to
// send the batch immediately
if let Some(r) = &rate_limiter {
if r.try_backoff(*counter) {
break;
}
}
}
// now we have a batch, send it all, whatever its size is
let len = requests.len();
Expand All @@ -854,7 +862,7 @@ fn bench_worker_async(map: Arc<Box<dyn AsyncKVMap>>, context: WorkerContext) {
}
}

if bench_phase_should_break(&benchmark.len, *counter, start, &mut workload) {
if bench_repeat_should_break(&benchmark.len, *counter, start, &mut workload) {
workload.reset();
break;
}
Expand All @@ -874,8 +882,18 @@ fn bench_worker_async(map: Arc<Box<dyn AsyncKVMap>>, context: WorkerContext) {
}
}
}
// if the pending queue is under depth (can be 0), and backoff is done
if pending <= benchmark.qd && rate_limiter.try_backoff(*counter, start) {
let backoff_free = match &rate_limiter {
None => true,
Some(r) => {
if r.try_backoff(*counter) {
true
} else {
false
}
}
};
// if the pending queue is under depth (can be 0) and no further backoff is needed
if pending <= benchmark.qd && backoff_free {
break;
}
}
Expand Down

0 comments on commit 8912806

Please sign in to comment.