diff --git a/Cargo.lock b/Cargo.lock index 583fb83..b891487 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1930,7 +1930,6 @@ dependencies = [ "serde_json", "tokio", "tracing", - "tracing-appender", "tracing-subscriber", ] @@ -2923,18 +2922,6 @@ dependencies = [ "tracing-core", ] -[[package]] -name = "tracing-appender" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" -dependencies = [ - "crossbeam-channel", - "thiserror", - "time", - "tracing-subscriber", -] - [[package]] name = "tracing-attributes" version = "0.1.27" diff --git a/Cargo.toml b/Cargo.toml index bb1694c..954d12a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ codegen-units = 4 [dependencies] anyhow = "1.0.83" +arc-swap = "1.7.1" async-trait = "0.1.80" bollard = "0.16.1" bollard-stubs = "=1.44.0-rc.2" @@ -34,6 +35,7 @@ clap = { version = "4.5.4", features = ["derive"] } figment = { version = "0.10.18", features = ["toml", "yaml", "env", "test"] } http = "1.1.0" instant-acme = "0.4.3" +once_cell = "1.19.0" pingora = "0.2.0" pingora-openssl = "0.2.0" pingora-http = "0.2.0" @@ -46,6 +48,3 @@ serde_json = "1.0.117" tokio = { version = "1.37.0", features = ["rt-multi-thread"] } tracing = "0.1.40" tracing-subscriber = "0.3.18" -tracing-appender = "0.2.3" -arc-swap = "1.7.1" -once_cell = "1.19.0" diff --git a/src/config/mod.rs b/src/config/mod.rs index 9d6a076..e2ae201 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -92,7 +92,7 @@ pub struct ConfigRoute { pub upstreams: Vec, } -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, ValueEnum)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Copy, ValueEnum)] pub enum LogLevel { Debug, Info, diff --git a/src/main.rs b/src/main.rs index 524ad1a..f16abdc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,9 @@ use once_cell::sync::Lazy; use pingora::listeners::TlsSettings; use pingora_load_balancing::{health_check::TcpHealthCheck, LoadBalancer}; use pingora_proxy::http_proxy_service; +use services::logger::{ProxyLogger, ProxyLoggerReceiver}; use stores::{certificates::CertificatesStore, routes::RouteStore}; +use tokio::sync::mpsc; mod config; mod docker; @@ -29,13 +31,14 @@ fn main() -> Result<(), anyhow::Error> { let proxy_config = load_proxy_config("/etc/proksi/configs")?; // let file_appender = tracing_appender::rolling::hourly("./tmp", "proksi.log"); - let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout()); + let (log_sender, log_receiver) = mpsc::unbounded_channel::>(); + let proxy_logger = ProxyLogger::new(log_sender); // Creates a tracing/logging subscriber based on the configuration provided tracing_subscriber::fmt() .with_max_level(&proxy_config.logging.level) .compact() - .with_writer(non_blocking) + .with_writer(proxy_logger) .init(); // Pingora load balancer server @@ -107,7 +110,7 @@ fn main() -> Result<(), anyhow::Error> { pingora_server.add_service(https_service); pingora_server.add_service(docker_service); pingora_server.add_service(le_service); - // pingora_server.add_service(logger_service); + pingora_server.add_service(ProxyLoggerReceiver(log_receiver)); pingora_server.bootstrap(); pingora_server.run_forever(); diff --git a/src/services/logger/mod.rs b/src/services/logger/mod.rs index 0b7fe86..b138f9e 100644 --- a/src/services/logger/mod.rs +++ b/src/services/logger/mod.rs @@ -1,16 +1,77 @@ +use std::io; + use async_trait::async_trait; -use pingora::{server::ShutdownWatch, services::background::BackgroundService}; +use pingora::{ + server::{ListenFds, ShutdownWatch}, + services::Service, +}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tracing_subscriber::fmt::MakeWriter; + +/// A io::Write implementation that sends logs to a background service +#[derive(Debug, Clone)] +pub struct StdoutLogger(UnboundedSender>); + +impl io::Write for StdoutLogger { + fn write(&mut self, buf: &[u8]) -> io::Result { + let buf_copy = buf.to_owned(); + if let Ok(()) = self.0.send(buf_copy) { + return Ok(buf.len()); + } + + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + todo!() + } +} + +/// A naive implementation of a logger that delegate sending logs to a background channel +#[derive(Debug)] +pub struct ProxyLogger { + stdout: StdoutLogger, +} -use crate::config::LogLevel; +impl ProxyLogger { + pub fn new(sender: UnboundedSender>) -> Self { + ProxyLogger { + // level, + stdout: StdoutLogger(sender.clone()), + } + } +} + +/// impl from tracing_subscriber::fmt::MakeWriter +impl<'a> MakeWriter<'a> for ProxyLogger { + type Writer = StdoutLogger; + + fn make_writer(&'a self) -> Self::Writer { + self.stdout.clone() + } +} -pub struct ProxyLogger(pub LogLevel); +/// A background service that receives logs from the main thread and writes them to stdout +/// TODO: implement log rotation/write to disk (or use an existing lightweight crate) +pub struct ProxyLoggerReceiver(pub UnboundedReceiver>); #[async_trait] -impl BackgroundService for ProxyLogger { - async fn start(&self, _shutdown: ShutdownWatch) { - tracing_subscriber::fmt() - .with_max_level(&self.0) - .with_writer(std::io::stdout) - .init() +impl Service for ProxyLoggerReceiver { + async fn start_service(&mut self, _fds: Option, _shutdown: ShutdownWatch) { + loop { + if let Some(buf) = self.0.recv().await { + let buf = std::str::from_utf8(&buf).unwrap(); + // TODO: flush/rotate logs to disk + print!("{}", buf); + } + } + } + + fn name(&self) -> &str { + "ProxyLogger" + } + + fn threads(&self) -> Option { + Some(1) } }