diff --git a/Cargo.lock b/Cargo.lock index 815dd063e..d8da8b4c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -195,7 +195,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.4", "bitflags 1.3.2", "bytes", "futures-util", @@ -211,7 +211,34 @@ dependencies = [ "rustversion", "serde", "sync_wrapper 0.1.2", - "tower", + "tower 0.4.13", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 1.0.1", + "tower 0.5.1", "tower-layer", "tower-service", ] @@ -233,6 +260,26 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.1", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -701,7 +748,7 @@ dependencies = [ "tokio", "tokio-stream", "tonic 0.9.2", - "tower", + "tower 0.4.13", "tracing", "tracing-subscriber", ] @@ -979,6 +1026,12 @@ version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.3.26" @@ -1255,11 +1308,24 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.4.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-util" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" dependencies = [ "bytes", "futures-channel", @@ -1452,6 +1518,9 @@ dependencies = [ "nix 0.29.0", "num_cpus", "once_cell", + "opentelemetry 0.26.0", + "opentelemetry-otlp", + "opentelemetry_sdk 0.26.0", "procfs", "proptest", "rand", @@ -1468,7 +1537,7 @@ dependencies = [ "tokio-stream", "tokio-util", "tonic 0.9.2", - "tower", + "tower 0.4.13", "tracing", "tracing-subscriber", "uuid", @@ -1493,10 +1562,10 @@ dependencies = [ "byte-unit", "bytes", "criterion", - "opentelemetry-proto", + "opentelemetry-proto 0.1.0", "proptest", "proptest-derive", - "prost", + "prost 0.11.9", "rand", "rmp-serde", "rustc-hash", @@ -1825,7 +1894,53 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69d6c3d7288a106c0a363e4b0e8d308058d56902adefb16f4936f417ffef086e" dependencies = [ "opentelemetry_api", - "opentelemetry_sdk", + "opentelemetry_sdk 0.18.0", +] + +[[package]] +name = "opentelemetry" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "570074cc999d1a58184080966e5bd3bf3a9a4af650c3b05047c2621e7405cd17" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry-http" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6351496aeaa49d7c267fb480678d85d1cd30c5edb20b497c48c56f62a8c14b99" +dependencies = [ + "async-trait", + "bytes", + "http 1.1.0", + "opentelemetry 0.26.0", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29e1f9c8b032d4f635c730c0efcf731d5e2530ea13fa8bef7939ddc8420696bd" +dependencies = [ + "async-trait", + "futures-core", + "http 1.1.0", + "opentelemetry 0.26.0", + "opentelemetry-http", + "opentelemetry-proto 0.26.1", + "opentelemetry_sdk 0.26.0", + "prost 0.13.3", + "serde_json", + "thiserror", + "tokio", + "tonic 0.12.3", ] [[package]] @@ -1836,12 +1951,26 @@ checksum = "d61a2f56df5574508dd86aaca016c917489e589ece4141df1b5e349af8d66c28" dependencies = [ "futures", "futures-util", - "opentelemetry", - "prost", + "opentelemetry 0.18.0", + "prost 0.11.9", "tonic 0.8.3", "tonic-build 0.8.4", ] +[[package]] +name = "opentelemetry-proto" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9d3968ce3aefdcca5c27e3c4ea4391b37547726a70893aab52d3de95d5f8b34" +dependencies = [ + "hex", + "opentelemetry 0.26.0", + "opentelemetry_sdk 0.26.0", + "prost 0.13.3", + "serde", + "tonic 0.12.3", +] + [[package]] name = "opentelemetry_api" version = "0.18.0" @@ -1878,6 +2007,27 @@ dependencies = [ "thiserror", ] +[[package]] +name = "opentelemetry_sdk" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry 0.26.0", + "percent-encoding", + "rand", + "serde_json", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "ordered-float" version = "4.3.0" @@ -2147,7 +2297,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.11.9", +] + +[[package]] +name = "prost" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" +dependencies = [ + "bytes", + "prost-derive 0.13.3", ] [[package]] @@ -2164,7 +2324,7 @@ dependencies = [ "multimap", "petgraph", "prettyplease", - "prost", + "prost 0.11.9", "prost-types", "regex", "syn 1.0.109", @@ -2185,13 +2345,26 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prost-derive" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.85", +] + [[package]] name = "prost-types" version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" dependencies = [ - "prost", + "prost 0.11.9", ] [[package]] @@ -2651,7 +2824,7 @@ dependencies = [ name = "shared" version = "0.1.0" dependencies = [ - "prost", + "prost 0.11.9", "serde", "serde_json", "tonic 0.9.2", @@ -2668,7 +2841,7 @@ dependencies = [ "tempfile", "tokio", "tonic 0.9.2", - "tower", + "tower 0.4.13", "tracing", "tracing-subscriber", ] @@ -2971,7 +3144,7 @@ checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.13.1", "bytes", "futures-core", @@ -2980,15 +3153,15 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.30", - "hyper-timeout", + "hyper-timeout 0.4.1", "percent-encoding", "pin-project", - "prost", - "prost-derive", + "prost 0.11.9", + "prost-derive 0.11.9", "tokio", "tokio-stream", "tokio-util", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -3002,7 +3175,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" dependencies = [ "async-trait", - "axum", + "axum 0.6.20", "base64 0.21.7", "bytes", "futures-core", @@ -3011,13 +3184,43 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.30", - "hyper-timeout", + "hyper-timeout 0.4.1", + "percent-encoding", + "pin-project", + "prost 0.11.9", + "tokio", + "tokio-stream", + "tower 0.4.13", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +dependencies = [ + "async-stream", + "async-trait", + "axum 0.7.7", + "base64 0.22.1", + "bytes", + "h2 0.4.6", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-timeout 0.5.2", + "hyper-util", "percent-encoding", "pin-project", - "prost", + "prost 0.13.3", + "socket2 0.5.7", "tokio", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -3069,6 +3272,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.3" diff --git a/lading/Cargo.toml b/lading/Cargo.toml index f0ae67888..4af1ff59d 100644 --- a/lading/Cargo.toml +++ b/lading/Cargo.toml @@ -19,6 +19,9 @@ lading-capture = { version = "0.2", path = "../lading_capture" } lading-payload = { version = "0.1", path = "../lading_payload" } lading-throttle = { version = "0.1", path = "../lading_throttle" } lading-signal = { version = "0.1", path = "../lading_signal" } +opentelemetry = { version = "0.26", features = ["metrics", "otel_unstable"] } +opentelemetry_sdk = { version = "0.26", features = ["metrics", "rt-tokio"] } +opentelemetry-otlp = { version = "0.26.0", features = ["tonic", "metrics", "http-proto", "http-json"] } async-compression = { version = "0.4.17", features = ["tokio", "zstd"] } average = { version = "0.15", default-features = false, features = [] } diff --git a/lading/src/bin/payloadtool.rs b/lading/src/bin/payloadtool.rs index d9956c92a..d8f4795cd 100644 --- a/lading/src/bin/payloadtool.rs +++ b/lading/src/bin/payloadtool.rs @@ -109,6 +109,7 @@ fn check_generator(config: &lading::generator::Config) -> Result<(), Error> { .expect("Non-zero max prebuild cache size"); generate_and_check(variant, g.seed, total_bytes, g.maximum_block_size)?; } + lading::generator::Inner::Otlp(_g) => unimplemented!("Otlp not supported"), lading::generator::Inner::SplunkHec(_) => unimplemented!("SplunkHec not supported"), lading::generator::Inner::FileTree(_) => unimplemented!("FileTree not supported"), lading::generator::Inner::Grpc(g) => { diff --git a/lading/src/generator.rs b/lading/src/generator.rs index 7005928db..b635f9409 100644 --- a/lading/src/generator.rs +++ b/lading/src/generator.rs @@ -19,6 +19,7 @@ pub mod file_gen; pub mod file_tree; pub mod grpc; pub mod http; +pub mod otlp; pub mod passthru_file; pub mod process_tree; pub mod procfs; @@ -40,6 +41,9 @@ pub enum Error { /// See [`crate::generator::http::Error`] for details. #[error(transparent)] Http(#[from] http::Error), + /// See [`crate::generator::otlp::Error`] for details. + #[error(transparent)] + Otlp(#[from] otlp::Error), /// See [`crate::generator::splunk_hec::Error`] for details. #[error(transparent)] SplunkHec(#[from] splunk_hec::Error), @@ -102,6 +106,8 @@ pub enum Inner { Udp(udp::Config), /// See [`crate::generator::http::Config`] for details. Http(http::Config), + /// See [`crate::generator::otlp::Config`] for details. + Otlp(otlp::Config), /// See [`crate::generator::splunk_hec::Config`] for details. SplunkHec(splunk_hec::Config), /// See [`crate::generator::file_gen::Config`] for details. @@ -134,6 +140,8 @@ pub enum Server { Udp(udp::Udp), /// See [`crate::generator::http::Http`] for details. Http(http::Http), + /// See [`crate::generator::otlp::Otlp`] for details. + Otlp(otlp::Otlp), /// See [`crate::generator::splunk_hec::SplunkHec`] for details. SplunkHec(splunk_hec::SplunkHec), /// See [`crate::generator::file_gen::FileGen`] for details. @@ -169,6 +177,7 @@ impl Server { Inner::Tcp(conf) => Self::Tcp(tcp::Tcp::new(config.general, &conf, shutdown)?), Inner::Udp(conf) => Self::Udp(udp::Udp::new(config.general, &conf, shutdown)?), Inner::Http(conf) => Self::Http(http::Http::new(config.general, conf, shutdown)?), + Inner::Otlp(conf) => Self::Otlp(otlp::Otlp::new(config.general, conf, shutdown)?), Inner::SplunkHec(conf) => { Self::SplunkHec(splunk_hec::SplunkHec::new(config.general, conf, shutdown)?) } @@ -228,6 +237,7 @@ impl Server { Server::Tcp(inner) => inner.spin().await?, Server::Udp(inner) => inner.spin().await?, Server::Http(inner) => inner.spin().await?, + Server::Otlp(inner) => inner.spin().await?, Server::SplunkHec(inner) => inner.spin().await?, Server::FileGen(inner) => inner.spin().await?, Server::FileTree(inner) => inner.spin().await?, diff --git a/lading/src/generator/http.rs b/lading/src/generator/http.rs index 216c2f688..4afade591 100644 --- a/lading/src/generator/http.rs +++ b/lading/src/generator/http.rs @@ -92,6 +92,9 @@ pub enum Error { /// Wrapper around [`hyper::http::Error`]. #[error("HTTP error: {0}")] Http(#[from] hyper::http::Error), + // OTLP traffic + //#[error("OTLP error: {0}")] + //Otlp(#[from] OtlpError), /// Byte error #[error("Bytes must not be negative: {0}")] Byte(#[from] ByteError), diff --git a/lading/src/generator/otlp.rs b/lading/src/generator/otlp.rs new file mode 100644 index 000000000..e17e9239c --- /dev/null +++ b/lading/src/generator/otlp.rs @@ -0,0 +1,226 @@ +//! The OTLP protocol speaking generator. + +use super::General; +use hyper::Uri; +use opentelemetry::global; +use opentelemetry::KeyValue; +use opentelemetry_otlp; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider}; +use opentelemetry_sdk::{runtime, Resource}; +use serde::{Deserialize, Serialize}; +//use opentelemetry_sdk::metrics::data::Temporality; +use opentelemetry_sdk::metrics::reader::DefaultTemporalitySelector; +use std::vec; + +/// Config for [`OTLP`] +#[derive(Debug, Deserialize, Serialize, PartialEq)] +#[serde(deny_unknown_fields)] +/// Configuration of this generator. +pub struct Config { + /// The seed for random operations against this target + pub seed: [u8; 32], + /// The URI for the target, must be a valid URI + #[serde(with = "http_serde::uri")] + pub target_uri: Uri, + /// The bytes per second to send or receive from the target + pub bytes_per_second: byte_unit::Byte, + /// The maximum size in bytes of the largest block in the prebuild cache. + #[serde(default = "lading_payload::block::default_maximum_block_size")] + pub maximum_block_size: byte_unit::Byte, + /// The total number of parallel connections to maintain + pub parallel_connections: u16, + /// The load throttle configuration + #[serde(default)] + pub throttle: lading_throttle::Config, +} + +#[allow(missing_docs)] +#[derive(thiserror::Error, Copy, Clone, Debug)] +/// Errors produced by [`Otlp`]. +pub enum Error { + // Generic error for OTLP + #[error("OTLP Error")] + Generic, +} + +/// The OTLP generator. +/// +/// This generator is reposnsible for connecting to the target via OTLP. +#[derive(Clone, Debug)] +pub struct Otlp { + target_uri: Uri, + shutdown: lading_signal::Watcher, +} + +fn init_meter_provider(endpoint: &str) -> opentelemetry_sdk::metrics::SdkMeterProvider { + let exporter = opentelemetry_otlp::new_exporter() + .tonic() // create GRPC layer + .with_endpoint(endpoint) + .build_metrics_exporter(Box::new(DefaultTemporalitySelector::new())) + .unwrap(); + + let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); + let provider = SdkMeterProvider::builder() + .with_reader(reader) + .with_resource(Resource::new([KeyValue::new( + "service.name", + "metrics-basic-example", + )])) + .build(); + + global::set_meter_provider(provider.clone()); + provider +} + +impl Otlp { + /// Create a new [`Otlp`] instance + /// + /// # Errors + /// + /// Creation will fail if the underlying governor capacity exceeds u32. + /// + /// # Panics + /// + /// Function will panic if user has passed non-zero values for any byte + /// values. Sharp corners. + #[allow(clippy::cast_possible_truncation)] + pub fn new( + _general: General, + config: Config, + shutdown: lading_signal::Watcher, + ) -> Result { + Ok(Self { + target_uri: config.target_uri, + shutdown, + }) + } + + /// Run [`Otlp`] to completion or until a shutdown signal is received. + /// + /// # Errors + /// + /// TODO + /// + pub async fn spin(self) -> Result<(), Error> { + let meter_provider = init_meter_provider(&self.target_uri.to_string()); + + // Create a meter from the above MeterProvider. + let meter = global::meter("mylibraryname"); + + // Create a Counter Instrument. + let counter = meter.u64_counter("my_counter").init(); + + // Record measurements using the Counter instrument. + counter.add( + 10, + &[ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ], + ); + + // Create a ObservableCounter instrument and register a callback that reports the measurement. + let _observable_counter = meter + .u64_observable_counter("my_observable_counter") + .with_description("My observable counter example description") + .with_unit("myunit") + .with_callback(|observer| { + observer.observe( + 100, + &[ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ], + ) + }) + .init(); + + // Create a UpCounter Instrument. + let updown_counter = meter.i64_up_down_counter("my_updown_counter").init(); + + // Record measurements using the UpCounter instrument. + updown_counter.add( + -10, + &[ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ], + ); + + // Create a Observable UpDownCounter instrument and register a callback that reports the measurement. + let _observable_up_down_counter = meter + .i64_observable_up_down_counter("my_observable_updown_counter") + .with_description("My observable updown counter example description") + .with_unit("myunit") + .with_callback(|observer| { + observer.observe( + 100, + &[ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ], + ) + }) + .init(); + + // Create a Histogram Instrument. + let histogram = meter + .f64_histogram("my_histogram") + .with_description("My histogram example description") + // Setting boundaries is optional. By default, the boundaries are set to + // [0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0, 5000.0, 7500.0, 10000.0] + .with_boundaries(vec![0.0, 5.0, 10.0, 15.0, 20.0, 25.0]) + .init(); + + // Record measurements using the histogram instrument. + histogram.record( + 10.5, + &[ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ], + ); + + // Note that there is no ObservableHistogram instrument. + + // Create a Gauge Instrument. + let gauge = meter + .f64_gauge("my_gauge") + .with_description("A gauge set to 1.0") + .with_unit("myunit") + .init(); + + gauge.record( + 1.0, + &[ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ], + ); + + // Create a ObservableGauge instrument and register a callback that reports the measurement. + let _observable_gauge = meter + .f64_observable_gauge("my_observable_gauge") + .with_description("An observable gauge set to 1.0") + .with_unit("myunit") + .with_callback(|observer| { + observer.observe( + 1.0, + &[ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ], + ) + }) + .init(); + + // Metrics are exported by default every 30 seconds when using stdout exporter, + // however shutting down the MeterProvider here instantly flushes + // the metrics, instead of waiting for the 30 sec interval. + match meter_provider.shutdown() { + Ok(_) => return Ok(()), + Err(_err) => return Err(Error::Generic), + } + } +}