Skip to content

Commit

Permalink
encapsulates shred's payload type (#4690)
Browse files Browse the repository at this point in the history
A wrapper type would allow to hide the implementation from the rest of
the code-base and more easily roll-out changes.
  • Loading branch information
behzadnouri authored Jan 29, 2025
1 parent 278b447 commit 7d8cdd9
Show file tree
Hide file tree
Showing 17 changed files with 242 additions and 97 deletions.
5 changes: 3 additions & 2 deletions core/src/repair/repair_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@ pub fn repair_response_packet(
}

pub fn repair_response_packet_from_bytes(
bytes: Vec<u8>,
bytes: impl AsRef<[u8]>,
dest: &SocketAddr,
nonce: Nonce,
) -> Option<Packet> {
let bytes = bytes.as_ref();
let mut packet = Packet::default();
let size = bytes.len() + SIZE_OF_NONCE;
if size > packet.buffer_mut().len() {
return None;
}
packet.meta_mut().size = size;
packet.meta_mut().set_socket_addr(dest);
packet.buffer_mut()[..bytes.len()].copy_from_slice(&bytes);
packet.buffer_mut()[..bytes.len()].copy_from_slice(bytes);
let mut wr = io::Cursor::new(&mut packet.buffer_mut()[bytes.len()..]);
bincode::serialize_into(&mut wr, &nonce).expect("Buffer not large enough to fit nonce");
Some(packet)
Expand Down
9 changes: 4 additions & 5 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ use {
tokio::sync::mpsc::Sender as AsyncSender,
};

type ShredPayload = Vec<u8>;
type DuplicateSlotSender = Sender<Slot>;
pub(crate) type DuplicateSlotReceiver = Receiver<Slot>;

Expand Down Expand Up @@ -202,7 +201,7 @@ fn run_check_duplicate(
existing_shred_payload.clone(),
shred.clone().into_payload(),
)?;
(shred, existing_shred_payload)
(shred, shred::Payload::from(existing_shred_payload))
}
};

Expand Down Expand Up @@ -261,7 +260,7 @@ fn run_insert<F>(
metrics: &mut BlockstoreInsertionMetrics,
ws_metrics: &mut WindowServiceMetrics,
completed_data_sets_sender: Option<&CompletedDataSetsSender>,
retransmit_sender: &Sender<Vec<ShredPayload>>,
retransmit_sender: &Sender<Vec<shred::Payload>>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
reed_solomon_cache: &ReedSolomonCache,
accept_repairs_only: bool,
Expand Down Expand Up @@ -354,7 +353,7 @@ impl WindowService {
pub(crate) fn new(
blockstore: Arc<Blockstore>,
verified_receiver: Receiver<Vec<PacketBatch>>,
retransmit_sender: Sender<Vec<ShredPayload>>,
retransmit_sender: Sender<Vec<shred::Payload>>,
repair_socket: Arc<UdpSocket>,
ancestor_hashes_socket: Arc<UdpSocket>,
repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
Expand Down Expand Up @@ -462,7 +461,7 @@ impl WindowService {
verified_receiver: Receiver<Vec<PacketBatch>>,
check_duplicate_sender: Sender<PossibleDuplicateShred>,
completed_data_sets_sender: Option<CompletedDataSetsSender>,
retransmit_sender: Sender<Vec<ShredPayload>>,
retransmit_sender: Sender<Vec<shred::Payload>>,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
accept_repairs_only: bool,
) -> JoinHandle<()> {
Expand Down
7 changes: 4 additions & 3 deletions gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,19 +201,20 @@ where
Err(Error::InvalidErasureMetaConflict)
}

pub(crate) fn from_shred<F>(
pub(crate) fn from_shred<T: AsRef<[u8]>, F>(
shred: Shred,
self_pubkey: Pubkey, // Pubkey of my node broadcasting crds value.
other_payload: Vec<u8>,
other_payload: T,
leader_schedule: Option<F>,
wallclock: u64,
max_size: usize, // Maximum serialized size of each DuplicateShred.
shred_version: u16,
) -> Result<impl Iterator<Item = DuplicateShred>, Error>
where
F: FnOnce(Slot) -> Option<Pubkey>,
shred::Payload: From<T>,
{
if shred.payload() == &other_payload {
if shred.payload().as_ref() == other_payload.as_ref() {
return Err(Error::InvalidDuplicateShreds);
}
let other_shred = Shred::new_from_serialized_shred(other_payload)?;
Expand Down
4 changes: 2 additions & 2 deletions ledger-tool/src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use {
solana_ledger::{
blockstore::{Blockstore, BlockstoreError},
blockstore_meta::{DuplicateSlotProof, ErasureMeta},
shred::{Shred, ShredType},
shred::{self, Shred, ShredType},
},
solana_runtime::bank::{Bank, TotalAccountsStats},
solana_sdk::{
Expand Down Expand Up @@ -408,7 +408,7 @@ impl From<Shred> for CliDuplicateShred {
merkle_root: shred.merkle_root().ok(),
chained_merkle_root: shred.chained_merkle_root().ok(),
last_in_slot: shred.last_in_slot(),
payload: shred.payload().clone(),
payload: shred::Payload::unwrap_or_clone(shred.payload().clone()),
}
}
}
Expand Down
57 changes: 39 additions & 18 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,26 @@ enum InsertDataShredError {
#[derive(Eq, PartialEq, Debug, Clone)]
pub enum PossibleDuplicateShred {
Exists(Shred), // Blockstore has another shred in its spot
LastIndexConflict(/* original */ Shred, /* conflict */ Vec<u8>), // The index of this shred conflicts with `slot_meta.last_index`
ErasureConflict(/* original */ Shred, /* conflict */ Vec<u8>), // The coding shred has a conflict in the erasure_meta
MerkleRootConflict(/* original */ Shred, /* conflict */ Vec<u8>), // Merkle root conflict in the same fec set
ChainedMerkleRootConflict(/* original */ Shred, /* conflict */ Vec<u8>), // Merkle root chaining conflict with previous fec set
// The index of this shred conflicts with `slot_meta.last_index`
LastIndexConflict(
Shred, // original
shred::Payload, // conflict
),
// The coding shred has a conflict in the erasure_meta
ErasureConflict(
Shred, // original
shred::Payload, // conflict
),
// Merkle root conflict in the same fec set
MerkleRootConflict(
Shred, // original
shred::Payload, // conflict
),
// Merkle root chaining conflict with previous fec set
ChainedMerkleRootConflict(
Shred, // original
shred::Payload, // conflict
),
}

impl PossibleDuplicateShred {
Expand Down Expand Up @@ -976,7 +992,7 @@ impl Blockstore {
leader_schedule: Option<&LeaderScheduleCache>,
reed_solomon_cache: &ReedSolomonCache,
shred_insertion_tracker: &mut ShredInsertionTracker,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
retransmit_sender: &Sender<Vec<shred::Payload>>,
is_trusted: bool,
metrics: &mut BlockstoreInsertionMetrics,
) {
Expand Down Expand Up @@ -1219,7 +1235,7 @@ impl Blockstore {
// recovered shreds.
should_recover_shreds: Option<(
&ReedSolomonCache,
&Sender<Vec</*shred:*/ Vec<u8>>>, // retransmit_sender
&Sender<Vec<shred::Payload>>, // retransmit_sender
)>,
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<InsertResults> {
Expand Down Expand Up @@ -1296,7 +1312,7 @@ impl Blockstore {
shreds: impl ExactSizeIterator<Item = (Shred, /*is_repaired:*/ bool)>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
retransmit_sender: &Sender<Vec<shred::Payload>>,
handle_duplicate: &F,
reed_solomon_cache: &ReedSolomonCache,
metrics: &mut BlockstoreInsertionMetrics,
Expand Down Expand Up @@ -1559,7 +1575,7 @@ impl Blockstore {
slot: Slot,
erasure_meta: &ErasureMeta,
just_received_shreds: &'a HashMap<ShredId, Shred>,
) -> Option<Cow<'a, Vec<u8>>> {
) -> Option<Cow<'a, shred::Payload>> {
// Search for the shred which set the initial erasure config, either inserted,
// or in the current batch in just_received_shreds.
let index = erasure_meta.first_received_coding_shred_index()?;
Expand Down Expand Up @@ -1784,7 +1800,7 @@ impl Blockstore {
&'a self,
just_inserted_shreds: &'a HashMap<ShredId, Shred>,
shred_id: ShredId,
) -> Option<Cow<'a, Vec<u8>>> {
) -> Option<Cow<'a, shred::Payload>> {
let (slot, index, shred_type) = shred_id.unpack();
match (just_inserted_shreds.get(&shred_id), shred_type) {
(Some(shred), _) => Some(Cow::Borrowed(shred.payload())),
Expand All @@ -1793,10 +1809,12 @@ impl Blockstore {
(_, ShredType::Data) => self
.get_data_shred(slot, u64::from(index))
.unwrap()
.map(shred::Payload::from)
.map(Cow::Owned),
(_, ShredType::Code) => self
.get_coding_shred(slot, u64::from(index))
.unwrap()
.map(shred::Payload::from)
.map(Cow::Owned),
}
}
Expand Down Expand Up @@ -4057,7 +4075,10 @@ impl Blockstore {
.map(|(slot, proof_bytes)| (slot, deserialize(&proof_bytes).unwrap()))
}

pub fn store_duplicate_slot(&self, slot: Slot, shred1: Vec<u8>, shred2: Vec<u8>) -> Result<()> {
pub fn store_duplicate_slot<S, T>(&self, slot: Slot, shred1: S, shred2: T) -> Result<()>
where
shred::Payload: From<S> + From<T>,
{
let duplicate_slot_proof = DuplicateSlotProof::new(shred1, shred2);
self.duplicate_slots_cf.put(slot, &duplicate_slot_proof)
}
Expand Down Expand Up @@ -4086,7 +4107,7 @@ impl Blockstore {
error!("set retransmitter signature failed: {err:?}");
}
}
(&other != shred.payload()).then_some(other)
(other != **shred.payload()).then_some(other)
}

pub fn has_duplicate_shreds_in_slot(&self, slot: Slot) -> bool {
Expand Down Expand Up @@ -10251,7 +10272,7 @@ pub mod tests {
.get_data_shred(s.slot(), s.index() as u64)
.unwrap()
.unwrap(),
buf
buf.as_ref(),
);
}

Expand Down Expand Up @@ -10551,8 +10572,8 @@ pub mod tests {

// Check if shreds are duplicated
assert_eq!(
blockstore.is_shred_duplicate(&duplicate_shred),
Some(shred.payload().clone())
blockstore.is_shred_duplicate(&duplicate_shred).as_deref(),
Some(shred.payload().as_ref())
);
assert!(blockstore
.is_shred_duplicate(&non_duplicate_shred)
Expand Down Expand Up @@ -11036,7 +11057,7 @@ pub mod tests {
if i <= smaller_last_shred_index as u64 {
assert_eq!(
blockstore.get_data_shred(slot, i).unwrap().unwrap(),
*shreds[i as usize].payload()
shreds[i as usize].payload().as_ref(),
);
} else {
assert!(blockstore.get_data_shred(slot, i).unwrap().is_none());
Expand Down Expand Up @@ -11068,7 +11089,7 @@ pub mod tests {
if i <= smaller_last_shred_index as u64 {
assert_eq!(
blockstore.get_data_shred(slot, i).unwrap().unwrap(),
*shreds[i as usize].payload()
shreds[i as usize].payload().as_ref(),
);
} else {
assert!(blockstore.get_data_shred(slot, i).unwrap().is_none());
Expand Down Expand Up @@ -11104,7 +11125,7 @@ pub mod tests {
.get_data_shred(slot, shred_index)
.unwrap()
.unwrap(),
*shred_to_check.payload()
shred_to_check.payload().as_ref(),
);
} else {
assert!(blockstore
Expand Down Expand Up @@ -11135,7 +11156,7 @@ pub mod tests {
.get_data_shred(slot, shred_index)
.unwrap()
.unwrap(),
*shred_to_check.payload()
shred_to_check.payload().as_ref(),
);
} else {
assert!(blockstore
Expand Down
20 changes: 13 additions & 7 deletions ledger/src/blockstore_meta.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::{
blockstore::MAX_DATA_SHREDS_PER_SLOT,
shred::{Shred, ShredType},
shred::{self, Shred, ShredType},
},
bitflags::bitflags,
serde::{Deserialize, Deserializer, Serialize, Serializer},
Expand Down Expand Up @@ -224,10 +224,10 @@ pub struct MerkleRootMeta {

#[derive(Deserialize, Serialize)]
pub struct DuplicateSlotProof {
#[serde(with = "serde_bytes")]
pub shred1: Vec<u8>,
#[serde(with = "serde_bytes")]
pub shred2: Vec<u8>,
#[serde(with = "shred::serde_bytes_payload")]
pub shred1: shred::Payload,
#[serde(with = "shred::serde_bytes_payload")]
pub shred2: shred::Payload,
}

#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -774,8 +774,14 @@ impl MerkleRootMeta {
}

impl DuplicateSlotProof {
pub(crate) fn new(shred1: Vec<u8>, shred2: Vec<u8>) -> Self {
DuplicateSlotProof { shred1, shred2 }
pub(crate) fn new<S, T>(shred1: S, shred2: T) -> Self
where
shred::Payload: From<S> + From<T>,
{
DuplicateSlotProof {
shred1: shred::Payload::from(shred1),
shred2: shred::Payload::from(shred2),
}
}
}

Expand Down
30 changes: 18 additions & 12 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,17 @@
//! So, given a) - c), we must restrict data shred's payload length such that the entire coding
//! payload can fit into one coding shred / packet.
pub(crate) use self::merkle::SIZE_OF_MERKLE_ROOT;
#[cfg(test)]
pub(crate) use self::shred_code::MAX_CODE_SHREDS_PER_SLOT;
pub(crate) use self::{merkle::SIZE_OF_MERKLE_ROOT, payload::serde_bytes_payload};
pub use {
self::{
payload::Payload,
shred_data::ShredData,
stats::{ProcessShredsStats, ShredFetchStats},
},
crate::shredder::{ReedSolomonCache, Shredder},
};
use {
self::{shred_code::ShredCode, traits::Shred as _},
crate::blockstore::{self, MAX_DATA_SHREDS_PER_SLOT},
Expand All @@ -72,17 +80,11 @@ use {
std::{fmt::Debug, time::Instant},
thiserror::Error,
};
pub use {
self::{
shred_data::ShredData,
stats::{ProcessShredsStats, ShredFetchStats},
},
crate::shredder::{ReedSolomonCache, Shredder},
};

mod common;
mod legacy;
mod merkle;
mod payload;
pub mod shred_code;
mod shred_data;
mod stats;
Expand Down Expand Up @@ -369,9 +371,9 @@ impl Shred {
dispatch!(pub(crate) fn erasure_shard_index(&self) -> Result<usize, Error>);
dispatch!(pub(crate) fn retransmitter_signature(&self) -> Result<Signature, Error>);

dispatch!(pub fn into_payload(self) -> Vec<u8>);
dispatch!(pub fn into_payload(self) -> Payload);
dispatch!(pub fn merkle_root(&self) -> Result<Hash, Error>);
dispatch!(pub fn payload(&self) -> &Vec<u8>);
dispatch!(pub fn payload(&self) -> &Payload);
dispatch!(pub fn sanitize(&self) -> Result<(), Error>);

// Only for tests.
Expand Down Expand Up @@ -408,8 +410,12 @@ impl Shred {
))
}

pub fn new_from_serialized_shred(shred: Vec<u8>) -> Result<Self, Error> {
Ok(match layout::get_shred_variant(&shred)? {
pub fn new_from_serialized_shred<T>(shred: T) -> Result<Self, Error>
where
T: AsRef<[u8]> + Into<Payload>,
Payload: From<T>,
{
Ok(match layout::get_shred_variant(shred.as_ref())? {
ShredVariant::LegacyCode => {
let shred = legacy::ShredCode::from_payload(shred)?;
Self::from(ShredCode::from(shred))
Expand Down
4 changes: 2 additions & 2 deletions ledger/src/shred/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ macro_rules! impl_shred_common {
}

#[inline]
fn payload(&self) -> &Vec<u8> {
fn payload(&self) -> &Payload {
&self.payload
}

#[inline]
fn into_payload(self) -> Vec<u8> {
fn into_payload(self) -> Payload {
self.payload
}

Expand Down
Loading

0 comments on commit 7d8cdd9

Please sign in to comment.