Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: upgrade libp2p to 0.55.0 #2683

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,004 changes: 569 additions & 435 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion ant-bootstrap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.2.1", features = ["derive", "env"] }
dirs-next = "~2.0.0"
futures = "0.3.30"
libp2p = { version = "0.54.1", features = ["serde"] }
libp2p = { version = "0.55.0", features = ["serde"] }
reqwest = { version = "0.12.2", default-features = false, features = [
"rustls-tls-manual-roots",
] }
Expand Down
2 changes: 1 addition & 1 deletion ant-evm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ custom_debug = "~0.6.1"
evmlib = { path = "../evmlib", version = "0.1.8" }
hex = "~0.4.3"
lazy_static = "1.4.0"
libp2p = { version = "0.54.1", features = ["identify", "kad"] }
libp2p = { version = "0.55.0", features = ["identify", "kad"] }
rand = { version = "~0.8.5", features = ["small_rng"] }
ring = "0.17.8"
rmp-serde = "1.1.1"
Expand Down
4 changes: 2 additions & 2 deletions ant-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ hyper = { version = "0.14", features = [
"http1",
], optional = true }
itertools = "~0.12.1"
libp2p = { version = "0.54.1", features = [
libp2p = { version = "0.55.0", features = [
"tokio",
"dns",
"upnp",
Expand Down Expand Up @@ -82,4 +82,4 @@ uuid = { version = "1.5.0", features = ["v4"] }
workspace = true

[lib]
crate-type = ["cdylib", "rlib"]
crate-type = ["cdylib", "rlib"]
9 changes: 3 additions & 6 deletions ant-networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ use crate::{
log_markers::Marker,
multiaddr_pop_p2p, GetRecordCfg, GetRecordError, MsgResponder, NetworkEvent, CLOSE_GROUP_SIZE,
};
use ant_evm::{PaymentQuote, QuotingMetrics, U256};
use ant_evm::{PaymentQuote, QuotingMetrics};
use ant_protocol::{
convert_distance_to_u256,
messages::{Cmd, Request, Response},
storage::{DataTypes, RecordHeader, RecordKind, ValidationType},
NetworkAddress, PrettyPrintRecordKey,
Expand Down Expand Up @@ -1202,13 +1201,11 @@ impl SwarmDriver {
}

/// Returns the nodes that within the defined distance.
fn get_peers_in_range(peers: &[PeerId], address: &NetworkAddress, range: U256) -> Vec<PeerId> {
fn get_peers_in_range(peers: &[PeerId], address: &NetworkAddress, range: Distance) -> Vec<PeerId> {
peers
.iter()
.filter_map(|peer_id| {
let distance =
convert_distance_to_u256(&address.distance(&NetworkAddress::from_peer(*peer_id)));
if distance <= range {
if address.distance(&NetworkAddress::from_peer(*peer_id)) <= range {
Some(*peer_id)
} else {
None
Expand Down
10 changes: 4 additions & 6 deletions ant-networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ use crate::{
metrics::service::run_metrics_server, metrics::NetworkMetricsRecorder, MetricsRegistries,
};
use ant_bootstrap::BootstrapCacheStore;
use ant_evm::{PaymentQuote, U256};
use ant_evm::PaymentQuote;
use ant_protocol::{
convert_distance_to_u256,
messages::{ChunkProof, Nonce, Request, Response},
storage::RetryStrategy,
version::{
Expand All @@ -46,7 +45,7 @@ use futures::StreamExt;
use libp2p::{core::muxing::StreamMuxerBox, relay, swarm::behaviour::toggle::Toggle};
use libp2p::{
identity::Keypair,
kad::{self, QueryId, Quorum, Record, RecordKey, K_VALUE},
kad::{self, KBucketDistance as Distance, QueryId, Quorum, Record, RecordKey, K_VALUE, U256},
multiaddr::Protocol,
request_response::{self, Config as RequestResponseConfig, OutboundRequestId, ProtocolSupport},
swarm::{
Expand Down Expand Up @@ -939,9 +938,8 @@ impl SwarmDriver {
// Note: self is included
let self_addr = NetworkAddress::from_peer(self.self_peer_id);
let close_peers_distance = self_addr.distance(&NetworkAddress::from_peer(closest_k_peers[CLOSE_GROUP_SIZE + 1]));
let close_peers_u256 = convert_distance_to_u256(&close_peers_distance);

let distance = std::cmp::max(density_distance, close_peers_u256);
let distance = std::cmp::max(Distance(density_distance), close_peers_distance);

// The sampling approach has severe impact to the node side performance
// Hence suggested to be only used by client side.
Expand All @@ -960,7 +958,7 @@ impl SwarmDriver {
// self_addr.distance(&NetworkAddress::from_peer(closest_k_peers[CLOSE_GROUP_SIZE]))
// };

info!("Set responsible range to {distance:?}({:?})", distance.log2());
info!("Set responsible range to {distance:?}({:?})", distance.ilog2());

// set any new distance to farthest record in the store
self.swarm.behaviour_mut().kademlia.store_mut().set_distance_range(distance);
Expand Down
8 changes: 6 additions & 2 deletions ant-networking/src/event/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl SwarmDriver {
event: request_response::Event<Request, Response>,
) -> Result<(), NetworkError> {
match event {
request_response::Event::Message { message, peer } => match message {
request_response::Event::Message { message, peer, .. } => match message {
Message::Request {
request,
channel,
Expand Down Expand Up @@ -139,6 +139,7 @@ impl SwarmDriver {
request_id,
error,
peer,
..
} => {
if let Some(sender) = self.pending_requests.remove(&request_id) {
match sender {
Expand All @@ -161,10 +162,13 @@ impl SwarmDriver {
peer,
request_id,
error,
..
} => {
warn!("RequestResponse: InboundFailure for request_id: {request_id:?} and peer: {peer:?}, with error: {error:?}");
}
request_response::Event::ResponseSent { peer, request_id } => {
request_response::Event::ResponseSent {
peer, request_id, ..
} => {
debug!("ResponseSent for request_id: {request_id:?} and peer: {peer:?}");
}
}
Expand Down
33 changes: 15 additions & 18 deletions ant-networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ use aes_gcm_siv::{
aead::{Aead, KeyInit},
Aes256GcmSiv, Key as AesKey, Nonce,
};
use ant_evm::{QuotingMetrics, U256};
use ant_evm::QuotingMetrics;
use ant_protocol::{
convert_distance_to_u256,
storage::{DataTypes, RecordHeader, RecordKind, ValidationType},
NetworkAddress, PrettyPrintRecordKey,
};
Expand Down Expand Up @@ -140,7 +139,7 @@ pub struct NodeRecordStore {
/// Main records store remains unchanged for compatibility
records: HashMap<Key, (NetworkAddress, ValidationType, DataTypes)>,
/// Additional index organizing records by distance
records_by_distance: BTreeMap<U256, Key>,
records_by_distance: BTreeMap<Distance, Key>,
/// FIFO simple cache of records to reduce read times
records_cache: RecordCache,
/// Send network events to the node layer.
Expand All @@ -150,7 +149,7 @@ pub struct NodeRecordStore {
/// ilog2 distance range of responsible records
/// AKA: how many buckets of data do we consider "close"
/// None means accept all records.
responsible_distance_range: Option<U256>,
responsible_distance_range: Option<Distance>,
#[cfg(feature = "open-metrics")]
/// Used to report the number of records held by the store to the metrics server.
record_count_metric: Option<Gauge>,
Expand Down Expand Up @@ -373,10 +372,10 @@ impl NodeRecordStore {
let local_address = NetworkAddress::from_peer(local_id);

// Initialize records_by_distance
let mut records_by_distance: BTreeMap<U256, Key> = BTreeMap::new();
let mut records_by_distance: BTreeMap<Distance, Key> = BTreeMap::new();
for (key, (addr, _record_type, _data_type)) in records.iter() {
let distance = convert_distance_to_u256(&local_address.distance(addr));
let _ = records_by_distance.insert(distance, key.clone());
let distance = &local_address.distance(addr);
let _ = records_by_distance.insert(*distance, key.clone());
}

let cache_size = config.records_cache_size;
Expand Down Expand Up @@ -412,7 +411,7 @@ impl NodeRecordStore {
}

/// Returns the current distance ilog2 (aka bucket) range of CLOSE_GROUP nodes.
pub fn get_responsible_distance_range(&self) -> Option<U256> {
pub fn get_responsible_distance_range(&self) -> Option<Distance> {
self.responsible_distance_range
}

Expand Down Expand Up @@ -616,14 +615,13 @@ impl NodeRecordStore {
) {
let addr = NetworkAddress::from_record_key(&key);
let distance = self.local_address.distance(&addr);
let distance_u256 = convert_distance_to_u256(&distance);

// Update main records store
self.records
.insert(key.clone(), (addr.clone(), validate_type, data_type));

// Update bucket index
let _ = self.records_by_distance.insert(distance_u256, key.clone());
let _ = self.records_by_distance.insert(distance, key.clone());

// Update farthest record if needed (unchanged)
if let Some((_farthest_record, farthest_record_distance)) = self.farthest_record.clone() {
Expand Down Expand Up @@ -769,7 +767,7 @@ impl NodeRecordStore {
let relevant_records = self.get_records_within_distance_range(distance_range);

// The `responsible_range` is the network density
quoting_metrics.network_density = Some(distance_range.to_be_bytes());
quoting_metrics.network_density = Some(distance_range.0.to_big_endian());

quoting_metrics.close_records_stored = relevant_records;
} else {
Expand All @@ -792,7 +790,7 @@ impl NodeRecordStore {
}

/// Calculate how many records are stored within a distance range
pub fn get_records_within_distance_range(&self, range: U256) -> usize {
pub fn get_records_within_distance_range(&self, range: Distance) -> usize {
let within_range = self
.records_by_distance
.range(..range)
Expand All @@ -805,7 +803,7 @@ impl NodeRecordStore {
}

/// Setup the distance range.
pub(crate) fn set_responsible_distance_range(&mut self, responsible_distance: U256) {
pub(crate) fn set_responsible_distance_range(&mut self, responsible_distance: Distance) {
self.responsible_distance_range = Some(responsible_distance);
}

Expand Down Expand Up @@ -914,7 +912,7 @@ impl RecordStore for NodeRecordStore {
fn remove(&mut self, k: &Key) {
// Remove from main store
if let Some((addr, _, _)) = self.records.remove(k) {
let distance = convert_distance_to_u256(&self.local_address.distance(&addr));
let distance = self.local_address.distance(&addr);
let _ = self.records_by_distance.remove(&distance);
}

Expand Down Expand Up @@ -1015,7 +1013,6 @@ mod tests {
use bls::SecretKey;
use xor_name::XorName;

use ant_protocol::convert_distance_to_u256;
use ant_protocol::storage::{
try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, DataTypes, Scratchpad,
};
Expand Down Expand Up @@ -1594,12 +1591,12 @@ mod tests {
.wrap_err("Could not parse record store key")?,
);
// get the distance to this record from our local key
let distance = convert_distance_to_u256(&self_address.distance(&halfway_record_address));
let distance = &self_address.distance(&halfway_record_address);

// must be plus one bucket from the halfway record
store.set_responsible_distance_range(distance);
store.set_responsible_distance_range(*distance);

let records_in_range = store.get_records_within_distance_range(distance);
let records_in_range = store.get_records_within_distance_range(*distance);

// check that the number of records returned is larger than half our records
// (ie, that we cover _at least_ all the records within our distance range)
Expand Down
10 changes: 6 additions & 4 deletions ant-networking/src/record_store_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@

use crate::error::{NetworkError, Result};
use crate::record_store::{ClientRecordStore, NodeRecordStore};
use ant_evm::{QuotingMetrics, U256};
use ant_evm::QuotingMetrics;
use ant_protocol::{
storage::{DataTypes, ValidationType},
NetworkAddress,
};
use libp2p::kad::{store::RecordStore, ProviderRecord, Record, RecordKey};
use libp2p::kad::{
store::RecordStore, KBucketDistance as Distance, ProviderRecord, Record, RecordKey,
};
use std::{borrow::Cow, collections::HashMap};

pub enum UnifiedRecordStore {
Expand Down Expand Up @@ -157,7 +159,7 @@ impl UnifiedRecordStore {
}
}

pub(crate) fn get_farthest_replication_distance(&self) -> Result<Option<U256>> {
pub(crate) fn get_farthest_replication_distance(&self) -> Result<Option<Distance>> {
match self {
Self::Client(_) => {
error!(
Expand All @@ -169,7 +171,7 @@ impl UnifiedRecordStore {
}
}

pub(crate) fn set_distance_range(&mut self, distance: U256) {
pub(crate) fn set_distance_range(&mut self, distance: Distance) {
match self {
Self::Client(_) => {
error!("Calling set_distance_range at Client. This should not happen");
Expand Down
20 changes: 8 additions & 12 deletions ant-networking/src/replication_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@

use crate::time::spawn;
use crate::{event::NetworkEvent, time::Instant, CLOSE_GROUP_SIZE};
use ant_evm::U256;
use ant_protocol::{
convert_distance_to_u256,
storage::{DataTypes, ValidationType},
NetworkAddress, PrettyPrintRecordKey,
};
Expand Down Expand Up @@ -47,7 +45,7 @@ pub(crate) struct ReplicationFetcher {
on_going_fetches: HashMap<(RecordKey, ValidationType), (PeerId, ReplicationTimeout)>,
event_sender: mpsc::Sender<NetworkEvent>,
/// Distance range that the incoming key shall be fetched
distance_range: Option<U256>,
distance_range: Option<Distance>,
/// Restrict fetch range to closer than this value
/// used when the node is full, but we still have "close" data coming in
/// that is _not_ closer than our farthest max record
Expand Down Expand Up @@ -77,7 +75,7 @@ impl ReplicationFetcher {
}

/// Set the distance range.
pub(crate) fn set_replication_distance_range(&mut self, distance_range: U256) {
pub(crate) fn set_replication_distance_range(&mut self, distance_range: Distance) {
self.distance_range = Some(distance_range);
}

Expand Down Expand Up @@ -463,14 +461,14 @@ impl ReplicationFetcher {
// Filter out those out_of_range ones among the incoming_keys.
if let Some(ref distance_range) = self.distance_range {
new_incoming_keys.retain(|(addr, _record_type)| {
let distance = convert_distance_to_u256(&self_address.distance(addr));
let distance = &self_address.distance(addr);
debug!(
"Distance to target {addr:?} is {distance:?}, against range {distance_range:?}"
);
let mut is_in_range = distance <= *distance_range;
let mut is_in_range = distance <= distance_range;
// For middle-range records, they could be farther than distance_range,
// but still supposed to be held by the closest group to us.
if !is_in_range && distance - *distance_range < *distance_range {
if !is_in_range && distance.0 - distance_range.0 < distance_range.0 {
closest_k_peers.sort_by_key(|key| key.distance(addr));
let closest_group: HashSet<_> = closest_k_peers.iter().take(CLOSE_GROUP_SIZE).collect();
if closest_group.contains(&self_address) {
Expand Down Expand Up @@ -599,7 +597,7 @@ impl ReplicationFetcher {
mod tests {
use super::{ReplicationFetcher, FETCH_TIMEOUT, MAX_PARALLEL_FETCH};
use crate::CLOSE_GROUP_SIZE;
use ant_protocol::{convert_distance_to_u256, storage::ValidationType, NetworkAddress};
use ant_protocol::{storage::ValidationType, NetworkAddress};
use eyre::Result;
use libp2p::{kad::RecordKey, PeerId};
use std::{
Expand Down Expand Up @@ -692,8 +690,7 @@ mod tests {
// Set distance range
let distance_target = NetworkAddress::from_peer(PeerId::random());
let distance_range = self_address.distance(&distance_target);
let distance_range_256 = convert_distance_to_u256(&distance_range);
replication_fetcher.set_replication_distance_range(distance_range_256);
replication_fetcher.set_replication_distance_range(distance_range);

let mut closest_k_peers = vec![];
(0..19).for_each(|_| {
Expand All @@ -709,10 +706,9 @@ mod tests {
let key = NetworkAddress::from_record_key(&RecordKey::from(random_data));

let distance = key.distance(&self_address);
let distance_256 = convert_distance_to_u256(&distance);
if distance <= distance_range {
in_range_keys += 1;
} else if distance_256 - distance_range_256 < distance_range_256 {
} else if distance.0 - distance_range.0 < distance_range.0 {
closest_k_peers_include_self.sort_by_key(|addr| key.distance(addr));
let closest_group: HashSet<_> = closest_k_peers_include_self
.iter()
Expand Down
2 changes: 1 addition & 1 deletion ant-node-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ colored = "2.0.4"
color-eyre = "0.6.3"
dirs-next = "2.0.0"
indicatif = { version = "0.17.5", features = ["tokio"] }
libp2p = { version = "0.54.1", features = [] }
libp2p = { version = "0.55.0", features = [] }
libp2p-identity = { version = "0.2.7", features = ["rand"] }
prost = { version = "0.9" }
rand = "0.8.5"
Expand Down
4 changes: 2 additions & 2 deletions ant-node-rpc-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ bls = { package = "blsttc", version = "8.0.1" }
clap = { version = "4.2.1", features = ["derive"] }
color-eyre = "0.6.3"
hex = "~0.4.3"
libp2p = { version = "0.54.1", features = ["kad"] }
libp2p-identity = { version = "0.2.7", features = ["rand"] }
libp2p = { version = "0.55.0", features = ["kad"]}
libp2p-identity = { version="0.2.7", features = ["rand"] }
thiserror = "1.0.23"
# # watch out updating this, protoc compiler needs to be installed on all build systems
# # arm builds + musl are very problematic
Expand Down
Loading
Loading