Skip to content

Commit

Permalink
Move to synchronous logging to reduce memory usage
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde committed Aug 14, 2024
1 parent 8d2c357 commit 5915c84
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 31 deletions.
4 changes: 4 additions & 0 deletions crates/arroyo-rpc/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,10 @@ pub struct LogConfig {
/// Set the log format
#[serde(default)]
pub format: LogFormat,

/// Nonblocking logging may reduce tail latency at the cost of higher memory usage
#[serde(default)]
pub nonblocking: bool,
}

#[derive(Debug, Clone, Deserialize, Serialize, Default)]
Expand Down
75 changes: 44 additions & 31 deletions crates/arroyo-server-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ use tracing_subscriber::EnvFilter;
use tracing_subscriber::Registry;

use arroyo_rpc::config::{config, LogFormat};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_appender::non_blocking::{NonBlockingBuilder, WorkerGuard};
use tracing_log::LogTracer;
use tracing_subscriber::fmt::FormatFields;
use uuid::Uuid;

pub const BUILD_TIMESTAMP: &str = env!("VERGEN_BUILD_TIMESTAMP");
Expand All @@ -48,7 +49,7 @@ pub const VERSION: &str = "0.12.0-dev";

static CLUSTER_ID: OnceCell<String> = OnceCell::new();

pub fn init_logging(name: &str) -> WorkerGuard {
pub fn init_logging(name: &str) -> Option<WorkerGuard> {
init_logging_with_filter(
name,
EnvFilter::builder()
Expand All @@ -57,7 +58,24 @@ pub fn init_logging(name: &str) -> WorkerGuard {
)
}

pub fn init_logging_with_filter(_name: &str, filter: EnvFilter) -> WorkerGuard {
macro_rules! register_log {
($e: expr, $nonblocking: expr, $filter: expr) => {{
let layer = $e;
if let Some(nonblocking) = $nonblocking {
tracing::subscriber::set_global_default(
Registry::default().with(layer.with_writer(nonblocking).with_filter($filter)),
)
.expect("Unable to set global log subscriber")
} else {
tracing::subscriber::set_global_default(
Registry::default().with(layer.with_writer(std::io::stderr).with_filter($filter)),
)
.expect("Unable to set global log subscriber")
}
}};
}

pub fn init_logging_with_filter(_name: &str, filter: EnvFilter) -> Option<WorkerGuard> {
if let Err(e) = LogTracer::init() {
eprintln!("Failed to initialize log tracer {:?}", e);
}
Expand All @@ -66,44 +84,39 @@ pub fn init_logging_with_filter(_name: &str, filter: EnvFilter) -> WorkerGuard {
.add_directive("refinery_core=warn".parse().unwrap())
.add_directive("aws_config::profile::credentials=warn".parse().unwrap());

let (nonblocking, guard) = tracing_appender::non_blocking(std::io::stderr());
let (nonblocking, guard) = if config().logging.nonblocking {
let (nonblocking, guard) = tracing_appender::non_blocking(std::io::stderr());
(Some(nonblocking), Some(guard))
} else {
(None, None)
};

match config().logging.format {
LogFormat::Plaintext => {
tracing::subscriber::set_global_default(
Registry::default().with(
tracing_subscriber::fmt::layer()
.with_line_number(false)
.with_file(false)
.with_span_events(FmtSpan::NONE)
.with_writer(nonblocking)
.with_filter(filter),
),
register_log!(
tracing_subscriber::fmt::layer()
.with_line_number(false)
.with_file(false)
.with_span_events(FmtSpan::NONE),
nonblocking,
filter
)
.expect("Unable to set global log subscriber");
}
LogFormat::Logfmt => {
tracing::subscriber::set_global_default(
Registry::default().with(
tracing_subscriber::fmt::layer()
.event_format(tracing_logfmt::EventsFormatter)
.fmt_fields(tracing_logfmt::FieldsFormatter)
.with_writer(nonblocking)
.with_filter(filter),
),
register_log!(
tracing_subscriber::fmt::layer()
.event_format(tracing_logfmt::EventsFormatter)
.fmt_fields(tracing_logfmt::FieldsFormatter),
nonblocking,
filter
)
.expect("Unable to set global log subscriber");
}
LogFormat::Json => {
tracing::subscriber::set_global_default(
Registry::default().with(
tracing_subscriber::fmt::layer()
.event_format(Format::default().json())
.with_writer(nonblocking)
.with_filter(filter),
),
register_log!(
tracing_subscriber::fmt::layer().event_format(Format::default().json()),
nonblocking,
filter
)
.expect("Unable to set global log subscriber");
}
}

Expand Down

0 comments on commit 5915c84

Please sign in to comment.