Skip to content

Commit

Permalink
[resharding] Handle resharding of delayed receipts (#12513)
Browse files Browse the repository at this point in the history
This PR is the core of delayed receipt handling for resharding V3.

## Writeup from NEP

The delayed receipts queue contains all incoming receipts that could not
be executed as part of a block due to resource constraints like compute
cost, gas limits etc. The entries in the delayed receipt queue can
belong to any of the accounts as part of the shard. During a resharding
event, we ideally need to split the delayed receipts across both the
child shards according to the associated account_id with the receipt.

The singleton trie key DelayedReceiptIndices holds the start_index and
end_index associated with the delayed receipt entries for the shard. The
trie key DelayedReceipt { index } contains the actual delayed receipt
associated with some account_id. These are processed in a fifo queue
order during chunk execution.

Note that the delayed receipt trie keys do not have the account_id
prefix. In ReshardingV2, we followed the trivial solution of iterating
through all the delayed receipt queue entries and assigning them to the
appropriate child shard, however due to constraints on the state witness
size limits and instant resharding, this approach is no longer feasible
for ReshardingV3.

For ReshardingV3, we decided to handle the resharding by duplicating the
entries of the delayed receipt queue across both the child shards. This
is great from the perspective of state witness size and instant
resharding as we only need to access the delayed receipt queue root
entry in the trie, however it breaks the assumption that all delayed
receipts in a shard belong to the accounts within that shard.

To resolve this, with the new protocol version, we changed the
implementation of runtime to discard executing delayed receipts that
don't belong to the account_id on that shard.

Note that no delayed receipts are lost during resharding as all receipts
get executed exactly once based on which of the child shards does the
associated account_id belong to.

Source NEP [receipt handling
section](near/NEPs#578).

## Changes in this PR

- We change the implementation of `DelayedReceiptQueueWrapper` to add a
new filter function that filters out all delayed receipts that do not
belong to the current shard.
- `peek_iter` functions is updated appropriately to filter out delayed
receipts as well to ensure compatibility with pipelining.
- Main loop in `process_delayed_receipts` as updated from "while
processing_state.delayed_receipts.len() > 0" to something like "loop
while Some(delayed_receipt) in queue" as we no longer have a precise
count of the number of delayed receipts to process.

## Next steps

- Change memtrie split function to include delayed receipts in child
shards
- Testloop test.
  • Loading branch information
shreyan-gupta authored Nov 28, 2024
1 parent 137ab8c commit 6eeabc3
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 90 deletions.
14 changes: 11 additions & 3 deletions core/primitives/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -993,11 +993,19 @@ impl Block {
#[derive(Default)]
pub struct MockEpochInfoProvider {
pub validators: HashMap<AccountId, Balance>,
pub account_id_to_shard_id_map: HashMap<AccountId, ShardId>,
}

impl MockEpochInfoProvider {
pub fn new(validators: impl Iterator<Item = (AccountId, Balance)>) -> Self {
MockEpochInfoProvider { validators: validators.collect() }
MockEpochInfoProvider {
validators: validators.collect(),
account_id_to_shard_id_map: HashMap::new(),
}
}

pub fn set_shard_id_for_account_id(&mut self, account_id: &AccountId, shard_id: ShardId) {
self.account_id_to_shard_id_map.insert(account_id.clone(), shard_id);
}
}

Expand Down Expand Up @@ -1029,10 +1037,10 @@ impl EpochInfoProvider for MockEpochInfoProvider {

fn account_id_to_shard_id(
&self,
_account_id: &AccountId,
account_id: &AccountId,
_epoch_id: &EpochId,
) -> Result<ShardId, EpochError> {
Ok(ShardId::new(0))
Ok(self.account_id_to_shard_id_map.get(account_id).cloned().unwrap_or(ShardId::new(0)))
}
}

Expand Down
19 changes: 14 additions & 5 deletions core/store/src/trie/receipts_column_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use near_primitives::trie_key::TrieKey;
use near_primitives::types::ShardId;

/// Read-only iterator over items stored in a TrieQueue.
pub struct TrieQueueIterator<'a, Queue: TrieQueue> {
struct TrieQueueIterator<'a, Queue>
where
Queue: TrieQueue,
{
indices: std::ops::Range<u64>,
trie_queue: &'a Queue,
trie: &'a dyn TrieAccess,
Expand Down Expand Up @@ -217,7 +220,7 @@ pub trait TrieQueue {
&'a self,
trie: &'a dyn TrieAccess,
side_effects: bool,
) -> TrieQueueIterator<'a, Self>
) -> impl DoubleEndedIterator<Item = Result<Self::Item<'static>, StorageError>> + 'a
where
Self: Sized,
{
Expand Down Expand Up @@ -330,8 +333,11 @@ impl TrieQueue for OutgoingReceiptBuffer<'_> {
}
}

impl<'a, Queue: TrieQueue> Iterator for TrieQueueIterator<'a, Queue> {
type Item = Result<Queue::Item<'static>, StorageError>;
impl<Q> Iterator for TrieQueueIterator<'_, Q>
where
Q: TrieQueue,
{
type Item = Result<Q::Item<'static>, StorageError>;

fn next(&mut self) -> Option<Self::Item> {
let index = self.indices.next()?;
Expand All @@ -349,7 +355,10 @@ impl<'a, Queue: TrieQueue> Iterator for TrieQueueIterator<'a, Queue> {
}
}

impl<'a, Queue: TrieQueue> DoubleEndedIterator for TrieQueueIterator<'a, Queue> {
impl<Q> DoubleEndedIterator for TrieQueueIterator<'_, Q>
where
Q: TrieQueue,
{
fn next_back(&mut self) -> Option<Self::Item> {
let index = self.indices.next_back()?;
let key = self.trie_queue.trie_key(index);
Expand Down
2 changes: 1 addition & 1 deletion runtime/runtime/src/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ pub(crate) fn action_function_call(
code_hash: CryptoHash,
config: &RuntimeConfig,
is_last_action: bool,
epoch_info_provider: &(dyn EpochInfoProvider),
epoch_info_provider: &dyn EpochInfoProvider,
contract: Box<dyn PreparedContract>,
) -> Result<(), RuntimeError> {
if account.amount().checked_add(function_call.deposit).is_none() {
Expand Down
113 changes: 80 additions & 33 deletions runtime/runtime/src/congestion_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ use near_primitives::receipt::{
Receipt, ReceiptEnum, ReceiptOrStateStoredReceipt, StateStoredReceipt,
StateStoredReceiptMetadata,
};
use near_primitives::types::{EpochInfoProvider, Gas, ShardId};
use near_primitives::types::{EpochId, EpochInfoProvider, Gas, ShardId};
use near_primitives::version::ProtocolFeature;
use near_store::trie::outgoing_metadata::{OutgoingMetadatas, ReceiptGroupsConfig};
use near_store::trie::receipts_column_helper::{
DelayedReceiptQueue, ShardsOutgoingReceiptBuffer, TrieQueue, TrieQueueIterator,
DelayedReceiptQueue, ShardsOutgoingReceiptBuffer, TrieQueue,
};
use near_store::{StorageError, TrieAccess, TrieUpdate};
use near_vm_runner::logic::ProtocolVersion;
Expand Down Expand Up @@ -72,25 +72,6 @@ enum ReceiptForwarding {
NotForwarded(Receipt),
}

/// A wrapper around `DelayedReceiptQueue` to accumulate changes in gas and
/// bytes.
///
/// This struct exists for two reasons. One, to encapsulate the accounting of
/// gas and bytes in functions that can be called in all necessary places. Two,
/// to accumulate changes and only apply them to `CongestionInfo` in the end,
/// which avoids problems with multiple mutable borrows with the closure
/// approach we currently have in receipt processing code.
///
/// We use positive added and removed values to avoid integer conversions with
/// the associated additional overflow conditions.
pub(crate) struct DelayedReceiptQueueWrapper {
queue: DelayedReceiptQueue,
new_delayed_gas: Gas,
new_delayed_bytes: u64,
removed_delayed_gas: Gas,
removed_delayed_bytes: u64,
}

impl ReceiptSink {
pub(crate) fn new(
protocol_version: ProtocolVersion,
Expand Down Expand Up @@ -464,7 +445,7 @@ impl ReceiptSinkV2 {
// version where metadata was not enabled. Metadata doesn't contain information about them.
// We can't make a proper request in this case, so we make a basic request while we wait for
// metadata to become fully initialized. The basic request requests just `max_receipt_size`. This is enough to
// ensure liveness, as all receipts are smaller than `max_receipt_size`. The resulting behaviour is similar
// ensure liveness, as all receipts are smaller than `max_receipt_size`. The resulting behavior is similar
// to the previous approach where the `allowed_shard` was assigned most of the bandwidth.
// Over time these old receipts will be removed from the outgoing buffer and eventually metadata will contain
// information about every receipt in the buffer. From that point on we will be able to make
Expand Down Expand Up @@ -610,10 +591,46 @@ pub fn bootstrap_congestion_info(
}))
}

impl DelayedReceiptQueueWrapper {
pub fn new(queue: DelayedReceiptQueue) -> Self {
/// A wrapper around `DelayedReceiptQueue` to accumulate changes in gas and
/// bytes.
///
/// This struct exists for two reasons. One, to encapsulate the accounting of
/// gas and bytes in functions that can be called in all necessary places. Two,
/// to accumulate changes and only apply them to `CongestionInfo` in the end,
/// which avoids problems with multiple mutable borrows with the closure
/// approach we currently have in receipt processing code.
///
/// We use positive added and removed values to avoid integer conversions with
/// the associated additional overflow conditions.
pub(crate) struct DelayedReceiptQueueWrapper<'a> {
// The delayed receipt queue.
queue: DelayedReceiptQueue,

// Epoch_info_provider, shard_id, and epoch_id are used to determine
// if a receipt belongs to the current shard or not after a resharding event.
epoch_info_provider: &'a dyn EpochInfoProvider,
shard_id: ShardId,
epoch_id: EpochId,

// Accumulated changes in gas and bytes for congestion info calculations.
new_delayed_gas: Gas,
new_delayed_bytes: u64,
removed_delayed_gas: Gas,
removed_delayed_bytes: u64,
}

impl<'a> DelayedReceiptQueueWrapper<'a> {
pub fn new(
queue: DelayedReceiptQueue,
epoch_info_provider: &'a dyn EpochInfoProvider,
shard_id: ShardId,
epoch_id: EpochId,
) -> Self {
Self {
queue,
epoch_info_provider,
shard_id,
epoch_id,
new_delayed_gas: 0,
new_delayed_bytes: 0,
removed_delayed_gas: 0,
Expand Down Expand Up @@ -654,29 +671,59 @@ impl DelayedReceiptQueueWrapper {
Ok(())
}

// With ReshardingV3, it's possible for a chunk to have delayed receipts that technically
// belong to the sibling shard before a resharding event.
// Here, we filter all the receipts that don't belong to the current shard_id.
//
// The function follows the guidelines of standard iterator filter function
// We return true if we should retain the receipt and false if we should filter it.
fn receipt_filter_fn(&self, receipt: &ReceiptOrStateStoredReceipt) -> bool {
let receiver_id = receipt.get_receipt().receiver_id();
let receipt_shard_id = self
.epoch_info_provider
.account_id_to_shard_id(receiver_id, &self.epoch_id)
.expect("account_id_to_shard_id should never fail");
receipt_shard_id == self.shard_id
}

pub(crate) fn pop(
&mut self,
trie_update: &mut TrieUpdate,
config: &RuntimeConfig,
) -> Result<Option<ReceiptOrStateStoredReceipt>, RuntimeError> {
let receipt = self.queue.pop_front(trie_update)?;
if let Some(receipt) = &receipt {
let delayed_gas = receipt_congestion_gas(receipt, &config)?;
let delayed_bytes = receipt_size(receipt)? as u64;
// While processing receipts, we need to keep track of the gas and bytes
// even for receipts that may be filtered out due to a resharding event
while let Some(receipt) = self.queue.pop_front(trie_update)? {
let delayed_gas = receipt_congestion_gas(&receipt, &config)?;
let delayed_bytes = receipt_size(&receipt)? as u64;
self.removed_delayed_gas = safe_add_gas(self.removed_delayed_gas, delayed_gas)?;
self.removed_delayed_bytes = safe_add_gas(self.removed_delayed_bytes, delayed_bytes)?;

// TODO(resharding): The filter function check here is bypassing the limit check for state witness.
// Track gas and bytes for receipt above and return only receipt that belong to the shard.
if self.receipt_filter_fn(&receipt) {
return Ok(Some(receipt));
}
}
Ok(receipt)
Ok(None)
}

pub(crate) fn peek_iter<'a>(
pub(crate) fn peek_iter(
&'a self,
trie_update: &'a TrieUpdate,
) -> TrieQueueIterator<'a, DelayedReceiptQueue> {
self.queue.iter(trie_update, false)
) -> impl Iterator<Item = ReceiptOrStateStoredReceipt<'static>> + 'a {
self.queue
.iter(trie_update, false)
.map_while(Result::ok)
.filter(|receipt| self.receipt_filter_fn(receipt))
}

pub(crate) fn len(&self) -> u64 {
/// This function returns the maximum length of the delayed receipt queue.
/// The only time the real number of delayed receipts differ from the returned value is right
/// after a resharding event. During resharding, we duplicate the delayed receipt queue across
/// both child shards, which means it's possible that the child shards contain delayed receipts
/// that don't belong to them.
pub(crate) fn upper_bound_len(&self) -> u64 {
self.queue.len()
}

Expand Down
4 changes: 2 additions & 2 deletions runtime/runtime/src/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct RuntimeExt<'a> {
epoch_id: EpochId,
prev_block_hash: CryptoHash,
last_block_hash: CryptoHash,
epoch_info_provider: &'a (dyn EpochInfoProvider),
epoch_info_provider: &'a dyn EpochInfoProvider,
current_protocol_version: ProtocolVersion,
}

Expand Down Expand Up @@ -70,7 +70,7 @@ impl<'a> RuntimeExt<'a> {
epoch_id: EpochId,
prev_block_hash: CryptoHash,
last_block_hash: CryptoHash,
epoch_info_provider: &'a (dyn EpochInfoProvider),
epoch_info_provider: &'a dyn EpochInfoProvider,
current_protocol_version: ProtocolVersion,
) -> Self {
RuntimeExt {
Expand Down
Loading

0 comments on commit 6eeabc3

Please sign in to comment.