Skip to content

Commit

Permalink
fix: Find peers that can provide state headers (#10054)
Browse files Browse the repository at this point in the history
### The problem:

If a node is configured as a chunk-only producer (technically, if it
doesn't have `tracked_shards = [0]` in `config.json`), then it can't
provide state headers or state parts to other nodes.

### The root cause:

The nodes use archaic logic to set `tracked_shards` in `Handshake`. That
logic sets `tracked_shard` to either `[]` or `[0,1,2,3]`.

### Solution:

Ask a random peer for state sync header without taking their
`tracked_shards` into account.
  • Loading branch information
nikurt authored Nov 2, 2023
2 parents 07f69b1 + d7546f9 commit 904db01
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 50 deletions.
18 changes: 18 additions & 0 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,24 @@ pub(crate) static STATE_SYNC_RETRY_PART: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

pub(crate) static STATE_SYNC_HEADER_ERROR: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_header_error_total",
"Number of state sync header requests resulting in an error",
&["shard_id"],
)
.unwrap()
});

pub(crate) static STATE_SYNC_HEADER_TIMEOUT: Lazy<IntCounterVec> = Lazy::new(|| {
try_create_int_counter_vec(
"near_state_sync_header_timeout_total",
"Number of state sync header requests timing out",
&["shard_id"],
)
.unwrap()
});

pub(crate) static STATE_SYNC_PARTS_DONE: Lazy<IntGaugeVec> = Lazy::new(|| {
try_create_int_gauge_vec(
"near_state_sync_parts_done",
Expand Down
71 changes: 28 additions & 43 deletions chain/client/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,44 +502,22 @@ impl StateSync {
}
}

/// Find possible targets to download state from.
/// Candidates are peers at highest height.
/// Only select candidates that we have no pending request currently ongoing.
fn possible_targets(
&mut self,
shard_id: ShardId,
highest_height_peers: &[HighestHeightPeerInfo],
) -> Result<Vec<PeerId>, near_chain::Error> {
let peers = highest_height_peers
.iter()
.filter_map(|peer| {
// Select peers that are high enough (if they are syncing themselves, they might not have the data that we want)
// and that are tracking the shard.
// TODO: possible optimization - simply select peers that have height greater than the epoch start that we're asking for.
if peer.tracked_shards.contains(&shard_id) {
Some(peer.peer_info.id.clone())
} else {
None
}
})
.collect();
Ok(self.select_peers(peers, shard_id)?)
}

/// Avoids peers that already have outstanding requests for parts.
fn select_peers(
&mut self,
peers: Vec<PeerId>,
highest_height_peers: &[HighestHeightPeerInfo],
shard_id: ShardId,
) -> Result<Vec<PeerId>, near_chain::Error> {
let peers: Vec<PeerId> =
highest_height_peers.iter().map(|peer| peer.peer_info.id.clone()).collect();
let res = match &mut self.inner {
StateSyncInner::Peers { last_part_id_requested, .. } => {
last_part_id_requested.retain(|_, request| !request.expired());
peers
.into_iter()
.filter(|candidate| {
.filter(|peer| {
// If we still have a pending request from this node - don't add another one.
!last_part_id_requested.contains_key(&(candidate.clone(), shard_id))
!last_part_id_requested.contains_key(&(peer.clone(), shard_id))
})
.collect::<Vec<_>>()
}
Expand All @@ -559,9 +537,10 @@ impl StateSync {
runtime_adapter: Arc<dyn RuntimeAdapter>,
state_parts_arbiter_handle: &ArbiterHandle,
) -> Result<(), near_chain::Error> {
let possible_targets = self.possible_targets(shard_id, highest_height_peers)?;
let possible_targets = self.select_peers(highest_height_peers, shard_id)?;

if possible_targets.is_empty() {
tracing::debug!(target: "sync", "Can't request a state header: No possible targets");
// In most cases it means that all the targets are currently busy (that we have a pending request with them).
return Ok(());
}
Expand Down Expand Up @@ -602,6 +581,7 @@ impl StateSync {
new_shard_sync_download: &mut ShardSyncDownload,
) {
let peer_id = possible_targets.choose(&mut thread_rng()).cloned().unwrap();
tracing::debug!(target: "sync", ?peer_id, shard_id, ?sync_hash, ?possible_targets, "request_shard_header");
assert!(new_shard_sync_download.downloads[0].run_me.load(Ordering::SeqCst));
new_shard_sync_download.downloads[0].run_me.store(false, Ordering::SeqCst);
new_shard_sync_download.downloads[0].state_requests_count += 1;
Expand Down Expand Up @@ -857,32 +837,37 @@ impl StateSync {
chain: &Chain,
now: DateTime<Utc>,
) -> Result<(bool, bool), near_chain::Error> {
let mut download_timeout = false;
let mut run_shard_state_download = false;
let download = &mut shard_sync_download.downloads[0];
// StateDownloadHeader is the first step. We want to fetch the basic information about the state (its size, hash etc).
if shard_sync_download.downloads[0].done {
if download.done {
let shard_state_header = chain.get_state_header(shard_id, sync_hash)?;
let state_num_parts = shard_state_header.num_state_parts();
// If the header was downloaded successfully - move to phase 2 (downloading parts).
// Create the vector with entry for each part.
*shard_sync_download =
ShardSyncDownload::new_download_state_parts(now, state_num_parts);
run_shard_state_download = true;
Ok((false, true))
} else {
let prev = shard_sync_download.downloads[0].prev_update_time;
let error = shard_sync_download.downloads[0].error;
download_timeout = now - prev > self.timeout;
// Retry in case of timeout or failure.
if download_timeout || error {
shard_sync_download.downloads[0].run_me.store(true, Ordering::SeqCst);
shard_sync_download.downloads[0].error = false;
shard_sync_download.downloads[0].prev_update_time = now;
let download_timeout = now - download.prev_update_time > self.timeout;
if download_timeout {
tracing::debug!(target: "sync", last_target = ?download.last_target, start_time = ?download.start_time, prev_update_time = ?download.prev_update_time, state_requests_count = download.state_requests_count, "header request timed out");
metrics::STATE_SYNC_HEADER_TIMEOUT
.with_label_values(&[&shard_id.to_string()])
.inc();
}
if shard_sync_download.downloads[0].run_me.load(Ordering::SeqCst) {
run_shard_state_download = true;
if download.error {
tracing::debug!(target: "sync", last_target = ?download.last_target, start_time = ?download.start_time, prev_update_time = ?download.prev_update_time, state_requests_count = download.state_requests_count, "header request error");
metrics::STATE_SYNC_HEADER_ERROR.with_label_values(&[&shard_id.to_string()]).inc();
}
// Retry in case of timeout or failure.
if download_timeout || download.error {
download.run_me.store(true, Ordering::SeqCst);
download.error = false;
download.prev_update_time = now;
}
let run_me = download.run_me.load(Ordering::SeqCst);
Ok((download_timeout, run_me))
}
Ok((download_timeout, run_shard_state_download))
}

/// Checks if the parts are downloaded.
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/sync/sync_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl SyncActor {
debug!(target: "sync", shard_id = ?self.shard_uid.shard_id, "Sync already running.");
return;
}
info!(target: "sync", shard_id = ?self.shard_uid.shard_id, "Startgin sync on shard");
info!(target: "sync", shard_id = ?self.shard_uid.shard_id, "Starting sync on shard");
// TODO: Add logic to commence state sync.
self.sync_hash = sync_hash;
}
Expand Down
24 changes: 18 additions & 6 deletions chain/epoch-manager/src/shard_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,17 @@ impl ShardTracker {
.epoch_manager
.cares_about_shard_from_prev_block(parent_hash, account_id, shard_id)
.unwrap_or(false);
if !is_me {
return account_cares_about_shard;
} else if account_cares_about_shard {
if account_cares_about_shard {
// An account has to track this shard because of its validation duties.
return true;
}
if !is_me {
// We don't know how another node is configured.
// It may track all shards, it may track no additional shards.
return false;
} else {
// We have access to the node config. Use the config to find a definite answer.
}
}
match self.tracked_config {
TrackedConfig::AllShards => {
Expand Down Expand Up @@ -163,11 +169,17 @@ impl ShardTracker {
.cares_about_shard_next_epoch_from_prev_block(parent_hash, account_id, shard_id)
.unwrap_or(false)
};
if !is_me {
return account_cares_about_shard;
} else if account_cares_about_shard {
if account_cares_about_shard {
// An account has to track this shard because of its validation duties.
return true;
}
if !is_me {
// We don't know how another node is configured.
// It may track all shards, it may track no additional shards.
return false;
} else {
// We have access to the node config. Use the config to find a definite answer.
}
}
match self.tracked_config {
TrackedConfig::AllShards => {
Expand Down

0 comments on commit 904db01

Please sign in to comment.