Skip to content

Commit

Permalink
do it
Browse files Browse the repository at this point in the history
  • Loading branch information
wacban committed Jan 8, 2024
1 parent a231d5e commit 70c83f3
Show file tree
Hide file tree
Showing 38 changed files with 446 additions and 452 deletions.
129 changes: 63 additions & 66 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use crate::types::{
};
use crate::update_shard::{
apply_new_chunk, process_missing_chunks_range, process_shard_update, NewChunkData,
NewChunkResult, OldChunkData, ShardContext, ShardUpdateReason, ShardUpdateResult,
StateSplitData, StorageContext,
NewChunkResult, OldChunkData, ReshardingData, ShardContext, ShardUpdateReason,
ShardUpdateResult, StorageContext,
};
use crate::validate::{
validate_challenge, validate_chunk_proofs, validate_chunk_with_chunk_extra,
Expand All @@ -35,7 +35,7 @@ use chrono::Duration;
use crossbeam_channel::{unbounded, Receiver, Sender};
use itertools::Itertools;
use lru::LruCache;
use near_chain_configs::{MutableConfigValue, StateSplitConfig, StateSplitHandle};
use near_chain_configs::{MutableConfigValue, ReshardingConfig, ReshardingHandle};
#[cfg(feature = "new_epoch_sync")]
use near_chain_primitives::error::epoch_sync::EpochSyncInfoError;
use near_chain_primitives::error::{BlockKnownError, Error, LogTransientStorageError};
Expand Down Expand Up @@ -268,11 +268,11 @@ pub struct Chain {
snapshot_callbacks: Option<SnapshotCallbacks>,

/// Configuration for resharding.
pub(crate) state_split_config: MutableConfigValue<near_chain_configs::StateSplitConfig>,
pub(crate) resharding_config: MutableConfigValue<near_chain_configs::ReshardingConfig>,

// A handle that allows the main process to interrupt resharding if needed.
// This typically happens when the main process is interrupted.
pub state_split_handle: StateSplitHandle,
pub resharding_handle: ReshardingHandle,
}

impl Drop for Chain {
Expand Down Expand Up @@ -366,11 +366,11 @@ impl Chain {
pending_state_patch: Default::default(),
requested_state_parts: StateRequestTracker::new(),
snapshot_callbacks: None,
state_split_config: MutableConfigValue::new(
StateSplitConfig::default(),
"state_split_config",
resharding_config: MutableConfigValue::new(
ReshardingConfig::default(),
"resharding_config",
),
state_split_handle: StateSplitHandle::new(),
resharding_handle: ReshardingHandle::new(),
})
}

Expand Down Expand Up @@ -546,8 +546,8 @@ impl Chain {
pending_state_patch: Default::default(),
requested_state_parts: StateRequestTracker::new(),
snapshot_callbacks,
state_split_config: chain_config.state_split_config,
state_split_handle: StateSplitHandle::new(),
resharding_config: chain_config.resharding_config,
resharding_handle: ReshardingHandle::new(),
})
}

Expand Down Expand Up @@ -3154,19 +3154,20 @@ impl Chain {
})
}

fn get_split_state_roots(
fn get_resharding_state_roots(
&self,
block: &Block,
shard_id: ShardId,
) -> Result<HashMap<ShardUId, StateRoot>, Error> {
let next_shard_layout =
self.epoch_manager.get_shard_layout(block.header().next_epoch_id())?;
let new_shards = next_shard_layout.get_split_shard_uids(shard_id).unwrap_or_else(|| {
panic!(
"shard layout must contain maps of all shards to its split shards {} {:?}",
shard_id, next_shard_layout,
);
});
let new_shards =
next_shard_layout.get_children_shards_uids(shard_id).unwrap_or_else(|| {
panic!(
"shard layout must contain maps of all shards to its children shards {} {:?}",
shard_id, next_shard_layout,
);
});
new_shards
.iter()
.map(|shard_uid| {
Expand Down Expand Up @@ -3298,14 +3299,14 @@ impl Chain {
cares_about_shard_this_epoch,
cares_about_shard_next_epoch,
);
let need_to_split_states = will_shard_layout_change && cares_about_shard_next_epoch;
let need_to_reshard = will_shard_layout_change && cares_about_shard_next_epoch;
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, epoch_id)?;
Ok(ShardContext {
shard_uid,
cares_about_shard_this_epoch,
will_shard_layout_change,
should_apply_transactions,
need_to_split_states,
need_to_reshard,
})
}

Expand All @@ -3326,24 +3327,24 @@ impl Chain {
let prev_hash = block.header().prev_hash();
let shard_context = self.get_shard_context(me, block.header(), shard_id, mode)?;

// We can only split states when states are ready, i.e., mode != ApplyChunksMode::NotCaughtUp
// 1) if should_apply_transactions == true && split_state_roots.is_some(),
// that means split states are ready.
// `apply_split_state_changes` will apply updates to split_states
// 2) if should_apply_transactions == true && split_state_roots.is_none(),
// that means split states are not ready yet.
// `apply_split_state_changes` will return `state_changes_for_split_states`,
// We can only perform resharding when states are ready, i.e., mode != ApplyChunksMode::NotCaughtUp
// 1) if should_apply_transactions == true && resharding_state_roots.is_some(),
// that means children shards are ready.
// `apply_resharding_state_changes` will apply updates to the children shards
// 2) if should_apply_transactions == true && resharding_state_roots.is_none(),
// that means children shards are not ready yet.
// `apply_resharding_state_changes` will return `state_changes_for_resharding`,
// which will be stored to the database in `process_apply_chunks`
// 3) if should_apply_transactions == false && split_state_roots.is_some()
// 3) if should_apply_transactions == false && resharding_state_roots.is_some()
// This implies mode == CatchingUp and cares_about_shard_this_epoch == true,
// otherwise should_apply_transactions will be true
// That means transactions have already been applied last time when apply_chunks are
// called with mode NotCaughtUp, therefore `state_changes_for_split_states` have been
// called with mode NotCaughtUp, therefore `state_changes_for_resharding` have been
// stored in the database. Then we can safely read that and apply that to the split
// states
let split_state_roots =
if shard_context.need_to_split_states && mode != ApplyChunksMode::NotCaughtUp {
Some(self.get_split_state_roots(block, shard_id)?)
let resharding_state_roots =
if shard_context.need_to_reshard && mode != ApplyChunksMode::NotCaughtUp {
Some(self.get_resharding_state_roots(block, shard_id)?)
} else {
None
};
Expand Down Expand Up @@ -3425,7 +3426,7 @@ impl Chain {
is_first_block_with_chunk_of_version,
chunk,
receipts,
split_state_roots,
resharding_state_roots,
storage_context,
})
} else {
Expand All @@ -3434,21 +3435,21 @@ impl Chain {
prev_chunk_extra: ChunkExtra::clone(
self.get_chunk_extra(prev_hash, &shard_context.shard_uid)?.as_ref(),
),
split_state_roots,
resharding_state_roots,
storage_context,
})
}
} else if let Some(split_state_roots) = split_state_roots {
} else if let Some(resharding_state_roots) = resharding_state_roots {
assert!(
mode == ApplyChunksMode::CatchingUp && shard_context.cares_about_shard_this_epoch
);
let state_changes =
self.store().get_state_changes_for_split_states(block.hash(), shard_id)?;
ShardUpdateReason::StateSplit(StateSplitData {
self.store().get_state_changes_for_resharding(block.hash(), shard_id)?;
ShardUpdateReason::Resharding(ReshardingData {
block_hash: *block.hash(),
block_height: block.header().height(),
state_changes,
split_state_roots,
resharding_state_roots,
})
} else {
return Ok(None);
Expand Down Expand Up @@ -3496,9 +3497,9 @@ impl Chain {
let block_header = self.get_block_header(&block_hash)?;
let prev_block_header = self.get_previous_header(&block_header)?;
let shard_context = self.get_shard_context(me, &block_header, shard_id, mode)?;
if shard_context.need_to_split_states {
if shard_context.need_to_reshard {
return Err(Error::Other(String::from(
"State split occurred in blocks range, it is not supported yet",
"Resharding occurred in blocks range, it is not supported yet",
)));
}
execution_contexts.push((
Expand Down Expand Up @@ -3660,7 +3661,7 @@ impl Chain {
let block_header = self.get_block_header(&prev_chunk_block_hash)?;
let prev_block_header = self.get_previous_header(&block_header)?;
let shard_context = self.get_shard_context(me, &block_header, shard_id, mode)?;
if shard_context.need_to_split_states {
if shard_context.need_to_reshard {
return Ok(None);
}
(
Expand Down Expand Up @@ -3707,28 +3708,24 @@ impl Chain {
// TODO(logunov): use `validate_chunk_with_chunk_extra`
assert_eq!(current_chunk_extra.state_root(), &prev_chunk.prev_state_root());
// Process previous chunk.
let NewChunkResult {
gas_limit,
shard_uid,
apply_result,
apply_split_result_or_state_changes: _,
} = apply_new_chunk(
parent_span,
NewChunkData {
chunk: prev_chunk,
receipts,
split_state_roots: None,
block: prev_chunk_block_context.clone(),
is_first_block_with_chunk_of_version: false,
storage_context: StorageContext {
storage_data_source: StorageDataSource::DbTrieOnly,
state_patch: Default::default(),
let NewChunkResult { gas_limit, shard_uid, apply_result, resharding_results: _ } =
apply_new_chunk(
parent_span,
NewChunkData {
chunk: prev_chunk,
receipts,
resharding_state_roots: None,
block: prev_chunk_block_context.clone(),
is_first_block_with_chunk_of_version: false,
storage_context: StorageContext {
storage_data_source: StorageDataSource::DbTrieOnly,
state_patch: Default::default(),
},
},
},
prev_chunk_shard_context,
runtime.as_ref(),
epoch_manager.as_ref(),
)?;
prev_chunk_shard_context,
runtime.as_ref(),
epoch_manager.as_ref(),
)?;
let (outcome_root, _) =
ApplyTransactionResult::compute_outcomes_proof(&apply_result.outcomes);
current_chunk_extra = ChunkExtra::new(
Expand Down Expand Up @@ -3862,8 +3859,8 @@ fn sync_hash_not_first_hash(sync_hash: CryptoHash) -> Error {
/// We want to guarantee that transactions are only applied once for each shard,
/// even though apply_chunks may be called twice, once with
/// ApplyChunksMode::NotCaughtUp once with ApplyChunksMode::CatchingUp. Note
/// that it does not guard whether we split states or not, see the comments
/// before `need_to_split_state`
/// that it does not guard whether the children shards are ready or not, see the
/// comments before `need_to_reshard`
fn get_should_apply_transactions(
mode: ApplyChunksMode,
cares_about_shard_this_epoch: bool,
Expand Down Expand Up @@ -4225,8 +4222,8 @@ impl Chain {
shard_ids = shard_ids
.into_iter()
.flat_map(|id| {
next_shard_layout.get_split_shard_ids(id).unwrap_or_else(|| {
panic!("invalid shard layout {:?} because it does not contain split shards for parent shard {}", next_shard_layout, id)
next_shard_layout.get_children_shards_ids(id).unwrap_or_else(|| {
panic!("invalid shard layout {:?} because it does not contain children shards for parent shard {}", next_shard_layout, id)
})
})
.collect();
Expand Down
Loading

0 comments on commit 70c83f3

Please sign in to comment.