diff --git a/Cargo.lock b/Cargo.lock index 25f5004..6ee3d12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1447,6 +1447,7 @@ dependencies = [ "tokio-stream", "tonic", "tracing", + "tracing-subscriber", ] [[package]] @@ -1514,10 +1515,12 @@ name = "swbusd" version = "0.1.0" dependencies = [ "clap", + "sonic-common", "swbus-core", "tokio", "tokio-stream", "tonic", + "tracing", ] [[package]] diff --git a/crates/sonic-common/src/log.rs b/crates/sonic-common/src/log.rs index 2735f4d..00fa477 100644 --- a/crates/sonic-common/src/log.rs +++ b/crates/sonic-common/src/log.rs @@ -8,7 +8,7 @@ use tracing_subscriber::{ use std::path::PathBuf; #[cfg(debug_assertions)] -const DEFAULT_LOG_LEVEL: &str = "trace"; +const DEFAULT_LOG_LEVEL: &str = "debug"; #[cfg(not(debug_assertions))] const DEFAULT_LOG_LEVEL: &str = "info"; @@ -20,7 +20,7 @@ pub fn init(program_name: &'static str) -> Result<()> { "RUST_LOG", std::env::var("RUST_LOG") .or_else(|_| std::env::var(log_level_env_var)) - .unwrap_or_else(|_| format!("{}={}", program_name, DEFAULT_LOG_LEVEL)), + .unwrap_or_else(|_| DEFAULT_LOG_LEVEL.to_string()), ); let file_subscriber = new_file_subscriber(program_name).wrap_err("Unable to create file subscriber.")?; diff --git a/crates/swbus-cli/Cargo.toml b/crates/swbus-cli/Cargo.toml index 3fd396f..85a5556 100644 --- a/crates/swbus-cli/Cargo.toml +++ b/crates/swbus-cli/Cargo.toml @@ -22,7 +22,7 @@ tabled.workspace = true # Log and error handling tracing.workspace = true - +tracing-subscriber.workspace = true # Internal dependencies swbus-edge.workspace = true swbus-core.workspace = true diff --git a/crates/swbus-cli/src/main.rs b/crates/swbus-cli/src/main.rs index d32377e..a1de3a4 100644 --- a/crates/swbus-cli/src/main.rs +++ b/crates/swbus-cli/src/main.rs @@ -9,6 +9,7 @@ use tokio::sync::mpsc; use tokio::sync::Mutex; use tokio::time::{self, Duration, Instant}; use tracing::info; +use tracing_subscriber::{fmt, prelude::*, Layer}; #[derive(Parser, Debug)] #[command(name = "swbuscli")] @@ -90,10 +91,42 @@ pub(crate) async fn wait_for_response( ResponseResult::from_code(SwbusErrorCode::Timeout as i32, "request timeout".to_string(), None) } +fn init_logger(debug: bool) { + let stdout_level = if debug { + tracing::level_filters::LevelFilter::DEBUG + } else { + tracing::level_filters::LevelFilter::INFO + }; + + // Create a stdout logger for `info!` and lower severity levels + let stdout_layer = fmt::layer() + .with_writer(std::io::stdout) + .without_time() + .with_target(false) + .with_level(false) + .with_filter(stdout_level); + + // Create a stderr logger for `error!` and higher severity levels + let stderr_layer = fmt::layer() + .with_writer(std::io::stderr) + .without_time() + .with_target(false) + .with_level(false) + .with_filter(tracing::level_filters::LevelFilter::ERROR); + + // Combine the layers and set them as the global subscriber + tracing_subscriber::registry() + .with(stdout_layer) + .with(stderr_layer) + .init(); +} + #[tokio::main] async fn main() { let args = Command::parse(); + init_logger(args.debug); + let runtime = Arc::new(Mutex::new(SwbusEdgeRuntime::new( format!("http://{}", args.address), args.service_path.clone(), diff --git a/crates/swbus-core/src/mux/conn.rs b/crates/swbus-core/src/mux/conn.rs index 47e6ee4..e70853a 100644 --- a/crates/swbus-core/src/mux/conn.rs +++ b/crates/swbus-core/src/mux/conn.rs @@ -16,7 +16,7 @@ use tokio_util::sync::CancellationToken; use tonic::metadata::MetadataValue; use tonic::transport::{Channel, Endpoint}; use tonic::{Request, Status, Streaming}; -use tracing::error; +use tracing::*; #[derive(Debug)] pub struct SwbusConn { @@ -76,7 +76,7 @@ impl SwbusConn { let channel = match endpoint.connect().await { Ok(c) => c, Err(e) => { - error!("Failed to connect: {}.", e); + debug!("Failed to connect: {}.", e); return Err(SwbusError::connection( SwbusErrorCode::ConnectionError, io::Error::new(io::ErrorKind::ConnectionReset, e.to_string()), diff --git a/crates/swbus-core/src/mux/conn_store.rs b/crates/swbus-core/src/mux/conn_store.rs index b7c623d..87f2abf 100644 --- a/crates/swbus-core/src/mux/conn_store.rs +++ b/crates/swbus-core/src/mux/conn_store.rs @@ -7,7 +7,7 @@ use dashmap::{DashMap, DashSet}; use std::sync::Arc; use tokio::task::JoinHandle; use tokio::time::Duration; -use tracing::{error, info}; +use tracing::*; #[derive(Debug)] enum ConnTracker { @@ -30,30 +30,35 @@ impl SwbusConnStore { } } + #[instrument(skip(self, conn_info), fields(conn_id=conn_info.id()))] fn start_connect_task(self: &Arc, conn_info: Arc, reconnect: bool) { let conn_info_clone = conn_info.clone(); - + info!("Starting connection task to the peer"); let retry_interval = match reconnect { true => Duration::from_millis(1), false => Duration::from_secs(1), }; let mux_clone = self.mux.clone(); let conn_store = self.clone(); - let retry_task: JoinHandle<()> = tokio::spawn(async move { - loop { - match SwbusConn::connect(conn_info.clone(), mux_clone.clone(), conn_store.clone()).await { - Ok(conn) => { - info!("Successfully connect to peer {}", conn_info.id()); - // register the new connection and update the route table - conn_store.conn_established(conn); - break; - } - Err(_) => { - tokio::time::sleep(retry_interval).await; - } + let current_span = Span::current(); + let retry_task: JoinHandle<()> = tokio::spawn( + async move { + loop { + match SwbusConn::connect(conn_info.clone(), mux_clone.clone(), conn_store.clone()).await { + Ok(conn) => { + info!("Successfully connect to the peer"); + // register the new connection and update the route table + conn_store.conn_established(conn); + return; + } + Err(_) => { + tokio::time::sleep(retry_interval).await; + } + }; } } - }); + .instrument(current_span.clone()), + ); self.connections.insert(conn_info_clone, ConnTracker::Task(retry_task)); } diff --git a/crates/swbus-core/src/mux/conn_worker.rs b/crates/swbus-core/src/mux/conn_worker.rs index c85f0ee..afe86f6 100644 --- a/crates/swbus-core/src/mux/conn_worker.rs +++ b/crates/swbus-core/src/mux/conn_worker.rs @@ -10,7 +10,7 @@ use swbus_proto::swbus::*; use tokio_stream::StreamExt; use tokio_util::sync::CancellationToken; use tonic::Status; -use tracing::{error, info}; +use tracing::*; pub struct SwbusConnWorker where @@ -49,12 +49,16 @@ where self.shutdown_ct.cancel(); } + #[instrument(name="ConnWorker", skip(self), fields(conn_id=self.info.id()))] pub async fn run(&mut self) -> Result<()> { + info!("Starting connection worker"); self.register_to_mux()?; let result = self.run_worker_loop().await; // unregister from mux + info!("Unregistering from mux."); self.unregister_from_mux()?; if result.is_err() { + info!("Reporting connection lost."); self.conn_store.conn_lost(self.info.clone()); } result @@ -115,7 +119,9 @@ where Ok(()) } + #[instrument(name="receive_msg", level="debug", skip_all, fields(message.id=message.header.as_ref().unwrap().id))] async fn process_data_message(&mut self, message: SwbusMessage) -> Result<()> { + debug!("{:?}", &message); self.validate_message_common(&message)?; match message.body { Some(swbus_message::Body::TraceRouteRequest(_)) => { diff --git a/crates/swbus-core/src/mux/multiplexer.rs b/crates/swbus-core/src/mux/multiplexer.rs index 0240841..0979098 100644 --- a/crates/swbus-core/src/mux/multiplexer.rs +++ b/crates/swbus-core/src/mux/multiplexer.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use swbus_proto::message_id_generator::MessageIdGenerator; use swbus_proto::result::*; use swbus_proto::swbus::*; +use tracing::*; enum RouteStage { Global, @@ -68,13 +69,17 @@ impl SwbusMultiplexer { self.routes.remove(&route_key); } + #[instrument(name = "update_route", level = "info", skip(self, nexthop), fields(nh_type=?nexthop.nh_type(), hop_count=nexthop.hop_count(), conn_info=nexthop.conn_info().as_ref().map(|x| x.id()).unwrap_or(&"None".to_string())))] pub(crate) fn update_route(&self, route_key: String, nexthop: SwbusNextHop) { // If route entry doesn't exist, we insert the next hop as a new one. + info!("Update route entry"); match self.routes.entry(route_key) { Entry::Occupied(mut existing) => { let route_entry = existing.get(); if route_entry.hop_count() > nexthop.hop_count() { existing.insert(nexthop); + } else { + info!("Route entry already exists with smaller hop count"); } } Entry::Vacant(entry) => { @@ -116,8 +121,19 @@ impl SwbusMultiplexer { .key .clone() } - + #[instrument(name="route_message", parent=None, level="debug", skip_all, fields(message_id=?message.header.as_ref().unwrap().id))] pub async fn route_message(&self, message: SwbusMessage) -> Result<()> { + debug!( + destination = message + .header + .as_ref() + .unwrap() + .destination + .as_ref() + .unwrap() + .to_longest_path(), + "Routing message" + ); let header = match message.header { Some(ref header) => header, None => { @@ -162,6 +178,7 @@ impl SwbusMultiplexer { return Ok(()); } + info!("No route found for destination: {}", destination.to_longest_path()); let response = SwbusMessage::new_response( &message, Some(&self.get_my_service_path()), diff --git a/crates/swbus-core/src/mux/nexthop.rs b/crates/swbus-core/src/mux/nexthop.rs index f08f05c..c0d77c2 100644 --- a/crates/swbus-core/src/mux/nexthop.rs +++ b/crates/swbus-core/src/mux/nexthop.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use swbus_proto::result::*; use swbus_proto::swbus::*; use swbus_proto::swbus::{swbus_message, SwbusMessage}; -use tracing::info; +use tracing::*; #[derive(Debug, Copy, Clone, PartialEq)] pub(crate) enum NextHopType { @@ -58,19 +58,26 @@ impl SwbusNextHop { hop_count: 0, } } - + #[instrument(name="queue_message", parent=None, level="debug", skip_all, fields(nh_type=?self.nh_type, conn_info=self.conn_info.as_ref().map(|x| x.id()).unwrap_or(&"None".to_string()), message.id=?message.header.as_ref().unwrap().id))] pub async fn queue_message( &self, mux: &SwbusMultiplexer, mut message: SwbusMessage, ) -> Result> { + let current_span = tracing::Span::current(); + debug!("Queue message"); match self.nh_type { - NextHopType::Drop => self.drop_message(message).await, - NextHopType::Local => self.process_local_message(mux, message).await, + NextHopType::Drop => self.drop_message(message).instrument(current_span.clone()).await, + NextHopType::Local => { + self.process_local_message(mux, message) + .instrument(current_span.clone()) + .await + } NextHopType::Remote => { - let header: &mut SwbusMessageHeader = message.header.as_mut().expect("missing header"); //should not happen otherwise it won't reach here + let header: &mut SwbusMessageHeader = message.header.as_mut().expect("missing header"); // should not happen otherwise it won't reach here header.ttl -= 1; if header.ttl == 0 { + debug!("TTL expired"); let response = SwbusMessage::new_response( &message, Some(&mux.get_my_service_path()), @@ -81,6 +88,7 @@ impl SwbusNextHop { ); return Ok(Some(response)); } + debug!("Sending to the remote endpoint"); self.conn_proxy .as_ref() .expect("conn_proxy shouldn't be None in remote nexthop") @@ -97,7 +105,6 @@ impl SwbusNextHop { mux: &SwbusMultiplexer, message: SwbusMessage, ) -> Result> { - // @todo: move to trace // process message locally let response = match message.body.as_ref() { Some(swbus_message::Body::PingRequest(_)) => self.process_ping_request(mux, message).unwrap(), @@ -105,6 +112,7 @@ impl SwbusNextHop { self.process_mgmt_request(mux, &message, mgmt_request).unwrap() } _ => { + debug!("Invalid message type to a local endpoint"); return Err(SwbusError::input( SwbusErrorCode::ServiceNotFound, format!("Invalid message type to a local endpoint: {:?}", message), @@ -115,8 +123,7 @@ impl SwbusNextHop { } fn process_ping_request(&self, mux: &SwbusMultiplexer, message: SwbusMessage) -> Result { - // @todo: move to trace - // info!("Received ping request: {:?}", message); + debug!("Received ping request"); let id = mux.generate_message_id(); Ok(SwbusMessage::new_response( &message, @@ -136,6 +143,7 @@ impl SwbusNextHop { ) -> Result { match mgmt_request.request.as_str() { "show_route" => { + debug!("Received show_route request"); let routes = mux.export_routes(None); let response_msg = SwbusMessage::new_response( message, @@ -154,9 +162,8 @@ impl SwbusNextHop { } } - async fn drop_message(&self, message: SwbusMessage) -> Result> { - // todo: change to trace - info!("Drop message: {:?}", message); + async fn drop_message(&self, _: SwbusMessage) -> Result> { + debug!("Drop message"); // todo: increment drop counter Ok(None) } @@ -213,8 +220,14 @@ mod tests { async fn test_queue_message_drop() { let nexthop = SwbusNextHop::new_drop(); let mux = SwbusMultiplexer::default(); - let message = SwbusMessage::default(); - + let message = SwbusMessage { + header: Some(SwbusMessageHeader::new( + ServicePath::from_string("region-a.cluster-a.10.0.0.1-dpu0").unwrap(), + ServicePath::from_string("region-a.cluster-a.10.0.0.2-dpu0").unwrap(), + 1, + )), + body: None, + }; let result = nexthop.queue_message(&mux, message).await.unwrap(); assert!(result.is_none()); } diff --git a/crates/swbus-core/src/mux/service.rs b/crates/swbus-core/src/mux/service.rs index 1500686..868aceb 100644 --- a/crates/swbus-core/src/mux/service.rs +++ b/crates/swbus-core/src/mux/service.rs @@ -13,9 +13,7 @@ use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::Stream; use tonic::{transport::Server, Request, Response, Status, Streaming}; -use tracing::error; -use tracing::info; - +use tracing::*; pub struct SwbusServiceHost { swbus_server_addr: String, mux: Arc, @@ -80,6 +78,7 @@ impl SwbusServiceHost { impl SwbusService for SwbusServiceHost { type StreamMessagesStream = SwbusMessageStream; + #[instrument(name="connection_received", level="info", skip_all, fields(addr=%request.remote_addr().unwrap()))] async fn stream_messages( &self, request: Request>, @@ -118,6 +117,11 @@ impl SwbusService for SwbusServiceHost { }; let in_stream = request.into_inner(); + info!( + conn_type = conn_type as i32, + service_path = service_path.to_longest_path(), + "Creating SwbusConn" + ); // outgoing message queue let (out_tx, out_rx) = mpsc::channel(16); diff --git a/crates/swbus-core/tests/common/test_executor.rs b/crates/swbus-core/tests/common/test_executor.rs index 2f8187a..3936db3 100644 --- a/crates/swbus-core/tests/common/test_executor.rs +++ b/crates/swbus-core/tests/common/test_executor.rs @@ -13,7 +13,7 @@ use tokio::task::JoinHandle; use tokio::time::{self, Duration, Instant}; use tracing::{error, info}; -//3 seconds receive timeout +// 3 seconds receive timeout pub const RECEIVE_TIMEOUT: u32 = 3; /// The Topo struct contains the server jobs and clients' TX and RX of its message queues. diff --git a/crates/swbusd/Cargo.toml b/crates/swbusd/Cargo.toml index efd059d..b9e0830 100644 --- a/crates/swbusd/Cargo.toml +++ b/crates/swbusd/Cargo.toml @@ -17,6 +17,8 @@ tokio.workspace = true tokio-stream.workspace = true tonic.workspace = true swbus-core.workspace = true +sonic-common.workspace = true +tracing.workspace = true clap = { version = "4.0", features = ["derive"] } [lints] diff --git a/crates/swbusd/src/main.rs b/crates/swbusd/src/main.rs index c486904..c613de8 100644 --- a/crates/swbusd/src/main.rs +++ b/crates/swbusd/src/main.rs @@ -1,7 +1,8 @@ use clap::Parser; +use sonic_common::log; use swbus_core::mux::route_config::RoutesConfig; use swbus_core::mux::service::SwbusServiceHost; - +use tracing::info; #[derive(Parser, Debug)] #[command(name = "swbusd")] struct Args { @@ -16,6 +17,11 @@ struct Args { #[tokio::main] async fn main() { let args = Args::parse(); + + if let Err(e) = log::init("swbusd") { + eprintln!("Failed to initialize logging: {}", e); + } + info!("Starting swbusd"); let route_config = RoutesConfig::load_from_yaml(args.route_config).unwrap(); let server = SwbusServiceHost::new(args.address); server.start(route_config).await.unwrap();