diff --git a/Cargo.toml b/Cargo.toml index 0009b1cac..bc1b58e19 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -136,6 +136,7 @@ tycho-util = { path = "./util", version = "0.1.4" } [patch.crates-io] weedb = { version = "0.3.8", git = "https://github.com/broxus/weedb.git", branch = "next-rocksdb" } +everscale-types = { git = "https://github.com/broxus/everscale-types.git", rev = "8ea6b16e81f1ffd88006263327004b891b5ef47c"} [workspace.lints.rust] future_incompatible = "warn" @@ -241,7 +242,4 @@ opt-level = 3 [profile.dev.package.hashbrown] opt-level = 3 [profile.dev.package."*"] -opt-level = 1 - -[patch.crates-io] -everscale-types = { git = "https://github.com/broxus/everscale-types.git", rev = "8ea6b16e81f1ffd88006263327004b891b5ef47c"} \ No newline at end of file +opt-level = 1 \ No newline at end of file diff --git a/block-util/src/block/block_stuff.rs b/block-util/src/block/block_stuff.rs index ea0683747..a0bae6568 100644 --- a/block-util/src/block/block_stuff.rs +++ b/block-util/src/block/block_stuff.rs @@ -24,7 +24,7 @@ impl BlockStuff { pub const BOOT_OFFSET: Duration = Duration::from_secs(12 * 3600); pub fn compute_is_persistent(block_utime: u32, prev_utime: u32) -> bool { - block_utime >> 10 != prev_utime >> 10 + block_utime >> 17 != prev_utime >> 17 } pub fn can_use_for_boot(block_utime: u32, now_utime: u32) -> bool { diff --git a/collator/src/collator/do_collate/mod.rs b/collator/src/collator/do_collate/mod.rs index 134976eae..3f34bc6da 100644 --- a/collator/src/collator/do_collate/mod.rs +++ b/collator/src/collator/do_collate/mod.rs @@ -1272,7 +1272,7 @@ impl CollatorStdImpl { read_ext_msgs={}, read_int_msgs={}, \ read_new_msgs_from_iterator={}, inserted_new_msgs_to_iterator={} has_unprocessed_messages={}, \ total_execute_msgs_time_mc={}, \ - wu_used_for_prepare_msgs_groups={}, wu_used_for_execute: {}, wu_used_for_finalize: {}", + wu_used_for_prepare_msgs_groups={}, wu_used_for_execute: {}, wu_used_for_finalize: {}, diffs_tail_len: {}", block_id, collation_data.start_lt, collation_data.next_lt, collation_data.execute_count_all, collation_data.execute_count_ext, collation_data.ext_msgs_error_count, @@ -1283,7 +1283,7 @@ impl CollatorStdImpl { collation_data.read_ext_msgs_count, collation_data.read_int_msgs_from_iterator_count, collation_data.read_new_msgs_from_iterator_count, collation_data.inserted_new_msgs_to_iterator_count, final_result.has_unprocessed_messages, collation_data.total_execute_msgs_time_mc, - execute_result.prepare_groups_wu_total, execute_result.execute_groups_wu_total, finalize_wu_total + execute_result.prepare_groups_wu_total, execute_result.execute_groups_wu_total, finalize_wu_total, collation_data.diff_tail_len, ); tracing::debug!( diff --git a/core/src/block_strider/subscriber/gc_subscriber.rs b/core/src/block_strider/subscriber/gc_subscriber.rs index 5435df095..ec84ee810 100644 --- a/core/src/block_strider/subscriber/gc_subscriber.rs +++ b/core/src/block_strider/subscriber/gc_subscriber.rs @@ -14,6 +14,7 @@ use tycho_block_util::block::BlockStuff; use tycho_storage::{BlocksGcType, Storage}; use tycho_util::metrics::HistogramGuard; +use crate::block_strider::subscriber::find_longest_diffs_tail; use crate::block_strider::{ BlockSubscriber, BlockSubscriberContext, StateSubscriber, StateSubscriberContext, }; @@ -254,7 +255,19 @@ impl GcSubscriber { // TODO: Must be in sync with the largest possible archive size (in mc blocks). const MIN_SAFE_DISTANCE: u32 = 100; - let safe_distance = std::cmp::max(safe_distance, MIN_SAFE_DISTANCE); + let tail_len = find_longest_diffs_tail(tick.mc_block_id, &storage).await; + + let tail_len = tail_len.unwrap_or_else(|e| { + tracing::error!(?e, "failed to find longest diffs tail"); + 0 + }) as u32; + + tracing::info!(tail_len, "found longest diffs tail"); + + let safe_distance = [safe_distance, MIN_SAFE_DISTANCE, tail_len + 1] + .into_iter() + .max() + .unwrap(); // Compute the target masterchain block seqno let target_seqno = match tick.mc_block_id.seqno.checked_sub(safe_distance) { @@ -399,6 +412,7 @@ impl GcSubscriber { tracing::info!("starting GC for target seqno: {}", target_seqno); let hist = HistogramGuard::begin("tycho_gc_states_time"); + if let Err(e) = storage .shard_state_storage() .remove_outdated_states(target_seqno) diff --git a/core/src/block_strider/subscriber/mod.rs b/core/src/block_strider/subscriber/mod.rs index 2491e49f1..bb9a4a973 100644 --- a/core/src/block_strider/subscriber/mod.rs +++ b/core/src/block_strider/subscriber/mod.rs @@ -7,6 +7,7 @@ use futures_util::future::{self, BoxFuture}; use tycho_block_util::archive::ArchiveData; use tycho_block_util::block::BlockStuff; use tycho_block_util::state::ShardStateStuff; +use tycho_storage::{BlockHandle, Storage}; pub use self::futures::{OptionHandleFut, OptionPrepareFut}; pub use self::gc_subscriber::{GcSubscriber, ManualGcTrigger}; @@ -420,3 +421,59 @@ pub mod test { } } } + +pub async fn find_longest_diffs_tail(mc_block: BlockId, storage: &Storage) -> Result { + let mc_block_stuff = load_mc_block_stuff(mc_block, storage).await?; + + let shard_block_handles = load_shard_block_handles(&mc_block_stuff, storage).await?; + + let mut max_tail_len = None; + + for block_handle in shard_block_handles { + let block = storage + .block_storage() + .load_block_data(&block_handle) + .await?; + let tail_len = block.block().out_msg_queue_updates.tail_len as usize; + + max_tail_len = Some(max_tail_len.map_or(tail_len, |max: usize| max.max(tail_len))); + } + + let mc_tail_len = mc_block_stuff.block().out_msg_queue_updates.tail_len as usize; + let result_tail_len = max_tail_len.map_or(mc_tail_len, |max: usize| mc_tail_len.max(max)); + + Ok(result_tail_len) +} + +async fn load_mc_block_stuff(mc_seqno: BlockId, storage: &Storage) -> Result { + let mc_handle = storage.block_handle_storage().load_handle(&mc_seqno); + if let Some(mc_handle) = mc_handle { + let mc_block_stuff = storage.block_storage().load_block_data(&mc_handle).await?; + Ok(mc_block_stuff) + } else { + anyhow::bail!("mc block handle not found: {mc_seqno}"); + } +} + +async fn load_shard_block_handles( + mc_block_stuff: &BlockStuff, + storage: &Storage, +) -> Result> { + let block_handles = storage.block_handle_storage(); + let mut shard_block_handles = Vec::new(); + + for entry in mc_block_stuff.load_custom()?.shards.latest_blocks() { + let block_id = entry?; + if block_id.seqno == 0 { + continue; + } + + let Some(block_handle) = block_handles.load_handle(&block_id) else { + anyhow::bail!("top shard block handle not found: {block_id}"); + }; + + shard_block_handles.push(block_handle); + } + + Ok(shard_block_handles) +} diff --git a/storage/src/store/persistent_state/mod.rs b/storage/src/store/persistent_state/mod.rs index e414d1fc2..260e4a82f 100644 --- a/storage/src/store/persistent_state/mod.rs +++ b/storage/src/store/persistent_state/mod.rs @@ -436,6 +436,10 @@ impl PersistentStateStorage { messages.push(queue_diff.zip(&out_messages)); queue_diffs.push(queue_diff.diff().clone()); + if tail_len == 1 { + break; + } + let prev_block_id = match top_block_info.load_prev_ref()? { PrevBlockRef::Single(block_ref) => block_ref.as_block_id(shard_ident), PrevBlockRef::AfterMerge { .. } => anyhow::bail!("merge not supported yet"),