From 63557438c307f420baf948f874c768a81d864405 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Sun, 26 Jan 2025 15:04:08 -0600 Subject: [PATCH] moves repair nonce verification from window-service to shred-fetch-stage Because repair nonce is a randomly generated u32 for each repair request, it can be verified before sigverify so that shreds with invalid repair nonce do not waste sigverify resources. --- core/src/repair/outstanding_requests.rs | 14 +-- core/src/repair/repair_response.rs | 9 -- core/src/repair/request_response.rs | 2 +- core/src/repair/serve_repair.rs | 78 ++++++++++------- core/src/shred_fetch_stage.rs | 93 +++++++++++++++----- core/src/tvu.rs | 1 + core/src/window_service.rs | 110 ++---------------------- ledger/src/blockstore.rs | 4 +- ledger/src/shred.rs | 14 ++- 9 files changed, 149 insertions(+), 176 deletions(-) diff --git a/core/src/repair/outstanding_requests.rs b/core/src/repair/outstanding_requests.rs index aeeb6a7c30ee40..d093bf7e07f087 100644 --- a/core/src/repair/outstanding_requests.rs +++ b/core/src/repair/outstanding_requests.rs @@ -11,7 +11,7 @@ pub struct OutstandingRequests { requests: LruCache>, } -impl OutstandingRequests +impl OutstandingRequests where T: RequestResponse, { @@ -118,7 +118,7 @@ pub(crate) mod tests { .expire_timestamp; assert!(outstanding_requests - .register_response(nonce, &shred, expire_timestamp + 1, |_| ()) + .register_response(nonce, shred.payload(), expire_timestamp + 1, |_| ()) .is_none()); assert!(outstanding_requests.requests.get(&nonce).is_none()); } @@ -144,7 +144,7 @@ pub(crate) mod tests { // Response that passes all checks should decrease num_expected_responses assert!(outstanding_requests - .register_response(nonce, &shred, expire_timestamp - 1, |_| ()) + .register_response(nonce, shred.payload(), expire_timestamp - 1, |_| ()) .is_some()); num_expected_responses -= 1; assert_eq!( @@ -158,10 +158,10 @@ pub(crate) mod tests { // Response with incorrect nonce is ignored assert!(outstanding_requests - .register_response(nonce + 1, &shred, expire_timestamp - 1, |_| ()) + .register_response(nonce + 1, shred.payload(), expire_timestamp - 1, |_| ()) .is_none()); assert!(outstanding_requests - .register_response(nonce + 1, &shred, expire_timestamp, |_| ()) + .register_response(nonce + 1, shred.payload(), expire_timestamp, |_| ()) .is_none()); assert_eq!( outstanding_requests @@ -175,7 +175,7 @@ pub(crate) mod tests { // Response with timestamp over limit should remove status, preventing late // responses from being accepted assert!(outstanding_requests - .register_response(nonce, &shred, expire_timestamp, |_| ()) + .register_response(nonce, shred.payload(), expire_timestamp, |_| ()) .is_none()); assert!(outstanding_requests.requests.get(&nonce).is_none()); @@ -195,7 +195,7 @@ pub(crate) mod tests { for _ in 0..num_expected_responses { assert!(outstanding_requests.requests.get(&nonce).is_some()); assert!(outstanding_requests - .register_response(nonce, &shred, expire_timestamp - 1, |_| ()) + .register_response(nonce, shred.payload(), expire_timestamp - 1, |_| ()) .is_some()); } assert!(outstanding_requests.requests.get(&nonce).is_none()); diff --git a/core/src/repair/repair_response.rs b/core/src/repair/repair_response.rs index 0a82935e89892c..27baff67fad39e 100644 --- a/core/src/repair/repair_response.rs +++ b/core/src/repair/repair_response.rs @@ -40,15 +40,6 @@ pub fn repair_response_packet_from_bytes( Some(packet) } -pub(crate) fn nonce(packet: &Packet) -> Option { - // Nonces are attached to both repair and ancestor hashes responses. - let data = packet.data(..)?; - let offset = data.len().checked_sub(SIZE_OF_NONCE)?; - <[u8; SIZE_OF_NONCE]>::try_from(&data[offset..]) - .map(Nonce::from_le_bytes) - .ok() -} - #[cfg(test)] mod test { use { diff --git a/core/src/repair/request_response.rs b/core/src/repair/request_response.rs index 429e49b0a87635..6fbb0f2634ffc3 100644 --- a/core/src/repair/request_response.rs +++ b/core/src/repair/request_response.rs @@ -1,5 +1,5 @@ pub trait RequestResponse { - type Response; + type Response: ?Sized; fn num_expected_responses(&self) -> u32; fn verify_response(&self, response: &Self::Response) -> bool; } diff --git a/core/src/repair/serve_repair.rs b/core/src/repair/serve_repair.rs index 9970941323f74e..5c65d1b021d158 100644 --- a/core/src/repair/serve_repair.rs +++ b/core/src/repair/serve_repair.rs @@ -27,7 +27,7 @@ use { solana_ledger::{ ancestor_iterator::{AncestorIterator, AncestorIteratorWithHash}, blockstore::Blockstore, - shred::{Nonce, Shred, ShredFetchStats, SIZE_OF_NONCE}, + shred::{self, Nonce, ShredFetchStats, SIZE_OF_NONCE}, }, solana_perf::{ data_budget::DataBudget, @@ -113,21 +113,28 @@ impl ShredRepairType { } impl RequestResponse for ShredRepairType { - type Response = Shred; + type Response = [u8]; // shred's payload fn num_expected_responses(&self) -> u32 { match self { ShredRepairType::Orphan(_) => MAX_ORPHAN_REPAIR_RESPONSES as u32, ShredRepairType::Shred(_, _) | ShredRepairType::HighestShred(_, _) => 1, } } - fn verify_response(&self, response_shred: &Shred) -> bool { + fn verify_response(&self, shred: &Self::Response) -> bool { + #[inline] + fn get_shred_index(shred: &[u8]) -> Option { + shred::layout::get_index(shred).map(u64::from) + } + let Some(shred_slot) = shred::layout::get_slot(shred) else { + return false; + }; match self { - ShredRepairType::Orphan(slot) => response_shred.slot() <= *slot, + ShredRepairType::Orphan(slot) => shred_slot <= *slot, ShredRepairType::HighestShred(slot, index) => { - response_shred.slot() == *slot && response_shred.index() as u64 >= *index + shred_slot == *slot && get_shred_index(shred) >= Some(*index) } ShredRepairType::Shred(slot, index) => { - response_shred.slot() == *slot && response_shred.index() as u64 == *index + shred_slot == *slot && get_shred_index(shred) == Some(*index) } } } @@ -1450,7 +1457,7 @@ mod tests { get_tmp_ledger_path_auto_delete, shred::{max_ticks_per_n_shreds, Shred, ShredFlags}, }, - solana_perf::packet::{deserialize_from_with_limit, Packet}, + solana_perf::packet::{deserialize_from_with_limit, Packet, PacketFlags}, solana_runtime::bank::Bank, solana_sdk::{hash::Hash, pubkey::Pubkey, signature::Keypair, timing::timestamp}, solana_streamer::socket::SocketAddrSpace, @@ -1897,7 +1904,7 @@ mod tests { ); let index = 1; - let rv = ServeRepair::run_highest_window_request( + let mut rv = ServeRepair::run_highest_window_request( &recycler, &socketaddr_any!(), &blockstore, @@ -1910,10 +1917,13 @@ mod tests { verify_responses(&request, rv.iter()); let rv: Vec = rv - .into_iter() - .filter_map(|p| { - assert_eq!(repair_response::nonce(p).unwrap(), nonce); - Shred::new_from_serialized_shred(p.data(..).unwrap().to_vec()).ok() + .iter_mut() + .map(|packet| { + packet.meta_mut().flags |= PacketFlags::REPAIR; + let (shred, repair_nonce) = + shred::layout::get_shred_and_repair_nonce(packet).unwrap(); + assert_eq!(repair_nonce.unwrap(), nonce); + Shred::new_from_serialized_shred(shred.to_vec()).unwrap() }) .collect(); assert!(!rv.is_empty()); @@ -1959,7 +1969,7 @@ mod tests { .expect("Expect successful ledger write"); let index = 1; - let rv = ServeRepair::run_window_request( + let mut rv = ServeRepair::run_window_request( &recycler, &socketaddr_any!(), &blockstore, @@ -1971,10 +1981,13 @@ mod tests { let request = ShredRepairType::Shred(slot, index); verify_responses(&request, rv.iter()); let rv: Vec = rv - .into_iter() - .filter_map(|p| { - assert_eq!(repair_response::nonce(p).unwrap(), nonce); - Shred::new_from_serialized_shred(p.data(..).unwrap().to_vec()).ok() + .iter_mut() + .map(|packet| { + packet.meta_mut().flags |= PacketFlags::REPAIR; + let (shred, repair_nonce) = + shred::layout::get_shred_and_repair_nonce(packet).unwrap(); + assert_eq!(repair_nonce.unwrap(), nonce); + Shred::new_from_serialized_shred(shred.to_vec()).unwrap() }) .collect(); assert_eq!(rv[0].index(), 1); @@ -2108,7 +2121,7 @@ mod tests { let ledger_path = get_tmp_ledger_path_auto_delete!(); let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); let rv = - ServeRepair::run_orphan(&recycler, &socketaddr_any!(), &blockstore, slot, 0, nonce); + ServeRepair::run_orphan(&recycler, &socketaddr_any!(), &blockstore, slot, 5, nonce); assert!(rv.is_none()); // Create slots [slot, slot + num_slots) with 5 shreds apiece @@ -2145,7 +2158,7 @@ mod tests { .collect(); // Verify responses - let request = ShredRepairType::Orphan(slot); + let request = ShredRepairType::Orphan(slot + num_slots - 1); verify_responses(&request, rv.iter()); let expected: Vec<_> = (slot..slot + num_slots) @@ -2422,40 +2435,39 @@ mod tests { // Orphan let shred = new_test_data_shred(slot, 0); let request = ShredRepairType::Orphan(slot); - assert!(request.verify_response(&shred)); + assert!(request.verify_response(shred.payload())); let shred = new_test_data_shred(slot - 1, 0); - assert!(request.verify_response(&shred)); + assert!(request.verify_response(shred.payload())); let shred = new_test_data_shred(slot + 1, 0); - assert!(!request.verify_response(&shred)); + assert!(!request.verify_response(shred.payload())); // HighestShred let shred = new_test_data_shred(slot, index); let request = ShredRepairType::HighestShred(slot, index as u64); - assert!(request.verify_response(&shred)); + assert!(request.verify_response(shred.payload())); let shred = new_test_data_shred(slot, index + 1); - assert!(request.verify_response(&shred)); + assert!(request.verify_response(shred.payload())); let shred = new_test_data_shred(slot, index - 1); - assert!(!request.verify_response(&shred)); + assert!(!request.verify_response(shred.payload())); let shred = new_test_data_shred(slot - 1, index); - assert!(!request.verify_response(&shred)); + assert!(!request.verify_response(shred.payload())); let shred = new_test_data_shred(slot + 1, index); - assert!(!request.verify_response(&shred)); + assert!(!request.verify_response(shred.payload())); // Shred let shred = new_test_data_shred(slot, index); let request = ShredRepairType::Shred(slot, index as u64); - assert!(request.verify_response(&shred)); + assert!(request.verify_response(shred.payload())); let shred = new_test_data_shred(slot, index + 1); - assert!(!request.verify_response(&shred)); + assert!(!request.verify_response(shred.payload())); let shred = new_test_data_shred(slot + 1, index); - assert!(!request.verify_response(&shred)); + assert!(!request.verify_response(shred.payload())); } fn verify_responses<'a>(request: &ShredRepairType, packets: impl Iterator) { for packet in packets { - let shred_payload = packet.data(..).unwrap().to_vec(); - let shred = Shred::new_from_serialized_shred(shred_payload).unwrap(); - request.verify_response(&shred); + let shred = shred::layout::get_shred(packet).unwrap(); + assert!(request.verify_response(shred)); } } diff --git a/core/src/shred_fetch_stage.rs b/core/src/shred_fetch_stage.rs index 6f3e8889423bf4..160c81b0eb1914 100644 --- a/core/src/shred_fetch_stage.rs +++ b/core/src/shred_fetch_stage.rs @@ -1,14 +1,16 @@ //! The `shred_fetch_stage` pulls shreds from UDP sockets and sends it to a channel. use { - crate::repair::serve_repair::ServeRepair, + crate::repair::{repair_service::OutstandingShredRepairs, serve_repair::ServeRepair}, bytes::Bytes, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, itertools::Itertools, solana_feature_set::{self as feature_set, FeatureSet}, solana_gossip::cluster_info::ClusterInfo, - solana_ledger::shred::{should_discard_shred, ShredFetchStats}, - solana_perf::packet::{PacketBatch, PacketBatchRecycler, PacketFlags, PACKETS_PER_BATCH}, + solana_ledger::shred::{self, should_discard_shred, ShredFetchStats}, + solana_perf::packet::{ + Packet, PacketBatch, PacketBatchRecycler, PacketFlags, PACKETS_PER_BATCH, + }, solana_runtime::bank_forks::BankForks, solana_sdk::{ clock::{Slot, DEFAULT_MS_PER_SLOT}, @@ -16,6 +18,7 @@ use { genesis_config::ClusterType, packet::{Meta, PACKET_DATA_SIZE}, pubkey::Pubkey, + signature::Keypair, }, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats}, std::{ @@ -41,6 +44,13 @@ pub(crate) struct ShredFetchStage { thread_hdls: Vec>, } +#[derive(Clone)] +struct RepairContext { + repair_socket: Arc, + cluster_info: Arc, + outstanding_repair_requests: Arc>, +} + impl ShredFetchStage { // updates packets received on a channel and sends them on another channel fn modify_packets( @@ -50,15 +60,17 @@ impl ShredFetchStage { shred_version: u16, name: &'static str, flags: PacketFlags, - repair_context: Option<(&UdpSocket, &ClusterInfo)>, + repair_context: Option<&RepairContext>, turbine_disabled: Arc, ) { + // Only repair shreds need repair context. + debug_assert_eq!( + flags.contains(PacketFlags::REPAIR), + repair_context.is_some() + ); const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1); let mut last_updated = Instant::now(); - let mut keypair = repair_context - .as_ref() - .map(|(_, cluster_info)| cluster_info.keypair().clone()); - + let mut keypair = repair_context.as_ref().copied().map(RepairContext::keypair); let ( mut last_root, mut slots_per_epoch, @@ -94,23 +106,36 @@ impl ShredFetchStage { epoch_schedule = root_bank.epoch_schedule().clone(); last_root = root_bank.slot(); slots_per_epoch = root_bank.get_slots_in_epoch(root_bank.epoch()); - keypair = repair_context - .as_ref() - .map(|(_, cluster_info)| cluster_info.keypair().clone()); + keypair = repair_context.as_ref().copied().map(RepairContext::keypair); } stats.shred_count += packet_batch.len(); - if let Some((udp_socket, _)) = repair_context { + if let Some(repair_context) = repair_context { debug_assert_eq!(flags, PacketFlags::REPAIR); debug_assert!(keypair.is_some()); if let Some(ref keypair) = keypair { ServeRepair::handle_repair_response_pings( - udp_socket, + &repair_context.repair_socket, keypair, &mut packet_batch, &mut stats, ); } + // Discard packets if repair nonce does not verify. + let now = solana_sdk::timing::timestamp(); + let mut outstanding_repair_requests = + repair_context.outstanding_repair_requests.write().unwrap(); + packet_batch + .iter_mut() + .filter(|packet| !packet.meta().discard()) + .for_each(|packet| { + // Have to set repair flag here so that the nonce is + // taken off the shred's payload. + packet.meta_mut().flags |= PacketFlags::REPAIR; + if !verify_repair_nonce(packet, now, &mut outstanding_repair_requests) { + packet.meta_mut().set_discard(true); + } + }); } // Filter out shreds that are way too far in the future to avoid the @@ -161,7 +186,7 @@ impl ShredFetchStage { shred_version: u16, name: &'static str, flags: PacketFlags, - repair_context: Option<(Arc, Arc)>, + repair_context: Option, turbine_disabled: Arc, ) -> (Vec>, JoinHandle<()>) { let (packet_sender, packet_receiver) = unbounded(); @@ -186,9 +211,6 @@ impl ShredFetchStage { let modifier_hdl = Builder::new() .name(modifier_thread_name.to_string()) .spawn(move || { - let repair_context = repair_context - .as_ref() - .map(|(socket, cluster_info)| (socket.as_ref(), cluster_info.as_ref())); Self::modify_packets( packet_receiver, sender, @@ -196,7 +218,7 @@ impl ShredFetchStage { shred_version, name, flags, - repair_context, + repair_context.as_ref(), turbine_disabled, ) }) @@ -214,10 +236,16 @@ impl ShredFetchStage { shred_version: u16, bank_forks: Arc>, cluster_info: Arc, + outstanding_repair_requests: Arc>, turbine_disabled: Arc, exit: Arc, ) -> Self { let recycler = PacketBatchRecycler::warmed(100, 1024); + let repair_context = RepairContext { + repair_socket: repair_socket.clone(), + cluster_info, + outstanding_repair_requests, + }; let (mut tvu_threads, tvu_filter) = Self::packet_modifier( "solRcvrShred", @@ -237,7 +265,7 @@ impl ShredFetchStage { let (repair_receiver, repair_handler) = Self::packet_modifier( "solRcvrShredRep", "solTvuRepPktMod", - vec![repair_socket.clone()], + vec![repair_socket], exit.clone(), sender.clone(), recycler.clone(), @@ -245,7 +273,7 @@ impl ShredFetchStage { shred_version, "shred_fetch_repair", PacketFlags::REPAIR, - Some((repair_socket, cluster_info)), + Some(repair_context.clone()), turbine_disabled.clone(), ); @@ -283,7 +311,8 @@ impl ShredFetchStage { shred_version, "shred_fetch_repair_quic", PacketFlags::REPAIR, - None, // repair_context; no ping packets! + // No ping packets but need to verify repair nonce. + Some(&repair_context), turbine_disabled, ) }) @@ -334,6 +363,28 @@ impl ShredFetchStage { } } +impl RepairContext { + fn keypair(&self) -> Arc { + self.cluster_info.keypair().clone() + } +} + +// Returns false if repair nonce is invalid and packet should be discarded. +#[must_use] +fn verify_repair_nonce( + packet: &Packet, + now: u64, // solana_sdk::timing::timestamp() + outstanding_repair_requests: &mut OutstandingShredRepairs, +) -> bool { + debug_assert!(packet.meta().flags.contains(PacketFlags::REPAIR)); + let Some((shred, Some(nonce))) = shred::layout::get_shred_and_repair_nonce(packet) else { + return false; + }; + outstanding_repair_requests + .register_response(nonce, shred, now, |_| ()) + .is_some() +} + pub(crate) fn receive_quic_datagrams( quic_datagrams_receiver: Receiver<(Pubkey, SocketAddr, Bytes)>, flags: PacketFlags, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 846a03636f764c..07761b07f5bec0 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -185,6 +185,7 @@ impl Tvu { tvu_config.shred_version, bank_forks.clone(), cluster_info.clone(), + outstanding_repair_requests.clone(), turbine_disabled, exit.clone(), ); diff --git a/core/src/window_service.rs b/core/src/window_service.rs index b8749976a4a54b..b46f9610e8f5d8 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -8,7 +8,6 @@ use { completed_data_sets_service::CompletedDataSetsSender, repair::{ ancestor_hashes_service::AncestorHashesReplayUpdateReceiver, - repair_response, repair_service::{ DumpedSlotsReceiver, OutstandingShredRepairs, PopularPrunedForksSender, RepairInfo, RepairService, @@ -24,7 +23,7 @@ use { solana_ledger::{ blockstore::{Blockstore, BlockstoreInsertionMetrics, PossibleDuplicateShred}, leader_schedule_cache::LeaderScheduleCache, - shred::{self, Nonce, ReedSolomonCache, Shred}, + shred::{self, ReedSolomonCache, Shred}, }, solana_measure::measure::Measure, solana_metrics::inc_new_counter_error, @@ -219,38 +218,6 @@ fn run_check_duplicate( .try_for_each(check_duplicate) } -fn verify_repair( - outstanding_requests: &mut OutstandingShredRepairs, - shred: &Shred, - repair_meta: &Option, -) -> bool { - repair_meta - .as_ref() - .map(|repair_meta| { - outstanding_requests - .register_response( - repair_meta.nonce, - shred, - solana_sdk::timing::timestamp(), - |_| (), - ) - .is_some() - }) - .unwrap_or(true) -} - -fn prune_shreds_by_repair_status( - shreds: &mut Vec<(Shred, Option)>, - outstanding_requests: &RwLock, - accept_repairs_only: bool, -) { - let mut outstanding_requests = outstanding_requests.write().unwrap(); - shreds.retain(|(shred, repair_meta)| { - (!accept_repairs_only || repair_meta.is_some()) - && verify_repair(&mut outstanding_requests, shred, repair_meta) - }); -} - #[allow(clippy::too_many_arguments)] fn run_insert( thread_pool: &ThreadPool, @@ -262,7 +229,6 @@ fn run_insert( ws_metrics: &mut WindowServiceMetrics, completed_data_sets_sender: Option<&CompletedDataSetsSender>, retransmit_sender: &Sender>, - outstanding_requests: &RwLock, reed_solomon_cache: &ReedSolomonCache, accept_repairs_only: bool, ) -> Result<()> @@ -280,20 +246,16 @@ where if packet.meta().discard() { return None; } + let repair = packet.meta().repair(); + if accept_repairs_only && !repair { + return None; + } let shred = shred::layout::get_shred(packet)?; let shred = Shred::new_from_serialized_shred(shred.to_vec()).ok()?; - if packet.meta().repair() { - let repair_info = RepairMeta { - // If can't parse the nonce, dump the packet. - nonce: repair_response::nonce(packet)?, - }; - Some((shred, Some(repair_info))) - } else { - Some((shred, None)) - } + Some((shred, repair)) }; let now = Instant::now(); - let mut shreds: Vec<_> = thread_pool.install(|| { + let shreds: Vec<_> = thread_pool.install(|| { packets .par_iter() .with_min_len(32) @@ -302,10 +264,7 @@ where }); ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64; ws_metrics.num_packets += packets.iter().map(PacketBatch::len).sum::(); - ws_metrics.num_repairs += shreds - .iter() - .filter(|(_, repair_meta)| repair_meta.is_some()) - .count(); + ws_metrics.num_repairs += shreds.iter().filter(|&&(_, repair)| repair).count(); ws_metrics.num_shreds_received += shreds.len(); for packet in packets.iter().flat_map(PacketBatch::iter) { let addr = packet.meta().socket_addr(); @@ -314,14 +273,10 @@ where let mut prune_shreds_elapsed = Measure::start("prune_shreds_elapsed"); let num_shreds = shreds.len(); - prune_shreds_by_repair_status(&mut shreds, outstanding_requests, accept_repairs_only); ws_metrics.num_shreds_pruned_invalid_repair = num_shreds - shreds.len(); prune_shreds_elapsed.stop(); ws_metrics.prune_shreds_elapsed_us += prune_shreds_elapsed.as_us(); - let shreds = shreds - .into_iter() - .map(|(shred, repair_meta)| (shred, repair_meta.is_some())); let completed_data_sets = blockstore.insert_shreds_handle_duplicate( shreds, Some(leader_schedule_cache), @@ -339,10 +294,6 @@ where Ok(()) } -struct RepairMeta { - nonce: Nonce, -} - pub(crate) struct WindowService { t_insert: JoinHandle<()>, t_check_duplicate: JoinHandle<()>, @@ -413,7 +364,6 @@ impl WindowService { duplicate_sender, completed_data_sets_sender, retransmit_sender, - outstanding_repair_requests, accept_repairs_only, ); @@ -463,7 +413,6 @@ impl WindowService { check_duplicate_sender: Sender, completed_data_sets_sender: Option, retransmit_sender: Sender>, - outstanding_requests: Arc>, accept_repairs_only: bool, ) -> JoinHandle<()> { let handle_error = || { @@ -499,7 +448,6 @@ impl WindowService { &mut ws_metrics, completed_data_sets_sender.as_ref(), &retransmit_sender, - &outstanding_requests, &reed_solomon_cache, accept_repairs_only, ) { @@ -548,7 +496,6 @@ impl WindowService { mod test { use { super::*, - crate::repair::serve_repair::ShredRepairType, rand::Rng, solana_entry::entry::{create_ticks, Entry}, solana_gossip::contact_info::ContactInfo, @@ -729,45 +676,4 @@ mod test { exit.store(true, Ordering::Relaxed); t_check_duplicate.join().unwrap(); } - - #[test] - fn test_prune_shreds() { - solana_logger::setup(); - let shred = Shred::new_from_parity_shard( - 5, // slot - 5, // index - &[], // parity_shard - 5, // fec_set_index - 6, // num_data_shreds - 6, // num_coding_shreds - 4, // position - 0, // version - ); - let shreds = [shred.clone(), shred.clone(), shred.clone()]; - let repair_meta = RepairMeta { nonce: 0 }; - let outstanding_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default())); - let repair_type = ShredRepairType::Orphan(9); - let nonce = outstanding_requests - .write() - .unwrap() - .add_request(repair_type, timestamp()); - let repair_meta1 = RepairMeta { nonce }; - let repair_meta = [None, Some(repair_meta), Some(repair_meta1)]; - let mut shreds = shreds.into_iter().zip(repair_meta).collect(); - prune_shreds_by_repair_status(&mut shreds, &outstanding_requests, false); - assert_eq!(shreds.len(), 2); - assert!(shreds[0].1.is_none()); - assert_eq!(shreds[1].1.as_ref().unwrap().nonce, nonce); - - let shreds = [shred.clone(), shred.clone(), shred]; - let repair_meta2 = RepairMeta { nonce: 0 }; - let repair_meta3 = RepairMeta { nonce }; - let repair_meta = [None, Some(repair_meta2), Some(repair_meta3)]; - let mut shreds = shreds.into_iter().zip(repair_meta).collect(); - // In wen_restart, we discard all Turbine shreds and only keep valid repair shreds. - prune_shreds_by_repair_status(&mut shreds, &outstanding_requests, true); - assert_eq!(shreds.len(), 1); - assert!(shreds[0].1.is_some()); - assert_eq!(shreds[0].1.as_ref().unwrap().nonce, nonce); - } } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index fca12bd825ebc6..04ce5c837a30a1 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1293,7 +1293,7 @@ impl Blockstore { // Blockstore::insert_shreds when inserting own shreds during leader slots. pub fn insert_shreds_handle_duplicate( &self, - shreds: impl ExactSizeIterator, + shreds: impl IntoIterator, leader_schedule: Option<&LeaderScheduleCache>, is_trusted: bool, retransmit_sender: &Sender>>, @@ -1308,7 +1308,7 @@ impl Blockstore { completed_data_set_infos, duplicate_shreds, } = self.do_insert_shreds( - shreds, + shreds.into_iter(), leader_schedule, is_trusted, Some((reed_solomon_cache, retransmit_sender)), diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 08802d86df0003..5bffc273268bf0 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -646,6 +646,18 @@ pub mod layout { packet.buffer_mut().get_mut(..size) } + #[inline] + pub fn get_shred_and_repair_nonce(packet: &Packet) -> Option<(&[u8], Option)> { + let data = packet.data(..)?; + if !packet.meta().repair() { + return Some((data, None)); + } + let offset = data.len().checked_sub(4)?; + let (shred, nonce) = data.split_at(offset); + let nonce = u32::from_le_bytes(<[u8; 4]>::try_from(nonce).unwrap()); + Some((shred, Some(nonce))) + } + #[inline] pub fn get_common_header_bytes(shred: &[u8]) -> Option<&[u8]> { shred.get(..SIZE_OF_COMMON_SHRED_HEADER) @@ -681,7 +693,7 @@ pub mod layout { } #[inline] - pub(crate) fn get_index(shred: &[u8]) -> Option { + pub fn get_index(shred: &[u8]) -> Option { let bytes = <[u8; 4]>::try_from(shred.get(73..73 + 4)?).unwrap(); Some(u32::from_le_bytes(bytes)) }