Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

encapsulates shred's payload type #4690

Merged
merged 1 commit into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/repair/repair_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@ pub fn repair_response_packet(
}

pub fn repair_response_packet_from_bytes(
bytes: Vec<u8>,
bytes: impl AsRef<[u8]>,
dest: &SocketAddr,
nonce: Nonce,
) -> Option<Packet> {
let bytes = bytes.as_ref();
let mut packet = Packet::default();
let size = bytes.len() + SIZE_OF_NONCE;
if size > packet.buffer_mut().len() {
return None;
}
packet.meta_mut().size = size;
packet.meta_mut().set_socket_addr(dest);
packet.buffer_mut()[..bytes.len()].copy_from_slice(&bytes);
packet.buffer_mut()[..bytes.len()].copy_from_slice(bytes);
let mut wr = io::Cursor::new(&mut packet.buffer_mut()[bytes.len()..]);
bincode::serialize_into(&mut wr, &nonce).expect("Buffer not large enough to fit nonce");
Some(packet)
Expand Down
9 changes: 4 additions & 5 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ use {
tokio::sync::mpsc::Sender as AsyncSender,
};

type ShredPayload = Vec<u8>;
type DuplicateSlotSender = Sender<Slot>;
pub(crate) type DuplicateSlotReceiver = Receiver<Slot>;

Expand Down Expand Up @@ -202,7 +201,7 @@ fn run_check_duplicate(
existing_shred_payload.clone(),
shred.clone().into_payload(),
)?;
(shred, existing_shred_payload)
(shred, shred::Payload::from(existing_shred_payload))
}
};

Expand Down Expand Up @@ -261,7 +260,7 @@ fn run_insert<F>(
metrics: &mut BlockstoreInsertionMetrics,
ws_metrics: &mut WindowServiceMetrics,
completed_data_sets_sender: Option<&CompletedDataSetsSender>,
retransmit_sender: &Sender<Vec<ShredPayload>>,
retransmit_sender: &Sender<Vec<shred::Payload>>,
outstanding_requests: &RwLock<OutstandingShredRepairs>,
reed_solomon_cache: &ReedSolomonCache,
accept_repairs_only: bool,
Expand Down Expand Up @@ -354,7 +353,7 @@ impl WindowService {
pub(crate) fn new(
blockstore: Arc<Blockstore>,
verified_receiver: Receiver<Vec<PacketBatch>>,
retransmit_sender: Sender<Vec<ShredPayload>>,
retransmit_sender: Sender<Vec<shred::Payload>>,
repair_socket: Arc<UdpSocket>,
ancestor_hashes_socket: Arc<UdpSocket>,
repair_request_quic_sender: AsyncSender<(SocketAddr, Bytes)>,
Expand Down Expand Up @@ -462,7 +461,7 @@ impl WindowService {
verified_receiver: Receiver<Vec<PacketBatch>>,
check_duplicate_sender: Sender<PossibleDuplicateShred>,
completed_data_sets_sender: Option<CompletedDataSetsSender>,
retransmit_sender: Sender<Vec<ShredPayload>>,
retransmit_sender: Sender<Vec<shred::Payload>>,
outstanding_requests: Arc<RwLock<OutstandingShredRepairs>>,
accept_repairs_only: bool,
) -> JoinHandle<()> {
Expand Down
7 changes: 4 additions & 3 deletions gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,19 +201,20 @@ where
Err(Error::InvalidErasureMetaConflict)
}

pub(crate) fn from_shred<F>(
pub(crate) fn from_shred<T: AsRef<[u8]>, F>(
shred: Shred,
self_pubkey: Pubkey, // Pubkey of my node broadcasting crds value.
other_payload: Vec<u8>,
other_payload: T,
leader_schedule: Option<F>,
wallclock: u64,
max_size: usize, // Maximum serialized size of each DuplicateShred.
shred_version: u16,
) -> Result<impl Iterator<Item = DuplicateShred>, Error>
where
F: FnOnce(Slot) -> Option<Pubkey>,
shred::Payload: From<T>,
{
if shred.payload() == &other_payload {
if shred.payload().as_ref() == other_payload.as_ref() {
return Err(Error::InvalidDuplicateShreds);
}
let other_shred = Shred::new_from_serialized_shred(other_payload)?;
Expand Down
4 changes: 2 additions & 2 deletions ledger-tool/src/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use {
solana_ledger::{
blockstore::{Blockstore, BlockstoreError},
blockstore_meta::{DuplicateSlotProof, ErasureMeta},
shred::{Shred, ShredType},
shred::{self, Shred, ShredType},
},
solana_runtime::bank::{Bank, TotalAccountsStats},
solana_sdk::{
Expand Down Expand Up @@ -408,7 +408,7 @@ impl From<Shred> for CliDuplicateShred {
merkle_root: shred.merkle_root().ok(),
chained_merkle_root: shred.chained_merkle_root().ok(),
last_in_slot: shred.last_in_slot(),
payload: shred.payload().clone(),
payload: shred::Payload::unwrap_or_clone(shred.payload().clone()),
}
}
}
Expand Down
57 changes: 39 additions & 18 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,26 @@ enum InsertDataShredError {
#[derive(Eq, PartialEq, Debug, Clone)]
pub enum PossibleDuplicateShred {
Exists(Shred), // Blockstore has another shred in its spot
LastIndexConflict(/* original */ Shred, /* conflict */ Vec<u8>), // The index of this shred conflicts with `slot_meta.last_index`
ErasureConflict(/* original */ Shred, /* conflict */ Vec<u8>), // The coding shred has a conflict in the erasure_meta
MerkleRootConflict(/* original */ Shred, /* conflict */ Vec<u8>), // Merkle root conflict in the same fec set
ChainedMerkleRootConflict(/* original */ Shred, /* conflict */ Vec<u8>), // Merkle root chaining conflict with previous fec set
// The index of this shred conflicts with `slot_meta.last_index`
LastIndexConflict(
Shred, // original
shred::Payload, // conflict
),
// The coding shred has a conflict in the erasure_meta
ErasureConflict(
Shred, // original
shred::Payload, // conflict
),
// Merkle root conflict in the same fec set
MerkleRootConflict(
Shred, // original
shred::Payload, // conflict
),
// Merkle root chaining conflict with previous fec set
ChainedMerkleRootConflict(
Shred, // original
shred::Payload, // conflict
),
}

impl PossibleDuplicateShred {
Expand Down Expand Up @@ -976,7 +992,7 @@ impl Blockstore {
leader_schedule: Option<&LeaderScheduleCache>,
reed_solomon_cache: &ReedSolomonCache,
shred_insertion_tracker: &mut ShredInsertionTracker,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
retransmit_sender: &Sender<Vec<shred::Payload>>,
is_trusted: bool,
metrics: &mut BlockstoreInsertionMetrics,
) {
Expand Down Expand Up @@ -1219,7 +1235,7 @@ impl Blockstore {
// recovered shreds.
should_recover_shreds: Option<(
&ReedSolomonCache,
&Sender<Vec</*shred:*/ Vec<u8>>>, // retransmit_sender
&Sender<Vec<shred::Payload>>, // retransmit_sender
)>,
metrics: &mut BlockstoreInsertionMetrics,
) -> Result<InsertResults> {
Expand Down Expand Up @@ -1296,7 +1312,7 @@ impl Blockstore {
shreds: impl ExactSizeIterator<Item = (Shred, /*is_repaired:*/ bool)>,
leader_schedule: Option<&LeaderScheduleCache>,
is_trusted: bool,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
retransmit_sender: &Sender<Vec<shred::Payload>>,
handle_duplicate: &F,
reed_solomon_cache: &ReedSolomonCache,
metrics: &mut BlockstoreInsertionMetrics,
Expand Down Expand Up @@ -1559,7 +1575,7 @@ impl Blockstore {
slot: Slot,
erasure_meta: &ErasureMeta,
just_received_shreds: &'a HashMap<ShredId, Shred>,
) -> Option<Cow<'a, Vec<u8>>> {
) -> Option<Cow<'a, shred::Payload>> {
// Search for the shred which set the initial erasure config, either inserted,
// or in the current batch in just_received_shreds.
let index = erasure_meta.first_received_coding_shred_index()?;
Expand Down Expand Up @@ -1784,7 +1800,7 @@ impl Blockstore {
&'a self,
just_inserted_shreds: &'a HashMap<ShredId, Shred>,
shred_id: ShredId,
) -> Option<Cow<'a, Vec<u8>>> {
) -> Option<Cow<'a, shred::Payload>> {
let (slot, index, shred_type) = shred_id.unpack();
match (just_inserted_shreds.get(&shred_id), shred_type) {
(Some(shred), _) => Some(Cow::Borrowed(shred.payload())),
Expand All @@ -1793,10 +1809,12 @@ impl Blockstore {
(_, ShredType::Data) => self
.get_data_shred(slot, u64::from(index))
.unwrap()
.map(shred::Payload::from)
.map(Cow::Owned),
(_, ShredType::Code) => self
.get_coding_shred(slot, u64::from(index))
.unwrap()
.map(shred::Payload::from)
.map(Cow::Owned),
}
}
Expand Down Expand Up @@ -4057,7 +4075,10 @@ impl Blockstore {
.map(|(slot, proof_bytes)| (slot, deserialize(&proof_bytes).unwrap()))
}

pub fn store_duplicate_slot(&self, slot: Slot, shred1: Vec<u8>, shred2: Vec<u8>) -> Result<()> {
pub fn store_duplicate_slot<S, T>(&self, slot: Slot, shred1: S, shred2: T) -> Result<()>
where
shred::Payload: From<S> + From<T>,
{
let duplicate_slot_proof = DuplicateSlotProof::new(shred1, shred2);
self.duplicate_slots_cf.put(slot, &duplicate_slot_proof)
}
Expand Down Expand Up @@ -4086,7 +4107,7 @@ impl Blockstore {
error!("set retransmitter signature failed: {err:?}");
}
}
(&other != shred.payload()).then_some(other)
(other != **shred.payload()).then_some(other)
}

pub fn has_duplicate_shreds_in_slot(&self, slot: Slot) -> bool {
Expand Down Expand Up @@ -10251,7 +10272,7 @@ pub mod tests {
.get_data_shred(s.slot(), s.index() as u64)
.unwrap()
.unwrap(),
buf
buf.as_ref(),
);
}

Expand Down Expand Up @@ -10551,8 +10572,8 @@ pub mod tests {

// Check if shreds are duplicated
assert_eq!(
blockstore.is_shred_duplicate(&duplicate_shred),
Some(shred.payload().clone())
blockstore.is_shred_duplicate(&duplicate_shred).as_deref(),
Some(shred.payload().as_ref())
);
assert!(blockstore
.is_shred_duplicate(&non_duplicate_shred)
Expand Down Expand Up @@ -11036,7 +11057,7 @@ pub mod tests {
if i <= smaller_last_shred_index as u64 {
assert_eq!(
blockstore.get_data_shred(slot, i).unwrap().unwrap(),
*shreds[i as usize].payload()
shreds[i as usize].payload().as_ref(),
);
} else {
assert!(blockstore.get_data_shred(slot, i).unwrap().is_none());
Expand Down Expand Up @@ -11068,7 +11089,7 @@ pub mod tests {
if i <= smaller_last_shred_index as u64 {
assert_eq!(
blockstore.get_data_shred(slot, i).unwrap().unwrap(),
*shreds[i as usize].payload()
shreds[i as usize].payload().as_ref(),
);
} else {
assert!(blockstore.get_data_shred(slot, i).unwrap().is_none());
Expand Down Expand Up @@ -11104,7 +11125,7 @@ pub mod tests {
.get_data_shred(slot, shred_index)
.unwrap()
.unwrap(),
*shred_to_check.payload()
shred_to_check.payload().as_ref(),
);
} else {
assert!(blockstore
Expand Down Expand Up @@ -11135,7 +11156,7 @@ pub mod tests {
.get_data_shred(slot, shred_index)
.unwrap()
.unwrap(),
*shred_to_check.payload()
shred_to_check.payload().as_ref(),
);
} else {
assert!(blockstore
Expand Down
20 changes: 13 additions & 7 deletions ledger/src/blockstore_meta.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::{
blockstore::MAX_DATA_SHREDS_PER_SLOT,
shred::{Shred, ShredType},
shred::{self, Shred, ShredType},
},
bitflags::bitflags,
serde::{Deserialize, Deserializer, Serialize, Serializer},
Expand Down Expand Up @@ -224,10 +224,10 @@ pub struct MerkleRootMeta {

#[derive(Deserialize, Serialize)]
pub struct DuplicateSlotProof {
#[serde(with = "serde_bytes")]
pub shred1: Vec<u8>,
#[serde(with = "serde_bytes")]
pub shred2: Vec<u8>,
#[serde(with = "shred::serde_bytes_payload")]
pub shred1: shred::Payload,
#[serde(with = "shred::serde_bytes_payload")]
pub shred2: shred::Payload,
}

#[derive(Deserialize, Serialize, Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -774,8 +774,14 @@ impl MerkleRootMeta {
}

impl DuplicateSlotProof {
pub(crate) fn new(shred1: Vec<u8>, shred2: Vec<u8>) -> Self {
DuplicateSlotProof { shred1, shred2 }
pub(crate) fn new<S, T>(shred1: S, shred2: T) -> Self
where
shred::Payload: From<S> + From<T>,
{
DuplicateSlotProof {
shred1: shred::Payload::from(shred1),
shred2: shred::Payload::from(shred2),
}
}
}

Expand Down
30 changes: 18 additions & 12 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,17 @@
//! So, given a) - c), we must restrict data shred's payload length such that the entire coding
//! payload can fit into one coding shred / packet.

pub(crate) use self::merkle::SIZE_OF_MERKLE_ROOT;
#[cfg(test)]
pub(crate) use self::shred_code::MAX_CODE_SHREDS_PER_SLOT;
pub(crate) use self::{merkle::SIZE_OF_MERKLE_ROOT, payload::serde_bytes_payload};
pub use {
self::{
payload::Payload,
shred_data::ShredData,
stats::{ProcessShredsStats, ShredFetchStats},
},
crate::shredder::{ReedSolomonCache, Shredder},
};
use {
self::{shred_code::ShredCode, traits::Shred as _},
crate::blockstore::{self, MAX_DATA_SHREDS_PER_SLOT},
Expand All @@ -72,17 +80,11 @@ use {
std::{fmt::Debug, time::Instant},
thiserror::Error,
};
pub use {
self::{
shred_data::ShredData,
stats::{ProcessShredsStats, ShredFetchStats},
},
crate::shredder::{ReedSolomonCache, Shredder},
};

mod common;
mod legacy;
mod merkle;
mod payload;
pub mod shred_code;
mod shred_data;
mod stats;
Expand Down Expand Up @@ -369,9 +371,9 @@ impl Shred {
dispatch!(pub(crate) fn erasure_shard_index(&self) -> Result<usize, Error>);
dispatch!(pub(crate) fn retransmitter_signature(&self) -> Result<Signature, Error>);

dispatch!(pub fn into_payload(self) -> Vec<u8>);
dispatch!(pub fn into_payload(self) -> Payload);
dispatch!(pub fn merkle_root(&self) -> Result<Hash, Error>);
dispatch!(pub fn payload(&self) -> &Vec<u8>);
dispatch!(pub fn payload(&self) -> &Payload);
dispatch!(pub fn sanitize(&self) -> Result<(), Error>);

// Only for tests.
Expand Down Expand Up @@ -408,8 +410,12 @@ impl Shred {
))
}

pub fn new_from_serialized_shred(shred: Vec<u8>) -> Result<Self, Error> {
Ok(match layout::get_shred_variant(&shred)? {
pub fn new_from_serialized_shred<T>(shred: T) -> Result<Self, Error>
where
T: AsRef<[u8]> + Into<Payload>,
Payload: From<T>,
{
Ok(match layout::get_shred_variant(shred.as_ref())? {
ShredVariant::LegacyCode => {
let shred = legacy::ShredCode::from_payload(shred)?;
Self::from(ShredCode::from(shred))
Expand Down
4 changes: 2 additions & 2 deletions ledger/src/shred/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ macro_rules! impl_shred_common {
}

#[inline]
fn payload(&self) -> &Vec<u8> {
fn payload(&self) -> &Payload {
&self.payload
}

#[inline]
fn into_payload(self) -> Vec<u8> {
fn into_payload(self) -> Payload {
self.payload
}

Expand Down
Loading
Loading