Skip to content

Commit

Permalink
fix(epoch-sync): write EpochSyncInfo after full epoch finalisation (#…
Browse files Browse the repository at this point in the history
…10109)

#10031
#10031 (comment)

1. Header potentially needed for `EpochInfo` validation in
`EpochSyncInfo` via `epoch_sync_data_hash` is actually in the next
epoch. This header should be recorded in `EpochSyncInfo` too.
2. All data in `EpochSyncInfo` should be finalised.
3. `test_continuous_epoch_sync_info_population` is changed to reflect
that `EpochSyncInfo` is only written after everything is finalised.

P.S. I don't think it is the last change to `EpochSyncInfo` (this week).
  • Loading branch information
posvyatokum authored Nov 7, 2023
1 parent 3ff4e11 commit 984c935
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 24 deletions.
90 changes: 70 additions & 20 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1950,10 +1950,8 @@ impl Chain {
#[cfg(feature = "new_epoch_sync")]
{
// At this point BlockInfo for this header should be in DB and in `epoch_manager`s cache because of `add_validator_proposals` call.
let block_info = self.epoch_manager.get_block_info(header.hash())?;

let mut chain_update = self.chain_update();
chain_update.save_epoch_sync_info(&block_info)?;
chain_update.save_epoch_sync_info_if_finalised(header)?;
chain_update.commit()?;
}
}
Expand Down Expand Up @@ -5486,8 +5484,7 @@ impl<'a> ChainUpdate<'a> {
#[cfg(feature = "new_epoch_sync")]
{
// BlockInfo should be already recorded in epoch_manager cache because of `add_validator_proposals` call
let block_info = self.epoch_manager.get_block_info(block.hash())?;
self.save_epoch_sync_info(&block_info)?;
self.save_epoch_sync_info_if_finalised(block.header())?;
}

// Add validated block to the db, even if it's not the canonical fork.
Expand Down Expand Up @@ -5865,22 +5862,66 @@ impl<'a> ChainUpdate<'a> {
Ok(true)
}

/// This function assumes `BlockInfo` is already retrievable from `epoch_manager`.
/// This can be achieved by calling `add_validator_proposals`.
#[cfg(feature = "new_epoch_sync")]
fn save_epoch_sync_info_if_finalised(&mut self, header: &BlockHeader) -> Result<(), Error> {
let block_info = self.epoch_manager.get_block_info(header.hash())?;
let epoch_first_block_hash = block_info.epoch_first_block();

if *epoch_first_block_hash == CryptoHash::default() {
// This is the genesis epoch. We don't have any fully finalised epoch yet.
return Ok(());
}

let epoch_first_block_info = self.epoch_manager.get_block_info(epoch_first_block_hash)?;
let prev_epoch_last_block_hash = epoch_first_block_info.prev_hash();

if *prev_epoch_last_block_hash == CryptoHash::default() {
// This is the genesis epoch. We don't have any fully finalised epoch yet.
return Ok(());
}
let prev_epoch_last_block_info =
self.epoch_manager.get_block_info(prev_epoch_last_block_hash)?;

if prev_epoch_last_block_info.epoch_id() == epoch_first_block_info.epoch_id() {
// Previous epoch is the genesis epoch. We don't have any fully finalised epoch yet.
return Ok(());
}

// Check that last finalised block is after epoch first block.
// So, that it is in the current epoch.
let last_final_block_hash = header.last_final_block();
if *last_final_block_hash == CryptoHash::default() {
// We didn't finalise any blocks yet. We don't have any fully finalised epoch yet.
return Ok(());
}
let last_final_block_info = self.epoch_manager.get_block_info(last_final_block_hash)?;
if last_final_block_info.epoch_id() != epoch_first_block_info.epoch_id() {
// Last finalised block is in the previous epoch.
// We didn't finalise header with `epoch_sync_data_hash` for the previous epoch yet.
return Ok(());
}
self.save_epoch_sync_info_impl(&prev_epoch_last_block_info, epoch_first_block_hash)
}

/// If the block is the last one in the epoch
/// construct and record `EpochSyncInfo` to `self.chain_store_update`.
#[cfg(feature = "new_epoch_sync")]
fn save_epoch_sync_info(&mut self, last_block_info: &BlockInfo) -> Result<(), Error> {
let epoch_id = last_block_info.epoch_id();
if self.epoch_manager.is_next_block_epoch_start(last_block_info.hash())? {
let mut store_update = self.chain_store_update.store().store_update();
store_update
.set_ser(
DBCol::EpochSyncInfo,
epoch_id.as_ref(),
&self.create_epoch_sync_info(last_block_info)?,
)
.map_err(EpochError::from)?;
self.chain_store_update.merge(store_update);
}
fn save_epoch_sync_info_impl(
&mut self,
last_block_info: &BlockInfo,
next_epoch_first_hash: &CryptoHash,
) -> Result<(), Error> {
let mut store_update = self.chain_store_update.store().store_update();
store_update
.set_ser(
DBCol::EpochSyncInfo,
last_block_info.epoch_id().as_ref(),
&self.create_epoch_sync_info(last_block_info, next_epoch_first_hash)?,
)
.map_err(EpochError::from)?;
self.chain_store_update.merge(store_update);
Ok(())
}

Expand Down Expand Up @@ -5909,6 +5950,7 @@ impl<'a> ChainUpdate<'a> {
}

/// For epoch sync we need to save:
/// - (*) first header of the next epoch (contains `epoch_sync_data_hash` for `EpochInfo` validation)
/// - first header of the epoch
/// - last header of the epoch
/// - prev last header of the epoch
Expand All @@ -5923,6 +5965,7 @@ impl<'a> ChainUpdate<'a> {
fn get_epoch_sync_info_headers(
&self,
last_block_info: &BlockInfo,
next_epoch_first_hash: &CryptoHash,
) -> Result<(HashMap<CryptoHash, BlockHeader>, HashSet<CryptoHash>), Error> {
let mut headers = HashMap::new();
let mut headers_to_save = HashSet::new();
Expand All @@ -5935,6 +5978,7 @@ impl<'a> ChainUpdate<'a> {
Ok(())
};

add_header(next_epoch_first_hash)?;
add_header(last_block_info.epoch_first_block())?;
add_header(last_block_info.hash())?;
add_header(last_block_info.prev_hash())?;
Expand Down Expand Up @@ -5968,11 +6012,16 @@ impl<'a> ChainUpdate<'a> {

/// Data that is necessary to prove Epoch in new Epoch Sync.
#[cfg(feature = "new_epoch_sync")]
fn create_epoch_sync_info(&self, last_block_info: &BlockInfo) -> Result<EpochSyncInfo, Error> {
fn create_epoch_sync_info(
&self,
last_block_info: &BlockInfo,
next_epoch_first_hash: &CryptoHash,
) -> Result<EpochSyncInfo, Error> {
let mut all_block_hashes = self.epoch_manager.get_all_epoch_hashes(last_block_info)?;
all_block_hashes.reverse();

let (headers, headers_to_save) = self.get_epoch_sync_info_headers(last_block_info)?;
let (headers, headers_to_save) =
self.get_epoch_sync_info_headers(last_block_info, next_epoch_first_hash)?;

let epoch_id = last_block_info.epoch_id();
let next_epoch_id = self.epoch_manager.get_next_epoch_id(last_block_info.hash())?;
Expand All @@ -5982,6 +6031,7 @@ impl<'a> ChainUpdate<'a> {
all_block_hashes,
headers,
headers_to_save,
next_epoch_first_hash: *next_epoch_first_hash,
epoch_info: (*self.epoch_manager.get_epoch_info(epoch_id)?).clone(),
next_epoch_info: (*self.epoch_manager.get_epoch_info(&next_epoch_id)?).clone(),
next_next_epoch_info: (*self.epoch_manager.get_epoch_info(&next_next_epoch_id)?)
Expand Down
7 changes: 7 additions & 0 deletions core/primitives/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1244,9 +1244,16 @@ pub mod epoch_sync {
pub all_block_hashes: Vec<CryptoHash>,
/// All headers relevant to epoch sync.
/// Contains epoch headers that need to be saved + supporting headers needed for validation.
/// Probably contains one header from the previous epoch.
/// It refers to `last_final_block` of the first block of the epoch.
/// Also contains first header from the next epoch.
/// It refers to `next_epoch_first_hash`.
pub headers: HashMap<CryptoHash, BlockHeader>,
/// Hashes of headers that need to be validated and saved.
pub headers_to_save: HashSet<CryptoHash>,
/// Hash of the first block of the next epoch.
/// Header of this block contains `epoch_sync_data_hash`.
pub next_epoch_first_hash: CryptoHash,
pub epoch_info: EpochInfo,
pub next_epoch_info: EpochInfo,
pub next_next_epoch_info: EpochInfo,
Expand Down
20 changes: 16 additions & 4 deletions integration-tests/src/tests/client/epoch_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use actix::Actor;
use actix_rt::System;
use futures::{future, FutureExt};
use near_actix_test_utils::run_actix;
use near_chain::ChainStoreAccess;
use near_chain::{ChainGenesis, Provenance};
use near_chain_configs::Genesis;
use near_client::test_utils::TestEnv;
Expand All @@ -18,6 +19,7 @@ use near_primitives::test_utils::create_test_signer;
use near_primitives::transaction::{
Action, DeployContractAction, FunctionCallAction, SignedTransaction,
};
use near_primitives::types::EpochId;
use near_primitives_core::hash::CryptoHash;
use near_primitives_core::types::BlockHeight;
use near_store::Mode::ReadOnly;
Expand Down Expand Up @@ -75,13 +77,13 @@ fn generate_transactions(last_hash: &CryptoHash, h: BlockHeight) -> Vec<SignedTr
}

/// Produce 4 epochs with some transactions.
/// At the end of each epoch check that `EpochSyncInfo` has been recorded.
/// When the first block of the next epoch is finalised check that `EpochSyncInfo` has been recorded.
#[test]
fn test_continuous_epoch_sync_info_population() {
init_test_logger();

let epoch_length = 5;
let max_height = epoch_length * 4 + 1;
let max_height = epoch_length * 4 + 3;

let mut genesis = Genesis::test(vec!["test0".parse().unwrap(), "test1".parse().unwrap()], 1);

Expand All @@ -94,6 +96,7 @@ fn test_continuous_epoch_sync_info_population() {
.build();

let mut last_hash = *env.clients[0].chain.genesis().hash();
let mut last_epoch_id = EpochId::default();

for h in 1..max_height {
for tx in generate_transactions(&last_hash, h) {
Expand All @@ -104,13 +107,22 @@ fn test_continuous_epoch_sync_info_population() {
env.process_block(0, block.clone(), Provenance::PRODUCED);
last_hash = *block.hash();

if env.clients[0].epoch_manager.is_next_block_epoch_start(&last_hash).unwrap() {
let epoch_id = block.header().epoch_id().clone();
let last_final_hash = block.header().last_final_block();
if *last_final_hash == CryptoHash::default() {
continue;
}
let last_final_header =
env.clients[0].chain.store().get_block_header(last_final_hash).unwrap();

if *last_final_header.epoch_id() != last_epoch_id {
let epoch_id = last_epoch_id.clone();

tracing::debug!("Checking epoch: {:?}", &epoch_id);
assert!(env.clients[0].chain.store().get_epoch_sync_info(&epoch_id).is_ok());
tracing::debug!("OK");
}

last_epoch_id = last_final_header.epoch_id().clone();
}
}

Expand Down

0 comments on commit 984c935

Please sign in to comment.