From 452f2e73e8a462c238404865d51c07f9ff0353cf Mon Sep 17 00:00:00 2001 From: Luiz Fonseca Date: Thu, 16 May 2024 02:01:49 +0200 Subject: [PATCH] feat: add non-blocking tracing and static ROUTER with arc_swap --- .gitignore | 1 + Cargo.lock | 15 ++++++++++++ Cargo.toml | 3 +++ src/docker/client.rs | 3 ++- src/main.rs | 39 ++++++++++++++++++------------ src/proxy_server/http_proxy.rs | 14 +++++------ src/proxy_server/https_proxy.rs | 32 ++++-------------------- src/services/letsencrypt/http01.rs | 3 +-- src/services/logger/mod.rs | 16 ++++++++++++ src/services/mod.rs | 1 + src/stores/mod.rs | 1 + src/stores/routes.rs | 38 +++++++++++++++++++++++++++++ 12 files changed, 113 insertions(+), 53 deletions(-) create mode 100644 src/services/logger/mod.rs create mode 100644 src/stores/mod.rs create mode 100644 src/stores/routes.rs diff --git a/.gitignore b/.gitignore index 48dc4f8..156f8bd 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ proksi.yaml .zed/ .vscode/ +/tmp diff --git a/Cargo.lock b/Cargo.lock index 374b6bf..583fb83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1909,6 +1909,7 @@ name = "proksi" version = "0.1.2" dependencies = [ "anyhow", + "arc-swap", "async-trait", "bollard", "bollard-stubs", @@ -1917,6 +1918,7 @@ dependencies = [ "figment", "http 1.1.0", "instant-acme", + "once_cell", "pingora", "pingora-http", "pingora-load-balancing", @@ -1928,6 +1930,7 @@ dependencies = [ "serde_json", "tokio", "tracing", + "tracing-appender", "tracing-subscriber", ] @@ -2920,6 +2923,18 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3566e8ce28cc0a3fe42519fc80e6b4c943cc4c8cef275620eb8dac2d3d4e06cf" +dependencies = [ + "crossbeam-channel", + "thiserror", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.27" diff --git a/Cargo.toml b/Cargo.toml index be1b603..08cc804 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,3 +45,6 @@ serde_json = "1.0.117" tokio = { version = "1.37.0", features = ["rt-multi-thread"] } tracing = "0.1.40" tracing-subscriber = "0.3.18" +tracing-appender = "0.2.3" +arc-swap = "1.7.1" +once_cell = "1.19.0" diff --git a/src/docker/client.rs b/src/docker/client.rs index abc18d9..c98f153 100644 --- a/src/docker/client.rs +++ b/src/docker/client.rs @@ -3,6 +3,7 @@ use std::{collections::HashMap, hash::Hash, thread::sleep, time::Duration}; use async_trait::async_trait; use bollard::{service::ListServicesOptions, Docker}; use pingora::{server::ShutdownWatch, services::background::BackgroundService}; +use tracing::info; pub fn create_client() -> DockerClient { let docker = Docker::connect_with_local_defaults(); @@ -20,7 +21,7 @@ impl DockerClient { let mut default_filters = HashMap::new(); default_filters.insert("label", vec!["proksi.host", "proksi.port"]); - println!("Docker running"); + info!("Docker running"); loop { self.list_containers(default_filters.clone()).await; diff --git a/src/main.rs b/src/main.rs index 6168d1e..ac8b8e8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,16 +1,20 @@ use std::{collections::HashMap, sync::Arc}; use ::pingora::{server::Server, services::background::background_service}; +use arc_swap::ArcSwap; use config::load_proxy_config; use instant_acme::KeyAuthorization; +use once_cell::sync::Lazy; use pingora::listeners::TlsSettings; use pingora_load_balancing::{health_check::TcpHealthCheck, LoadBalancer}; use pingora_proxy::http_proxy_service; +use stores::routes::RouteStore; mod config; mod docker; mod proxy_server; mod services; +mod stores; mod tools; #[derive(Debug)] @@ -19,6 +23,10 @@ pub struct Storage { certificates: HashMap, } +/// Static reference to the route store that can be shared across threads +pub static ROUTE_STORE: Lazy> = + Lazy::new(|| ArcSwap::new(Arc::new(RouteStore::new()))); + pub type StorageArc = Arc>; impl Storage { @@ -62,26 +70,26 @@ impl Default for Storage { } } -fn init_tracing_subscriber(config: &config::Config) { - tracing_subscriber::fmt() - .with_max_level(&config.logging.level) - .init() -} - fn main() -> Result<(), anyhow::Error> { // Loads configuration from command-line, YAML or TOML sources let proxy_config = load_proxy_config("/etc/proksi/configs")?; + // let file_appender = tracing_appender::rolling::hourly("./tmp", "proksi.log"); + let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout()); + // Creates a tracing/logging subscriber based on the configuration provided - init_tracing_subscriber(&proxy_config); + tracing_subscriber::fmt() + .with_max_level(&proxy_config.logging.level) + .compact() + .with_writer(non_blocking) + .init(); // Pingora load balancer server - let mut pingora_server = Server::new(None).unwrap(); + let mut pingora_server = Server::new(None)?; // Request router: // Given a host header, the router will return the corresponding upstreams - // The router will also handle health checks and failover in case of upstream failure - let mut router = proxy_server::https_proxy::Router::new(); + let mut router_store = RouteStore::new(); // for each route, build a loadbalancer configuration with the corresponding upstreams for route in proxy_config.routes { @@ -92,16 +100,13 @@ fn main() -> Result<(), anyhow::Error> { .map(|upstr| format!("{}:{}", upstr.ip, upstr.port)); let mut upstreams = LoadBalancer::try_from_iter(addr_upstreams)?; - let tcp_health_check = TcpHealthCheck::new(); upstreams.set_health_check(tcp_health_check); let health_check_service = background_service(&route.host, upstreams); - let upstreams = health_check_service.task(); - router.add_route(route.host, upstreams); - + router_store.add_route(route.host, upstreams); pingora_server.add_service(health_check_service); } @@ -120,7 +125,7 @@ fn main() -> Result<(), anyhow::Error> { // Service: Lets Encrypt HTTP Challenge/Certificate renewal let letsencrypt_http = services::letsencrypt::http01::HttpLetsencrypt::new( - &router.get_route_keys(), + &ROUTE_STORE.load().get_route_keys(), "youremail@example.com", storage.clone(), ); @@ -137,6 +142,9 @@ fn main() -> Result<(), anyhow::Error> { ); // Service: HTTPS Load Balancer (main service) + // The router will also handle health checks and failover in case of upstream failure + ROUTE_STORE.swap(Arc::new(router_store)); + let router = proxy_server::https_proxy::Router {}; let mut https_service = http_proxy_service(&pingora_server.configuration, router); http_service.add_tcp("0.0.0.0:80"); @@ -148,6 +156,7 @@ fn main() -> Result<(), anyhow::Error> { pingora_server.add_service(https_service); pingora_server.add_service(docker_service); pingora_server.add_service(le_service); + // pingora_server.add_service(logger_service); pingora_server.bootstrap(); pingora_server.run_forever(); diff --git a/src/proxy_server/http_proxy.rs b/src/proxy_server/http_proxy.rs index 5871a8c..37ac5b5 100644 --- a/src/proxy_server/http_proxy.rs +++ b/src/proxy_server/http_proxy.rs @@ -127,18 +127,16 @@ impl ProxyHttp for HttpLB { } } -/// Retrieves the host from the request headers based on whether -/// the request is HTTP/1.1 or HTTP/2 -fn get_host(session: &mut Session) -> String { +/// Retrieves the host from the request headers based on +/// whether the request is HTTP/1.1 or HTTP/2 +fn get_host(session: &mut Session) -> &str { if let Some(host) = session.get_header(http::header::HOST) { - if let Ok(host_str) = host.to_str() { - return host_str.to_string(); - } + return host.to_str().unwrap_or(""); } if let Some(host) = session.req_header().uri.host() { - return host.to_string(); + return host; } - "".to_string() + "" } diff --git a/src/proxy_server/https_proxy.rs b/src/proxy_server/https_proxy.rs index 4cf602b..14b1bcc 100644 --- a/src/proxy_server/https_proxy.rs +++ b/src/proxy_server/https_proxy.rs @@ -1,8 +1,4 @@ -use std::{ - collections::{BTreeMap, HashMap}, - sync::Arc, - time::Duration, -}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; use async_trait::async_trait; use pingora::{ @@ -15,6 +11,8 @@ use pingora_load_balancing::{selection::RoundRobin, LoadBalancer}; use pingora_proxy::{ProxyHttp, Session}; use tracing::info; +use crate::ROUTE_STORE; + /// Default peer options to be used on every upstream connection pub const DEFAULT_PEER_OPTIONS: PeerOptions = PeerOptions { verify_hostname: true, @@ -45,27 +43,7 @@ pub const DEFAULT_PEER_OPTIONS: PeerOptions = PeerOptions { type ArcedLB = Arc>; /// Load balancer proxy struct -pub struct Router { - routes: HashMap>>, -} - -impl Router { - pub fn new() -> Self { - Router { - routes: HashMap::new(), - } - } - - /// Adds a new route using a hostname and a LoadBalancer instance wrapped in an `Arc` - pub fn add_route(&mut self, route: String, upstream: ArcedLB) { - self.routes.insert(route, upstream); - } - - /// Returns all registered routes hosts - pub fn get_route_keys(&self) -> Vec { - self.routes.keys().cloned().collect() - } -} +pub struct Router; pub struct RouterContext { pub host: Option, @@ -97,7 +75,7 @@ impl ProxyHttp for Router { let host_without_port = req_host.split(':').collect::>()[0].to_string(); // If there's no host matching, returns a 404 - let upstream_lb = self.routes.get(&host_without_port); + let upstream_lb = ROUTE_STORE.load().get_route(&host_without_port); if upstream_lb.is_none() { return Err(pingora::Error::new(HTTPStatus(404))); } diff --git a/src/services/letsencrypt/http01.rs b/src/services/letsencrypt/http01.rs index 1f68139..edde014 100644 --- a/src/services/letsencrypt/http01.rs +++ b/src/services/letsencrypt/http01.rs @@ -10,7 +10,7 @@ use instant_acme::{AccountCredentials, ChallengeType, LetsEncrypt, Order}; use pingora::{server::ShutdownWatch, services::background::BackgroundService}; use rcgen::KeyPair; use serde::{Deserialize, Serialize}; -use tracing::{info, instrument}; +use tracing::info; use crate::StorageArc; @@ -202,7 +202,6 @@ impl HttpLetsencrypt { #[async_trait] impl BackgroundService for HttpLetsencrypt { - #[instrument] async fn start(&self, _shutdown: ShutdownWatch) -> () { info!(service = "LetsEncrypt", "Background service started"); diff --git a/src/services/logger/mod.rs b/src/services/logger/mod.rs new file mode 100644 index 0000000..0b7fe86 --- /dev/null +++ b/src/services/logger/mod.rs @@ -0,0 +1,16 @@ +use async_trait::async_trait; +use pingora::{server::ShutdownWatch, services::background::BackgroundService}; + +use crate::config::LogLevel; + +pub struct ProxyLogger(pub LogLevel); + +#[async_trait] +impl BackgroundService for ProxyLogger { + async fn start(&self, _shutdown: ShutdownWatch) { + tracing_subscriber::fmt() + .with_max_level(&self.0) + .with_writer(std::io::stdout) + .init() + } +} diff --git a/src/services/mod.rs b/src/services/mod.rs index 3773d56..8c13f71 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -1 +1,2 @@ pub mod letsencrypt; +pub mod logger; diff --git a/src/stores/mod.rs b/src/stores/mod.rs new file mode 100644 index 0000000..6a664ab --- /dev/null +++ b/src/stores/mod.rs @@ -0,0 +1 @@ +pub mod routes; diff --git a/src/stores/routes.rs b/src/stores/routes.rs new file mode 100644 index 0000000..2bd1640 --- /dev/null +++ b/src/stores/routes.rs @@ -0,0 +1,38 @@ +use std::{collections::HashMap, sync::Arc}; + +use pingora_load_balancing::{selection::RoundRobin, LoadBalancer}; + +/// A store for routes that is updated in a background thread +#[derive(Clone)] +pub struct RouteStore { + routes: HashMap>>, +} + +impl RouteStore { + pub fn new() -> Self { + RouteStore { + routes: HashMap::new(), + } + } + + pub fn get_route_keys(&self) -> Vec { + self.routes.keys().cloned().collect() + } + + /// Adds a new route using a hostname and a LoadBalancer instance wrapped in an `Arc` + pub fn add_route(&mut self, route: String, upstream: Arc>) { + self.routes.insert(route, upstream); + } + + /// Gets a route from the store + pub fn get_route(&self, route: &str) -> Option>> { + self.routes.get(route).cloned() + } + + /// Optimistically removes a route from the store + pub fn _remove_route(&mut self, route: &str) -> bool { + self.routes.remove_entry(route); + + true + } +}