diff --git a/chain/chain/src/runtime/test_utils.rs b/chain/chain/src/runtime/test_utils.rs index 2f36e46a40b..2810c5d3e2c 100644 --- a/chain/chain/src/runtime/test_utils.rs +++ b/chain/chain/src/runtime/test_utils.rs @@ -48,6 +48,7 @@ impl NightshadeRuntime { runtime_config_store: Option, trie_config: TrieConfig, state_snapshot_type: StateSnapshotType, + gc_num_epochs_to_keep: u64, ) -> Arc { Self::new( store, @@ -57,7 +58,7 @@ impl NightshadeRuntime { None, None, runtime_config_store, - DEFAULT_GC_NUM_EPOCHS_TO_KEEP, + gc_num_epochs_to_keep, trie_config, StateSnapshotConfig { state_snapshot_type, diff --git a/integration-tests/src/test_loop/builder.rs b/integration-tests/src/test_loop/builder.rs index 436eb230501..377d2fee833 100644 --- a/integration-tests/src/test_loop/builder.rs +++ b/integration-tests/src/test_loop/builder.rs @@ -593,6 +593,7 @@ impl TestLoopBuilder { self.runtime_config_store.clone(), TrieConfig::from_store_config(&store_config), StateSnapshotType::EveryEpoch, + client_config.gc.gc_num_epochs_to_keep, ); let state_snapshot = StateSnapshotActor::new( @@ -671,6 +672,7 @@ impl TestLoopBuilder { self.runtime_config_store.clone(), TrieConfig::from_store_config(&store_config), StateSnapshotType::EveryEpoch, + client_config.gc.gc_num_epochs_to_keep, ); (view_epoch_manager, view_shard_tracker, view_runtime_adapter) } else { diff --git a/integration-tests/src/test_loop/tests/resharding_v3.rs b/integration-tests/src/test_loop/tests/resharding_v3.rs index f6064417095..0dc4441d884 100644 --- a/integration-tests/src/test_loop/tests/resharding_v3.rs +++ b/integration-tests/src/test_loop/tests/resharding_v3.rs @@ -21,8 +21,8 @@ use crate::test_loop::utils::receipts::{ #[cfg(feature = "test_features")] use crate::test_loop::utils::resharding::fork_before_resharding_block; use crate::test_loop::utils::resharding::{ - call_burn_gas_contract, call_promise_yield, execute_money_transfers, - temporary_account_during_resharding, + call_burn_gas_contract, call_promise_yield, check_state_cleanup_after_resharding, + execute_money_transfers, temporary_account_during_resharding, TrackedShardSchedule, }; use crate::test_loop::utils::sharding::print_and_assert_shard_accounts; use crate::test_loop::utils::transactions::{ @@ -45,6 +45,18 @@ const DEFAULT_EPOCH_LENGTH: u64 = 6; /// and later we would hit the `DBNotFoundErr("Transaction ...)` error in tests. const INCREASED_EPOCH_LENGTH: u64 = 8; +/// Garbage collection window length. +const GC_NUM_EPOCHS_TO_KEEP: u64 = 3; + +/// Maximum number of epochs under which the test should finish. +const TESTLOOP_NUM_EPOCHS_TO_WAIT: u64 = 8; + +/// Default shard layout version used in resharding tests. +const DEFAULT_SHARD_LAYOUT_VERSION: u64 = 2; + +/// Account used in resharding tests as a split boundary. +const NEW_BOUNDARY_ACCOUNT: &str = "account6"; + #[derive(derive_builder::Builder)] #[builder(pattern = "owned", build_fn(skip))] #[allow(unused)] @@ -84,6 +96,12 @@ struct TestReshardingParameters { chunk_ranges_to_drop: HashMap>, shuffle_shard_assignment_for_chunk_producers: bool, track_all_shards: bool, + // Manually specify what shards will be tracked for a given client ID. + // The client ID must not be used for any other role (validator, RPC, etc.). + // The schedule length must be more than `TESTLOOP_NUM_EPOCHS_TO_WAIT` so that it covers all epoch heights used in the test. + // The suffix must consist of `GC_NUM_EPOCHS_TO_KEEP` repetitions of the same shard, + // so that we can assert at the end of the test that the state of all other shards have been cleaned up. + tracked_shard_schedule: Option, load_mem_tries_for_tracked_shards: bool, /// Custom behavior executed at every iteration of test loop. #[builder(setter(custom))] @@ -115,7 +133,10 @@ struct TestReshardingParameters { impl TestReshardingParametersBuilder { fn build(self) -> TestReshardingParameters { + // Give enough time for GC to kick in after resharding. + assert!(GC_NUM_EPOCHS_TO_KEEP + 2 < TESTLOOP_NUM_EPOCHS_TO_WAIT); let epoch_length = self.epoch_length.unwrap_or(DEFAULT_EPOCH_LENGTH); + let tracked_shard_schedule = self.tracked_shard_schedule.unwrap_or(None); let num_accounts = self.num_accounts.unwrap_or(8); let num_clients = self.num_clients.unwrap_or(7); @@ -123,8 +144,12 @@ impl TestReshardingParametersBuilder { let num_validators = self.num_validators.unwrap_or(2); let num_rpcs = self.num_rpcs.unwrap_or(1); let num_archivals = self.num_archivals.unwrap_or(1); + let num_extra_nodes = if tracked_shard_schedule.is_some() { 1 } else { 0 }; - assert!(num_clients >= num_producers + num_validators + num_rpcs + num_archivals); + assert!( + num_clients + >= num_producers + num_validators + num_rpcs + num_archivals + num_extra_nodes + ); // #12195 prevents number of BPs bigger than `epoch_length`. assert!(num_producers > 0 && num_producers <= epoch_length); @@ -157,9 +182,23 @@ impl TestReshardingParametersBuilder { let validators = validators.to_vec(); let (rpcs, tmp) = tmp.split_at(num_rpcs as usize); let rpcs = rpcs.to_vec(); - let (archivals, _) = tmp.split_at(num_archivals as usize); + let (archivals, clients_without_role) = tmp.split_at(num_archivals as usize); let archivals = archivals.to_vec(); + if let Some(tracked_shard_schedule) = &tracked_shard_schedule { + assert!(clients_without_role.contains(&clients[tracked_shard_schedule.client_index])); + let schedule_length = tracked_shard_schedule.schedule.len(); + assert!(schedule_length > TESTLOOP_NUM_EPOCHS_TO_WAIT as usize); + for i in + (TESTLOOP_NUM_EPOCHS_TO_WAIT - GC_NUM_EPOCHS_TO_KEEP - 1) as usize..schedule_length + { + assert_eq!( + tracked_shard_schedule.schedule[i - 1], + tracked_shard_schedule.schedule[i] + ); + } + } + let client_index = if rpcs.is_empty() { 0 } else { num_producers + num_validators } as usize; let client_id = clients[client_index].clone(); @@ -167,10 +206,12 @@ impl TestReshardingParametersBuilder { println!("Clients setup:"); println!("Producers: {producers:?}"); println!("Validators: {validators:?}"); - println!("Rpcs: {rpcs:?}, to serve requests we use client: {client_id}"); + println!("Rpcs: {rpcs:?}"); println!("Archivals: {archivals:?}"); + println!("To serve requests, we use client: {client_id}"); + println!("Num extra nodes: {num_extra_nodes}"); - let new_boundary_account: AccountId = "account6".parse().unwrap(); + let new_boundary_account: AccountId = NEW_BOUNDARY_ACCOUNT.parse().unwrap(); let temporary_account_id: AccountId = format!("{}.{}", new_boundary_account, new_boundary_account).parse().unwrap(); let mut loop_actions = self.loop_actions.unwrap_or_default(); @@ -186,7 +227,9 @@ impl TestReshardingParametersBuilder { } TestReshardingParameters { - base_shard_layout_version: self.base_shard_layout_version.unwrap_or(2), + base_shard_layout_version: self + .base_shard_layout_version + .unwrap_or(DEFAULT_SHARD_LAYOUT_VERSION), num_accounts, num_clients, num_producers, @@ -208,6 +251,7 @@ impl TestReshardingParametersBuilder { .shuffle_shard_assignment_for_chunk_producers .unwrap_or(false), track_all_shards: self.track_all_shards.unwrap_or(false), + tracked_shard_schedule, load_mem_tries_for_tracked_shards: self .load_mem_tries_for_tracked_shards .unwrap_or(true), @@ -266,12 +310,20 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { init_test_logger(); let mut builder = TestLoopBuilder::new(); + let tracked_shard_schedule = params.tracked_shard_schedule.clone(); - // Adjust the resharding configuration to make the tests faster. - builder = builder.config_modifier(|config, _| { + builder = builder.config_modifier(move |config, client_index| { + // Adjust the resharding configuration to make the tests faster. let mut resharding_config = config.resharding_config.get(); resharding_config.batch_delay = Duration::milliseconds(1); config.resharding_config.update(resharding_config); + // Set the tracked shard schedule if specified for the client at the given index. + if let Some(tracked_shard_schedule) = &tracked_shard_schedule { + if client_index == tracked_shard_schedule.client_index { + config.tracked_shards = vec![]; + config.tracked_shard_schedule = tracked_shard_schedule.schedule.clone(); + } + } }); // Prepare shard split configuration. @@ -356,6 +408,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { base_protocol_version + 1, params.chunk_ranges_to_drop.clone(), ) + .gc_num_epochs_to_keep(GC_NUM_EPOCHS_TO_KEEP) .build(); let mut test_setup_transactions = vec![]; @@ -403,7 +456,6 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { client_handles.iter().map(|handle| &env.test_loop.data.get(handle).client).collect_vec(); let mut trie_sanity_check = TrieSanityCheck::new(&clients, params.load_mem_tries_for_tracked_shards); - let gc_num_epochs_to_keep = clients[client_index].config.gc.gc_num_epochs_to_keep; let latest_block_height = Cell::new(0u64); // Height of a block after resharding. @@ -456,6 +508,8 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { // Just passed an epoch with increased number of shards. new_layout_block_height.set(Some(latest_block_height.get())); new_layout_epoch_height.set(Some(epoch_height)); + // Assert that we will have a chance for gc to kick in before the test is over. + assert!(epoch_height + GC_NUM_EPOCHS_TO_KEEP < TESTLOOP_NUM_EPOCHS_TO_WAIT); println!("State after resharding:"); print_and_assert_shard_accounts(&clients, &tip); } @@ -467,7 +521,7 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { ); // Return false if garbage collection window has not passed yet since resharding. - if epoch_height <= new_layout_epoch_height.get().unwrap() + gc_num_epochs_to_keep { + if epoch_height <= new_layout_epoch_height.get().unwrap() + GC_NUM_EPOCHS_TO_KEEP { return false; } for loop_action in ¶ms.loop_actions { @@ -478,8 +532,8 @@ fn test_resharding_v3_base(params: TestReshardingParameters) { env.test_loop.run_until( success_condition, - // Give enough time to produce ~7 epochs. - Duration::seconds((7 * params.epoch_length) as i64), + // Give enough time to produce ~TESTLOOP_NUM_EPOCHS_TO_WAIT epochs. + Duration::seconds((TESTLOOP_NUM_EPOCHS_TO_WAIT * params.epoch_length) as i64), ); let client = &env.test_loop.data.get(&client_handles[client_index]).client; trie_sanity_check.check_epochs(client); @@ -492,6 +546,48 @@ fn test_resharding_v3() { test_resharding_v3_base(TestReshardingParametersBuilder::default().build()); } +// Takes a sequence of shard ids to track in consecutive epochs, +// repeats the last element `TESTLOOP_NUM_EPOCHS_TO_WAIT` times, +// and maps each element: |id| -> vec![id], to the format required by `TrackedShardSchedule`. +fn shard_sequence_to_schedule(mut shard_sequence: Vec) -> Vec> { + shard_sequence.extend( + std::iter::repeat(*shard_sequence.last().unwrap()) + .take(TESTLOOP_NUM_EPOCHS_TO_WAIT as usize), + ); + shard_sequence.iter().map(|shard_id| vec![*shard_id]).collect() +} + +#[test] +// TODO(resharding): fix nearcore and un-ignore this test +#[ignore] +fn test_resharding_v3_state_cleanup() { + // Track parent shard before resharding, child shard after resharding, and then an unrelated shard forever. + // Eventually, the State column should only contain entries belonging to the last tracked shard. + let account_in_stable_shard: AccountId = "account0".parse().unwrap(); + let split_boundary_account: AccountId = NEW_BOUNDARY_ACCOUNT.parse().unwrap(); + let base_shard_layout = get_base_shard_layout(DEFAULT_SHARD_LAYOUT_VERSION); + let new_shard_layout = + ShardLayout::derive_shard_layout(&base_shard_layout, split_boundary_account.clone()); + let parent_shard_id = base_shard_layout.account_id_to_shard_id(&split_boundary_account); + let child_shard_id = new_shard_layout.account_id_to_shard_id(&split_boundary_account); + let unrelated_shard_id = new_shard_layout.account_id_to_shard_id(&account_in_stable_shard); + + let tracked_shard_sequence = + vec![parent_shard_id, parent_shard_id, child_shard_id, unrelated_shard_id]; + let num_clients = 8; + let tracked_shard_schedule = TrackedShardSchedule { + client_index: (num_clients - 1) as usize, + schedule: shard_sequence_to_schedule(tracked_shard_sequence), + }; + test_resharding_v3_base( + TestReshardingParametersBuilder::default() + .num_clients(num_clients) + .tracked_shard_schedule(Some(tracked_shard_schedule.clone())) + .add_loop_action(check_state_cleanup_after_resharding(tracked_shard_schedule)) + .build(), + ); +} + #[test] fn test_resharding_v3_track_all_shards() { test_resharding_v3_base( diff --git a/integration-tests/src/test_loop/utils/resharding.rs b/integration-tests/src/test_loop/utils/resharding.rs index 005fc3f3fad..6f05c2607ba 100644 --- a/integration-tests/src/test_loop/utils/resharding.rs +++ b/integration-tests/src/test_loop/utils/resharding.rs @@ -1,16 +1,25 @@ use std::cell::Cell; +use std::collections::HashSet; +use std::num::NonZero; use assert_matches::assert_matches; +use borsh::BorshDeserialize; use itertools::Itertools; use near_async::test_loop::data::TestLoopData; +use near_chain::ChainStoreAccess; +use near_client::Client; use near_client::{Query, QueryError::GarbageCollectedBlock}; use near_crypto::Signer; +use near_primitives::hash::CryptoHash; use near_primitives::test_utils::create_user_test_signer; use near_primitives::transaction::SignedTransaction; -use near_primitives::types::{AccountId, BlockId, BlockReference, Gas}; +use near_primitives::types::{AccountId, BlockId, BlockReference, Gas, ShardId}; use near_primitives::views::{ FinalExecutionStatus, QueryRequest, QueryResponse, QueryResponseKind, }; +use near_store::adapter::StoreAdapter; +use near_store::db::refcount::decode_value_with_rc; +use near_store::{DBCol, ShardUId}; use rand::seq::SliceRandom; use rand::{Rng, SeedableRng}; use rand_chacha::ChaCha20Rng; @@ -24,6 +33,14 @@ use crate::test_loop::utils::transactions::{ }; use crate::test_loop::utils::{get_node_data, retrieve_client_actor, ONE_NEAR, TGAS}; +/// A config to tell what shards will be tracked by the client at the given index. +/// For more details, see `TrackedConfig::Schedule`. +#[derive(Clone, Debug)] +pub(crate) struct TrackedShardSchedule { + pub client_index: usize, + pub schedule: Vec>, +} + // Returns a callable function that, when invoked inside a test loop iteration, can force the creation of a chain fork. #[cfg(feature = "test_features")] pub(crate) fn fork_before_resharding_block(double_signing: bool) -> LoopAction { @@ -481,3 +498,100 @@ pub(crate) fn temporary_account_during_resharding( ); LoopAction::new(action_fn, succeeded) } + +/// Removes from State column all entries where key does not start with `the_only_shard_uid` ShardUId prefix. +fn retain_the_only_shard_state(client: &Client, the_only_shard_uid: ShardUId) { + let store = client.chain.chain_store.store().trie_store(); + let mut store_update = store.store_update(); + for kv in store.store().iter_raw_bytes(DBCol::State) { + let (key, value) = kv.unwrap(); + let shard_uid = ShardUId::try_from_slice(&key[0..8]).unwrap(); + if shard_uid == the_only_shard_uid { + continue; + } + let (_, rc) = decode_value_with_rc(&value); + assert!(rc > 0); + let node_hash = CryptoHash::try_from_slice(&key[8..]).unwrap(); + store_update.decrement_refcount_by(shard_uid, &node_hash, NonZero::new(rc as u32).unwrap()); + } + store_update.commit().unwrap(); +} + +/// Asserts that all other shards State except `the_only_shard_uid` have been cleaned-up. +fn check_has_the_only_shard_state(client: &Client, the_only_shard_uid: ShardUId) { + let store = client.chain.chain_store.store().trie_store(); + let mut shard_uid_prefixes = HashSet::new(); + for kv in store.store().iter_raw_bytes(DBCol::State) { + let (key, _) = kv.unwrap(); + let shard_uid = ShardUId::try_from_slice(&key[0..8]).unwrap(); + shard_uid_prefixes.insert(shard_uid); + } + let shard_uid_prefixes = shard_uid_prefixes.into_iter().collect_vec(); + assert_eq!(shard_uid_prefixes, [the_only_shard_uid]); +} + +// Loop action testing state cleanup after resharding. +// It assumes single shard tracking and it waits for gc after resharding. +// Then it checks whether the last shard tracked by the client +// is the only ShardUId prefix for nodes in the State column. +pub(crate) fn check_state_cleanup_after_resharding( + tracked_shard_schedule: TrackedShardSchedule, +) -> LoopAction { + let client_index = tracked_shard_schedule.client_index; + let latest_height = Cell::new(0); + let target_height = Cell::new(None); + + let (done, succeeded) = LoopAction::shared_success_flag(); + let action_fn = Box::new( + move |node_datas: &[TestData], test_loop_data: &mut TestLoopData, _: AccountId| { + if done.get() { + return; + } + + let client_handle = node_datas[client_index].client_sender.actor_handle(); + let client = &test_loop_data.get_mut(&client_handle).client; + let tip = client.chain.head().unwrap(); + + // Run this action only once at every block height. + if latest_height.get() == tip.height { + return; + } + + let epoch_height = client + .epoch_manager + .get_epoch_height_from_prev_block(&tip.prev_block_hash) + .unwrap(); + let [tracked_shard_id] = + tracked_shard_schedule.schedule[epoch_height as usize].clone().try_into().unwrap(); + let tracked_shard_uid = + client.epoch_manager.shard_id_to_uid(tracked_shard_id, &tip.epoch_id).unwrap(); + + if latest_height.get() == 0 { + // This is beginning of the test, and the first epoch after genesis has height 1. + assert_eq!(epoch_height, 1); + // Get rid of the part of the Genesis State other than the shard we initially track. + retain_the_only_shard_state(client, tracked_shard_uid); + } + latest_height.set(tip.height); + + if target_height.get().is_none() { + if !this_block_has_new_shard_layout(client.epoch_manager.as_ref(), &tip) { + return; + } + // Just resharded. Set the target height high enough so that gc will kick in. + let epoch_length = client.config.epoch_length; + let gc_num_epochs_to_keep = client.config.gc.gc_num_epochs_to_keep; + target_height + .set(Some(latest_height.get() + (gc_num_epochs_to_keep + 1) * epoch_length)); + } + + if latest_height.get() < target_height.get().unwrap() { + return; + } + // At this point, we should only have State from the last tracked shard. + check_has_the_only_shard_state(&client, tracked_shard_uid); + done.set(true); + }, + ); + LoopAction::new(action_fn, succeeded) +} diff --git a/nearcore/src/test_utils.rs b/nearcore/src/test_utils.rs index 6586172dfb6..baafc533102 100644 --- a/nearcore/src/test_utils.rs +++ b/nearcore/src/test_utils.rs @@ -1,5 +1,5 @@ use near_chain::types::RuntimeAdapter; -use near_chain_configs::Genesis; +use near_chain_configs::{Genesis, DEFAULT_GC_NUM_EPOCHS_TO_KEEP}; use near_client::test_utils::TestEnvBuilder; use near_epoch_manager::EpochManagerHandle; use near_parameters::RuntimeConfigStore; @@ -101,6 +101,7 @@ impl TestEnvNightshadeSetupExt for TestEnvBuilder { Some(runtime_config_store), trie_config, state_snapshot_type.clone(), + DEFAULT_GC_NUM_EPOCHS_TO_KEEP, ) }; let dummy_runtime_configs =