Skip to content

Commit

Permalink
workload: add shuffle(p) key distribution
Browse files Browse the repository at this point in the history
  • Loading branch information
nerdroychan committed Jul 30, 2024
1 parent 7a7abd3 commit 9ef4cda
Showing 1 changed file with 142 additions and 41 deletions.
183 changes: 142 additions & 41 deletions src/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::Operation;
use figment::providers::{Env, Format, Toml};
use figment::Figment;
use rand::distributions::{Distribution, Uniform, WeightedIndex};
use rand::prelude::SliceRandom;
use rand::Rng;
use serde::Deserialize;
use zipf::ZipfDistribution;
Expand Down Expand Up @@ -45,6 +46,7 @@ impl Mix {
#[derive(Debug)]
enum KeyDistribution {
Increment,
Shuffle(Vec<usize>), // be careful about its size
Uniform(Uniform<usize>),
Zipfian(ZipfDistribution, usize),
// File,
Expand All @@ -58,19 +60,21 @@ struct KeyGenerator {
min: usize,
max: usize,
keyspace: usize,
dist: KeyDistribution,
serial: usize,
dist: KeyDistribution,
}

impl KeyGenerator {
fn new(len: usize, min: usize, max: usize, dist: KeyDistribution) -> Self {
let keyspace = max - min;
let serial = 0;
Self {
len,
min,
max,
keyspace: max - min,
keyspace,
serial,
dist,
serial: 0,
}
}

Expand All @@ -79,13 +83,20 @@ impl KeyGenerator {
Self::new(len, min, max, dist)
}

fn new_shuffle(len: usize, min: usize, max: usize) -> Self {
let mut shuffle = (0..(max - min)).collect::<Vec<usize>>();
shuffle.shuffle(&mut rand::thread_rng());
let dist = KeyDistribution::Shuffle(shuffle);
Self::new(len, min, max, dist)
}

fn new_uniform(len: usize, min: usize, max: usize) -> Self {
let dist = KeyDistribution::Uniform(Uniform::new(0, max - min));
Self::new(len, min, max, dist)
}

fn new_zipfian(len: usize, min: usize, max: usize, theta: f64, hotspot: f64) -> Self {
let hotspot = (hotspot * (max - min) as f64) as usize; // approx location for discrete keys
let hotspot = (hotspot * (max - min - 1) as f64) as usize; // approx location for discrete keys
let dist =
KeyDistribution::Zipfian(ZipfDistribution::new(max - min, theta).unwrap(), hotspot);
Self::new(len, min, max, dist)
Expand All @@ -94,10 +105,12 @@ impl KeyGenerator {
fn next(&mut self, rng: &mut impl Rng) -> Box<[u8]> {
let key = match self.dist {
KeyDistribution::Increment => self.serial % self.keyspace,
KeyDistribution::Shuffle(ref shuffle) => shuffle[self.serial % self.keyspace],
KeyDistribution::Uniform(dist) => dist.sample(rng),
KeyDistribution::Zipfian(dist, hotspot) => {
// zipf starts at 1
(dist.sample(rng) + hotspot - 1) % self.keyspace
} // zipf starts at 1
}
} + self.min;
self.serial += 1;
assert!(key < self.max);
Expand Down Expand Up @@ -141,7 +154,13 @@ pub struct WorkloadOpt {
/// Key distribution.
///
/// - "increment": sequentially incrementing from `kmin` to `kmax`.
/// - "incrementp": partitioned `increment`, where each thread takes a range of keys.
/// - "incrementp": partitioned `increment`, where each thread takes a range of keys. For
/// example, if there are two threads with `kmin` of 0 and `kmax` of 10, one thread will get an
/// "increment" distribution from 0 to 5, and another one will get 6 to 10.
/// - "shuffle": shuffled sequence from `kmin` to `kmax`. One key appears exactly once during
/// an iteration of the whole key space. This is useful to randomly prefill keys.
/// - "shufflep": partitioned `shuffle`, similar to "incrementp" but the keys are shuffled for
/// each range/thread.
/// - "uniform": uniformly random keys from `kmin` to `kmax`.
/// - "zipfian": random keys from `kmin` to `kmax` following Zipfian distribution.
pub dist: String,
Expand Down Expand Up @@ -188,21 +207,31 @@ impl Workload {
assert!(klen > 0, "klen should be positive");
assert!(kmax > kmin, "kmax should be greater than kmin");

let split_key_space = || {
let (thread_id, nr_threads) = thread_info.expect("parallel keygen expects thread_info");
assert!(thread_id < nr_threads);
let nr_keys_per = (kmax - kmin) / nr_threads;
let kminp = kmin + thread_id * nr_keys_per;
let kmaxp = if thread_id == nr_threads - 1 {
kmax
} else {
kminp + nr_keys_per
};
(kminp, kmaxp)
};

let mix = Mix::new(opt.set_perc, opt.get_perc, opt.del_perc);
let kgen = match opt.dist.as_str() {
"increment" => KeyGenerator::new_increment(klen, kmin, kmax),
"incrementp" => {
let (thread_id, nr_threads) = thread_info.expect("incrementp expects thread info");
assert!(thread_id < nr_threads);
let nr_keys_per = (kmax - kmin) / nr_threads;
let kminp = kmin + thread_id * nr_keys_per;
let kmaxp = if thread_id == nr_threads - 1 {
kmax
} else {
kminp + nr_keys_per
};
let (kminp, kmaxp) = split_key_space();
KeyGenerator::new_increment(klen, kminp, kmaxp)
}
"shuffle" => KeyGenerator::new_shuffle(klen, kmin, kmax),
"shufflep" => {
let (kminp, kmaxp) = split_key_space();
KeyGenerator::new_shuffle(klen, kminp, kmaxp)
}
"uniform" => KeyGenerator::new_uniform(klen, kmin, kmax),
"zipfian" => {
let theta = opt.zipf_theta.unwrap_or(1.0f64);
Expand Down Expand Up @@ -255,7 +284,7 @@ impl Workload {
#[cfg(test)]
mod tests {
use super::*;
use hashbrown::HashMap;
use hashbrown::{HashMap, HashSet};
use quanta::Instant;

#[test]
Expand Down Expand Up @@ -309,6 +338,28 @@ mod tests {
}
}

#[test]
fn keygen_shuffle() {
let start = 117;
let end = 135423;
let mut rng = rand::thread_rng();
let mut kgen = KeyGenerator::new_shuffle(8, start, end);
let mut dist: HashSet<Box<[u8]>> = HashSet::new();
for _ in start..end {
let key = kgen.next(&mut rng);
// the key is newly added, not repeated
assert!(dist.insert(key.clone()));
}
// each key exists exactly once
assert_eq!(dist.len(), end - start);
let min = dist.iter().min().clone().unwrap();
let min_bytes = Box::from(start.to_be_bytes());
assert_eq!(*min, min_bytes);
let max = dist.iter().max().clone().unwrap();
let max_bytes = Box::from((end - 1).to_be_bytes());
assert_eq!(*max, max_bytes);
}

#[test]
fn keygen_uniform() {
let mut rng = rand::thread_rng();
Expand Down Expand Up @@ -383,37 +434,67 @@ mod tests {
let _ = kgen.next(&mut rng);
}
println!("increment time: {} ms", t.elapsed().as_millis());
let mut kgen = KeyGenerator::new_shuffle(8, 0, 1000);
let t = Instant::now();
for _ in 0..N {
let _ = kgen.next(&mut rng);
}
println!("shuffle time: {} ms", t.elapsed().as_millis());
}

#[test]
fn workloadopt_toml_correct() {
fn workload_toml_correct() {
let s = r#"set_perc = 70
get_perc = 20
del_perc = 10
klen = 4
vlen = 6
dist = "zipfian"
dist = "uniform"
kmin = 0
kmax = 12345"#;
let w = Workload::new_from_toml_str(s, None);
assert_eq!(w.kgen.len, 4);
assert_eq!(w.vlen, 6);
assert_eq!(w.kgen.min, 0);
assert_eq!(w.kgen.max, 12345);
assert!(matches!(w.kgen.dist, KeyDistribution::Uniform(_)));

let s = r#"set_perc = 70
get_perc = 20
del_perc = 10
klen = 4
vlen = 6
dist = "uniform"
klen = 40
vlen = 60
dist = "zipfian"
kmin = 0
kmax = 12345
zipf_theta = 1.0"#;
kmax = 123450
zipf_theta = 1.0
zipf_hotspot = 1.0"#;
let w = Workload::new_from_toml_str(s, None);
assert_eq!(w.kgen.len, 40);
assert_eq!(w.vlen, 60);
assert_eq!(w.kgen.min, 0);
assert_eq!(w.kgen.max, 123450);
assert!(matches!(w.kgen.dist, KeyDistribution::Zipfian(_, 123449)));

let s = r#"set_perc = 60
get_perc = 25
del_perc = 15
klen = 14
vlen = 16
dist = "shuffle"
kmin = 10000
kmax = 20000"#;
let w = Workload::new_from_toml_str(s, Some((1, 2)));
assert_eq!(w.kgen.len, 14);
assert_eq!(w.vlen, 16);
assert_eq!(w.kgen.min, 10000);
assert_eq!(w.kgen.max, 20000);
assert!(matches!(w.kgen.dist, KeyDistribution::Shuffle(_)));
}

#[test]
#[should_panic(expected = "should be positive")]
fn workloadopt_toml_invalid_wrong_size() {
fn workload_toml_invalid_wrong_size() {
let s = r#"set_perc = 60
get_perc = 40
del_perc = 0
Expand All @@ -427,7 +508,7 @@ mod tests {

#[test]
#[should_panic(expected = "should be specified")]
fn workloadopt_toml_invalid_missing_fields() {
fn workload_toml_invalid_missing_fields() {
let s = r#"set_perc = 60
get_perc = 40
del_perc = 0
Expand All @@ -439,7 +520,7 @@ mod tests {

#[test]
#[should_panic(expected = "should be greater")]
fn workloadopt_toml_invalid_wrong_keyspace() {
fn workload_toml_invalid_wrong_keyspace() {
let s = r#"set_perc = 60
get_perc = 40
del_perc = 0
Expand All @@ -453,7 +534,7 @@ mod tests {

#[test]
#[should_panic(expected = "should be 100")]
fn workloadopt_toml_invalid_wrong_mix() {
fn workload_toml_invalid_wrong_mix() {
let s = r#"set_perc = 70
get_perc = 40
del_perc = 0
Expand All @@ -466,8 +547,22 @@ mod tests {
}

#[test]
fn workload_increment_parallel() {
let opt = WorkloadOpt {
#[should_panic(expected = "invalid key distribution")]
fn workload_toml_invalid_key_distribution() {
let s = r#"set_perc = 70
get_perc = 30
del_perc = 0
klen = 4
vlen = 6
dist = "uniorm"
kmin = 0
kmax = 12345"#;
let _ = Workload::new_from_toml_str(s, None);
}

#[test]
fn workload_keygen_parallel() {
let mut opt = WorkloadOpt {
set_perc: 100,
get_perc: 0,
del_perc: 0,
Expand All @@ -479,18 +574,24 @@ mod tests {
zipf_theta: None,
zipf_hotspot: None,
};

let workload = Workload::new(&opt, Some((0, 3)));
assert_eq!(workload.kgen.min, 10000);
assert_eq!(workload.kgen.max, 14115);

let workload = Workload::new(&opt, Some((1, 3)));
assert_eq!(workload.kgen.min, 14115);
assert_eq!(workload.kgen.max, 18230);

let workload = Workload::new(&opt, Some((2, 3)));
assert_eq!(workload.kgen.min, 18230);
assert_eq!(workload.kgen.max, 22347); // 2 more keys
let test = |opt: &WorkloadOpt| {
let workload = Workload::new(&opt, Some((0, 3)));
assert_eq!(workload.kgen.min, 10000);
assert_eq!(workload.kgen.max, 14115);

let workload = Workload::new(&opt, Some((1, 3)));
assert_eq!(workload.kgen.min, 14115);
assert_eq!(workload.kgen.max, 18230);

let workload = Workload::new(&opt, Some((2, 3)));
assert_eq!(workload.kgen.min, 18230);
assert_eq!(workload.kgen.max, 22347); // 2 more keys
};
// incrementp
test(&opt);
// shufflep
opt.dist = "shufflep".to_string();
test(&opt);
}

#[test]
Expand Down

0 comments on commit 9ef4cda

Please sign in to comment.