Skip to content

Commit

Permalink
fix: state parts dump monitoring was not checking latest dumped direc…
Browse files Browse the repository at this point in the history
…tory (#10076)

fix the bug that monitoring was checking
prev_epoch_height/prev_epoch_id/shard_id=x instead of
latest_epoch_height/latest_epoch_id/shard_id=x
  • Loading branch information
ppca authored Nov 2, 2023
1 parent d3a0a83 commit b26a15b
Showing 1 changed file with 22 additions and 23 deletions.
45 changes: 22 additions & 23 deletions tools/state-parts-dump-check/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ enum StatePartsDumpCheckStatus {

#[derive(Clone)]
struct DumpCheckIterInfo {
prev_epoch_id: EpochId,
prev_epoch_height: u64,
prev_epoch_state_roots: Vec<CryptoHash>,
epoch_id: EpochId,
epoch_height: u64,
state_roots: Vec<CryptoHash>,
}

fn create_external_connection(
Expand Down Expand Up @@ -284,15 +284,15 @@ fn run_loop_all_shards(
match last_check_status_vec[shard_id] {
Ok(StatePartsDumpCheckStatus::Done { epoch_height }) => {
tracing::info!("last one was done, epoch_height: {}", epoch_height);
if epoch_height >= dump_check_iter_info.prev_epoch_height {
if epoch_height >= dump_check_iter_info.epoch_height {
tracing::info!("current height was already checked. sleeping for 60s.");
sleep(Duration::from_secs(60));
continue;
}

tracing::info!("current height was not already checked, will start checking.");
if dump_check_iter_info.prev_epoch_height > epoch_height + 1 {
tracing::info!("there is a skip between last done epoch at epoch_height: {}, and latest available epoch at {}", epoch_height, dump_check_iter_info.prev_epoch_height);
if dump_check_iter_info.epoch_height > epoch_height + 1 {
tracing::info!("there is a skip between last done epoch at epoch_height: {}, and latest available epoch at {}", epoch_height, dump_check_iter_info.epoch_height);
crate::metrics::STATE_SYNC_DUMP_CHECK_HAS_SKIPPED_EPOCH
.with_label_values(&[&shard_id.to_string(), &chain_id.to_string()])
.set(1);
Expand All @@ -304,8 +304,8 @@ fn run_loop_all_shards(
}
Ok(StatePartsDumpCheckStatus::WaitingForParts { epoch_height }) => {
tracing::info!("last one was waiting, epoch_height: {}", epoch_height);
if dump_check_iter_info.prev_epoch_height > epoch_height {
tracing::info!("last one was never finished. There is a skip between last waiting epoch at epoch_height: {}, and latest available epoch at {}", epoch_height, dump_check_iter_info.prev_epoch_height);
if dump_check_iter_info.epoch_height > epoch_height {
tracing::info!("last one was never finished. There is a skip between last waiting epoch at epoch_height: {}, and latest available epoch at {}", epoch_height, dump_check_iter_info.epoch_height);
crate::metrics::STATE_SYNC_DUMP_CHECK_HAS_SKIPPED_EPOCH
.with_label_values(&[&shard_id.to_string(), &chain_id.to_string()])
.set(1);
Expand Down Expand Up @@ -344,10 +344,10 @@ fn run_loop_all_shards(

run_single_check_with_3_retries(
chain_id,
dump_check_iter_info.prev_epoch_id,
dump_check_iter_info.prev_epoch_height,
dump_check_iter_info.epoch_id,
dump_check_iter_info.epoch_height,
shard_id as u64,
dump_check_iter_info.prev_epoch_state_roots[shard_id],
dump_check_iter_info.state_roots[shard_id],
root_dir,
s3_bucket,
s3_region,
Expand Down Expand Up @@ -470,6 +470,7 @@ async fn run_single_check(

let directory_path =
external_storage_location_directory(&chain_id, &epoch_id, epoch_height, shard_id);
tracing::info!("the storage location for the state parts being checked is: {}", directory_path);
let part_file_names = external.list_state_parts(shard_id, &directory_path).await?;
if part_file_names.is_empty() {
return Ok(StatePartsDumpCheckStatus::WaitingForParts { epoch_height: epoch_height });
Expand Down Expand Up @@ -639,27 +640,25 @@ async fn get_processing_epoch_information(
.await
.or_else(|_| Err(anyhow!("get final block failed")))?;
let latest_epoch_id = latest_block_response.header.epoch_id;
let latest_epoch_response =
rpc_client
.validators_by_epoch_id(EpochId(latest_epoch_id))
.await
.or_else(|_| Err(anyhow!("validators_by_epoch_id for prev_epoch_id failed")))?;
let latest_epoch_height = latest_epoch_response.epoch_height;
let prev_epoch_last_block_response =
get_previous_epoch_last_block_response(rpc_client, latest_epoch_id).await?;
let prev_epoch_id = prev_epoch_last_block_response.header.epoch_id;
let prev_epoch_response = rpc_client
.validators_by_epoch_id(EpochId(prev_epoch_id))
.await
.or_else(|_| Err(anyhow!("validators_by_epoch_id for prev_epoch_id failed")))?;
let prev_epoch_height = prev_epoch_response.epoch_height;
let prev_prev_epoch_last_block_response =
get_previous_epoch_last_block_response(rpc_client, prev_epoch_id).await?;
let shard_ids: Vec<usize> = (0..4).collect();
// state roots ordered by shard_id
let prev_epoch_state_roots: Vec<CryptoHash> = shard_ids
.iter()
.map(|&shard_id| prev_prev_epoch_last_block_response.chunks[shard_id].prev_state_root)
.map(|&shard_id| prev_epoch_last_block_response.chunks[shard_id].prev_state_root)
.collect();

Ok(DumpCheckIterInfo {
prev_epoch_id: EpochId(prev_epoch_id),
prev_epoch_height: prev_epoch_height,
prev_epoch_state_roots: prev_epoch_state_roots,
epoch_id: EpochId(latest_epoch_id),
epoch_height: latest_epoch_height,
state_roots: prev_epoch_state_roots,
})
}

Expand Down

0 comments on commit b26a15b

Please sign in to comment.