Skip to content

Commit

Permalink
refactor(consensus): simplify IngressPayload implementation (#3444)
Browse files Browse the repository at this point in the history
Currently the payload is one big bytes buffer which contains all ingress
messages in a packed representation. This makes it a big hard to work
with.
In this PR we simplify the structure by replacing the buffer with a map
from `IngressMessageId`s to a _serialized_ ingress messages.
The following two important properties are preserved:
1. The individual ingress messages deserialization is delayed until it's
actually needed
2. We preserve the original byte representation of the ingress messages

I've ran multiple benchmarks and didn't notice a _big_ change:
1. `consensus-performance` system test showed the same throughput /
block rates with both implementations
2. the [serialization/deserialization
](https://github.com/dfinity/ic/blob/master/rs/consensus/benches/validate_payload.rs#L374-L404)
of ingress payload gets about 2x slower, but it's still in sub 1ms
territory

---------

Co-authored-by: Leon Tan <[email protected]>
  • Loading branch information
kpop-dfinity and Sawchord authored Jan 15, 2025
1 parent 2828131 commit f491f84
Show file tree
Hide file tree
Showing 13 changed files with 174 additions and 180 deletions.
4 changes: 3 additions & 1 deletion rs/consensus/src/consensus/malicious_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ fn maliciously_propose_blocks(
.get_block_maker_rank(height, &beacon, my_node_id)
{
Ok(Some(rank)) => Some(rank),
Ok(None) => Some(Rank(0)),

This comment has been minimized.

Copy link
@timk11

timk11 Jan 20, 2025

Comment in line 83 may need tweaking.

// TODO: introduce a malicious flag which will instruct a malicious node to propose a block
// when it's not elected a block maker; implement a system test which uses the flag.
Ok(None) => None,
Err(_) => None,
};

Expand Down
2 changes: 1 addition & 1 deletion rs/consensus/src/consensus/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl BatchStats {
self.ingress_message_bytes_delivered += payload.ingress.count_bytes();
self.xnet_bytes_delivered += payload.xnet.size_bytes();
self.ingress_ids
.extend_from_slice(&payload.ingress.message_ids());
.extend(payload.ingress.message_ids().cloned());
}
}

Expand Down
4 changes: 2 additions & 2 deletions rs/consensus/src/consensus/purger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,10 +905,10 @@ mod tests {
non_finalized_notarization_2
)),
ChangeAction::RemoveFromValidated(ConsensusMessage::BlockProposal(
non_finalized_block_proposal_2_1
non_finalized_block_proposal_2_0
)),
ChangeAction::RemoveFromValidated(ConsensusMessage::BlockProposal(
non_finalized_block_proposal_2_0
non_finalized_block_proposal_2_1
)),
]
);
Expand Down
36 changes: 26 additions & 10 deletions rs/ingress_manager/src/ingress_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,29 @@ impl IngressSelector for IngressManager {

// Tracks the sum of cycles needed per canister.
let mut cycles_needed: BTreeMap<CanisterId, Cycles> = BTreeMap::new();
for i in 0..payload.message_count() {
let (ingress_id, ingress) = payload
.get(i)
.map_err(InvalidIngressPayloadReason::IngressPayloadError)?;

// Validate each ingress message in the payload
for (ingress_id, maybe_ingress) in payload.iter() {
let ingress = match maybe_ingress {
Ok(ingress) => ingress,
Err(deserialization_error) => {
return Err(ValidationError::InvalidArtifact(
InvalidIngressPayloadReason::IngressMessageDeserializationFailure(
ingress_id.clone(),
deserialization_error.to_string(),
),
));
}
};

if IngressMessageId::from(&ingress) != *ingress_id {
return Err(ValidationError::InvalidArtifact(
InvalidIngressPayloadReason::MismatchedMessageId {
expected: ingress_id.clone(),
computed: IngressMessageId::from(&ingress),
},
));
}

self.validate_ingress(
ingress_id.clone(),
Expand Down Expand Up @@ -373,7 +392,7 @@ impl IngressSelector for IngressManager {
let ingress = ingress_payload_cache
.entry((*height, payload_hash.clone()))
.or_insert_with(|| {
Arc::new(batch.ingress.message_ids().into_iter().collect())
Arc::new(batch.ingress.message_ids().cloned().collect())
});
Some(ingress.clone())
}
Expand Down Expand Up @@ -1046,11 +1065,8 @@ mod tests {
assert_eq!(first_ingress_payload.message_count(), 1);

// we should not get it again because it is part of past payloads
let mut hash_set = HashSet::new();
for i in 0..first_ingress_payload.message_count() {
let (id, _) = first_ingress_payload.get(i).unwrap();
hash_set.insert(id);
}
let hash_set = HashSet::from_iter(first_ingress_payload.message_ids().cloned());

let second_ingress_payload = ingress_manager.get_ingress_payload(
&hash_set,
&validation_context,
Expand Down
10 changes: 8 additions & 2 deletions rs/interfaces/src/ingress_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
use ic_interfaces_state_manager::StateManagerError;
use ic_types::{
artifact::IngressMessageId,
batch::{IngressPayload, IngressPayloadError, ValidationContext},
batch::{IngressPayload, ValidationContext},
consensus::Payload,
ingress::IngressSets,
messages::MessageId,
Expand Down Expand Up @@ -52,8 +52,14 @@ impl IngressSetQuery for IngressSets {
/// Reasons for why an ingress payload might be invalid.
#[derive(Eq, PartialEq, Debug)]
pub enum InvalidIngressPayloadReason {
/// An [`IngressMessageId`] inside the payload doesn't match the referenced [`SignedIngress`].
MismatchedMessageId {
expected: IngressMessageId,
computed: IngressMessageId,
},
/// Failed to deserialize an ingress message.
IngressMessageDeserializationFailure(IngressMessageId, String),
IngressValidationError(MessageId, String),
IngressPayloadError(IngressPayloadError),
IngressExpired(MessageId, String),
IngressMessageTooBig(usize, usize),
IngressPayloadTooManyMessages(usize, usize),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ impl Pools {
};

match data_payload.batch.ingress.get_by_id(ingress_message_id) {
Some(ingress_message) => {
Ok(Some(ingress_message)) => {
self.metrics.ingress_messages_in_block.inc();
Ok(ingress_message)
}
None => {
_ => {
self.metrics.ingress_messages_not_found.inc();
Err(PoolsAccessError::IngressMessageNotFound)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl Strippable for &IngressPayload {

fn strip(self) -> Self::Output {
Self::Output {
ingress_messages: self.message_ids(),
ingress_messages: self.message_ids().cloned().collect(),
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions rs/protobuf/def/types/v1/consensus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,15 @@ message IngressIdOffset {
uint64 offset = 3;
}

message IngressMessage {
bytes message_id = 1;
uint64 expiry = 2;
bytes signed_request_bytes = 3;
}

message IngressPayload {
repeated IngressIdOffset id_and_pos = 1;
bytes buffer = 2;
reserved 1, 2;
repeated IngressMessage ingress_messages = 3;
}

// Stripped consensus artifacts messages below
Expand Down
15 changes: 11 additions & 4 deletions rs/protobuf/src/gen/types/types.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1528,11 +1528,18 @@ pub struct IngressIdOffset {
pub offset: u64,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct IngressMessage {
#[prost(bytes = "vec", tag = "1")]
pub message_id: ::prost::alloc::vec::Vec<u8>,
#[prost(uint64, tag = "2")]
pub expiry: u64,
#[prost(bytes = "vec", tag = "3")]
pub signed_request_bytes: ::prost::alloc::vec::Vec<u8>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct IngressPayload {
#[prost(message, repeated, tag = "1")]
pub id_and_pos: ::prost::alloc::vec::Vec<IngressIdOffset>,
#[prost(bytes = "vec", tag = "2")]
pub buffer: ::prost::alloc::vec::Vec<u8>,
#[prost(message, repeated, tag = "3")]
pub ingress_messages: ::prost::alloc::vec::Vec<IngressMessage>,
}
/// Stripped consensus artifacts messages below
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
4 changes: 1 addition & 3 deletions rs/state_machine_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1370,9 +1370,7 @@ impl StateMachine {
// used by the function `Self::execute_payload` of the `StateMachine`.
let xnet_payload = batch_payload.xnet.clone();
let ingress = &batch_payload.ingress;
let ingress_messages = (0..ingress.message_count())
.map(|i| ingress.get(i).unwrap().1)
.collect();
let ingress_messages = ingress.clone().try_into().unwrap();
let (http_responses, _) =
CanisterHttpPayloadBuilderImpl::into_messages(&batch_payload.canister_http);
let inducted: Vec<_> = http_responses
Expand Down
Loading

0 comments on commit f491f84

Please sign in to comment.