Skip to content

Commit

Permalink
refactor: process two ranges of missing chunks in stateless job (#10341)
Browse files Browse the repository at this point in the history
I've tried to make a switch from stateful to stateless jobs, but
unfortunately it turned out that the `get_stateless_validation_job`
skeleton is not enough.

TLDR - I need to process both ranges of missing chunks inside it -
between `prev_prev_chunk` and `prev_chunk` and between `prev_chunk` and
`chunk`.
The first one needed for compatibility with current protocol, where we
process missing chunks immediately on receiving block, thus missing
chunks are effectively processed before executed chunk.
The second one is needed to generate post state root for latest
processed block, to make it pre state root for newly processed chunk.

However, it will be useful for switching to the "after-range" later,
especially implementing a protocol version switch.

Nayduck https://nayduck.near.org/#/run/3327

---------

Co-authored-by: Longarithm <[email protected]>
  • Loading branch information
Longarithm and Looogarithm authored Dec 18, 2023
1 parent 5c79685 commit 6c4edb2
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 119 deletions.
264 changes: 165 additions & 99 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use crate::types::{
ChainConfig, RuntimeAdapter, RuntimeStorageConfig, StorageDataSource,
};
use crate::update_shard::{
process_shard_update, NewChunkData, NewChunkResult, OldChunkData, OldChunkResult,
ShardBlockUpdateResult, ShardContext, ShardUpdateReason, ShardUpdateResult, StateSplitData,
StateSplitResult, StorageContext,
apply_new_chunk, process_missing_chunks_range, process_shard_update, NewChunkData,
NewChunkResult, OldChunkData, OldChunkResult, ShardBlockUpdateResult, ShardContext,
ShardUpdateReason, ShardUpdateResult, StateSplitData, StateSplitResult, StorageContext,
};
use crate::validate::{
validate_challenge, validate_chunk_proofs, validate_chunk_with_chunk_extra,
Expand Down Expand Up @@ -4179,6 +4179,75 @@ impl Chain {
)))
}

/// Gets range of block and shard contexts for blocks between two
/// consecutive chunks for given `shard_id`.
/// One chunk is represented by `chunk_prev_hash`.
/// Its previous chunk is represented by `prev_chunk_height_included`.
/// Needed to process shard updates between these chunks.
/// If there are no missing chunks between them, resulting vector will be
/// empty.
/// Optionally returns `ChunkExtra` corresponding to block with previous
/// chunk.
fn get_blocks_range_with_missing_chunks(
&self,
me: &Option<AccountId>,
chunk_prev_hash: CryptoHash,
prev_chunk_height_included: BlockHeight,
shard_id: ShardId,
mode: ApplyChunksMode,
return_chunk_extra: bool,
) -> Result<(Vec<(ApplyTransactionsBlockContext, ShardContext)>, Option<ChunkExtra>), Error>
{
let blocks_to_execute =
self.get_blocks_until_height(chunk_prev_hash, prev_chunk_height_included, false)?;
let mut execution_contexts: Vec<(ApplyTransactionsBlockContext, ShardContext)> = vec![];
for block_hash in blocks_to_execute {
let block_header = self.get_block_header(&block_hash)?;
let prev_block_header = self.get_previous_header(&block_header)?;
let shard_context = self.get_shard_context(me, &block_header, shard_id, mode)?;
if shard_context.need_to_split_states {
return Err(Error::Other(String::from(
"State split occurred in blocks range, it is not supported yet",
)));
}
execution_contexts.push((
self.get_apply_transactions_block_context(
&block_header,
&prev_block_header,
false,
)?,
shard_context,
));
}
execution_contexts.reverse();

let chunk_extra = if return_chunk_extra {
let chunk_extra_block_hash = match execution_contexts.first() {
// If blocks range is non-trivial, block hash for chunk extra will
// be the **previous hash** for the first block in range of missing
// chunk.
Some((block_context, _)) => &block_context.prev_block_hash,
// Otherwise it is previous block for the chunk.
None => &chunk_prev_hash,
};
let shard_uid = self.epoch_manager.shard_id_to_uid(
shard_id,
&self.epoch_manager.get_epoch_id(chunk_extra_block_hash)?,
)?;
match self.get_chunk_extra(chunk_extra_block_hash, &shard_uid) {
Ok(chunk_extra) => Some(ChunkExtra::clone(chunk_extra.as_ref())),
Err(e) => {
debug!(target: "client", ?chunk_extra_block_hash, ?shard_uid, "Chunk extra is missing: {e}");
None
}
}
} else {
None
};

Ok((execution_contexts, chunk_extra))
}

/// Returns closure which should validate a chunk without state, if chunk is present.
/// TODO(logunov):
/// 1. Currently result of this job is not applied and used only to validate with
Expand Down Expand Up @@ -4275,127 +4344,124 @@ impl Chain {
)?;
let receipts = collect_receipts_from_response(receipts_response);

// Get sequence of blocks for which we will process shard update.
let blocks_to_execute = self.get_blocks_until_height(
prev_chunk_block_hash,
prev_prev_chunk_height_included,
false,
)?;
let mut execution_contexts: Vec<(ApplyTransactionsBlockContext, ShardContext)> = vec![];
for block_hash in blocks_to_execute {
let block_header = self.get_block_header(&block_hash)?;
// Get execution contexts for blocks with missing chunks between
// previous-previous chunk and previous chunk to execute.
// Also get starting chunk extra.
let (execution_contexts_before, maybe_chunk_extra) = self
.get_blocks_range_with_missing_chunks(
me,
prev_chunk_prev_hash,
prev_prev_chunk_height_included,
shard_id,
mode,
true,
)?;
let mut current_chunk_extra = match maybe_chunk_extra {
Some(c) => c,
None => {
debug!(target: "client", "Warning: chunk extra is missing");
return Ok(None);
}
};

// Get execution context for previous chunk.
let (prev_chunk_block_context, prev_chunk_shard_context) = {
let block_header = self.get_block_header(&prev_chunk_block_hash)?;
let prev_block_header = self.get_previous_header(&block_header)?;
let shard_context = self.get_shard_context(me, &block_header, shard_id, mode)?;
if shard_context.need_to_split_states {
return Ok(None);
}
execution_contexts.push((
self.get_apply_transactions_block_context(
&block_header,
&prev_block_header,
block_hash == prev_chunk_block_hash,
)?,
(
self.get_apply_transactions_block_context(&block_header, &prev_block_header, true)?,
shard_context,
));
}
execution_contexts.reverse();

// Create stateless validation job.
// Its initial state corresponds to the block at which `prev_prev_chunk` was created.
// Then, we process updates for missing chunks, until we find a block at which
// `prev_chunk` was created.
// And finally we process update for the `prev_chunk`.
let mut current_chunk_extra = match self.get_chunk_extra(
&execution_contexts[0].0.prev_block_hash,
&execution_contexts[0].1.shard_uid,
) {
Ok(c) => ChunkExtra::clone(c.as_ref()),
Err(e) => {
let block_height = block.header().height();
let block_hash = block.hash();
let requested_block_hash = execution_contexts[0].0.prev_block_hash;
let requested_shard_uid = execution_contexts[0].1.shard_uid;
debug!(target: "client", block_height, ?block_hash, ?requested_block_hash, ?requested_shard_uid, "Chunk extra is missing: {e}");
return Ok(None);
}
)
};
let (last_block_context, last_shard_context) = execution_contexts.pop().unwrap();
let prev_chunk = self.get_chunk_clone_from_header(&prev_chunk_header.clone())?;

// Get execution contexts for blocks with missing chunks between
// previous chunk and the current chunk.
let (execution_contexts_after, _) = self.get_blocks_range_with_missing_chunks(
me,
*prev_block.hash(),
prev_chunk_height_included,
shard_id,
mode,
false,
)?;

// Create stateless validation job.
// Its initial state corresponds to the block at which `prev_prev_chunk`
// was created, and then it:
// 1. processes updates for missing chunks until a block at which
// `prev_chunk` was created;
// 2. processes update for the `prev_chunk`;
// 3. processes updates for missing chunks until the last chunk
// is reached.
Ok(Some((
shard_id,
Box::new(move |parent_span| -> Result<ShardUpdateResult, Error> {
let mut result = vec![];
for (block_context, shard_context) in execution_contexts {
let block_result = process_shard_update(
parent_span,
runtime.as_ref(),
epoch_manager.as_ref(),
ShardUpdateReason::OldChunk(OldChunkData {
block: block_context.clone(),
split_state_roots: None,
prev_chunk_extra: current_chunk_extra.clone(),
storage_context: StorageContext {
storage_data_source: StorageDataSource::DbTrieOnly,
state_patch: Default::default(),
},
}),
shard_context,
)?;
if let ShardBlockUpdateResult::OldChunk(OldChunkResult {
shard_uid,
apply_result,
apply_split_result_or_state_changes: _,
}) = block_result
{
*current_chunk_extra.state_root_mut() = apply_result.new_root;
result.push((
block_context.block_hash,
shard_uid,
current_chunk_extra.clone(),
));
}
}
// TODO(logunov): use `validate_chunk_with_chunk_extra`
assert_eq!(current_chunk_extra.state_root(), &prev_chunk.prev_state_root());
let block_result = process_shard_update(
// Process missing chunks before previous chunk.
let mut result = process_missing_chunks_range(
parent_span,
current_chunk_extra.clone(),
runtime.as_ref(),
epoch_manager.as_ref(),
ShardUpdateReason::NewChunk(NewChunkData {
execution_contexts_before,
)?;
current_chunk_extra = match result.last() {
Some((_, _, chunk_extra)) => chunk_extra.clone(),
None => current_chunk_extra,
};
// TODO(logunov): use `validate_chunk_with_chunk_extra`
assert_eq!(current_chunk_extra.state_root(), &prev_chunk.prev_state_root());
// Process previous chunk.
let NewChunkResult {
gas_limit,
shard_uid,
apply_result,
apply_split_result_or_state_changes: _,
} = apply_new_chunk(
parent_span,
NewChunkData {
chunk: prev_chunk,
receipts,
split_state_roots: None,
block: last_block_context.clone(),
block: prev_chunk_block_context.clone(),
is_first_block_with_chunk_of_version: false,
storage_context: StorageContext {
storage_data_source: StorageDataSource::DbTrieOnly,
state_patch: Default::default(),
},
}),
last_shard_context,
},
prev_chunk_shard_context,
runtime.as_ref(),
epoch_manager.as_ref(),
)?;
if let ShardBlockUpdateResult::NewChunk(NewChunkResult {
let (outcome_root, _) =
ApplyTransactionResult::compute_outcomes_proof(&apply_result.outcomes);
current_chunk_extra = ChunkExtra::new(
&apply_result.new_root,
outcome_root,
apply_result.validator_proposals,
apply_result.total_gas_burnt,
gas_limit,
apply_result.total_balance_burnt,
);
result.push((
prev_chunk_block_context.block_hash,
shard_uid,
apply_result,
apply_split_result_or_state_changes: _,
}) = block_result
{
let (outcome_root, _) =
ApplyTransactionResult::compute_outcomes_proof(&apply_result.outcomes);
result.push((
last_block_context.block_hash,
shard_uid,
ChunkExtra::new(
&apply_result.new_root,
outcome_root,
apply_result.validator_proposals,
apply_result.total_gas_burnt,
gas_limit,
apply_result.total_balance_burnt,
),
));
}
current_chunk_extra.clone(),
));
// Process missing chunks after previous chunk.
let result_after = process_missing_chunks_range(
parent_span,
current_chunk_extra,
runtime.as_ref(),
epoch_manager.as_ref(),
execution_contexts_after,
)?;
result.extend(result_after.into_iter());
Ok(ShardUpdateResult::Stateless(result))
}),
)))
Expand Down
Loading

0 comments on commit 6c4edb2

Please sign in to comment.