Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add trace logs to swbus-core #41

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/sonic-common/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tracing_subscriber::{
use std::path::PathBuf;

#[cfg(debug_assertions)]
const DEFAULT_LOG_LEVEL: &str = "trace";
const DEFAULT_LOG_LEVEL: &str = "debug";

#[cfg(not(debug_assertions))]
const DEFAULT_LOG_LEVEL: &str = "info";
Expand All @@ -20,7 +20,7 @@ pub fn init(program_name: &'static str) -> Result<()> {
"RUST_LOG",
std::env::var("RUST_LOG")
.or_else(|_| std::env::var(log_level_env_var))
.unwrap_or_else(|_| format!("{}={}", program_name, DEFAULT_LOG_LEVEL)),
.unwrap_or_else(|_| DEFAULT_LOG_LEVEL.to_string()),
);

let file_subscriber = new_file_subscriber(program_name).wrap_err("Unable to create file subscriber.")?;
Expand Down
2 changes: 1 addition & 1 deletion crates/swbus-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ tabled.workspace = true

# Log and error handling
tracing.workspace = true

tracing-subscriber.workspace = true
# Internal dependencies
swbus-edge.workspace = true
swbus-core.workspace = true
Expand Down
33 changes: 33 additions & 0 deletions crates/swbus-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::time::{self, Duration, Instant};
use tracing::info;
use tracing_subscriber::{fmt, prelude::*, Layer};

#[derive(Parser, Debug)]
#[command(name = "swbuscli")]
Expand Down Expand Up @@ -90,10 +91,42 @@ pub(crate) async fn wait_for_response(
ResponseResult::from_code(SwbusErrorCode::Timeout as i32, "request timeout".to_string(), None)
}

fn init_logger(debug: bool) {
let stdout_level = if debug {
tracing::level_filters::LevelFilter::DEBUG
} else {
tracing::level_filters::LevelFilter::INFO
};

// Create a stdout logger for `info!` and lower severity levels
let stdout_layer = fmt::layer()
.with_writer(std::io::stdout)
.without_time()
.with_target(false)
.with_level(false)
.with_filter(stdout_level);

// Create a stderr logger for `error!` and higher severity levels
let stderr_layer = fmt::layer()
.with_writer(std::io::stderr)
.without_time()
.with_target(false)
.with_level(false)
.with_filter(tracing::level_filters::LevelFilter::ERROR);

// Combine the layers and set them as the global subscriber
tracing_subscriber::registry()
.with(stdout_layer)
.with(stderr_layer)
.init();
}

#[tokio::main]
async fn main() {
let args = Command::parse();

init_logger(args.debug);

let runtime = Arc::new(Mutex::new(SwbusEdgeRuntime::new(
format!("http://{}", args.address),
args.service_path.clone(),
Expand Down
4 changes: 2 additions & 2 deletions crates/swbus-core/src/mux/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio_util::sync::CancellationToken;
use tonic::metadata::MetadataValue;
use tonic::transport::{Channel, Endpoint};
use tonic::{Request, Status, Streaming};
use tracing::error;
use tracing::*;

#[derive(Debug)]
pub struct SwbusConn {
Expand Down Expand Up @@ -76,7 +76,7 @@ impl SwbusConn {
let channel = match endpoint.connect().await {
Ok(c) => c,
Err(e) => {
error!("Failed to connect: {}.", e);
debug!("Failed to connect: {}.", e);
return Err(SwbusError::connection(
SwbusErrorCode::ConnectionError,
io::Error::new(io::ErrorKind::ConnectionReset, e.to_string()),
Expand Down
35 changes: 20 additions & 15 deletions crates/swbus-core/src/mux/conn_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use dashmap::{DashMap, DashSet};
use std::sync::Arc;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use tracing::{error, info};
use tracing::*;

#[derive(Debug)]
enum ConnTracker {
Expand All @@ -30,30 +30,35 @@ impl SwbusConnStore {
}
}

#[instrument(skip(self, conn_info), fields(conn_id=conn_info.id()))]
fn start_connect_task(self: &Arc<SwbusConnStore>, conn_info: Arc<SwbusConnInfo>, reconnect: bool) {
let conn_info_clone = conn_info.clone();

info!("Starting connection task to the peer");
let retry_interval = match reconnect {
true => Duration::from_millis(1),
false => Duration::from_secs(1),
};
let mux_clone = self.mux.clone();
let conn_store = self.clone();
let retry_task: JoinHandle<()> = tokio::spawn(async move {
loop {
match SwbusConn::connect(conn_info.clone(), mux_clone.clone(), conn_store.clone()).await {
Ok(conn) => {
info!("Successfully connect to peer {}", conn_info.id());
// register the new connection and update the route table
conn_store.conn_established(conn);
break;
}
Err(_) => {
tokio::time::sleep(retry_interval).await;
}
let current_span = Span::current();
let retry_task: JoinHandle<()> = tokio::spawn(
async move {
loop {
match SwbusConn::connect(conn_info.clone(), mux_clone.clone(), conn_store.clone()).await {
Ok(conn) => {
info!("Successfully connect to the peer");
// register the new connection and update the route table
conn_store.conn_established(conn);
return;
}
Err(_) => {
tokio::time::sleep(retry_interval).await;
}
};
}
}
});
.instrument(current_span.clone()),
);
self.connections.insert(conn_info_clone, ConnTracker::Task(retry_task));
}

Expand Down
8 changes: 7 additions & 1 deletion crates/swbus-core/src/mux/conn_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use swbus_proto::swbus::*;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use tonic::Status;
use tracing::{error, info};
use tracing::*;

pub struct SwbusConnWorker<T>
where
Expand Down Expand Up @@ -49,12 +49,16 @@ where
self.shutdown_ct.cancel();
}

#[instrument(name="ConnWorker", skip(self), fields(conn_id=self.info.id()))]
pub async fn run(&mut self) -> Result<()> {
info!("Starting connection worker");
self.register_to_mux()?;
let result = self.run_worker_loop().await;
// unregister from mux
info!("Unregistering from mux.");
self.unregister_from_mux()?;
if result.is_err() {
info!("Reporting connection lost.");
self.conn_store.conn_lost(self.info.clone());
}
result
Expand Down Expand Up @@ -115,7 +119,9 @@ where
Ok(())
}

#[instrument(name="receive_msg", level="debug", skip_all, fields(message.id=message.header.as_ref().unwrap().id))]
async fn process_data_message(&mut self, message: SwbusMessage) -> Result<()> {
debug!("{:?}", &message);
self.validate_message_common(&message)?;
match message.body {
Some(swbus_message::Body::TraceRouteRequest(_)) => {
Expand Down
19 changes: 18 additions & 1 deletion crates/swbus-core/src/mux/multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;
use swbus_proto::message_id_generator::MessageIdGenerator;
use swbus_proto::result::*;
use swbus_proto::swbus::*;
use tracing::*;

enum RouteStage {
Global,
Expand Down Expand Up @@ -68,13 +69,17 @@ impl SwbusMultiplexer {
self.routes.remove(&route_key);
}

#[instrument(name = "update_route", level = "info", skip(self, nexthop), fields(nh_type=?nexthop.nh_type(), hop_count=nexthop.hop_count(), conn_info=nexthop.conn_info().as_ref().map(|x| x.id()).unwrap_or(&"None".to_string())))]
pub(crate) fn update_route(&self, route_key: String, nexthop: SwbusNextHop) {
// If route entry doesn't exist, we insert the next hop as a new one.
info!("Update route entry");
match self.routes.entry(route_key) {
Entry::Occupied(mut existing) => {
let route_entry = existing.get();
if route_entry.hop_count() > nexthop.hop_count() {
existing.insert(nexthop);
} else {
info!("Route entry already exists with smaller hop count");
}
}
Entry::Vacant(entry) => {
Expand Down Expand Up @@ -116,8 +121,19 @@ impl SwbusMultiplexer {
.key
.clone()
}

#[instrument(name="route_message", parent=None, level="debug", skip_all, fields(message_id=?message.header.as_ref().unwrap().id))]
pub async fn route_message(&self, message: SwbusMessage) -> Result<()> {
debug!(
destination = message
.header
.as_ref()
.unwrap()
.destination
.as_ref()
.unwrap()
.to_longest_path(),
"Routing message"
);
let header = match message.header {
Some(ref header) => header,
None => {
Expand Down Expand Up @@ -162,6 +178,7 @@ impl SwbusMultiplexer {
return Ok(());
}

info!("No route found for destination: {}", destination.to_longest_path());
let response = SwbusMessage::new_response(
&message,
Some(&self.get_my_service_path()),
Expand Down
39 changes: 26 additions & 13 deletions crates/swbus-core/src/mux/nexthop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use swbus_proto::result::*;
use swbus_proto::swbus::*;
use swbus_proto::swbus::{swbus_message, SwbusMessage};
use tracing::info;
use tracing::*;

#[derive(Debug, Copy, Clone, PartialEq)]
pub(crate) enum NextHopType {
Expand Down Expand Up @@ -58,19 +58,26 @@ impl SwbusNextHop {
hop_count: 0,
}
}

#[instrument(name="queue_message", parent=None, level="debug", skip_all, fields(nh_type=?self.nh_type, conn_info=self.conn_info.as_ref().map(|x| x.id()).unwrap_or(&"None".to_string()), message.id=?message.header.as_ref().unwrap().id))]
pub async fn queue_message(
&self,
mux: &SwbusMultiplexer,
mut message: SwbusMessage,
) -> Result<Option<SwbusMessage>> {
let current_span = tracing::Span::current();
debug!("Queue message");
match self.nh_type {
NextHopType::Drop => self.drop_message(message).await,
NextHopType::Local => self.process_local_message(mux, message).await,
NextHopType::Drop => self.drop_message(message).instrument(current_span.clone()).await,
NextHopType::Local => {
self.process_local_message(mux, message)
.instrument(current_span.clone())
.await
}
NextHopType::Remote => {
let header: &mut SwbusMessageHeader = message.header.as_mut().expect("missing header"); //should not happen otherwise it won't reach here
let header: &mut SwbusMessageHeader = message.header.as_mut().expect("missing header"); // should not happen otherwise it won't reach here
header.ttl -= 1;
if header.ttl == 0 {
debug!("TTL expired");
let response = SwbusMessage::new_response(
&message,
Some(&mux.get_my_service_path()),
Expand All @@ -81,6 +88,7 @@ impl SwbusNextHop {
);
return Ok(Some(response));
}
debug!("Sending to the remote endpoint");
self.conn_proxy
.as_ref()
.expect("conn_proxy shouldn't be None in remote nexthop")
Expand All @@ -97,14 +105,14 @@ impl SwbusNextHop {
mux: &SwbusMultiplexer,
message: SwbusMessage,
) -> Result<Option<SwbusMessage>> {
// @todo: move to trace
// process message locally
let response = match message.body.as_ref() {
Some(swbus_message::Body::PingRequest(_)) => self.process_ping_request(mux, message).unwrap(),
Some(swbus_message::Body::ManagementRequest(mgmt_request)) => {
self.process_mgmt_request(mux, &message, mgmt_request).unwrap()
}
_ => {
debug!("Invalid message type to a local endpoint");
return Err(SwbusError::input(
SwbusErrorCode::ServiceNotFound,
format!("Invalid message type to a local endpoint: {:?}", message),
Expand All @@ -115,8 +123,7 @@ impl SwbusNextHop {
}

fn process_ping_request(&self, mux: &SwbusMultiplexer, message: SwbusMessage) -> Result<SwbusMessage> {
// @todo: move to trace
// info!("Received ping request: {:?}", message);
debug!("Received ping request");
let id = mux.generate_message_id();
Ok(SwbusMessage::new_response(
&message,
Expand All @@ -136,6 +143,7 @@ impl SwbusNextHop {
) -> Result<SwbusMessage> {
match mgmt_request.request.as_str() {
"show_route" => {
debug!("Received show_route request");
let routes = mux.export_routes(None);
let response_msg = SwbusMessage::new_response(
message,
Expand All @@ -154,9 +162,8 @@ impl SwbusNextHop {
}
}

async fn drop_message(&self, message: SwbusMessage) -> Result<Option<SwbusMessage>> {
// todo: change to trace
info!("Drop message: {:?}", message);
async fn drop_message(&self, _: SwbusMessage) -> Result<Option<SwbusMessage>> {
debug!("Drop message");
// todo: increment drop counter
Ok(None)
}
Expand Down Expand Up @@ -213,8 +220,14 @@ mod tests {
async fn test_queue_message_drop() {
let nexthop = SwbusNextHop::new_drop();
let mux = SwbusMultiplexer::default();
let message = SwbusMessage::default();

let message = SwbusMessage {
header: Some(SwbusMessageHeader::new(
ServicePath::from_string("region-a.cluster-a.10.0.0.1-dpu0").unwrap(),
ServicePath::from_string("region-a.cluster-a.10.0.0.2-dpu0").unwrap(),
1,
)),
body: None,
};
let result = nexthop.queue_message(&mux, message).await.unwrap();
assert!(result.is_none());
}
Expand Down
Loading