From 6c4edb2ead0353caa1810c39d06f4877162ab2c7 Mon Sep 17 00:00:00 2001 From: Aleksandr Logunov Date: Mon, 18 Dec 2023 17:38:58 -0500 Subject: [PATCH] refactor: process two ranges of missing chunks in stateless job (#10341) I've tried to make a switch from stateful to stateless jobs, but unfortunately it turned out that the `get_stateless_validation_job` skeleton is not enough. TLDR - I need to process both ranges of missing chunks inside it - between `prev_prev_chunk` and `prev_chunk` and between `prev_chunk` and `chunk`. The first one needed for compatibility with current protocol, where we process missing chunks immediately on receiving block, thus missing chunks are effectively processed before executed chunk. The second one is needed to generate post state root for latest processed block, to make it pre state root for newly processed chunk. However, it will be useful for switching to the "after-range" later, especially implementing a protocol version switch. Nayduck https://nayduck.near.org/#/run/3327 --------- Co-authored-by: Longarithm --- chain/chain/src/chain.rs | 264 ++++++++++++++++++++------------ chain/chain/src/update_shard.rs | 83 +++++++--- 2 files changed, 228 insertions(+), 119 deletions(-) diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index d2d2a915d1c..c09555852d5 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -17,9 +17,9 @@ use crate::types::{ ChainConfig, RuntimeAdapter, RuntimeStorageConfig, StorageDataSource, }; use crate::update_shard::{ - process_shard_update, NewChunkData, NewChunkResult, OldChunkData, OldChunkResult, - ShardBlockUpdateResult, ShardContext, ShardUpdateReason, ShardUpdateResult, StateSplitData, - StateSplitResult, StorageContext, + apply_new_chunk, process_missing_chunks_range, process_shard_update, NewChunkData, + NewChunkResult, OldChunkData, OldChunkResult, ShardBlockUpdateResult, ShardContext, + ShardUpdateReason, ShardUpdateResult, StateSplitData, StateSplitResult, StorageContext, }; use crate::validate::{ validate_challenge, validate_chunk_proofs, validate_chunk_with_chunk_extra, @@ -4179,6 +4179,75 @@ impl Chain { ))) } + /// Gets range of block and shard contexts for blocks between two + /// consecutive chunks for given `shard_id`. + /// One chunk is represented by `chunk_prev_hash`. + /// Its previous chunk is represented by `prev_chunk_height_included`. + /// Needed to process shard updates between these chunks. + /// If there are no missing chunks between them, resulting vector will be + /// empty. + /// Optionally returns `ChunkExtra` corresponding to block with previous + /// chunk. + fn get_blocks_range_with_missing_chunks( + &self, + me: &Option, + chunk_prev_hash: CryptoHash, + prev_chunk_height_included: BlockHeight, + shard_id: ShardId, + mode: ApplyChunksMode, + return_chunk_extra: bool, + ) -> Result<(Vec<(ApplyTransactionsBlockContext, ShardContext)>, Option), Error> + { + let blocks_to_execute = + self.get_blocks_until_height(chunk_prev_hash, prev_chunk_height_included, false)?; + let mut execution_contexts: Vec<(ApplyTransactionsBlockContext, ShardContext)> = vec![]; + for block_hash in blocks_to_execute { + let block_header = self.get_block_header(&block_hash)?; + let prev_block_header = self.get_previous_header(&block_header)?; + let shard_context = self.get_shard_context(me, &block_header, shard_id, mode)?; + if shard_context.need_to_split_states { + return Err(Error::Other(String::from( + "State split occurred in blocks range, it is not supported yet", + ))); + } + execution_contexts.push(( + self.get_apply_transactions_block_context( + &block_header, + &prev_block_header, + false, + )?, + shard_context, + )); + } + execution_contexts.reverse(); + + let chunk_extra = if return_chunk_extra { + let chunk_extra_block_hash = match execution_contexts.first() { + // If blocks range is non-trivial, block hash for chunk extra will + // be the **previous hash** for the first block in range of missing + // chunk. + Some((block_context, _)) => &block_context.prev_block_hash, + // Otherwise it is previous block for the chunk. + None => &chunk_prev_hash, + }; + let shard_uid = self.epoch_manager.shard_id_to_uid( + shard_id, + &self.epoch_manager.get_epoch_id(chunk_extra_block_hash)?, + )?; + match self.get_chunk_extra(chunk_extra_block_hash, &shard_uid) { + Ok(chunk_extra) => Some(ChunkExtra::clone(chunk_extra.as_ref())), + Err(e) => { + debug!(target: "client", ?chunk_extra_block_hash, ?shard_uid, "Chunk extra is missing: {e}"); + None + } + } + } else { + None + }; + + Ok((execution_contexts, chunk_extra)) + } + /// Returns closure which should validate a chunk without state, if chunk is present. /// TODO(logunov): /// 1. Currently result of this job is not applied and used only to validate with @@ -4275,127 +4344,124 @@ impl Chain { )?; let receipts = collect_receipts_from_response(receipts_response); - // Get sequence of blocks for which we will process shard update. - let blocks_to_execute = self.get_blocks_until_height( - prev_chunk_block_hash, - prev_prev_chunk_height_included, - false, - )?; - let mut execution_contexts: Vec<(ApplyTransactionsBlockContext, ShardContext)> = vec![]; - for block_hash in blocks_to_execute { - let block_header = self.get_block_header(&block_hash)?; + // Get execution contexts for blocks with missing chunks between + // previous-previous chunk and previous chunk to execute. + // Also get starting chunk extra. + let (execution_contexts_before, maybe_chunk_extra) = self + .get_blocks_range_with_missing_chunks( + me, + prev_chunk_prev_hash, + prev_prev_chunk_height_included, + shard_id, + mode, + true, + )?; + let mut current_chunk_extra = match maybe_chunk_extra { + Some(c) => c, + None => { + debug!(target: "client", "Warning: chunk extra is missing"); + return Ok(None); + } + }; + + // Get execution context for previous chunk. + let (prev_chunk_block_context, prev_chunk_shard_context) = { + let block_header = self.get_block_header(&prev_chunk_block_hash)?; let prev_block_header = self.get_previous_header(&block_header)?; let shard_context = self.get_shard_context(me, &block_header, shard_id, mode)?; if shard_context.need_to_split_states { return Ok(None); } - execution_contexts.push(( - self.get_apply_transactions_block_context( - &block_header, - &prev_block_header, - block_hash == prev_chunk_block_hash, - )?, + ( + self.get_apply_transactions_block_context(&block_header, &prev_block_header, true)?, shard_context, - )); - } - execution_contexts.reverse(); - - // Create stateless validation job. - // Its initial state corresponds to the block at which `prev_prev_chunk` was created. - // Then, we process updates for missing chunks, until we find a block at which - // `prev_chunk` was created. - // And finally we process update for the `prev_chunk`. - let mut current_chunk_extra = match self.get_chunk_extra( - &execution_contexts[0].0.prev_block_hash, - &execution_contexts[0].1.shard_uid, - ) { - Ok(c) => ChunkExtra::clone(c.as_ref()), - Err(e) => { - let block_height = block.header().height(); - let block_hash = block.hash(); - let requested_block_hash = execution_contexts[0].0.prev_block_hash; - let requested_shard_uid = execution_contexts[0].1.shard_uid; - debug!(target: "client", block_height, ?block_hash, ?requested_block_hash, ?requested_shard_uid, "Chunk extra is missing: {e}"); - return Ok(None); - } + ) }; - let (last_block_context, last_shard_context) = execution_contexts.pop().unwrap(); let prev_chunk = self.get_chunk_clone_from_header(&prev_chunk_header.clone())?; + + // Get execution contexts for blocks with missing chunks between + // previous chunk and the current chunk. + let (execution_contexts_after, _) = self.get_blocks_range_with_missing_chunks( + me, + *prev_block.hash(), + prev_chunk_height_included, + shard_id, + mode, + false, + )?; + + // Create stateless validation job. + // Its initial state corresponds to the block at which `prev_prev_chunk` + // was created, and then it: + // 1. processes updates for missing chunks until a block at which + // `prev_chunk` was created; + // 2. processes update for the `prev_chunk`; + // 3. processes updates for missing chunks until the last chunk + // is reached. Ok(Some(( shard_id, Box::new(move |parent_span| -> Result { - let mut result = vec![]; - for (block_context, shard_context) in execution_contexts { - let block_result = process_shard_update( - parent_span, - runtime.as_ref(), - epoch_manager.as_ref(), - ShardUpdateReason::OldChunk(OldChunkData { - block: block_context.clone(), - split_state_roots: None, - prev_chunk_extra: current_chunk_extra.clone(), - storage_context: StorageContext { - storage_data_source: StorageDataSource::DbTrieOnly, - state_patch: Default::default(), - }, - }), - shard_context, - )?; - if let ShardBlockUpdateResult::OldChunk(OldChunkResult { - shard_uid, - apply_result, - apply_split_result_or_state_changes: _, - }) = block_result - { - *current_chunk_extra.state_root_mut() = apply_result.new_root; - result.push(( - block_context.block_hash, - shard_uid, - current_chunk_extra.clone(), - )); - } - } - // TODO(logunov): use `validate_chunk_with_chunk_extra` - assert_eq!(current_chunk_extra.state_root(), &prev_chunk.prev_state_root()); - let block_result = process_shard_update( + // Process missing chunks before previous chunk. + let mut result = process_missing_chunks_range( parent_span, + current_chunk_extra.clone(), runtime.as_ref(), epoch_manager.as_ref(), - ShardUpdateReason::NewChunk(NewChunkData { + execution_contexts_before, + )?; + current_chunk_extra = match result.last() { + Some((_, _, chunk_extra)) => chunk_extra.clone(), + None => current_chunk_extra, + }; + // TODO(logunov): use `validate_chunk_with_chunk_extra` + assert_eq!(current_chunk_extra.state_root(), &prev_chunk.prev_state_root()); + // Process previous chunk. + let NewChunkResult { + gas_limit, + shard_uid, + apply_result, + apply_split_result_or_state_changes: _, + } = apply_new_chunk( + parent_span, + NewChunkData { chunk: prev_chunk, receipts, split_state_roots: None, - block: last_block_context.clone(), + block: prev_chunk_block_context.clone(), is_first_block_with_chunk_of_version: false, storage_context: StorageContext { storage_data_source: StorageDataSource::DbTrieOnly, state_patch: Default::default(), }, - }), - last_shard_context, + }, + prev_chunk_shard_context, + runtime.as_ref(), + epoch_manager.as_ref(), )?; - if let ShardBlockUpdateResult::NewChunk(NewChunkResult { + let (outcome_root, _) = + ApplyTransactionResult::compute_outcomes_proof(&apply_result.outcomes); + current_chunk_extra = ChunkExtra::new( + &apply_result.new_root, + outcome_root, + apply_result.validator_proposals, + apply_result.total_gas_burnt, gas_limit, + apply_result.total_balance_burnt, + ); + result.push(( + prev_chunk_block_context.block_hash, shard_uid, - apply_result, - apply_split_result_or_state_changes: _, - }) = block_result - { - let (outcome_root, _) = - ApplyTransactionResult::compute_outcomes_proof(&apply_result.outcomes); - result.push(( - last_block_context.block_hash, - shard_uid, - ChunkExtra::new( - &apply_result.new_root, - outcome_root, - apply_result.validator_proposals, - apply_result.total_gas_burnt, - gas_limit, - apply_result.total_balance_burnt, - ), - )); - } + current_chunk_extra.clone(), + )); + // Process missing chunks after previous chunk. + let result_after = process_missing_chunks_range( + parent_span, + current_chunk_extra, + runtime.as_ref(), + epoch_manager.as_ref(), + execution_contexts_after, + )?; + result.extend(result_after.into_iter()); Ok(ShardUpdateResult::Stateless(result)) }), ))) diff --git a/chain/chain/src/update_shard.rs b/chain/chain/src/update_shard.rs index d0a93409c4b..f76e90debd2 100644 --- a/chain/chain/src/update_shard.rs +++ b/chain/chain/src/update_shard.rs @@ -134,28 +134,71 @@ pub(crate) fn process_shard_update( shard_update_reason: ShardUpdateReason, shard_context: ShardContext, ) -> Result { - match shard_update_reason { - ShardUpdateReason::NewChunk(data) => { - apply_new_chunk(parent_span, data, shard_context, runtime, epoch_manager) - } - ShardUpdateReason::OldChunk(data) => { - apply_old_chunk(parent_span, data, shard_context, runtime, epoch_manager) - } - ShardUpdateReason::StateSplit(data) => { - apply_state_split(parent_span, data, shard_context.shard_uid, runtime, epoch_manager) - } - } + Ok(match shard_update_reason { + ShardUpdateReason::NewChunk(data) => ShardBlockUpdateResult::NewChunk(apply_new_chunk( + parent_span, + data, + shard_context, + runtime, + epoch_manager, + )?), + ShardUpdateReason::OldChunk(data) => ShardBlockUpdateResult::OldChunk(apply_old_chunk( + parent_span, + data, + shard_context, + runtime, + epoch_manager, + )?), + ShardUpdateReason::StateSplit(data) => ShardBlockUpdateResult::StateSplit( + apply_state_split(parent_span, data, shard_context.shard_uid, runtime, epoch_manager)?, + ), + }) } +/// Processes shard updates for the execution contexts range which must +/// correspond to missing chunks for some shard. +/// `current_chunk_extra` must correspond to `ChunkExtra` just before +/// execution; in the end it will correspond to the latest execution +/// result. +pub(crate) fn process_missing_chunks_range( + parent_span: &tracing::Span, + mut current_chunk_extra: ChunkExtra, + runtime: &dyn RuntimeAdapter, + epoch_manager: &dyn EpochManagerAdapter, + execution_contexts: Vec<(ApplyTransactionsBlockContext, ShardContext)>, +) -> Result, Error> { + let mut result = vec![]; + for (block_context, shard_context) in execution_contexts { + let OldChunkResult { shard_uid, apply_result, apply_split_result_or_state_changes: _ } = + apply_old_chunk( + parent_span, + OldChunkData { + block: block_context.clone(), + split_state_roots: None, + prev_chunk_extra: current_chunk_extra.clone(), + storage_context: StorageContext { + storage_data_source: StorageDataSource::DbTrieOnly, + state_patch: Default::default(), + }, + }, + shard_context, + runtime, + epoch_manager, + )?; + *current_chunk_extra.state_root_mut() = apply_result.new_root; + result.push((block_context.block_hash, shard_uid, current_chunk_extra.clone())); + } + Ok(result) +} /// Applies new chunk, which includes applying transactions from chunk and /// receipts filtered from outgoing receipts from previous chunks. -fn apply_new_chunk( +pub(crate) fn apply_new_chunk( parent_span: &tracing::Span, data: NewChunkData, shard_context: ShardContext, runtime: &dyn RuntimeAdapter, epoch_manager: &dyn EpochManagerAdapter, -) -> Result { +) -> Result { let NewChunkData { block, chunk, @@ -207,12 +250,12 @@ fn apply_new_chunk( } else { None }; - Ok(ShardBlockUpdateResult::NewChunk(NewChunkResult { + Ok(NewChunkResult { gas_limit, shard_uid: shard_context.shard_uid, apply_result, apply_split_result_or_state_changes, - })) + }) } Err(err) => Err(err), } @@ -227,7 +270,7 @@ fn apply_old_chunk( shard_context: ShardContext, runtime: &dyn RuntimeAdapter, epoch_manager: &dyn EpochManagerAdapter, -) -> Result { +) -> Result { let OldChunkData { prev_chunk_extra, split_state_roots, block, storage_context } = data; let shard_id = shard_context.shard_uid.shard_id(); let _span = tracing::debug_span!( @@ -269,11 +312,11 @@ fn apply_old_chunk( } else { None }; - Ok(ShardBlockUpdateResult::OldChunk(OldChunkResult { + Ok(OldChunkResult { shard_uid: shard_context.shard_uid, apply_result, apply_split_result_or_state_changes, - })) + }) } Err(err) => Err(err), } @@ -286,7 +329,7 @@ fn apply_state_split( shard_uid: ShardUId, runtime: &dyn RuntimeAdapter, epoch_manager: &dyn EpochManagerAdapter, -) -> Result { +) -> Result { let StateSplitData { split_state_roots, state_changes, block_height: height, block_hash } = data; let shard_id = shard_uid.shard_id(); @@ -306,7 +349,7 @@ fn apply_state_split( &next_epoch_shard_layout, state_changes, )?; - Ok(ShardBlockUpdateResult::StateSplit(StateSplitResult { shard_uid, results })) + Ok(StateSplitResult { shard_uid, results }) } /// Process ApplyTransactionResult to apply changes to split states