diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e87cbb0f..7a4045031 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 them rather than incorrectly identifying the metric value. - Prometheus target metrics scraper will no longer panic if a metric has an invalid value (instead it will be logged) +- DogStatsD payloads try harder to satisfy the specified `num_contexts`. Before + this change, dogstatsd payloads may be significantly under-representing the + desired number of contexts. ## [0.23.3] ### Changed diff --git a/lading/src/bin/payloadtool.rs b/lading/src/bin/payloadtool.rs index 36255c703..b374b3a29 100644 --- a/lading/src/bin/payloadtool.rs +++ b/lading/src/bin/payloadtool.rs @@ -1,12 +1,14 @@ use std::io::Read; +use std::io::Write; use std::time::Instant; use std::{io, num::NonZeroU32}; use clap::Parser; use lading::generator::http::Method; -use lading_payload::block; +use lading_payload::block::{self, Block}; use rand::{rngs::StdRng, SeedableRng}; -use tracing::{debug, error, info, warn}; +use tracing::{error, info, warn}; +use tracing_subscriber::EnvFilter; use tracing_subscriber::{fmt::format::FmtSpan, util::SubscriberInitExt}; const UDP_PACKET_LIMIT_BYTES: u32 = 65_507; @@ -17,6 +19,10 @@ struct Args { /// Path to standard lading config file config_path: String, + /// Emit blocks line by line on stdout + #[clap(short, long)] + emit_to_stdout: bool, + /// Optionally only run a single generator's payload #[clap(short, long)] generator_id: Option, @@ -41,7 +47,7 @@ fn generate_and_check( seed: [u8; 32], total_bytes: NonZeroU32, max_block_size: byte_unit::Byte, -) -> Result<(), Error> { +) -> Result, Error> { let mut rng = StdRng::from_seed(seed); let start = Instant::now(); let blocks = @@ -49,13 +55,12 @@ fn generate_and_check( block::Cache::Fixed { blocks, idx: _ } => blocks, }; info!("Payload generation took {:?}", start.elapsed()); - debug!("Payload: {:#?}", blocks); let mut total_generated_bytes: u32 = 0; for block in blocks.iter() { total_generated_bytes += block.total_bytes.get(); } - if total_bytes.get() != total_generated_bytes { + if total_bytes.get() > total_generated_bytes { let total_requested_bytes = byte_unit::Byte::from_bytes(total_bytes.get().into()); let total_requested_bytes_str = total_requested_bytes .get_appropriate_unit(false) @@ -67,11 +72,11 @@ fn generate_and_check( warn!("Generator failed to generate {total_requested_bytes_str}, instead only found {total_generated_bytes_str} of data") } - Ok(()) + Ok(blocks) } -fn check_generator(config: &lading::generator::Config) -> Result<(), Error> { - match &config.inner { +fn check_generator(config: &lading::generator::Config, emit_to_stdout: bool) -> Result<(), Error> { + let blocks = match &config.inner { lading::generator::Inner::FileGen(_) => unimplemented!("FileGen not supported"), lading::generator::Inner::UnixDatagram(g) => { let max_block_size = @@ -80,13 +85,13 @@ fn check_generator(config: &lading::generator::Config) -> Result<(), Error> { let total_bytes = NonZeroU32::new(g.maximum_prebuild_cache_size_bytes.get_bytes() as u32) .expect("Non-zero max prebuild cache size"); - generate_and_check(&g.variant, g.seed, total_bytes, max_block_size)?; + generate_and_check(&g.variant, g.seed, total_bytes, max_block_size)? } lading::generator::Inner::Tcp(g) => { let total_bytes = NonZeroU32::new(g.maximum_prebuild_cache_size_bytes.get_bytes() as u32) .expect("Non-zero max prebuild cache size"); - generate_and_check(&g.variant, g.seed, total_bytes, g.maximum_block_size)?; + generate_and_check(&g.variant, g.seed, total_bytes, g.maximum_block_size)? } lading::generator::Inner::Udp(g) => { let total_bytes = @@ -95,7 +100,7 @@ fn check_generator(config: &lading::generator::Config) -> Result<(), Error> { let max_block_size = byte_unit::Byte::from_unit(UDP_PACKET_LIMIT_BYTES.into(), byte_unit::ByteUnit::B) .expect("valid bytes"); - generate_and_check(&g.variant, g.seed, total_bytes, max_block_size)?; + generate_and_check(&g.variant, g.seed, total_bytes, max_block_size)? } lading::generator::Inner::Http(g) => { let (variant, max_prebuild_cache_size_bytes) = match &g.method { @@ -107,7 +112,7 @@ fn check_generator(config: &lading::generator::Config) -> Result<(), Error> { }; let total_bytes = NonZeroU32::new(max_prebuild_cache_size_bytes.get_bytes() as u32) .expect("Non-zero max prebuild cache size"); - generate_and_check(variant, g.seed, total_bytes, g.maximum_block_size)?; + generate_and_check(variant, g.seed, total_bytes, g.maximum_block_size)? } lading::generator::Inner::SplunkHec(_) => unimplemented!("SplunkHec not supported"), lading::generator::Inner::FileTree(_) => unimplemented!("FileTree not supported"), @@ -115,24 +120,35 @@ fn check_generator(config: &lading::generator::Config) -> Result<(), Error> { let total_bytes = NonZeroU32::new(g.maximum_prebuild_cache_size_bytes.get_bytes() as u32) .expect("Non-zero max prebuild cache size"); - generate_and_check(&g.variant, g.seed, total_bytes, g.maximum_block_size)?; + generate_and_check(&g.variant, g.seed, total_bytes, g.maximum_block_size)? } lading::generator::Inner::UnixStream(g) => { let total_bytes = NonZeroU32::new(g.maximum_prebuild_cache_size_bytes.get_bytes() as u32) .expect("Non-zero max prebuild cache size"); - generate_and_check(&g.variant, g.seed, total_bytes, g.maximum_block_size)?; + generate_and_check(&g.variant, g.seed, total_bytes, g.maximum_block_size)? } lading::generator::Inner::PassthruFile(g) => { let total_bytes = NonZeroU32::new(g.maximum_prebuild_cache_size_bytes.get_bytes() as u32) .expect("Non-zero max prebuild cache size"); - generate_and_check(&g.variant, g.seed, total_bytes, g.maximum_block_size)?; + generate_and_check(&g.variant, g.seed, total_bytes, g.maximum_block_size)? } lading::generator::Inner::ProcessTree(_) => unimplemented!("ProcessTree not supported"), lading::generator::Inner::ProcFs(_) => unimplemented!("ProcFs not supported"), }; + if emit_to_stdout { + for block in blocks { + match std::io::stdout().write_all(&block.bytes) { + Ok(_) => {} + Err(e) => { + error!("Failed to write block to stdout: {}", e); + } + } + } + } + Ok(()) } @@ -141,6 +157,7 @@ async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_span_events(FmtSpan::CLOSE) .with_ansi(false) + .with_env_filter(EnvFilter::from_default_env()) .finish() .init(); @@ -180,10 +197,10 @@ async fn main() -> Result<(), Error> { error!("No generator found with id: {}", generator_id); Error::InvalidArgs })?; - check_generator(generator)?; + check_generator(generator, args.emit_to_stdout)?; } else { for generator in config.generator { - check_generator(&generator)?; + check_generator(&generator, args.emit_to_stdout)?; } } diff --git a/lading_payload/src/dogstatsd/metric.rs b/lading_payload/src/dogstatsd/metric.rs index cab974694..4f119bbb6 100644 --- a/lading_payload/src/dogstatsd/metric.rs +++ b/lading_payload/src/dogstatsd/metric.rs @@ -3,12 +3,12 @@ use std::fmt; use rand::{ distributions::{OpenClosed01, WeightedIndex}, - prelude::{Distribution, SliceRandom}, + prelude::Distribution, Rng, }; use crate::{common::strings, dogstatsd::metric::template::Template, Error, Generator}; -use tracing::debug; +use tracing::info; use super::{ choose_or_not_ref, @@ -27,6 +27,7 @@ pub(crate) struct MetricGenerator { pub(crate) sampling: ConfRange, pub(crate) sampling_probability: f32, pub(crate) num_value_generator: NumValueGenerator, + pub(crate) ctx_idx: std::cell::Cell, } impl MetricGenerator { @@ -51,7 +52,7 @@ impl MetricGenerator { { let mut templates = Vec::with_capacity(num_contexts); - debug!("Generating metric templates for {} contexts.", num_contexts); + info!("Generating metric templates for {} contexts.", num_contexts); for _ in 0..num_contexts { let tags = tags_generator.generate(&mut rng); let name_sz = name_length.sample(&mut rng) as usize; @@ -86,6 +87,7 @@ impl MetricGenerator { sampling, sampling_probability, num_value_generator: NumValueGenerator::new(value_conf), + ctx_idx: std::cell::Cell::new(0), }) } } @@ -102,8 +104,10 @@ impl<'a> Generator<'a> for MetricGenerator { // and the program should crash prior to this point. let template: &Template = self .templates - .choose(&mut rng) - .expect("failed to choose templates"); + .get(self.ctx_idx.get()) + .expect("failed to get template"); + self.ctx_idx + .set((self.ctx_idx.get() + 1) % self.templates.len()); let container_id = choose_or_not_ref(&mut rng, &self.container_ids).map(String::as_str); // https://docs.datadoghq.com/metrics/custom_metrics/dogstatsd_metrics_submission/#sample-rates