From fd93db0c647e434e00f37261e3ee38914354b1c5 Mon Sep 17 00:00:00 2001 From: cchudant Date: Tue, 7 May 2024 04:31:52 -0700 Subject: [PATCH] l2 sync: refactor a bit, and add polling (#101) Co-authored-by: antiyro <74653697+antiyro@users.noreply.github.com> --- CHANGELOG.md | 3 +- crates/client/sync/src/fetch/fetchers.rs | 42 ++-- crates/client/sync/src/fetch/mod.rs | 65 +++++++ crates/client/sync/src/l2.rs | 235 ++++++++++++----------- crates/client/sync/src/lib.rs | 14 +- crates/node/src/commands/run.rs | 7 + 6 files changed, 235 insertions(+), 131 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f204ec3fd..075c541e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,8 @@ -git # Deoxys Changelog +# Deoxys Changelog ## Next release +- feat(l2 sync): polling to get new blocks once sync has caught up with the chain - perf: store key - fix: sync, remove `unwrap` in storage - fix(classes): Fixed classes on the RPC level by adding ordering and complete deserialisation diff --git a/crates/client/sync/src/fetch/fetchers.rs b/crates/client/sync/src/fetch/fetchers.rs index c72564dbc..9c9b458d9 100644 --- a/crates/client/sync/src/fetch/fetchers.rs +++ b/crates/client/sync/src/fetch/fetchers.rs @@ -41,6 +41,8 @@ pub struct FetchConfig { pub verify: bool, /// The optional API_KEY to avoid rate limiting from the sequencer gateway. pub api_key: Option, + /// Polling interval + pub pending_polling_interval: Duration, } pub async fn fetch_block(client: &SequencerGatewayProvider, block_number: u64) -> Result { @@ -49,10 +51,17 @@ pub async fn fetch_block(client: &SequencerGatewayProvider, block_number: u64) - Ok(block) } +pub struct L2BlockAndUpdates { + pub block_n: u64, + pub block: p::Block, + pub state_update: StateUpdate, + pub class_update: Vec, +} + pub async fn fetch_block_and_updates( block_n: u64, provider: Arc, -) -> Result<(p::Block, StateUpdate, Vec), L2SyncError> { +) -> Result { const MAX_RETRY: u32 = 15; let mut attempt = 0; let base_delay = Duration::from_secs(1); @@ -64,23 +73,32 @@ pub async fn fetch_block_and_updates( let (block, state_update) = tokio::join!(block, state_update); log::debug!("fetch_block_and_updates: done {block_n}"); - match block.as_ref().err().or(state_update.as_ref().err()) { - Some(L2SyncError::Provider(ProviderError::RateLimited)) => { - log::info!("The fetching process has been rate limited"); - log::debug!("The fetching process has been rate limited, retrying in {:?} seconds", base_delay); + match (block, state_update) { + (Err(L2SyncError::Provider(err)), _) | (_, Err(L2SyncError::Provider(err))) => { + // Exponential backoff with a cap on the delay + let delay = base_delay * 2_u32.pow(attempt - 1).min(6); // Cap to prevent overly long delays + match err { + ProviderError::RateLimited => { + log::info!("The fetching process has been rate limited, retrying in {:?} seconds", delay) + } + // sometimes the sequencer just errors out when trying to rate limit us (??) so retry in that case + _ => log::info!("The provider has returned an error, retrying in {:?} seconds", delay), + } attempt += 1; if attempt >= MAX_RETRY { - return Err(L2SyncError::FetchRetryLimit); + return Err(if matches!(err, ProviderError::RateLimited) { + L2SyncError::FetchRetryLimit + } else { + L2SyncError::Provider(err) + }); } - // Exponential backoff with a cap on the delay - let delay = base_delay * 2_u32.pow(attempt - 1).min(6); // Cap to prevent overly long delays tokio::time::sleep(delay).await; } - _ => { - let (block, (state_update, class_update)) = (block?, state_update?); - return Ok((block, state_update, class_update)); + (Err(err), _) | (_, Err(err)) => return Err(err), + (Ok(block), Ok((state_update, class_update))) => { + return Ok(L2BlockAndUpdates { block_n, block, state_update, class_update }); } - } + }; } } diff --git a/crates/client/sync/src/fetch/mod.rs b/crates/client/sync/src/fetch/mod.rs index 5ff58298d..ff08fb2b0 100644 --- a/crates/client/sync/src/fetch/mod.rs +++ b/crates/client/sync/src/fetch/mod.rs @@ -1 +1,66 @@ +use std::pin::pin; +use std::sync::Arc; +use std::time::Duration; + +use futures::prelude::*; +use starknet_core::types::StarknetError; +use starknet_providers::{ProviderError, SequencerGatewayProvider}; +use tokio::sync::mpsc; + +use self::fetchers::L2BlockAndUpdates; +use crate::fetch::fetchers::fetch_block_and_updates; +use crate::l2::L2SyncError; + pub mod fetchers; + +pub async fn l2_fetch_task( + first_block: u64, + fetch_stream_sender: mpsc::Sender, + provider: Arc, + pending_polling_interval: Duration, +) -> Result<(), L2SyncError> { + // First, catch up with the chain + + let mut last_block = { + // Fetch blocks and updates in parallel one time before looping + let fetch_stream = (first_block..).map(|block_n| { + let provider = Arc::clone(&provider); + async move { (block_n, fetch_block_and_updates(block_n, provider).await) } + }); + + // Have 10 fetches in parallel at once, using futures Buffered + let mut fetch_stream = stream::iter(fetch_stream).buffered(10); + loop { + let (block_n, val) = pin!(fetch_stream.next()).await.unwrap(); // UNWRAP: stream is infinite + log::debug!("got {:?}", block_n); + + match val { + Err(L2SyncError::Provider(ProviderError::StarknetError(StarknetError::BlockNotFound))) => { + break block_n; + } + val => fetch_stream_sender.send(val?).await.expect("reciever task is closed"), + } + } + }; + + log::debug!("We caught up with the chain yay"); + + // Polling + + let mut interval = tokio::time::interval(pending_polling_interval); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + interval.tick().await; + + loop { + match fetch_block_and_updates(last_block + 1, Arc::clone(&provider)).await { + Err(L2SyncError::Provider(ProviderError::StarknetError(StarknetError::BlockNotFound))) => { + break; + } + val => fetch_stream_sender.send(val?).await.expect("reciever task is closed"), + } + + last_block += 1; + } + } +} diff --git a/crates/client/sync/src/l2.rs b/crates/client/sync/src/l2.rs index 357ea3ac9..5d632c139 100644 --- a/crates/client/sync/src/l2.rs +++ b/crates/client/sync/src/l2.rs @@ -3,7 +3,6 @@ use std::pin::pin; use std::str::FromStr; use std::sync::{Arc, RwLock}; -use futures::prelude::*; use lazy_static::lazy_static; use mc_db::storage_handler::primitives::contract_class::ClassUpdateWrapper; use mc_db::storage_updates::{store_class_update, store_key_update, store_state_update}; @@ -15,7 +14,7 @@ use serde::Deserialize; use sp_blockchain::HeaderBackend; use sp_core::H256; use starknet_api::hash::{StarkFelt, StarkHash}; -use starknet_core::types::{PendingStateUpdate, StarknetError, StateUpdate}; +use starknet_core::types::{PendingStateUpdate, StateUpdate}; use starknet_ff::FieldElement; use starknet_providers::sequencer::models::BlockId; use starknet_providers::{ProviderError, SequencerGatewayProvider}; @@ -25,10 +24,13 @@ use tokio::sync::mpsc::Sender; use tokio::time::Duration; use crate::commitments::lib::{build_commitment_state_diff, update_state_root}; -use crate::fetch::fetchers::fetch_block_and_updates; +use crate::fetch::fetchers::L2BlockAndUpdates; +use crate::fetch::l2_fetch_task; use crate::l1::ETHEREUM_STATE_UPDATE; use crate::CommandSink; +/// Prefer this compared to [`tokio::spawn_blocking`], as spawn_blocking creates new OS threads and +/// we don't really need that async fn spawn_compute(func: F) -> R where F: FnOnce() -> R + Send + 'static, @@ -126,33 +128,130 @@ pub struct SenderConfig { pub command_sink: CommandSink, } +async fn l2_verify_and_apply_task( + mut fetch_stream_receiver: mpsc::Receiver, + block_sender: Sender, + mut command_sink: CommandSink, + verify: bool, +) -> Result<(), L2SyncError> { + let block_sender = Arc::new(block_sender); + + let mut last_block_hash = None; + + while let Some(L2BlockAndUpdates { block_n, block, state_update, class_update }) = + pin!(fetch_stream_receiver.recv()).await + { + let (state_update, block_conv) = { + let state_update = Arc::new(state_update); + let state_update_1 = Arc::clone(&state_update); + + let block_conv = spawn_compute(move || { + let convert_block = |block| { + let start = std::time::Instant::now(); + let block_conv = crate::convert::convert_block_sync(block); + log::debug!("convert::convert_block_sync {}: {:?}", block_n, std::time::Instant::now() - start); + block_conv + }; + let ver_l2 = || { + let start = std::time::Instant::now(); + let state_root = verify_l2(block_n, &state_update); + log::debug!("verify_l2: {:?}", std::time::Instant::now() - start); + state_root + }; + + if verify { + let (state_root, block_conv) = rayon::join(ver_l2, || convert_block(block)); + if (block_conv.header().global_state_root) != state_root { + log::info!( + "❗ Verified state: {} doesn't match fetched state: {}", + state_root, + block_conv.header().global_state_root + ); + } + block_conv + } else { + convert_block(block) + } + }) + .await; + + // UNWRAP: we need a 'static future as we are spawning tokio tasks further down the line + // this is a hack to achieve that, we put the update in an arc and then unwrap it at the end + // this will not panic as the Arc should not be aliased. + let state_update = Arc::try_unwrap(state_update_1).unwrap(); + (state_update, block_conv) + }; + + let block_sender = Arc::clone(&block_sender); + let storage_diffs = state_update.state_diff.storage_diffs.clone(); + tokio::join!( + async move { + block_sender.send(block_conv).await.expect("block reciever channel is closed"); + }, + async { + let start = std::time::Instant::now(); + if store_state_update(block_n, state_update).await.is_err() { + log::info!("❗ Failed to store state update for block {block_n}"); + }; + log::debug!("end store_state {}: {:?}", block_n, std::time::Instant::now() - start); + }, + async { + let start = std::time::Instant::now(); + if store_class_update(block_n, ClassUpdateWrapper(class_update)).await.is_err() { + log::info!("❗ Failed to store class update for block {block_n}"); + }; + log::debug!("end store_class {}: {:?}", block_n, std::time::Instant::now() - start); + }, + async { + let start = std::time::Instant::now(); + if store_key_update(block_n, &storage_diffs).await.is_err() { + log::info!("❗ Failed to store key update for block {block_n}"); + }; + log::debug!("end store_key {}: {:?}", block_n, std::time::Instant::now() - start); + }, + async { + let start = std::time::Instant::now(); + create_block(&mut command_sink, &mut last_block_hash).await.expect("creating block"); + log::debug!("end create_block {}: {:?}", block_n, std::time::Instant::now() - start); + } + ); + + // compact DB every 1k blocks + if block_n % 1000 == 0 { + DeoxysBackend::compact(); + } + } + + Ok(()) +} + /// Spawns workers to fetch blocks and state updates from the feeder. /// `n_blocks` is optionally the total number of blocks to sync, for debugging/benchmark purposes. pub async fn sync( block_sender: Sender, - mut command_sink: CommandSink, + command_sink: CommandSink, provider: SequencerGatewayProvider, first_block: u64, verify: bool, client: Arc, -) where + pending_polling_interval: Duration, +) -> Result<(), L2SyncError> +where C: HeaderBackend + 'static, { + let (fetch_stream_sender, fetch_stream_receiver) = mpsc::channel(10); let provider = Arc::new(provider); - let mut last_block_hash = None; - - // Fetch blocks and updates in parallel one time before looping - let fetch_stream = (first_block..).map(|block_n| { - let provider = Arc::clone(&provider); - async move { tokio::spawn(fetch_block_and_updates(block_n, provider)).await.expect("tokio join error") } - }); - // Have 10 fetches in parallel at once, using futures Buffered - let fetch_stream = stream::iter(fetch_stream).buffered(10); - let (fetch_stream_sender, mut fetch_stream_receiver) = mpsc::channel(10); + // [Fetch task] ==new blocks and updates=> [Verification and apply task] + // Fetch task does parallel fetching, verification is sequential and does all the compute and db + // updates + // TODO: make it cancel-safe, tasks outlive their parent here when error occurs here + // we are using separate tasks so that fetches don't get clogged up if by any chance the verify task + // starves the tokio worker tokio::select!( // update highest block hash and number, update pending block and state update + // TODO: remove _ = async { let mut interval = tokio::time::interval(Duration::from_secs(5)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -162,110 +261,14 @@ pub async fn sync( log::error!("Failed to update highest block hash and number: {}", e); } } - } => {}, + } => Ok(()), // fetch blocks and updates in parallel - _ = async { - fetch_stream.for_each(|val| async { - fetch_stream_sender.send(val).await.expect("receiver is closed"); - }).await; - - drop(fetch_stream_sender); // dropping the channel makes the recieving task stop once the queue is empty. - - std::future::pending().await - } => {}, + res = tokio::spawn(l2_fetch_task(first_block, fetch_stream_sender, Arc::clone(&provider), pending_polling_interval)) => res.expect("join error"), // apply blocks and updates sequentially - _ = async { - let mut block_n = first_block; - let block_sender = Arc::new(block_sender); - - while let Some(val) = pin!(fetch_stream_receiver.recv()).await { - if matches!(val, Err(L2SyncError::Provider(ProviderError::StarknetError(StarknetError::BlockNotFound)))) { - break; - } - - let (block, state_update, class_update) = val.expect("fetching block"); - - let (state_update, block_conv) = { - let state_update = Arc::new(state_update); - let state_update_1 = Arc::clone(&state_update); - - let block_conv = spawn_compute(move || { - let convert_block = |block| { - let start = std::time::Instant::now(); - let block_conv = crate::convert::convert_block_sync(block); - log::debug!("convert::convert_block_sync {}: {:?}",block_n, std::time::Instant::now() - start); - block_conv - }; - let ver_l2 = || { - let start = std::time::Instant::now(); - let state_root = verify_l2(block_n, &state_update); - log::debug!("verify_l2: {:?}", std::time::Instant::now() - start); - state_root - }; - - if verify { - let (state_root, block_conv) = rayon::join(ver_l2, || convert_block(block)); - if (block_conv.header().global_state_root) != state_root { - log::info!( - "❗ Verified state: {} doesn't match fetched state: {}", - state_root, - block_conv.header().global_state_root - ); - } - block_conv - } else { - convert_block(block) - } - }) - .await; - - (Arc::try_unwrap(state_update_1).expect("arc should not be aliased"), block_conv) - }; - - let block_sender = Arc::clone(&block_sender); - let storage_diffs = state_update.state_diff.storage_diffs.clone(); - tokio::join!( - async move { - block_sender.send(block_conv).await.expect("block reciever channel is closed"); - }, - async { - let start = std::time::Instant::now(); - if store_state_update(block_n, state_update).await.is_err() { - log::info!("❗ Failed to store state update for block {block_n}"); - }; - log::debug!("end store_state {}: {:?}",block_n, std::time::Instant::now() - start); - }, - async { - let start = std::time::Instant::now(); - if store_class_update(block_n, ClassUpdateWrapper(class_update)).await.is_err() { - log::info!("❗ Failed to store class update for block {block_n}"); - }; - log::debug!("end store_class {}: {:?}", block_n, std::time::Instant::now() - start); - }, - async { - let start = std::time::Instant::now(); - if store_key_update(block_n, &storage_diffs).await.is_err() { - log::info!("❗ Failed to store key update for block {block_n}"); - }; - log::debug!("end store_key {}: {:?}", block_n, std::time::Instant::now() - start); - }, - async { - let start = std::time::Instant::now(); - create_block(&mut command_sink, &mut last_block_hash).await.expect("creating block"); - log::debug!("end create_block {}: {:?}", block_n, std::time::Instant::now() - start); - } - ); - block_n += 1; + res = tokio::spawn(l2_verify_and_apply_task(fetch_stream_receiver, block_sender, command_sink, verify)) => res.expect("join error"), + )?; - // compact DB every 1k blocks - if block_n % 1000 == 0 { - DeoxysBackend::compact(); - } - } - } => {}, - ); - - log::debug!("L2 sync finished :)"); + Ok(()) } /// Notifies the consensus engine that a new block should be created. diff --git a/crates/client/sync/src/lib.rs b/crates/client/sync/src/lib.rs index 15cb1f888..efc8b454f 100644 --- a/crates/client/sync/src/lib.rs +++ b/crates/client/sync/src/lib.rs @@ -67,9 +67,19 @@ pub mod starknet_sync_worker { verify_l2(0, &state_update); } - let _ = tokio::join!( + let (_, l2_res) = tokio::join!( l1::sync(l1_url.clone()), - l2::sync(block_sender, command_sink, provider, starting_block.into(), fetch_config.verify, client) + l2::sync( + block_sender, + command_sink, + provider, + starting_block.into(), + fetch_config.verify, + client, + fetch_config.pending_polling_interval + ) ); + + l2_res.unwrap(); // TODO: error handling } } diff --git a/crates/node/src/commands/run.rs b/crates/node/src/commands/run.rs index bc1e60df0..8b5af4eaa 100644 --- a/crates/node/src/commands/run.rs +++ b/crates/node/src/commands/run.rs @@ -1,5 +1,6 @@ use std::path::PathBuf; use std::result::Result as StdResult; +use std::time::Duration; use deoxys_runtime::SealingMode; use mc_sync::fetch::fetchers::{fetch_apply_genesis_block, FetchConfig}; @@ -90,6 +91,7 @@ impl NetworkType { l1_core_address, verify: true, api_key: None, + pending_polling_interval: Duration::from_secs(2), } } } @@ -143,6 +145,10 @@ pub struct ExtendedRunCmd { #[clap(long)] pub gateway_key: Option, + /// Polling interval, in seconds. + #[clap(long, default_value = "2")] + pub pending_polling_interval: u64, + /// A flag to run the TUI dashboard #[cfg(feature = "tui")] #[clap(long)] @@ -187,6 +193,7 @@ pub fn run_node(mut cli: Cli) -> Result<()> { fetch_block_config.sound = cli.run.sound; fetch_block_config.verify = !cli.run.disable_root; fetch_block_config.api_key = cli.run.gateway_key.clone(); + fetch_block_config.pending_polling_interval = Duration::from_secs(cli.run.pending_polling_interval); update_config(&fetch_block_config); let genesis_block = fetch_apply_genesis_block(fetch_block_config.clone()).await.unwrap();