Skip to content

Commit

Permalink
feat: add proper listeners fetching support
Browse files Browse the repository at this point in the history
  • Loading branch information
ermineJose committed Feb 3, 2025
1 parent 65c07df commit 405c596
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 19 deletions.
5 changes: 5 additions & 0 deletions ant-networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ pub enum NetworkEvent {
KeysToFetchForReplication(Vec<(PeerId, RecordKey)>),
/// Started listening on a new address
NewListenAddr(Multiaddr),
/// stopped Listening from a address
ClosedListenAddr(Vec<Multiaddr>),
/// Report unverified record
UnverifiedRecord(Record),
/// Terminate Node on unrecoverable errors
Expand Down Expand Up @@ -180,6 +182,9 @@ impl Debug for NetworkEvent {
NetworkEvent::NewListenAddr(addr) => {
write!(f, "NetworkEvent::NewListenAddr({addr:?})")
}
NetworkEvent::ClosedListenAddr(addr ) => {
write!(f, "NetworkEvent::ClosedListenAddr({addr:?})")
}
NetworkEvent::UnverifiedRecord(record) => {
let pretty_key = PrettyPrintRecordKey::from(&record.key);
write!(f, "NetworkEvent::UnverifiedRecord({pretty_key:?})")
Expand Down
2 changes: 2 additions & 0 deletions ant-networking/src/event/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ impl SwarmDriver {
if let Some(relay_manager) = self.relay_manager.as_mut() {
relay_manager.on_listener_closed(&listener_id, &mut self.swarm);
}

self.send_event(NetworkEvent::ClosedListenAddr(addresses.clone()));
}
SwarmEvent::IncomingConnection {
connection_id,
Expand Down
5 changes: 4 additions & 1 deletion ant-node-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,9 @@ pub async fn status_report(
println!("Bin path: {}", daemon.daemon_path.to_string_lossy());
}




if let Some(faucet) = &node_registry.faucet {
print_banner(&format!(
"{} - {}",
Expand All @@ -459,7 +462,7 @@ pub async fn status_report(
.collect::<Vec<&NodeServiceData>>();
for node in nodes {
let peer_id = node.peer_id.map_or("-".to_string(), |p| p.to_string());
let connected_peers = node
let connected_peers = node
.connected_peers
.clone()
.map_or("-".to_string(), |p| p.len().to_string());
Expand Down
2 changes: 2 additions & 0 deletions ant-node/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,6 @@ pub enum Error {
InvalidRequest(String),
#[error("EVM Network error: {0}")]
EvmNetwork(String),
#[error("Not able to open the file for Listeners with option")]
InvalidListenerFileOperation,
}
115 changes: 97 additions & 18 deletions ant-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use super::{
error::Result, event::NodeEventsChannel, quote::quotes_verification, Marker, NodeEvent,
};
use crate::error::Error;
#[cfg(feature = "open-metrics")]
use crate::metrics::NodeMetricsRecorder;
use crate::RunningNode;
Expand All @@ -27,7 +28,7 @@ use ant_protocol::{
storage::ValidationType,
NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE,
};
use ant_service_management::metric::{write_network_metrics_to_file,NetworkInfoMetrics};
// use autonomi::client::address;
use bytes::Bytes;
use itertools::Itertools;
use libp2p::{identity::Keypair, Multiaddr, PeerId};
Expand All @@ -36,6 +37,7 @@ use rand::{
rngs::{OsRng, StdRng},
thread_rng, Rng, SeedableRng,
};
use std::io::{BufRead, Write};
use std::{
collections::HashMap,
net::SocketAddr,
Expand Down Expand Up @@ -69,7 +71,7 @@ const UNRELEVANT_RECORDS_CLEANUP_INTERVAL: Duration = Duration::from_secs(3600);
/// Highest score to achieve from each metric sub-sector during StorageChallenge.
const HIGHEST_SCORE: usize = 100;

/// Any nodes bearing a score below this shall be considered as bad.
/// Any n odes bearing a score below this shall be considered as bad.
/// Max is to be 100 * 100
const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 5000;

Expand Down Expand Up @@ -198,6 +200,7 @@ impl NodeBuilder {
#[cfg(feature = "open-metrics")]
metrics_recorder,
evm_network: self.evm_network,
root_dir: self.root_dir.clone(),
};

let node = Node {
Expand All @@ -219,21 +222,6 @@ impl NodeBuilder {
};

// Run the node
let runing_node_metrics = running_node.clone();
let _return_value = tokio::spawn(async move {
sleep(Duration::from_millis(200)).await;
let state = runing_node_metrics.get_swarm_local_state().await.expect("Failed to get swarm local state");
let connected_peers = state.connected_peers.iter().map(|p| p.to_string()).collect();
let listeners = state.listeners.iter().map(|m| m.to_string()).collect();
let network_info = NetworkInfoMetrics::new(connected_peers, listeners);

let _ = write_network_metrics_to_file(
runing_node_metrics.root_dir_path.clone(),
network_info,
runing_node_metrics.network.peer_id().to_string()
);
});

Ok(running_node)
}
}
Expand All @@ -257,6 +245,7 @@ struct NodeInner {
metrics_recorder: Option<NodeMetricsRecorder>,
reward_address: RewardsAddress,
evm_network: EvmNetwork,
root_dir: PathBuf,
}

impl Node {
Expand All @@ -275,6 +264,10 @@ impl Node {
&self.inner.network
}

pub(crate) fn get_root_dir(&self) -> &PathBuf {
&self.inner.root_dir
}

#[cfg(feature = "open-metrics")]
/// Returns a reference to the NodeMetricsRecorder if the `open-metrics` feature flag is enabled
/// This is used to record various metrics for the node.
Expand Down Expand Up @@ -451,7 +444,7 @@ impl Node {
fn handle_network_event(&self, event: NetworkEvent, peers_connected: &Arc<AtomicUsize>) {
let start = Instant::now();
let event_string = format!("{event:?}");
let event_header;
let mut event_header = "UnknownEvent";
debug!("Handling NetworkEvent {event_string:?}");

match event {
Expand Down Expand Up @@ -492,14 +485,53 @@ impl Node {
event_header = "NewListenAddr";
let network = self.network().clone();
let peers = self.initial_peers().clone();
let peer_id = self.network().peer_id().clone();
let root_dir_nw_info = self
.get_root_dir()
.clone()
.join("network_info")
.join(format!("listeners_{}", peer_id));

let path = std::path::Path::new(&root_dir_nw_info);

if !path.exists() {
println!("File does not exist. Creating it now...");
match std::fs::File::create(&root_dir_nw_info) {
Ok(_) => println!("File created successfully: {:?}", root_dir_nw_info),
Err(e) => eprintln!("Failed to create file: {}", e),
}
}

let _handle = spawn(async move {
for addr in peers {
if !contains_string(&root_dir_nw_info, &addr.to_string()) {
_ = append_to_file(&root_dir_nw_info, &addr.clone().to_string());
}
if let Err(err) = network.dial(addr.clone()).await {
tracing::error!("Failed to dial {addr}: {err:?}");
};
}
});
}
NetworkEvent::ClosedListenAddr(address ) => {
let peer_id = self.network().peer_id().clone();
let root_dir_nw_info = self
.get_root_dir()
.clone()
.join("network_info")
.join(format!("listeners_{}", peer_id));
let path = std::path::Path::new(&root_dir_nw_info);

if path.exists() {
let _handle = spawn(async move {
for addr in address {
if contains_string(&root_dir_nw_info, &addr.to_string()) {
_ = remove_from_file(&root_dir_nw_info, &addr.clone().to_string());
}
}
});
}
}
NetworkEvent::ResponseReceived { res } => {
event_header = "ResponseReceived";
debug!("NetworkEvent::ResponseReceived {res:?}");
Expand Down Expand Up @@ -1110,6 +1142,53 @@ fn challenge_score_scheme(
)
}

fn contains_string(file_path: &PathBuf, search_str: &str) -> bool {
match std::fs::read_to_string(file_path) {
Ok(contents) => contents.contains(search_str),
Err(e) => {
eprintln!("Failed to read file: {}", e);
false
}
}
}

fn append_to_file(file_path: &PathBuf, new_str: &str) -> Result<()> {
let mut file = std::fs::OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(file_path)
.map_err(|_| Error::InvalidListenerFileOperation)?;
if let Err(e) = file.write_all(format!("{}\n", new_str).as_bytes()) {
eprintln!("Failed to write to file: {}", e);
}
Ok(())
}

fn remove_from_file(file_path: &PathBuf, new_str: &str) -> Result<()> {
// Read all lines from the file
let file = std::fs::File::open(file_path).map_err(|_| Error::InvalidListenerFileOperation)?;
let reader = std::io::BufReader::new(file);

let lines: Vec<String> = reader
.lines()
.filter_map(|line| line.ok()) // Handle errors while reading
.filter(|line| !line.contains(new_str)) // Remove lines containing the keyword
.collect();

// Write back the filtered content
let mut file = std::fs::OpenOptions::new()
.write(true)
.truncate(true)
.open(file_path).map_err(|_| Error::InvalidListenerFileOperation)?;

for line in lines {
writeln!(file, "{}", line).map_err(|_| Error::InvalidListenerFileOperation)?; // Write each line with a newline
}

Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 405c596

Please sign in to comment.