diff --git a/src/workload.rs b/src/workload.rs index faf3fc1..10b7acd 100644 --- a/src/workload.rs +++ b/src/workload.rs @@ -4,6 +4,7 @@ use crate::Operation; use figment::providers::{Env, Format, Toml}; use figment::Figment; use rand::distributions::{Distribution, Uniform, WeightedIndex}; +use rand::prelude::SliceRandom; use rand::Rng; use serde::Deserialize; use zipf::ZipfDistribution; @@ -45,6 +46,7 @@ impl Mix { #[derive(Debug)] enum KeyDistribution { Increment, + Shuffle(Vec), // be careful about its size Uniform(Uniform), Zipfian(ZipfDistribution, usize), // File, @@ -58,19 +60,21 @@ struct KeyGenerator { min: usize, max: usize, keyspace: usize, - dist: KeyDistribution, serial: usize, + dist: KeyDistribution, } impl KeyGenerator { fn new(len: usize, min: usize, max: usize, dist: KeyDistribution) -> Self { + let keyspace = max - min; + let serial = 0; Self { len, min, max, - keyspace: max - min, + keyspace, + serial, dist, - serial: 0, } } @@ -79,13 +83,20 @@ impl KeyGenerator { Self::new(len, min, max, dist) } + fn new_shuffle(len: usize, min: usize, max: usize) -> Self { + let mut shuffle = (0..(max - min)).collect::>(); + shuffle.shuffle(&mut rand::thread_rng()); + let dist = KeyDistribution::Shuffle(shuffle); + Self::new(len, min, max, dist) + } + fn new_uniform(len: usize, min: usize, max: usize) -> Self { let dist = KeyDistribution::Uniform(Uniform::new(0, max - min)); Self::new(len, min, max, dist) } fn new_zipfian(len: usize, min: usize, max: usize, theta: f64, hotspot: f64) -> Self { - let hotspot = (hotspot * (max - min) as f64) as usize; // approx location for discrete keys + let hotspot = (hotspot * (max - min - 1) as f64) as usize; // approx location for discrete keys let dist = KeyDistribution::Zipfian(ZipfDistribution::new(max - min, theta).unwrap(), hotspot); Self::new(len, min, max, dist) @@ -94,10 +105,12 @@ impl KeyGenerator { fn next(&mut self, rng: &mut impl Rng) -> Box<[u8]> { let key = match self.dist { KeyDistribution::Increment => self.serial % self.keyspace, + KeyDistribution::Shuffle(ref shuffle) => shuffle[self.serial % self.keyspace], KeyDistribution::Uniform(dist) => dist.sample(rng), KeyDistribution::Zipfian(dist, hotspot) => { + // zipf starts at 1 (dist.sample(rng) + hotspot - 1) % self.keyspace - } // zipf starts at 1 + } } + self.min; self.serial += 1; assert!(key < self.max); @@ -141,7 +154,13 @@ pub struct WorkloadOpt { /// Key distribution. /// /// - "increment": sequentially incrementing from `kmin` to `kmax`. - /// - "incrementp": partitioned `increment`, where each thread takes a range of keys. + /// - "incrementp": partitioned `increment`, where each thread takes a range of keys. For + /// example, if there are two threads with `kmin` of 0 and `kmax` of 10, one thread will get an + /// "increment" distribution from 0 to 5, and another one will get 6 to 10. + /// - "shuffle": shuffled sequence from `kmin` to `kmax`. One key appears exactly once during + /// an iteration of the whole key space. This is useful to randomly prefill keys. + /// - "shufflep": partitioned `shuffle`, similar to "incrementp" but the keys are shuffled for + /// each range/thread. /// - "uniform": uniformly random keys from `kmin` to `kmax`. /// - "zipfian": random keys from `kmin` to `kmax` following Zipfian distribution. pub dist: String, @@ -188,21 +207,31 @@ impl Workload { assert!(klen > 0, "klen should be positive"); assert!(kmax > kmin, "kmax should be greater than kmin"); + let split_key_space = || { + let (thread_id, nr_threads) = thread_info.expect("parallel keygen expects thread_info"); + assert!(thread_id < nr_threads); + let nr_keys_per = (kmax - kmin) / nr_threads; + let kminp = kmin + thread_id * nr_keys_per; + let kmaxp = if thread_id == nr_threads - 1 { + kmax + } else { + kminp + nr_keys_per + }; + (kminp, kmaxp) + }; + let mix = Mix::new(opt.set_perc, opt.get_perc, opt.del_perc); let kgen = match opt.dist.as_str() { "increment" => KeyGenerator::new_increment(klen, kmin, kmax), "incrementp" => { - let (thread_id, nr_threads) = thread_info.expect("incrementp expects thread info"); - assert!(thread_id < nr_threads); - let nr_keys_per = (kmax - kmin) / nr_threads; - let kminp = kmin + thread_id * nr_keys_per; - let kmaxp = if thread_id == nr_threads - 1 { - kmax - } else { - kminp + nr_keys_per - }; + let (kminp, kmaxp) = split_key_space(); KeyGenerator::new_increment(klen, kminp, kmaxp) } + "shuffle" => KeyGenerator::new_shuffle(klen, kmin, kmax), + "shufflep" => { + let (kminp, kmaxp) = split_key_space(); + KeyGenerator::new_shuffle(klen, kminp, kmaxp) + } "uniform" => KeyGenerator::new_uniform(klen, kmin, kmax), "zipfian" => { let theta = opt.zipf_theta.unwrap_or(1.0f64); @@ -255,7 +284,7 @@ impl Workload { #[cfg(test)] mod tests { use super::*; - use hashbrown::HashMap; + use hashbrown::{HashMap, HashSet}; use quanta::Instant; #[test] @@ -309,6 +338,28 @@ mod tests { } } + #[test] + fn keygen_shuffle() { + let start = 117; + let end = 135423; + let mut rng = rand::thread_rng(); + let mut kgen = KeyGenerator::new_shuffle(8, start, end); + let mut dist: HashSet> = HashSet::new(); + for _ in start..end { + let key = kgen.next(&mut rng); + // the key is newly added, not repeated + assert!(dist.insert(key.clone())); + } + // each key exists exactly once + assert_eq!(dist.len(), end - start); + let min = dist.iter().min().clone().unwrap(); + let min_bytes = Box::from(start.to_be_bytes()); + assert_eq!(*min, min_bytes); + let max = dist.iter().max().clone().unwrap(); + let max_bytes = Box::from((end - 1).to_be_bytes()); + assert_eq!(*max, max_bytes); + } + #[test] fn keygen_uniform() { let mut rng = rand::thread_rng(); @@ -383,37 +434,67 @@ mod tests { let _ = kgen.next(&mut rng); } println!("increment time: {} ms", t.elapsed().as_millis()); + let mut kgen = KeyGenerator::new_shuffle(8, 0, 1000); + let t = Instant::now(); + for _ in 0..N { + let _ = kgen.next(&mut rng); + } + println!("shuffle time: {} ms", t.elapsed().as_millis()); } #[test] - fn workloadopt_toml_correct() { + fn workload_toml_correct() { let s = r#"set_perc = 70 get_perc = 20 del_perc = 10 klen = 4 vlen = 6 - dist = "zipfian" + dist = "uniform" kmin = 0 kmax = 12345"#; let w = Workload::new_from_toml_str(s, None); + assert_eq!(w.kgen.len, 4); + assert_eq!(w.vlen, 6); assert_eq!(w.kgen.min, 0); + assert_eq!(w.kgen.max, 12345); + assert!(matches!(w.kgen.dist, KeyDistribution::Uniform(_))); let s = r#"set_perc = 70 get_perc = 20 del_perc = 10 - klen = 4 - vlen = 6 - dist = "uniform" + klen = 40 + vlen = 60 + dist = "zipfian" kmin = 0 - kmax = 12345 - zipf_theta = 1.0"#; + kmax = 123450 + zipf_theta = 1.0 + zipf_hotspot = 1.0"#; let w = Workload::new_from_toml_str(s, None); + assert_eq!(w.kgen.len, 40); + assert_eq!(w.vlen, 60); assert_eq!(w.kgen.min, 0); + assert_eq!(w.kgen.max, 123450); + assert!(matches!(w.kgen.dist, KeyDistribution::Zipfian(_, 123449))); + + let s = r#"set_perc = 60 + get_perc = 25 + del_perc = 15 + klen = 14 + vlen = 16 + dist = "shuffle" + kmin = 10000 + kmax = 20000"#; + let w = Workload::new_from_toml_str(s, Some((1, 2))); + assert_eq!(w.kgen.len, 14); + assert_eq!(w.vlen, 16); + assert_eq!(w.kgen.min, 10000); + assert_eq!(w.kgen.max, 20000); + assert!(matches!(w.kgen.dist, KeyDistribution::Shuffle(_))); } #[test] #[should_panic(expected = "should be positive")] - fn workloadopt_toml_invalid_wrong_size() { + fn workload_toml_invalid_wrong_size() { let s = r#"set_perc = 60 get_perc = 40 del_perc = 0 @@ -427,7 +508,7 @@ mod tests { #[test] #[should_panic(expected = "should be specified")] - fn workloadopt_toml_invalid_missing_fields() { + fn workload_toml_invalid_missing_fields() { let s = r#"set_perc = 60 get_perc = 40 del_perc = 0 @@ -439,7 +520,7 @@ mod tests { #[test] #[should_panic(expected = "should be greater")] - fn workloadopt_toml_invalid_wrong_keyspace() { + fn workload_toml_invalid_wrong_keyspace() { let s = r#"set_perc = 60 get_perc = 40 del_perc = 0 @@ -453,7 +534,7 @@ mod tests { #[test] #[should_panic(expected = "should be 100")] - fn workloadopt_toml_invalid_wrong_mix() { + fn workload_toml_invalid_wrong_mix() { let s = r#"set_perc = 70 get_perc = 40 del_perc = 0 @@ -466,8 +547,22 @@ mod tests { } #[test] - fn workload_increment_parallel() { - let opt = WorkloadOpt { + #[should_panic(expected = "invalid key distribution")] + fn workload_toml_invalid_key_distribution() { + let s = r#"set_perc = 70 + get_perc = 30 + del_perc = 0 + klen = 4 + vlen = 6 + dist = "uniorm" + kmin = 0 + kmax = 12345"#; + let _ = Workload::new_from_toml_str(s, None); + } + + #[test] + fn workload_keygen_parallel() { + let mut opt = WorkloadOpt { set_perc: 100, get_perc: 0, del_perc: 0, @@ -479,18 +574,24 @@ mod tests { zipf_theta: None, zipf_hotspot: None, }; - - let workload = Workload::new(&opt, Some((0, 3))); - assert_eq!(workload.kgen.min, 10000); - assert_eq!(workload.kgen.max, 14115); - - let workload = Workload::new(&opt, Some((1, 3))); - assert_eq!(workload.kgen.min, 14115); - assert_eq!(workload.kgen.max, 18230); - - let workload = Workload::new(&opt, Some((2, 3))); - assert_eq!(workload.kgen.min, 18230); - assert_eq!(workload.kgen.max, 22347); // 2 more keys + let test = |opt: &WorkloadOpt| { + let workload = Workload::new(&opt, Some((0, 3))); + assert_eq!(workload.kgen.min, 10000); + assert_eq!(workload.kgen.max, 14115); + + let workload = Workload::new(&opt, Some((1, 3))); + assert_eq!(workload.kgen.min, 14115); + assert_eq!(workload.kgen.max, 18230); + + let workload = Workload::new(&opt, Some((2, 3))); + assert_eq!(workload.kgen.min, 18230); + assert_eq!(workload.kgen.max, 22347); // 2 more keys + }; + // incrementp + test(&opt); + // shufflep + opt.dist = "shufflep".to_string(); + test(&opt); } #[test]