Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop rpc replace metrics #2670

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 47 additions & 2 deletions ant-networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ pub struct NetworkBuilder {
metrics_server_port: Option<u16>,
request_timeout: Option<Duration>,
upnp: bool,
root_dir: Option<PathBuf>,
}

impl NetworkBuilder {
Expand All @@ -264,6 +265,7 @@ impl NetworkBuilder {
metrics_server_port: None,
request_timeout: None,
upnp: false,
root_dir: None,
}
}

Expand Down Expand Up @@ -304,6 +306,10 @@ impl NetworkBuilder {
self.upnp = upnp;
}

pub fn set_root_dir(&mut self, root_dir: PathBuf) {
self.root_dir = Some(root_dir);
}

/// Creates a new `SwarmDriver` instance, along with a `Network` handle
/// for sending commands and an `mpsc::Receiver<NetworkEvent>` for receiving
/// network events. It initializes the swarm, sets up the transport, and
Expand All @@ -318,7 +324,7 @@ impl NetworkBuilder {
///
/// Returns an error if there is a problem initializing the mDNS behaviour.
pub fn build_node(
self,
mut self,
root_dir: PathBuf,
) -> Result<(Network, mpsc::Receiver<NetworkEvent>, SwarmDriver)> {
let bootstrap_interval = rand::thread_rng().gen_range(
Expand Down Expand Up @@ -390,7 +396,7 @@ impl NetworkBuilder {

let listen_addr = self.listen_addr;
let upnp = self.upnp;

self.set_root_dir(root_dir.clone());
let (network, events_receiver, mut swarm_driver) =
self.build(kad_cfg, Some(store_cfg), false, ProtocolSupport::Full, upnp);

Expand Down Expand Up @@ -510,6 +516,45 @@ impl NetworkBuilder {
)]),
);

let metadata_extended_sub_reg = metrics_registries
.metadata_extended
.sub_registry_with_prefix("ant_networking");

metadata_extended_sub_reg.register(
"peer_id",
"Identifier of a peer of the network",
Info::new(vec![("peer_id".to_string(), peer_id.to_string())]),
);

metadata_extended_sub_reg.register(
"pid",
"id of the node process",
Info::new(vec![("pid".to_string(), std::process::id().to_string())]),
);

metadata_extended_sub_reg.register(
"bin_version",
"Package version of the node",
Info::new(vec![("bin_version".to_string(), env!("CARGO_PKG_VERSION").to_string())]),
);

if let Some(root_dir) = self.root_dir.clone() {
metadata_extended_sub_reg.register(
"data_dir",
"Root directory of the node",
Info::new(vec![("root_dir".to_string(), root_dir.clone().to_string_lossy().to_string())]),
);
}

if let Some(log_dir) = self.root_dir.clone() {
let log_dir = log_dir.join("logs");
metadata_extended_sub_reg.register(
"log_dir",
"Root directory of the node",
Info::new(vec![("log_dir".to_string(), log_dir.clone().to_string_lossy().to_string())]),
);
}

run_metrics_server(metrics_registries, port);
Some(metrics_recorder)
} else {
Expand Down
5 changes: 5 additions & 0 deletions ant-networking/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ pub enum NetworkEvent {
KeysToFetchForReplication(Vec<(PeerId, RecordKey)>),
/// Started listening on a new address
NewListenAddr(Multiaddr),
/// stopped Listening from a address
ClosedListenAddr(Vec<Multiaddr>),
/// Report unverified record
UnverifiedRecord(Record),
/// Terminate Node on unrecoverable errors
Expand Down Expand Up @@ -180,6 +182,9 @@ impl Debug for NetworkEvent {
NetworkEvent::NewListenAddr(addr) => {
write!(f, "NetworkEvent::NewListenAddr({addr:?})")
}
NetworkEvent::ClosedListenAddr(addr ) => {
write!(f, "NetworkEvent::ClosedListenAddr({addr:?})")
}
NetworkEvent::UnverifiedRecord(record) => {
let pretty_key = PrettyPrintRecordKey::from(&record.key);
write!(f, "NetworkEvent::UnverifiedRecord({pretty_key:?})")
Expand Down
2 changes: 2 additions & 0 deletions ant-networking/src/event/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ impl SwarmDriver {
if let Some(relay_manager) = self.relay_manager.as_mut() {
relay_manager.on_listener_closed(&listener_id, &mut self.swarm);
}

self.send_event(NetworkEvent::ClosedListenAddr(addresses.clone()));
}
SwarmEvent::IncomingConnection {
connection_id,
Expand Down
43 changes: 41 additions & 2 deletions ant-networking/src/metrics/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct MetricsRegistries {
pub standard_metrics: Registry,
pub extended_metrics: Registry,
pub metadata: Registry,
pub metadata_extended: Registry,
}

const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0";
Expand All @@ -36,7 +37,7 @@ pub(crate) fn run_metrics_server(registries: MetricsRegistries, port: u16) {
info!("Metrics server on http://{}/metrics", server.local_addr());
println!("Metrics server on http://{}/metrics", server.local_addr());

info!("Metrics server on http://{} Available endpoints: /metrics, /metrics_extended, /metadata", server.local_addr());
info!("Metrics server on http://{} Available endpoints: /metrics, /metrics_extended, /metadata, /metadata_extended", server.local_addr());
Dismissed Show dismissed Hide dismissed
// run the server forever
if let Err(e) = server.await {
error!("server error: {}", e);
Expand All @@ -50,6 +51,7 @@ pub(crate) struct MetricService {
standard_registry: SharedRegistry,
extended_registry: SharedRegistry,
metadata: SharedRegistry,
metadata_extended: SharedRegistry,
}

impl MetricService {
Expand All @@ -65,6 +67,11 @@ impl MetricService {
Arc::clone(&self.metadata)
}

fn get_metadata_extended_registry(&mut self) -> SharedRegistry {
Arc::clone(&self.metadata_extended)
}


fn respond_with_metrics(&mut self) -> Result<Response<String>> {
let mut response: Response<String> = Response::default();

Expand Down Expand Up @@ -152,6 +159,28 @@ impl MetricService {
Ok(response)
}

fn respond_with_metadata_extended(&mut self) -> Result<Response<String>> {
let mut response: Response<String> = Response::default();

response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
METRICS_CONTENT_TYPE
.try_into()
.map_err(|_| NetworkError::NetworkMetricError)?,
);

let reg = self.get_metadata_extended_registry();
let reg = reg.lock().map_err(|_| NetworkError::NetworkMetricError)?;
encode(&mut response.body_mut(), &reg).map_err(|err| {
error!("Failed to encode the metadata Registry {err:?}");
NetworkError::NetworkMetricError
})?;

*response.status_mut() = StatusCode::OK;

Ok(response)
}

fn respond_with_404_not_found(&mut self) -> Response<String> {
let mut resp = Response::default();
*resp.status_mut() = StatusCode::NOT_FOUND;
Expand Down Expand Up @@ -196,7 +225,13 @@ impl Service<Request<Body>> for MetricService {
Ok(resp) => resp,
Err(_) => self.respond_with_500_server_error(),
}
} else {
} else if req_method == Method::GET && req_path == "/metadata_extended" {
match self.respond_with_metadata_extended() {
Ok(resp) => resp,
Err(_) => self.respond_with_500_server_error(),
}
}
else {
self.respond_with_404_not_found()
};
Box::pin(async { Ok(resp) })
Expand All @@ -207,6 +242,7 @@ pub(crate) struct MakeMetricService {
standard_registry: SharedRegistry,
extended_registry: SharedRegistry,
metadata: SharedRegistry,
metadata_extended: SharedRegistry,
}

impl MakeMetricService {
Expand All @@ -215,6 +251,7 @@ impl MakeMetricService {
standard_registry: Arc::new(Mutex::new(registries.standard_metrics)),
extended_registry: Arc::new(Mutex::new(registries.extended_metrics)),
metadata: Arc::new(Mutex::new(registries.metadata)),
metadata_extended: Arc::new(Mutex::new(registries.metadata_extended))
}
}
}
Expand All @@ -232,12 +269,14 @@ impl<T> Service<T> for MakeMetricService {
let standard_registry = Arc::clone(&self.standard_registry);
let extended_registry = Arc::clone(&self.extended_registry);
let metadata = Arc::clone(&self.metadata);
let metadata_extended = Arc::clone(&self.metadata_extended);

let fut = async move {
Ok(MetricService {
standard_registry,
extended_registry,
metadata,
metadata_extended,
})
};
Box::pin(fut)
Expand Down
5 changes: 1 addition & 4 deletions ant-node-manager/src/add_services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,9 @@ pub async fn add_node(
};
let metrics_free_port = if let Some(port) = metrics_port {
Some(port)
} else if options.enable_metrics_server {
Some(service_control.get_available_port()?)
} else {
None
Some(service_control.get_available_port()?)
};

let rpc_socket_addr = if let Some(addr) = options.rpc_address {
SocketAddr::new(IpAddr::V4(addr), rpc_free_port)
} else {
Expand Down
4 changes: 2 additions & 2 deletions ant-node-manager/src/bin/cli/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ pub enum LocalSubCmd {
/// An interval applied between launching each node.
///
/// Units are milliseconds.
#[clap(long, default_value_t = 200)]
#[clap(long, default_value_t = 220)]
interval: u64,
/// Specify the logging format.
///
Expand Down Expand Up @@ -798,7 +798,7 @@ pub enum LocalSubCmd {
/// An interval applied between launching each node.
///
/// Units are milliseconds.
#[clap(long, default_value_t = 200)]
#[clap(long, default_value_t = 220)]
interval: u64,
/// Specify the logging format.
///
Expand Down
41 changes: 20 additions & 21 deletions ant-node-manager/src/cmd/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,16 @@
add_services::{
add_node,
config::{AddNodeServiceOptions, PortRange},
},
config::{self, is_running_as_root},
helpers::{download_and_extract_release, get_bin_version},
print_banner, refresh_node_registry, status_report, ServiceManager, VerbosityLevel,
}, config::{self, is_running_as_root}, error::Error, helpers::{download_and_extract_release, get_bin_version}, print_banner, refresh_node_registry, status_report, ServiceManager, VerbosityLevel
};
use ant_bootstrap::PeersArgs;
use ant_evm::{EvmNetwork, RewardsAddress};
use ant_logging::LogFormat;
use ant_releases::{AntReleaseRepoActions, ReleaseType};
use ant_service_management::{
control::{ServiceControl, ServiceController},
rpc::RpcClient,
NodeRegistry, NodeService, ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult,
};
metric::MetricClient, NodeRegistry, NodeService,
ServiceStateActions, ServiceStatus, UpgradeOptions, UpgradeResult};
use color_eyre::{eyre::eyre, Help, Result};
use colored::Colorize;
use libp2p_identity::PeerId;
Expand Down Expand Up @@ -181,10 +177,11 @@

for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node, Box::new(rpc_client));
// TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!
let metrics_port: u16 = node.metrics_port
.ok_or(Error::MetricPortEmpty)?;
let metric_client = MetricClient::new(metrics_port);
let service = NodeService::new(node, Box::new(metric_client)); // TODO: remove this as we have no way to know the reward balance of nodes since EVM payments!
Dismissed Show dismissed Hide dismissed
println!("{}: {}", service.service_data.service_name, 0,);
}
Ok(())
}
Expand Down Expand Up @@ -223,8 +220,10 @@
let mut failed_services = Vec::new();
for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node, Box::new(rpc_client));
let metrics_port = node.metrics_port.ok_or(Error::MetricPortEmpty)?;

let metric_client = MetricClient::new(metrics_port);
let service = NodeService::new(node, Box::new(metric_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);
match service_manager.remove(keep_directories).await {
Expand Down Expand Up @@ -310,10 +309,9 @@
let mut failed_services = Vec::new();
for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);

let service = NodeService::new(node, Box::new(rpc_client));

let metrics_port = node.metrics_port.ok_or(Error::MetricPortEmpty)?;
let metric_client = MetricClient::new(metrics_port);
let service = NodeService::new(node, Box::new(metric_client));
// set dynamic startup delay if fixed_interval is not set
let service = if fixed_interval.is_none() {
service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
Expand Down Expand Up @@ -406,8 +404,9 @@
let mut failed_services = Vec::new();
for &index in &service_indices {
let node = &mut node_registry.nodes[index];
let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node, Box::new(rpc_client));
let metrics_port = node.metrics_port.ok_or(Error::MetricPortEmpty)?;
let metric_client = MetricClient::new(metrics_port);
let service = NodeService::new(node, Box::new(metric_client));
let mut service_manager =
ServiceManager::new(service, Box::new(ServiceController {}), verbosity);

Expand Down Expand Up @@ -518,9 +517,9 @@
target_version: target_version.clone(),
};
let service_name = node.service_name.clone();

let rpc_client = RpcClient::from_socket_addr(node.rpc_socket_addr);
let service = NodeService::new(node, Box::new(rpc_client));
let metrics_port = node.metrics_port.ok_or(Error::MetricPortEmpty)?;
let metric_client = MetricClient::new(metrics_port);
let service = NodeService::new(node, Box::new(metric_client));
// set dynamic startup delay if fixed_interval is not set
let service = if fixed_interval.is_none() {
service.with_connection_timeout(Duration::from_secs(connection_timeout_s))
Expand Down
2 changes: 2 additions & 0 deletions ant-node-manager/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub enum Error {
PidNotFoundAfterStarting,
#[error("The PID of the process was not set.")]
PidNotSet,
#[error("The metric port of the node is empty")]
MetricPortEmpty,
#[error(transparent)]
SemverError(#[from] semver::Error),
#[error("The service(s) is already running: {0:?}")]
Expand Down
Loading
Loading