diff --git a/crates/arroyo-rpc/src/config.rs b/crates/arroyo-rpc/src/config.rs index 931405ab7..8a525434e 100644 --- a/crates/arroyo-rpc/src/config.rs +++ b/crates/arroyo-rpc/src/config.rs @@ -629,6 +629,10 @@ pub struct LogConfig { /// Set the log format #[serde(default)] pub format: LogFormat, + + /// Nonblocking logging may reduce tail latency at the cost of higher memory usage + #[serde(default)] + pub nonblocking: bool, } #[derive(Debug, Clone, Deserialize, Serialize, Default)] diff --git a/crates/arroyo-server-common/src/lib.rs b/crates/arroyo-server-common/src/lib.rs index 44394f1d4..3fceff0d2 100644 --- a/crates/arroyo-server-common/src/lib.rs +++ b/crates/arroyo-server-common/src/lib.rs @@ -38,8 +38,9 @@ use tracing_subscriber::EnvFilter; use tracing_subscriber::Registry; use arroyo_rpc::config::{config, LogFormat}; -use tracing_appender::non_blocking::WorkerGuard; +use tracing_appender::non_blocking::{NonBlockingBuilder, WorkerGuard}; use tracing_log::LogTracer; +use tracing_subscriber::fmt::FormatFields; use uuid::Uuid; pub const BUILD_TIMESTAMP: &str = env!("VERGEN_BUILD_TIMESTAMP"); @@ -48,7 +49,7 @@ pub const VERSION: &str = "0.12.0-dev"; static CLUSTER_ID: OnceCell = OnceCell::new(); -pub fn init_logging(name: &str) -> WorkerGuard { +pub fn init_logging(name: &str) -> Option { init_logging_with_filter( name, EnvFilter::builder() @@ -57,7 +58,24 @@ pub fn init_logging(name: &str) -> WorkerGuard { ) } -pub fn init_logging_with_filter(_name: &str, filter: EnvFilter) -> WorkerGuard { +macro_rules! register_log { + ($e: expr, $nonblocking: expr, $filter: expr) => {{ + let layer = $e; + if let Some(nonblocking) = $nonblocking { + tracing::subscriber::set_global_default( + Registry::default().with(layer.with_writer(nonblocking).with_filter($filter)), + ) + .expect("Unable to set global log subscriber") + } else { + tracing::subscriber::set_global_default( + Registry::default().with(layer.with_writer(std::io::stderr).with_filter($filter)), + ) + .expect("Unable to set global log subscriber") + } + }}; +} + +pub fn init_logging_with_filter(_name: &str, filter: EnvFilter) -> Option { if let Err(e) = LogTracer::init() { eprintln!("Failed to initialize log tracer {:?}", e); } @@ -66,44 +84,39 @@ pub fn init_logging_with_filter(_name: &str, filter: EnvFilter) -> WorkerGuard { .add_directive("refinery_core=warn".parse().unwrap()) .add_directive("aws_config::profile::credentials=warn".parse().unwrap()); - let (nonblocking, guard) = tracing_appender::non_blocking(std::io::stderr()); + let (nonblocking, guard) = if config().logging.nonblocking { + let (nonblocking, guard) = tracing_appender::non_blocking(std::io::stderr()); + (Some(nonblocking), Some(guard)) + } else { + (None, None) + }; match config().logging.format { LogFormat::Plaintext => { - tracing::subscriber::set_global_default( - Registry::default().with( - tracing_subscriber::fmt::layer() - .with_line_number(false) - .with_file(false) - .with_span_events(FmtSpan::NONE) - .with_writer(nonblocking) - .with_filter(filter), - ), + register_log!( + tracing_subscriber::fmt::layer() + .with_line_number(false) + .with_file(false) + .with_span_events(FmtSpan::NONE), + nonblocking, + filter ) - .expect("Unable to set global log subscriber"); } LogFormat::Logfmt => { - tracing::subscriber::set_global_default( - Registry::default().with( - tracing_subscriber::fmt::layer() - .event_format(tracing_logfmt::EventsFormatter) - .fmt_fields(tracing_logfmt::FieldsFormatter) - .with_writer(nonblocking) - .with_filter(filter), - ), + register_log!( + tracing_subscriber::fmt::layer() + .event_format(tracing_logfmt::EventsFormatter) + .fmt_fields(tracing_logfmt::FieldsFormatter), + nonblocking, + filter ) - .expect("Unable to set global log subscriber"); } LogFormat::Json => { - tracing::subscriber::set_global_default( - Registry::default().with( - tracing_subscriber::fmt::layer() - .event_format(Format::default().json()) - .with_writer(nonblocking) - .with_filter(filter), - ), + register_log!( + tracing_subscriber::fmt::layer().event_format(Format::default().json()), + nonblocking, + filter ) - .expect("Unable to set global log subscriber"); } }