Skip to content

Commit

Permalink
feat(resharding): making resharding interruptible (#10328)
Browse files Browse the repository at this point in the history
Currently ctrl-c doesn't fully stop neard while it's in the middle of
resharding.
Adding StateSplitHandle that allows for stopping resharding in the
`build_state_for_split_shards` method.

to be tested in mocknet soon
  • Loading branch information
wacban authored Dec 14, 2023
1 parent 4fce209 commit 686acb3
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 9 deletions.
9 changes: 8 additions & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use chrono::Duration;
use crossbeam_channel::{unbounded, Receiver, Sender};
use itertools::Itertools;
use lru::LruCache;
use near_chain_configs::{MutableConfigValue, StateSplitConfig};
use near_chain_configs::{MutableConfigValue, StateSplitConfig, StateSplitHandle};
#[cfg(feature = "new_epoch_sync")]
use near_chain_primitives::error::epoch_sync::EpochSyncInfoError;
use near_chain_primitives::error::{BlockKnownError, Error, LogTransientStorageError};
Expand Down Expand Up @@ -511,7 +511,12 @@ pub struct Chain {
/// A callback to initiate state snapshot.
snapshot_callbacks: Option<SnapshotCallbacks>,

/// Configuration for resharding.
pub(crate) state_split_config: MutableConfigValue<near_chain_configs::StateSplitConfig>,

// A handle that allows the main process to interrupt resharding if needed.
// This typically happens when the main process is interrupted.
pub state_split_handle: StateSplitHandle,
}

impl Drop for Chain {
Expand Down Expand Up @@ -609,6 +614,7 @@ impl Chain {
StateSplitConfig::default(),
"state_split_config",
),
state_split_handle: StateSplitHandle::new(),
})
}

Expand Down Expand Up @@ -785,6 +791,7 @@ impl Chain {
requested_state_parts: StateRequestTracker::new(),
snapshot_callbacks,
state_split_config: chain_config.state_split_config,
state_split_handle: StateSplitHandle::new(),
})
}

Expand Down
15 changes: 13 additions & 2 deletions chain/chain/src/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::metrics::{
};
use crate::Chain;
use itertools::Itertools;
use near_chain_configs::{MutableConfigValue, StateSplitConfig};
use near_chain_configs::{MutableConfigValue, StateSplitConfig, StateSplitHandle};
use near_chain_primitives::error::Error;
use near_primitives::errors::StorageError::StorageInconsistentState;
use near_primitives::hash::CryptoHash;
Expand Down Expand Up @@ -46,13 +46,17 @@ pub struct StateSplitRequest {
pub prev_prev_hash: CryptoHash,
// Parent shardUId to be split into child shards.
pub shard_uid: ShardUId,
// state root of the parent shardUId. This is different from block sync_hash
// The state root of the parent ShardUId. This is different from block sync_hash
pub state_root: StateRoot,
// The shard layout in the next epoch.
pub next_epoch_shard_layout: ShardLayout,
// Time we've spent polling for the state snapshot to be ready. We autofail after a certain time.
pub curr_poll_time: Duration,
// Configuration for resharding. Can be used to throttle resharding if needed.
pub config: MutableConfigValue<StateSplitConfig>,
// A handle that allows the main process to interrupt resharding if needed.
// This typically happens when the main process is interrupted.
pub handle: StateSplitHandle,
}

// Skip `runtime_adapter`, because it's a complex object that has complex logic
Expand Down Expand Up @@ -222,6 +226,7 @@ impl Chain {
next_epoch_shard_layout,
curr_poll_time: Duration::ZERO,
config: self.state_split_config.clone(),
handle: self.state_split_handle.clone(),
});

RESHARDING_STATUS
Expand Down Expand Up @@ -289,6 +294,7 @@ impl Chain {
state_root,
next_epoch_shard_layout,
config,
handle,
..
} = state_split_request;
tracing::debug!(target: "resharding", config=?config.get(), ?shard_uid, "build_state_for_split_shards_impl starting");
Expand Down Expand Up @@ -355,6 +361,11 @@ impl Chain {
// Once we build the iterator, we break it into batches using the get_trie_update_batch function.
let mut iter = iter;
loop {
if !handle.get() {
// The keep_going is set to false, interrupt processing.
tracing::info!(target: "resharding", ?shard_uid, "build_state_for_split_shards_impl interrupted");
return Err(Error::Other("Resharding interrupted.".to_string()));
}
// Prepare the batch.
let batch = {
let histogram = RESHARDING_BATCH_PREPARE_TIME.with_label_values(&metrics_labels);
Expand Down
7 changes: 4 additions & 3 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use near_chain::{
byzantine_assert, near_chain_primitives, Block, BlockHeader, BlockProcessingArtifact,
ChainGenesis, DoneApplyChunkCallback, Provenance,
};
use near_chain_configs::{ClientConfig, LogSummaryStyle};
use near_chain_configs::{ClientConfig, LogSummaryStyle, StateSplitHandle};
use near_chain_primitives::error::EpochErrorResultToChainError;
use near_chunks::adapter::ShardsManagerRequestFromClient;
use near_chunks::client::ShardsManagerResponse;
Expand Down Expand Up @@ -2002,7 +2002,7 @@ pub fn start_client(
sender: Option<broadcast::Sender<()>>,
adv: crate::adversarial::Controls,
config_updater: Option<ConfigUpdater>,
) -> (Addr<ClientActor>, ArbiterHandle) {
) -> (Addr<ClientActor>, ArbiterHandle, StateSplitHandle) {
let client_arbiter = Arbiter::new();
let client_arbiter_handle = client_arbiter.handle();

Expand All @@ -2022,6 +2022,7 @@ pub fn start_client(
snapshot_callbacks,
)
.unwrap();
let state_split_handle = client.chain.state_split_handle.clone();
let client_addr = ClientActor::start_in_arbiter(&client_arbiter_handle, move |ctx| {
ClientActor::new(
client,
Expand All @@ -2038,5 +2039,5 @@ pub fn start_client(
)
.unwrap()
});
(client_addr, client_arbiter_handle)
(client_addr, client_arbiter_handle, state_split_handle)
}
24 changes: 24 additions & 0 deletions core/chain-configs/src/client_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use near_primitives::types::{
use near_primitives::version::Version;
use std::cmp::{max, min};
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;

pub const TEST_STATE_SYNC_TIMEOUT: u64 = 5;
Expand Down Expand Up @@ -149,6 +151,28 @@ impl SyncConfig {
}
}

// A handle that allows the main process to interrupt resharding if needed.
// This typically happens when the main process is interrupted.
#[derive(Clone)]
pub struct StateSplitHandle {
keep_going: Arc<AtomicBool>,
}

impl StateSplitHandle {
pub fn new() -> Self {
Self { keep_going: Arc::new(AtomicBool::new(true)) }
}

pub fn get(&self) -> bool {
self.keep_going.load(std::sync::atomic::Ordering::Relaxed)
}

pub fn stop(&self) -> () {
self.keep_going.store(false, std::sync::atomic::Ordering::Relaxed);
}
}

/// Configuration for resharding.
#[derive(serde::Serialize, serde::Deserialize, Clone, Copy, Debug, PartialEq)]
#[serde(default)]
pub struct StateSplitConfig {
Expand Down
5 changes: 3 additions & 2 deletions core/chain-configs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ pub use client_config::{
default_transaction_pool_size_limit, default_trie_viewer_state_size_limit,
default_tx_routing_height_horizon, default_view_client_threads,
default_view_client_throttle_period, ClientConfig, DumpConfig, ExternalStorageConfig,
ExternalStorageLocation, GCConfig, LogSummaryStyle, StateSplitConfig, StateSyncConfig,
SyncConfig, DEFAULT_GC_NUM_EPOCHS_TO_KEEP, DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL,
ExternalStorageLocation, GCConfig, LogSummaryStyle, StateSplitConfig, StateSplitHandle,
StateSyncConfig, SyncConfig, DEFAULT_GC_NUM_EPOCHS_TO_KEEP,
DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_EXTERNAL,
DEFAULT_STATE_SYNC_NUM_CONCURRENT_REQUESTS_ON_CATCHUP_EXTERNAL, MIN_GC_NUM_EPOCHS_TO_KEEP,
TEST_STATE_SYNC_TIMEOUT,
};
Expand Down
7 changes: 6 additions & 1 deletion nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use near_chain::state_snapshot_actor::{
};
use near_chain::types::RuntimeAdapter;
use near_chain::{Chain, ChainGenesis};
use near_chain_configs::StateSplitHandle;
use near_chain_configs::SyncConfig;
use near_chunks::shards_manager_actor::start_shards_manager;
use near_client::sync::adapter::SyncAdapter;
Expand Down Expand Up @@ -213,6 +214,9 @@ pub struct NearNode {
/// A handle to control background flat state values inlining migration.
/// Needed temporarily, will be removed after the migration is completed.
pub flat_state_migration_handle: FlatStateValuesInliningMigrationHandle,
// A handle that allows the main process to interrupt resharding if needed.
// This typically happens when the main process is interrupted.
pub state_split_handle: StateSplitHandle,
}

pub fn start_with_config(home_dir: &Path, config: NearConfig) -> anyhow::Result<NearNode> {
Expand Down Expand Up @@ -329,7 +333,7 @@ pub fn start_with_config_and_synchronization(
get_make_snapshot_callback(state_snapshot_actor, runtime.get_flat_storage_manager());
let snapshot_callbacks = SnapshotCallbacks { make_snapshot_callback, delete_snapshot_callback };

let (client_actor, client_arbiter_handle) = start_client(
let (client_actor, client_arbiter_handle, state_split_handle) = start_client(
config.client_config.clone(),
chain_genesis.clone(),
epoch_manager.clone(),
Expand Down Expand Up @@ -442,6 +446,7 @@ pub fn start_with_config_and_synchronization(
cold_store_loop_handle,
state_sync_dump_handle,
flat_state_migration_handle,
state_split_handle,
})
}

Expand Down
2 changes: 2 additions & 0 deletions neard/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ impl RunCmd {
cold_store_loop_handle,
state_sync_dump_handle,
flat_state_migration_handle,
state_split_handle,
..
} = nearcore::start_with_config_and_synchronization(
home_dir,
Expand All @@ -582,6 +583,7 @@ impl RunCmd {
if let Some(handle) = state_sync_dump_handle {
handle.stop()
}
state_split_handle.stop();
flat_state_migration_handle.stop();
futures::future::join_all(rpc_servers.iter().map(|(name, server)| async move {
server.stop(true).await;
Expand Down

0 comments on commit 686acb3

Please sign in to comment.