Skip to content

Commit

Permalink
adds shared variant to shred payload type (#4696)
Browse files Browse the repository at this point in the history
After signature verification shreds payloads are concurrently sent to
window-service to be deserialized and inserted into blockstore, while
their payload is also sent to retransmit-stage to be further propagated:
https://github.com/anza-xyz/agave/blob/7d8cdd9d3/turbine/src/sigverify_shreds.rs#L242-L243

Similarly, when shreds are recovered from the erasure codes, they are
concurrently inserted into blockstore, while their payload is sent to
retransmit-stage:
https://github.com/anza-xyz/agave/blob/7d8cdd9d3/ledger/src/blockstore.rs#L1024-L1035

Having a shred::Payload variant which allows to share the payload
between the two concurrent paths will allow to reduce allocations and
memcopies.
  • Loading branch information
behzadnouri authored Jan 30, 2025
1 parent 5b7ebfb commit bdece2e
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 24 deletions.
10 changes: 3 additions & 7 deletions ledger/src/shred/legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ impl<'a> Shred<'a> for ShredData {
if payload.len() < Self::SIZE_OF_HEADERS {
return Err(Error::InvalidPayloadSize(payload.len()));
}
if payload.len() != Self::SIZE_OF_PAYLOAD {
Payload::make_mut(&mut payload).resize(Self::SIZE_OF_PAYLOAD, 0u8);
}
payload.resize(Self::SIZE_OF_PAYLOAD, 0u8);
let shred = Self {
common_header,
data_header,
Expand Down Expand Up @@ -146,9 +144,7 @@ impl<'a> Shred<'a> for ShredCode {
let coding_header = deserialize_from_with_limit(&mut cursor)?;
// Repair packets have nonce at the end of packet payload:
// https://github.com/solana-labs/solana/pull/10109
if payload.len() > Self::SIZE_OF_PAYLOAD {
Payload::make_mut(&mut payload).truncate(Self::SIZE_OF_PAYLOAD);
}
payload.truncate(Self::SIZE_OF_PAYLOAD);
let shred = Self {
common_header,
coding_header,
Expand Down Expand Up @@ -368,7 +364,7 @@ mod test {
// Corrupt shred by making it too large
{
let mut shred = shred.clone();
Payload::make_mut(&mut shred.payload).push(10u8);
shred.payload.push(10u8);
assert_matches!(shred.sanitize(), Err(Error::InvalidPayloadSize(1229)));
}
{
Expand Down
8 changes: 2 additions & 6 deletions ledger/src/shred/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,9 +516,7 @@ impl<'a> ShredTrait<'a> for ShredData {
if payload.len() < Self::SIZE_OF_PAYLOAD {
return Err(Error::InvalidPayloadSize(payload.len()));
}
if payload.len() > Self::SIZE_OF_PAYLOAD {
Payload::make_mut(&mut payload).truncate(Self::SIZE_OF_PAYLOAD);
}
payload.truncate(Self::SIZE_OF_PAYLOAD);
let (common_header, data_header): (ShredCommonHeader, _) =
deserialize_from_with_limit(&payload[..])?;
if !matches!(common_header.shred_variant, ShredVariant::MerkleData { .. }) {
Expand Down Expand Up @@ -583,9 +581,7 @@ impl<'a> ShredTrait<'a> for ShredCode {
if payload.len() < Self::SIZE_OF_PAYLOAD {
return Err(Error::InvalidPayloadSize(payload.len()));
}
if payload.len() > Self::SIZE_OF_PAYLOAD {
Payload::make_mut(&mut payload).truncate(Self::SIZE_OF_PAYLOAD);
}
payload.truncate(Self::SIZE_OF_PAYLOAD);
let shred = Self {
common_header,
coding_header,
Expand Down
66 changes: 55 additions & 11 deletions ledger/src/shred/payload.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,71 @@
use std::ops::{Deref, DerefMut};
use std::{
ops::{Deref, DerefMut},
sync::Arc,
};

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Payload(Vec<u8>);
pub enum Payload {
Shared(Arc<Vec<u8>>),
Unique(Vec<u8>),
}

macro_rules! make_mut {
($self:ident) => {
match $self {
Self::Shared(bytes) => Arc::make_mut(bytes),
Self::Unique(bytes) => bytes,
}
};
}

macro_rules! dispatch {
($vis:vis fn $name:ident(&self $(, $arg:ident : $ty:ty)?) $(-> $out:ty)?) => {
#[inline]
$vis fn $name(&self $(, $arg:$ty)?) $(-> $out)? {
self.0.$name($($arg, )?)
match self {
Self::Shared(bytes) => bytes.$name($($arg, )?),
Self::Unique(bytes) => bytes.$name($($arg, )?),
}
}
};
($vis:vis fn $name:ident(&mut self $(, $arg:ident : $ty:ty)*) $(-> $out:ty)?) => {
#[inline]
$vis fn $name(&mut self $(, $arg:$ty)*) $(-> $out)? {
make_mut!(self).$name($($arg, )*)
}
}
}

impl Payload {
#[cfg(test)]
dispatch!(pub(crate) fn push(&mut self, byte: u8));

#[inline]
pub(crate) fn resize(&mut self, size: usize, byte: u8) {
if self.len() != size {
make_mut!(self).resize(size, byte);
}
}

#[inline]
pub(super) fn make_mut(this: &mut Self) -> &mut Vec<u8> {
&mut this.0
pub(crate) fn truncate(&mut self, size: usize) {
if self.len() > size {
make_mut!(self).truncate(size);
}
}

#[inline]
pub fn unwrap_or_clone(this: Self) -> Vec<u8> {
this.0
match this {
Self::Shared(bytes) => Arc::unwrap_or_clone(bytes),
Self::Unique(bytes) => bytes,
}
}
}

pub(crate) mod serde_bytes_payload {
use {
super::*,
super::Payload,
serde::{Deserialize, Deserializer, Serializer},
serde_bytes::ByteBuf,
};
Expand All @@ -51,7 +90,14 @@ pub(crate) mod serde_bytes_payload {
impl From<Vec<u8>> for Payload {
#[inline]
fn from(bytes: Vec<u8>) -> Self {
Self(bytes)
Self::Unique(bytes)
}
}

impl From<Arc<Vec<u8>>> for Payload {
#[inline]
fn from(bytes: Arc<Vec<u8>>) -> Self {
Self::Shared(bytes)
}
}

Expand All @@ -65,7 +111,5 @@ impl Deref for Payload {
}

impl DerefMut for Payload {
fn deref_mut(&mut self) -> &mut Self::Target {
Payload::make_mut(self)
}
dispatch!(fn deref_mut(&mut self) -> &mut Self::Target);
}

0 comments on commit bdece2e

Please sign in to comment.