Skip to content

Commit

Permalink
feat: add non-blocking tracing and static ROUTER with arc_swap
Browse files Browse the repository at this point in the history
  • Loading branch information
luizfonseca committed May 16, 2024
1 parent 737d95c commit 452f2e7
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 53 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
proksi.yaml
.zed/
.vscode/
/tmp
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ serde_json = "1.0.117"
tokio = { version = "1.37.0", features = ["rt-multi-thread"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
tracing-appender = "0.2.3"
arc-swap = "1.7.1"
once_cell = "1.19.0"
3 changes: 2 additions & 1 deletion src/docker/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{collections::HashMap, hash::Hash, thread::sleep, time::Duration};
use async_trait::async_trait;
use bollard::{service::ListServicesOptions, Docker};
use pingora::{server::ShutdownWatch, services::background::BackgroundService};
use tracing::info;

pub fn create_client() -> DockerClient {
let docker = Docker::connect_with_local_defaults();
Expand All @@ -20,7 +21,7 @@ impl DockerClient {
let mut default_filters = HashMap::new();
default_filters.insert("label", vec!["proksi.host", "proksi.port"]);

println!("Docker running");
info!("Docker running");
loop {
self.list_containers(default_filters.clone()).await;

Expand Down
39 changes: 24 additions & 15 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
use std::{collections::HashMap, sync::Arc};

use ::pingora::{server::Server, services::background::background_service};
use arc_swap::ArcSwap;
use config::load_proxy_config;
use instant_acme::KeyAuthorization;
use once_cell::sync::Lazy;
use pingora::listeners::TlsSettings;
use pingora_load_balancing::{health_check::TcpHealthCheck, LoadBalancer};
use pingora_proxy::http_proxy_service;
use stores::routes::RouteStore;

mod config;
mod docker;
mod proxy_server;
mod services;
mod stores;
mod tools;

#[derive(Debug)]
Expand All @@ -19,6 +23,10 @@ pub struct Storage {
certificates: HashMap<String, String>,
}

/// Static reference to the route store that can be shared across threads
pub static ROUTE_STORE: Lazy<ArcSwap<RouteStore>> =
Lazy::new(|| ArcSwap::new(Arc::new(RouteStore::new())));

pub type StorageArc = Arc<tokio::sync::Mutex<Storage>>;

impl Storage {
Expand Down Expand Up @@ -62,26 +70,26 @@ impl Default for Storage {
}
}

fn init_tracing_subscriber(config: &config::Config) {
tracing_subscriber::fmt()
.with_max_level(&config.logging.level)
.init()
}

fn main() -> Result<(), anyhow::Error> {
// Loads configuration from command-line, YAML or TOML sources
let proxy_config = load_proxy_config("/etc/proksi/configs")?;

// let file_appender = tracing_appender::rolling::hourly("./tmp", "proksi.log");
let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout());

// Creates a tracing/logging subscriber based on the configuration provided
init_tracing_subscriber(&proxy_config);
tracing_subscriber::fmt()
.with_max_level(&proxy_config.logging.level)
.compact()
.with_writer(non_blocking)
.init();

// Pingora load balancer server
let mut pingora_server = Server::new(None).unwrap();
let mut pingora_server = Server::new(None)?;

// Request router:
// Given a host header, the router will return the corresponding upstreams
// The router will also handle health checks and failover in case of upstream failure
let mut router = proxy_server::https_proxy::Router::new();
let mut router_store = RouteStore::new();

// for each route, build a loadbalancer configuration with the corresponding upstreams
for route in proxy_config.routes {
Expand All @@ -92,16 +100,13 @@ fn main() -> Result<(), anyhow::Error> {
.map(|upstr| format!("{}:{}", upstr.ip, upstr.port));

let mut upstreams = LoadBalancer::try_from_iter(addr_upstreams)?;

let tcp_health_check = TcpHealthCheck::new();
upstreams.set_health_check(tcp_health_check);

let health_check_service = background_service(&route.host, upstreams);

let upstreams = health_check_service.task();

router.add_route(route.host, upstreams);

router_store.add_route(route.host, upstreams);
pingora_server.add_service(health_check_service);
}

Expand All @@ -120,7 +125,7 @@ fn main() -> Result<(), anyhow::Error> {

// Service: Lets Encrypt HTTP Challenge/Certificate renewal
let letsencrypt_http = services::letsencrypt::http01::HttpLetsencrypt::new(
&router.get_route_keys(),
&ROUTE_STORE.load().get_route_keys(),
"[email protected]",
storage.clone(),
);
Expand All @@ -137,6 +142,9 @@ fn main() -> Result<(), anyhow::Error> {
);

// Service: HTTPS Load Balancer (main service)
// The router will also handle health checks and failover in case of upstream failure
ROUTE_STORE.swap(Arc::new(router_store));
let router = proxy_server::https_proxy::Router {};
let mut https_service = http_proxy_service(&pingora_server.configuration, router);
http_service.add_tcp("0.0.0.0:80");

Expand All @@ -148,6 +156,7 @@ fn main() -> Result<(), anyhow::Error> {
pingora_server.add_service(https_service);
pingora_server.add_service(docker_service);
pingora_server.add_service(le_service);
// pingora_server.add_service(logger_service);

pingora_server.bootstrap();
pingora_server.run_forever();
Expand Down
14 changes: 6 additions & 8 deletions src/proxy_server/http_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,16 @@ impl ProxyHttp for HttpLB {
}
}

/// Retrieves the host from the request headers based on whether
/// the request is HTTP/1.1 or HTTP/2
fn get_host(session: &mut Session) -> String {
/// Retrieves the host from the request headers based on
/// whether the request is HTTP/1.1 or HTTP/2
fn get_host(session: &mut Session) -> &str {
if let Some(host) = session.get_header(http::header::HOST) {
if let Ok(host_str) = host.to_str() {
return host_str.to_string();
}
return host.to_str().unwrap_or("");
}

if let Some(host) = session.req_header().uri.host() {
return host.to_string();
return host;
}

"".to_string()
""
}
32 changes: 5 additions & 27 deletions src/proxy_server/https_proxy.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
time::Duration,
};
use std::{collections::BTreeMap, sync::Arc, time::Duration};

use async_trait::async_trait;
use pingora::{
Expand All @@ -15,6 +11,8 @@ use pingora_load_balancing::{selection::RoundRobin, LoadBalancer};
use pingora_proxy::{ProxyHttp, Session};
use tracing::info;

use crate::ROUTE_STORE;

/// Default peer options to be used on every upstream connection
pub const DEFAULT_PEER_OPTIONS: PeerOptions = PeerOptions {
verify_hostname: true,
Expand Down Expand Up @@ -45,27 +43,7 @@ pub const DEFAULT_PEER_OPTIONS: PeerOptions = PeerOptions {

type ArcedLB = Arc<LoadBalancer<RoundRobin>>;
/// Load balancer proxy struct
pub struct Router {
routes: HashMap<String, Arc<LoadBalancer<RoundRobin>>>,
}

impl Router {
pub fn new() -> Self {
Router {
routes: HashMap::new(),
}
}

/// Adds a new route using a hostname and a LoadBalancer instance wrapped in an `Arc`
pub fn add_route(&mut self, route: String, upstream: ArcedLB) {
self.routes.insert(route, upstream);
}

/// Returns all registered routes hosts
pub fn get_route_keys(&self) -> Vec<String> {
self.routes.keys().cloned().collect()
}
}
pub struct Router;

pub struct RouterContext {
pub host: Option<String>,
Expand Down Expand Up @@ -97,7 +75,7 @@ impl ProxyHttp for Router {
let host_without_port = req_host.split(':').collect::<Vec<&str>>()[0].to_string();

// If there's no host matching, returns a 404
let upstream_lb = self.routes.get(&host_without_port);
let upstream_lb = ROUTE_STORE.load().get_route(&host_without_port);
if upstream_lb.is_none() {
return Err(pingora::Error::new(HTTPStatus(404)));
}
Expand Down
3 changes: 1 addition & 2 deletions src/services/letsencrypt/http01.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use instant_acme::{AccountCredentials, ChallengeType, LetsEncrypt, Order};
use pingora::{server::ShutdownWatch, services::background::BackgroundService};
use rcgen::KeyPair;
use serde::{Deserialize, Serialize};
use tracing::{info, instrument};
use tracing::info;

use crate::StorageArc;

Expand Down Expand Up @@ -202,7 +202,6 @@ impl HttpLetsencrypt {

#[async_trait]
impl BackgroundService for HttpLetsencrypt {
#[instrument]
async fn start(&self, _shutdown: ShutdownWatch) -> () {
info!(service = "LetsEncrypt", "Background service started");

Expand Down
16 changes: 16 additions & 0 deletions src/services/logger/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use async_trait::async_trait;
use pingora::{server::ShutdownWatch, services::background::BackgroundService};

use crate::config::LogLevel;

pub struct ProxyLogger(pub LogLevel);

#[async_trait]
impl BackgroundService for ProxyLogger {
async fn start(&self, _shutdown: ShutdownWatch) {
tracing_subscriber::fmt()
.with_max_level(&self.0)
.with_writer(std::io::stdout)
.init()
}
}
1 change: 1 addition & 0 deletions src/services/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod letsencrypt;
pub mod logger;
1 change: 1 addition & 0 deletions src/stores/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod routes;
38 changes: 38 additions & 0 deletions src/stores/routes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use std::{collections::HashMap, sync::Arc};

use pingora_load_balancing::{selection::RoundRobin, LoadBalancer};

/// A store for routes that is updated in a background thread
#[derive(Clone)]
pub struct RouteStore {
routes: HashMap<String, Arc<LoadBalancer<RoundRobin>>>,
}

impl RouteStore {
pub fn new() -> Self {
RouteStore {
routes: HashMap::new(),
}
}

pub fn get_route_keys(&self) -> Vec<String> {
self.routes.keys().cloned().collect()
}

/// Adds a new route using a hostname and a LoadBalancer instance wrapped in an `Arc`
pub fn add_route(&mut self, route: String, upstream: Arc<LoadBalancer<RoundRobin>>) {
self.routes.insert(route, upstream);
}

/// Gets a route from the store
pub fn get_route(&self, route: &str) -> Option<Arc<LoadBalancer<RoundRobin>>> {
self.routes.get(route).cloned()
}

/// Optimistically removes a route from the store
pub fn _remove_route(&mut self, route: &str) -> bool {
self.routes.remove_entry(route);

true
}
}

0 comments on commit 452f2e7

Please sign in to comment.