diff --git a/src/bench.rs b/src/bench.rs index dbac8a6..b36f087 100644 --- a/src/bench.rs +++ b/src/bench.rs @@ -380,7 +380,7 @@ pub(crate) fn init(text: &str) -> (BenchKVMap, Vec>) { (map, phases) } -fn bench_phase_should_break( +fn bench_repeat_should_break( len: &Length, count: u64, start: Instant, @@ -631,28 +631,21 @@ fn bench_stat_final( /// throughput is achieved. Otherwise, it does nothing. struct RateLimiter { ops: u64, + start: Instant, } impl RateLimiter { - fn new(kops: u64, nr_threads: usize) -> Self { - let ops = match kops { - 0 => 0, - r => { - let per = r * 1000 / u64::try_from(nr_threads).unwrap() + 1; - per - } - }; - Self { ops } + fn new(kops: u64, nr_threads: usize, start: Instant) -> Self { + assert!(kops > 0); + let ops = kops * 1000 / u64::try_from(nr_threads).unwrap() + 1; + Self { ops, start } } /// Returns whether the backoff is done. #[inline(always)] - fn try_backoff(&self, count: u64, start: Instant) -> bool { - if self.ops == 0 { - return true; - } + fn try_backoff(&self, count: u64) -> bool { // self.kops is the target rate in kops, which is op/ms - let elapsed = u64::try_from(start.elapsed().as_nanos()).unwrap(); + let elapsed = u64::try_from(self.start.elapsed().as_nanos()).unwrap(); let ops = count * 1_000_000_000 / elapsed; if ops <= self.ops { return true; @@ -662,15 +655,9 @@ impl RateLimiter { /// Blocking backoff. #[inline(always)] - fn backoff(&self, count: u64, start: Instant) { - if self.ops == 0 { - return; - } - // self.kops is the target rate in kops, which is op/ms + fn backoff(&self, count: u64) { loop { - let elapsed = u64::try_from(start.elapsed().as_nanos()).unwrap(); - let ops = count * 1_000_000_000 / elapsed; - if ops <= self.ops { + if self.try_backoff(count) { break; } } @@ -701,17 +688,23 @@ fn bench_worker_regular(map: Arc>, context: WorkerContext) { Some(_) => || Some(Instant::now()), None => || None, }; - let rate_limiter = RateLimiter::new(benchmark.rate_limit, thread_info.1); let mut handle = map.handle(); let mut rng = rand::thread_rng(); let mut workload = Workload::new(&benchmark.wopt, Some(thread_info)); - let start = Instant::now(); // for thread 0 + let start = Instant::now(); + for i in 0..benchmark.repeat { let counter = measurements[id].counters[i].reference(); // start the benchmark phase at roughly the same time barrier.wait(); let start = Instant::now(); + + let rate_limiter = match benchmark.rate_limit { + 0 => None, + r => Some(RateLimiter::new(r, thread_info.1, start)), + }; + // start benchmark loop { let op = workload.next(&mut rng); @@ -736,11 +729,12 @@ fn bench_worker_regular(map: Arc>, context: WorkerContext) { } *counter += 1; - // internally it will not do anything if no rate limiter is in place - rate_limiter.backoff(*counter, start); + if let Some(r) = &rate_limiter { + r.backoff(*counter); + } // check if we need to break - if bench_phase_should_break(&benchmark.len, *counter, start, &mut workload) { + if bench_repeat_should_break(&benchmark.len, *counter, start, &mut workload) { workload.reset(); break; } @@ -810,7 +804,6 @@ fn bench_worker_async(map: Arc>, context: WorkerContext) { true => Some(measurements[id].latency.lock()), false => None, }; - let rate_limiter = RateLimiter::new(benchmark.rate_limit, thread_info.1); let responder = Rc::new(RefCell::new(Vec::::new())); let mut handle = map.handle(responder.clone()); @@ -820,12 +813,19 @@ fn bench_worker_async(map: Arc>, context: WorkerContext) { let mut pending = 0usize; let mut requests = Vec::::with_capacity(benchmark.batch); let mut rid = 0usize; - let start = Instant::now(); // for thread 0 + let start = Instant::now(); + for i in 0..benchmark.repeat { let counter = measurements[id].counters[i].reference(); // start the benchmark phase at roughly the same time barrier.wait(); let start = Instant::now(); + + let rate_limiter = match benchmark.rate_limit { + 0 => None, + r => Some(RateLimiter::new(r, thread_info.1, start)), + }; + // start benchmark loop { // first clear the requests vector @@ -839,9 +839,17 @@ fn bench_worker_async(map: Arc>, context: WorkerContext) { // otherwise the last check may fail because the time check is after a certain // interval, but the mod is never 0 *counter += 1; - if bench_phase_should_break(&benchmark.len, *counter, start, &mut workload) { + // stop the batch generation if the repeat is done + if bench_repeat_should_break(&benchmark.len, *counter, start, &mut workload) { break; } + // if a rate limiter is in place and no further backoff is needed, break early to + // send the batch immediately + if let Some(r) = &rate_limiter { + if r.try_backoff(*counter) { + break; + } + } } // now we have a batch, send it all, whatever its size is let len = requests.len(); @@ -854,7 +862,7 @@ fn bench_worker_async(map: Arc>, context: WorkerContext) { } } - if bench_phase_should_break(&benchmark.len, *counter, start, &mut workload) { + if bench_repeat_should_break(&benchmark.len, *counter, start, &mut workload) { workload.reset(); break; } @@ -874,8 +882,18 @@ fn bench_worker_async(map: Arc>, context: WorkerContext) { } } } - // if the pending queue is under depth (can be 0), and backoff is done - if pending <= benchmark.qd && rate_limiter.try_backoff(*counter, start) { + let backoff_free = match &rate_limiter { + None => true, + Some(r) => { + if r.try_backoff(*counter) { + true + } else { + false + } + } + }; + // if the pending queue is under depth (can be 0) and no further backoff is needed + if pending <= benchmark.qd && backoff_free { break; } }