Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some sync/backfill format nits #6861

Merged
merged 2 commits into from
Jan 30, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 65 additions & 79 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,67 +388,59 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
blocks: Vec<RpcBlock<T::EthSpec>>,
) -> Result<ProcessResult, BackFillError> {
// check if we have this batch
let batch = match self.batches.get_mut(&batch_id) {
None => {
if !matches!(self.state(), BackFillState::Failed) {
// A batch might get removed when the chain advances, so this is non fatal.
debug!(self.log, "Received a block for unknown batch"; "epoch" => batch_id);
}
return Ok(ProcessResult::Successful);
}
Some(batch) => {
// A batch could be retried without the peer failing the request (disconnecting/
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer, and that the
// request_id matches
// TODO(das): removed peer_id matching as the node may request a different peer for data
// columns.
if !batch.is_expecting_block(&request_id) {
return Ok(ProcessResult::Successful);
}
batch
let Some(batch) = self.batches.get_mut(&batch_id) else {
if !matches!(self.state(), BackFillState::Failed) {
// A batch might get removed when the chain advances, so this is non fatal.
debug!(self.log, "Received a block for unknown batch"; "epoch" => batch_id);
}
return Ok(ProcessResult::Successful);
};

{
// A stream termination has been sent. This batch has ended. Process a completed batch.
// Remove the request from the peer's active batches
self.active_requests
.get_mut(peer_id)
.map(|active_requests| active_requests.remove(&batch_id));

match batch.download_completed(blocks) {
Ok(received) => {
let awaiting_batches =
self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH;
debug!(self.log, "Completed batch received"; "epoch" => batch_id, "blocks" => received, "awaiting_batches" => awaiting_batches);

// pre-emptively request more blocks from peers whilst we process current blocks,
self.request_batches(network)?;
self.process_completed_batches(network)
}
Err(result) => {
let (expected_boundary, received_boundary, outcome) = match result {
Err(e) => {
return self
.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))
.map(|_| ProcessResult::Successful);
}
Ok(v) => v,
};
warn!(self.log, "Batch received out of range blocks"; "expected_boundary" => expected_boundary, "received_boundary" => received_boundary,
"peer_id" => %peer_id, batch);
// A batch could be retried without the peer failing the request (disconnecting/
// sending an error /timeout) if the peer is removed from the chain for other
// reasons. Check that this block belongs to the expected peer, and that the
// request_id matches
// TODO(das): removed peer_id matching as the node may request a different peer for data
// columns.
if !batch.is_expecting_block(&request_id) {
return Ok(ProcessResult::Successful);
}

if let BatchOperationOutcome::Failed { blacklist: _ } = outcome {
error!(self.log, "Backfill failed"; "epoch" => batch_id, "received_boundary" => received_boundary, "expected_boundary" => expected_boundary);
return self
.fail_sync(BackFillError::BatchDownloadFailed(batch_id))
.map(|_| ProcessResult::Successful);
// A stream termination has been sent. This batch has ended. Process a completed batch.
// Remove the request from the peer's active batches
self.active_requests
.get_mut(peer_id)
.map(|active_requests| active_requests.remove(&batch_id));

match batch.download_completed(blocks) {
Ok(received) => {
let awaiting_batches =
self.processing_target.saturating_sub(batch_id) / BACKFILL_EPOCHS_PER_BATCH;
debug!(self.log, "Completed batch received"; "epoch" => batch_id, "blocks" => received, "awaiting_batches" => awaiting_batches);

// pre-emptively request more blocks from peers whilst we process current blocks,
self.request_batches(network)?;
self.process_completed_batches(network)
}
Err(result) => {
let (expected_boundary, received_boundary, outcome) = match result {
Err(e) => {
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
return Ok(ProcessResult::Successful);
}
// this batch can't be used, so we need to request it again.
self.retry_batch_download(network, batch_id)
.map(|_| ProcessResult::Successful)
Ok(v) => v,
};
warn!(self.log, "Batch received out of range blocks"; "expected_boundary" => expected_boundary, "received_boundary" => received_boundary,
"peer_id" => %peer_id, batch);

if let BatchOperationOutcome::Failed { blacklist: _ } = outcome {
error!(self.log, "Backfill failed"; "epoch" => batch_id, "received_boundary" => received_boundary, "expected_boundary" => expected_boundary);
self.fail_sync(BackFillError::BatchDownloadFailed(batch_id))?;
return Ok(ProcessResult::Successful);
}
// this batch can't be used, so we need to request it again.
self.retry_batch_download(network, batch_id)?;
Ok(ProcessResult::Successful)
}
}
}
Expand Down Expand Up @@ -582,20 +574,16 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
}
};

let peer = match batch.current_peer() {
Some(v) => *v,
None => {
return self
.fail_sync(BackFillError::BatchInvalidState(
batch_id,
String::from("Peer does not exist"),
))
.map(|_| ProcessResult::Successful)
}
let Some(peer) = batch.current_peer() else {
self.fail_sync(BackFillError::BatchInvalidState(
batch_id,
String::from("Peer does not exist"),
))?;
return Ok(ProcessResult::Successful);
};

debug!(self.log, "Backfill batch processed"; "result" => ?result, &batch,
"batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(&peer));
"batch_epoch" => batch_id, "peer" => %peer, "client" => %network.client_type(peer));

match result {
BatchProcessResult::Success {
Expand Down Expand Up @@ -679,8 +667,8 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
{
self.fail_sync(BackFillError::BatchInvalidState(batch_id, e.0))?;
}
self.retry_batch_download(network, batch_id)
.map(|_| ProcessResult::Successful)
self.retry_batch_download(network, batch_id)?;
Ok(ProcessResult::Successful)
}
}
}
Expand Down Expand Up @@ -712,11 +700,10 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
// - AwaitingDownload -> A recoverable failed batch should have been
// re-requested.
// - Processing -> `self.current_processing_batch` is None
return self
.fail_sync(BackFillError::InvalidSyncState(String::from(
"Invalid expected batch state",
)))
.map(|_| ProcessResult::Successful);
self.fail_sync(BackFillError::InvalidSyncState(String::from(
"Invalid expected batch state",
)))?;
return Ok(ProcessResult::Successful);
}
BatchState::AwaitingValidation(_) => {
// TODO: I don't think this state is possible, log a CRIT just in case.
Expand All @@ -731,12 +718,11 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
}
}
} else {
return self
.fail_sync(BackFillError::InvalidSyncState(format!(
"Batch not found for current processing target {}",
self.processing_target
)))
.map(|_| ProcessResult::Successful);
self.fail_sync(BackFillError::InvalidSyncState(format!(
"Batch not found for current processing target {}",
self.processing_target
)))?;
return Ok(ProcessResult::Successful);
}
Ok(ProcessResult::Successful)
}
Expand Down