From 4c5f02eb0f01719cc954cb81288e9e6a6e4f2c35 Mon Sep 17 00:00:00 2001 From: cchudant Date: Thu, 14 Mar 2024 17:16:59 +0000 Subject: [PATCH 1/6] l2 sync: parallel fetching --- Cargo.lock | 5 +- Cargo.toml | 4 +- crates/client/sync/Cargo.toml | 1 + crates/client/sync/src/l2.rs | 325 +++++++++++++++----------------- crates/node/src/commands/run.rs | 4 +- 5 files changed, 159 insertions(+), 180 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 83049b015..615f0a0fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4415,7 +4415,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite 0.2.13", - "socket2 0.4.10", + "socket2 0.5.5", "tokio", "tower-service", "tracing", @@ -6120,6 +6120,7 @@ dependencies = [ "starknet-providers", "starknet-types-core", "starknet_api", + "thiserror", "tokio", "tokio-tungstenite", "url", @@ -12336,7 +12337,7 @@ checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ "cfg-if", "digest 0.10.7", - "rand 0.7.3", + "rand 0.8.5", "static_assertions", ] diff --git a/Cargo.toml b/Cargo.toml index 524d4f398..dd9a3dcf7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -223,7 +223,9 @@ mp-block = { path = "crates/primitives/block", default-features = false } mp-chain-id = { path = "crates/primitives/chain-id", default-features = false } mp-contract = { path = "crates/primitives/contract", default-features = false } mp-convert = { path = "crates/primitives/convert", default-features = false } -mp-digest-log = { path = "crates/primitives/digest-log", default-features = false } +mp-digest-log = { path = "crates/primitives/digest-log", default-features = false, features = [ + "std", +] } mp-fee = { path = "crates/primitives/fee", default-features = false } mp-felt = { path = "crates/primitives/felt", default-features = false } mp-genesis-config = { path = "crates/primitives/genesis-config", default-features = false } diff --git a/crates/client/sync/Cargo.toml b/crates/client/sync/Cargo.toml index d95859dda..aa8d78ccc 100644 --- a/crates/client/sync/Cargo.toml +++ b/crates/client/sync/Cargo.toml @@ -77,6 +77,7 @@ starknet-types-core = { workspace = true, default-features = false, features = [ "hash", "parity-scale-codec", ] } +thiserror.workspace = true [dev-dependencies] # test_utils = { path = "./test_utils" } diff --git a/crates/client/sync/src/l2.rs b/crates/client/sync/src/l2.rs index 0a8647a5f..a2911643f 100644 --- a/crates/client/sync/src/l2.rs +++ b/crates/client/sync/src/l2.rs @@ -1,11 +1,12 @@ //! Contains the code required to fetch data from the feeder efficiently. +use std::pin::pin; use std::sync::{Arc, Mutex, RwLock}; -use std::time::Duration; use bitvec::order::Msb0; use bitvec::view::AsBits; use bonsai_trie::id::BasicId; use bonsai_trie::BonsaiStorage; +use futures::{stream, StreamExt}; use itertools::Itertools; use lazy_static::lazy_static; use mc_db::bonsai_db::BonsaiDb; @@ -25,10 +26,13 @@ use starknet_api::api_core::ClassHash; use starknet_api::hash::StarkHash; use starknet_core::types::BlockId as BlockIdCore; use starknet_ff::FieldElement; +use starknet_providers::sequencer::models as p; use starknet_providers::sequencer::models::state_update::{DeclaredContract, DeployedContract}; use starknet_providers::sequencer::models::{BlockId, StateUpdate}; -use starknet_providers::{Provider, SequencerGatewayProvider}; +use starknet_providers::{Provider, ProviderError, SequencerGatewayProvider}; use starknet_types_core::hash::{Pedersen, Poseidon}; +use thiserror::Error; +use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; use tokio::task::JoinSet; @@ -36,6 +40,15 @@ use crate::commitments::lib::{build_commitment_state_diff, update_state_root}; use crate::utility::update_config; use crate::CommandSink; +// TODO: add more error variants, which are more explicit +#[derive(Error, Debug)] +pub enum L2SyncError { + #[error("provider error")] + Provider(#[from] ProviderError), + #[error("fetch retry limit exceeded")] + FetchRetryLimit, +} + /// Contains the Starknet verified state on L2 #[derive(Debug, Clone, Deserialize)] pub struct L2StateUpdate { @@ -92,6 +105,39 @@ pub struct SenderConfig { pub overrides: Arc, OpaqueExtrinsic>>>, } +async fn fetch_block_and_updates( + block_n: u64, + provider: &SequencerGatewayProvider, + overrides: &Arc, OpaqueExtrinsic>>>, + client: &C, +) -> Result<(u64, p::Block, StateUpdate, Vec), L2SyncError> +where + B: BlockT, + C: HeaderBackend, +{ + // retry loop + const MAX_RETRY: usize = 15; + for _ in 0..MAX_RETRY { + log::debug!("fetch_block_and_updates {}", block_n); + let block = fetch_block(&provider, block_n); + let state_update = fetch_state_and_class_update(provider, block_n, overrides, client); + let (block, state_update) = tokio::join!(block, state_update); + log::debug!("fetch_block_and_updates: done {block_n}"); + + if matches!( + block.as_ref().err().or(state_update.as_ref().err()), + Some(L2SyncError::Provider(ProviderError::RateLimited)) + ) { + continue; // retry api call + } + let (block, (state_update, class_update)) = (block?, state_update?); + + return Ok((block_n, block, state_update, class_update)); + } + + Err(L2SyncError::FetchRetryLimit) +} + /// Spawns workers to fetch blocks and state updates from the feeder. pub async fn sync( mut sender_config: SenderConfig, @@ -111,105 +157,77 @@ pub async fn sync( fetch_config.feeder_gateway.clone(), fetch_config.chain_id, ); - let mut current_block_number = first_block; let mut last_block_hash = None; - let mut got_block = false; - let mut got_state_update = false; - let mut last_update_highest_block = tokio::time::Instant::now() - Duration::from_secs(20); // TODO: move this somewhere else - if current_block_number == 1 { - let _ = fetch_genesis_state_update( - &provider, - Arc::clone(overrides), - Arc::clone(bonsai_contract), - Arc::clone(bonsai_class), - ) - .await; + if first_block == 1 { + let state_update = + provider.get_state_update(BlockId::Number(0)).await.expect("getting state update for genesis block"); + verify_l2(0, &state_update, overrides, bonsai_contract, bonsai_class, None).expect("verifying genesis block"); } - loop { - if last_update_highest_block.elapsed() > Duration::from_secs(20) { - last_update_highest_block = tokio::time::Instant::now(); - if let Err(e) = update_highest_block_hash_and_number(&provider).await { - eprintln!("Failed to update highest block hash and number: {}", e); + let fetch_stream = + (first_block..).map(|block_n| fetch_block_and_updates(block_n, &provider, overrides, client.as_ref())); + // Have 10 fetches in parallel at once, using futures Buffered + let mut fetch_stream = stream::iter(fetch_stream).buffered(10); + let (fetch_stream_sender, mut fetch_stream_receiver) = mpsc::channel(1); + + tokio::join!( + // fetch blocks and updates in parallel + async { + // FIXME: make it cleaner by replacing this with tokio_util::sync::PollSender to make the channel a + // Sink and have the fetch Stream pipe into it + while let Some(val) = pin!(fetch_stream.next()).await { + fetch_stream_sender.send(val).await.expect("receiver is closed") } - } - let (block, state_update) = match (got_block, got_state_update) { - (false, false) => { - let block = fetch_block(&provider, block_sender, current_block_number); - let state_update = fetch_state_and_class_update( - &provider, - current_block_number, - Arc::clone(overrides), - Arc::clone(bonsai_contract), - Arc::clone(bonsai_class), - state_update_sender, - class_sender, - client.as_ref(), + }, + // apply blocks and updates sequentially + async { + while let Some(val) = pin!(fetch_stream_receiver.recv()).await { + let (block_n, block, state_update, class_update) = val.expect("fetching block"); + + let block_hash = block_hash_substrate(client.as_ref(), block_n - 1); + + verify_l2(block_n, &state_update, overrides, bonsai_contract, bonsai_class, block_hash) + .expect("verifying block"); + + tokio::join!( + async { + let block_conv = crate::convert::block(block).await; + block_sender.send(block_conv).await.expect("block reciever channel is closed"); + }, + async { + // Now send state_update, which moves it. This will be received + // by QueryBlockConsensusDataProvider in deoxys/crates/node/src/service.rs + state_update_sender + .send(StateUpdateWrapper::from(state_update)) + .await + .expect("state updater is not running"); + }, + async { + // do the same to class update + class_sender + .send(ClassUpdateWrapper(class_update)) + .await + .expect("class updater is not running"); + } ); - tokio::join!(block, state_update) - } - (false, true) => (fetch_block(&provider, block_sender, current_block_number).await, Ok(())), - (true, false) => ( - Ok(()), - fetch_state_and_class_update( - &provider, - current_block_number, - Arc::clone(overrides), - Arc::clone(bonsai_contract), - Arc::clone(bonsai_class), - state_update_sender, - class_sender, - client.as_ref(), - ) - .await, - ), - (true, true) => unreachable!(), - }; - - got_block = got_block || block.is_ok(); - got_state_update = got_state_update || state_update.is_ok(); - - match (block, state_update) { - (Ok(()), Ok(())) => match create_block(command_sink, &mut last_block_hash).await { - Ok(()) => { - current_block_number += 1; - got_block = false; - got_state_update = false; - } - Err(e) => { - eprintln!("Failed to create block: {}", e); - return; - } - }, - (Err(a), Ok(())) => { - eprintln!("Failed to fetch block {}: {}", current_block_number, a); - tokio::time::sleep(Duration::from_secs(10)).await; - } - (_, Err(b)) => { - eprintln!("Failed to fetch state update {}: {}", current_block_number, b); - tokio::time::sleep(Duration::from_secs(10)).await; + + create_block(command_sink, &mut last_block_hash).await.expect("creating block") } } - } + ); } -async fn fetch_block( - client: &SequencerGatewayProvider, - block_sender: &Sender, - block_number: u64, -) -> Result<(), String> { - let block = - client.get_block(BlockId::Number(block_number)).await.map_err(|e| format!("failed to get block: {e}"))?; - - let block_conv = crate::convert::block(block).await; - block_sender.send(block_conv).await.map_err(|e| format!("failed to dispatch block: {e}"))?; +async fn fetch_block(client: &SequencerGatewayProvider, block_number: u64) -> Result { + let block = client.get_block(BlockId::Number(block_number)).await?; - Ok(()) + Ok(block) } -pub async fn fetch_genesis_block(config: FetchConfig) -> Result { +// FIXME: I don't know why this is required by the CLI,, this is not used within the sync +// procedure. +pub async fn fetch_apply_genesis_block(config: FetchConfig) -> Result { let client = SequencerGatewayProvider::new(config.gateway.clone(), config.feeder_gateway.clone(), config.chain_id); let block = client.get_block(BlockId::Number(0)).await.map_err(|e| format!("failed to get block: {e}"))?; @@ -220,71 +238,28 @@ pub async fn fetch_genesis_block(config: FetchConfig) -> Result( provider: &SequencerGatewayProvider, block_number: u64, - overrides: Arc, OpaqueExtrinsic>>>, - bonsai_contract: Arc, Pedersen>>>, - bonsai_class: Arc, Poseidon>>>, - state_update_sender: &Sender, - class_sender: &Sender, + overrides: &Arc, OpaqueExtrinsic>>>, client: &C, -) -> Result<(), String> +) -> Result<(StateUpdate, Vec), L2SyncError> where B: BlockT, C: HeaderBackend, { - let state_update = - fetch_state_update(provider, block_number, overrides.clone(), bonsai_contract, bonsai_class, client).await?; + // Children tasks need StateUpdate as an Arc, because of task spawn 'static requirement + // We make an Arc, and then unwrap the StateUpdate out of the Arc + let state_update = Arc::new(fetch_state_update(provider, block_number).await?); let class_update = fetch_class_update(provider, &state_update, overrides, block_number, client).await?; + let state_update = Arc::try_unwrap(state_update).expect("arc should not be aliased"); - // Now send state_update, which moves it. This will be received - // by QueryBlockConsensusDataProvider in deoxys/crates/node/src/service.rs - state_update_sender - .send(StateUpdateWrapper::from(state_update)) - .await - .map_err(|e| format!("failed to dispatch state update: {e}"))?; - - // do the same to class update - class_sender - .send(ClassUpdateWrapper(class_update)) - .await - .map_err(|e| format!("failed to dispatch class update: {e}"))?; - - Ok(()) + Ok((state_update, class_update)) } /// retrieves state update from Starknet sequencer -async fn fetch_state_update( +async fn fetch_state_update( provider: &SequencerGatewayProvider, block_number: u64, - overrides: Arc, OpaqueExtrinsic>>>, - bonsai_contract: Arc, Pedersen>>>, - bonsai_class: Arc, Poseidon>>>, - client: &C, -) -> Result -where - B: BlockT, - C: HeaderBackend, -{ - let state_update = provider - .get_state_update(BlockId::Number(block_number)) - .await - .map_err(|e| format!("failed to get state update: {e}"))?; - - let block_hash = block_hash_substrate(client, block_number - 1); - verify_l2(block_number, &state_update, overrides, bonsai_contract, bonsai_class, block_hash)?; - - Ok(state_update) -} - -pub async fn fetch_genesis_state_update( - provider: &SequencerGatewayProvider, - overrides: Arc, OpaqueExtrinsic>>>, - bonsai_contract: Arc, Pedersen>>>, - bonsai_class: Arc, Poseidon>>>, -) -> Result { - let state_update = - provider.get_state_update(BlockId::Number(0)).await.map_err(|e| format!("failed to get state update: {e}"))?; - - verify_l2(0, &state_update, overrides, bonsai_contract, bonsai_class, None)?; +) -> Result { + let state_update = provider.get_state_update(BlockId::Number(block_number)).await?; Ok(state_update) } @@ -292,11 +267,11 @@ pub async fn fetch_genesis_state_update( /// retrieves class updates from Starknet sequencer async fn fetch_class_update( provider: &SequencerGatewayProvider, - state_update: &StateUpdate, - overrides: Arc, OpaqueExtrinsic>>>, + state_update: &Arc, + overrides: &Arc, OpaqueExtrinsic>>>, block_number: u64, client: &C, -) -> Result, String> +) -> Result, L2SyncError> where B: BlockT, C: HeaderBackend, @@ -304,31 +279,24 @@ where // defaults to downloading ALL classes if a substrate block hash could not be determined let missing_classes = match block_hash_substrate(client, block_number) { Some(block_hash_substrate) => fetch_missing_classes(state_update, overrides, block_hash_substrate), - None => aggregate_classes(state_update), + None => aggregate_classes(&state_update), }; let arc_provider = Arc::new(provider.clone()); let mut task_set = missing_classes.into_iter().fold(JoinSet::new(), |mut set, class_hash| { - set.spawn(download_class(*class_hash, block_hash_madara(state_update), Arc::clone(&arc_provider))); + let provider = Arc::clone(&arc_provider); + let state_update = Arc::clone(&state_update); + let class_hash = *class_hash; + set.spawn(async move { download_class(class_hash, block_hash_madara(&state_update), &provider).await }); set }); // WARNING: all class downloads will abort if even a single class fails to download. let mut classes = vec![]; while let Some(res) = task_set.join_next().await { - match res { - Ok(result) => match result { - Ok(contract) => classes.push(contract), - Err(e) => { - task_set.abort_all(); - return Err(e.to_string()); - } - }, - Err(e) => { - task_set.abort_all(); - return Err(e.to_string()); - } - } + classes.push(res.expect("Join error")?); + // No need to `abort_all()` the `task_set` in cast of errors, as dropping the `task_set` + // will abort all the tasks. } Ok(classes) @@ -356,8 +324,8 @@ where async fn download_class( class_hash: FieldElement, block_hash: FieldElement, - provider: Arc, -) -> anyhow::Result { + provider: &SequencerGatewayProvider, +) -> Result { // log::info!("💾 Downloading class {class_hash:#x}"); let core_class = provider.get_class(BlockIdCore::Hash(block_hash), class_hash).await?; @@ -366,22 +334,22 @@ async fn download_class( Ok(ContractClassData { // TODO: find a less roundabout way of converting from a Felt252Wrapper hash: ClassHash(Felt252Wrapper::from(class_hash).into()), - contract_class: ContractClassWrapper::try_from(core_class)?, + // TODO: remove this expect when ContractClassWrapper::try_from does proper error handling using + // thiserror + contract_class: ContractClassWrapper::try_from(core_class).expect("converting contract class"), }) } /// Filters out class declarations in the Starknet sequencer state update /// and retains only those which are not stored in the local Substrate db. -fn fetch_missing_classes( - state_update: &StateUpdate, - overrides: Arc, OpaqueExtrinsic>>>, +fn fetch_missing_classes<'a>( + state_update: &'a StateUpdate, + overrides: &Arc, OpaqueExtrinsic>>>, block_hash_substrate: H256, -) -> Vec<&FieldElement> { +) -> Vec<&'a FieldElement> { aggregate_classes(state_update) .into_iter() - .filter(|class_hash| { - is_missing_class(Arc::clone(&overrides), block_hash_substrate, Felt252Wrapper::from(**class_hash)) - }) + .filter(|class_hash| is_missing_class(overrides, block_hash_substrate, Felt252Wrapper::from(**class_hash))) .collect() } @@ -412,7 +380,7 @@ fn aggregate_classes(state_update: &StateUpdate) -> Vec<&FieldElement> { /// Since a change in class definition will result in a change in class hash, /// this means we only need to check for class hashes in the db. fn is_missing_class( - overrides: Arc, OpaqueExtrinsic>>>, + overrides: &Arc, OpaqueExtrinsic>>>, block_hash_substrate: H256, class_hash: Felt252Wrapper, ) -> bool { @@ -456,17 +424,24 @@ pub fn update_l2(state_update: L2StateUpdate) { pub fn verify_l2( block_number: u64, state_update: &StateUpdate, - overrides: Arc, OpaqueExtrinsic>>>, - bonsai_contract: Arc, Pedersen>>>, - bonsai_class: Arc, Poseidon>>>, + overrides: &Arc, OpaqueExtrinsic>>>, + bonsai_contract: &Arc, Pedersen>>>, + bonsai_class: &Arc, Poseidon>>>, substrate_block_hash: Option, -) -> Result<(), String> { +) -> Result<(), L2SyncError> { let state_update_wrapper = StateUpdateWrapper::from(state_update); let csd = build_commitment_state_diff(state_update_wrapper.clone()); - let state_root = - update_state_root(csd, overrides, bonsai_contract, bonsai_class, block_number, substrate_block_hash); + let state_root = update_state_root( + csd, + Arc::clone(overrides), + Arc::clone(bonsai_contract), + Arc::clone(bonsai_class), + block_number, + substrate_block_hash, + ); let block_hash = state_update.block_hash.expect("Block hash not found in state update"); + log::debug!("update_state_root {} -- block_hash: {block_hash:?}, state_root: {state_root:?}", block_number); update_l2(L2StateUpdate { block_number, diff --git a/crates/node/src/commands/run.rs b/crates/node/src/commands/run.rs index 9dc28d7e7..c38d77978 100644 --- a/crates/node/src/commands/run.rs +++ b/crates/node/src/commands/run.rs @@ -2,7 +2,7 @@ use std::path::PathBuf; use std::result::Result as StdResult; use madara_runtime::SealingMode; -use mc_sync::l2::fetch_genesis_block; +use mc_sync::l2::fetch_apply_genesis_block; use mc_sync::utility::update_config; use mc_sync::utils::constant::starknet_core_address; use reqwest::Url; @@ -185,7 +185,7 @@ pub fn run_node(mut cli: Cli) -> Result<()> { let sealing = cli.run.sealing.map(Into::into).unwrap_or_default(); let cache = cli.run.cache; let mut fetch_block_config = cli.run.network.block_fetch_config(); - let genesis_block = fetch_genesis_block(fetch_block_config.clone()).await.unwrap(); + let genesis_block = fetch_apply_genesis_block(fetch_block_config.clone()).await.unwrap(); fetch_block_config.sound = cli.run.sound; update_config(&fetch_block_config); From a5bfb26d04bd18e428e4f09df5a4ef7523e8f531 Mon Sep 17 00:00:00 2001 From: cchudant Date: Thu, 14 Mar 2024 17:18:10 +0000 Subject: [PATCH 2/6] l2 sync: use tokio blocking task for state root computation --- crates/client/sync/src/l2.rs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/crates/client/sync/src/l2.rs b/crates/client/sync/src/l2.rs index a2911643f..7d4b0d3a5 100644 --- a/crates/client/sync/src/l2.rs +++ b/crates/client/sync/src/l2.rs @@ -169,8 +169,8 @@ pub async fn sync( let fetch_stream = (first_block..).map(|block_n| fetch_block_and_updates(block_n, &provider, overrides, client.as_ref())); // Have 10 fetches in parallel at once, using futures Buffered - let mut fetch_stream = stream::iter(fetch_stream).buffered(10); - let (fetch_stream_sender, mut fetch_stream_receiver) = mpsc::channel(1); + let mut fetch_stream = stream::iter(fetch_stream).buffered(5); + let (fetch_stream_sender, mut fetch_stream_receiver) = mpsc::channel(5); tokio::join!( // fetch blocks and updates in parallel @@ -188,8 +188,24 @@ pub async fn sync( let block_hash = block_hash_substrate(client.as_ref(), block_n - 1); - verify_l2(block_n, &state_update, overrides, bonsai_contract, bonsai_class, block_hash) - .expect("verifying block"); + let state_update = { + let overrides = Arc::clone(&overrides); + let bonsai_contract = Arc::clone(&bonsai_contract); + let bonsai_class = Arc::clone(&bonsai_class); + let state_update = Arc::new(state_update); + let state_update_1 = Arc::clone(&state_update); + + // verify_l2 takes a long time, we don't want to starve the event loop + tokio::task::spawn_blocking(move || { + verify_l2(block_n, &state_update, &overrides, &bonsai_contract, &bonsai_class, block_hash) + .expect("verifying block"); + }) + .await + .expect("verification task panicked"); + + // hack because tokio tasks need to be 'static + Arc::try_unwrap(state_update_1).expect("arc should not be aliased") + }; tokio::join!( async { From c2a903312b51924ac1278b50b98ea7d231dcf4af Mon Sep 17 00:00:00 2001 From: cchudant Date: Thu, 14 Mar 2024 19:52:36 +0000 Subject: [PATCH 3/6] l2 sync: fix sync exit condition --- crates/client/sync/src/l2.rs | 54 +++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/crates/client/sync/src/l2.rs b/crates/client/sync/src/l2.rs index 7d4b0d3a5..b9779aab5 100644 --- a/crates/client/sync/src/l2.rs +++ b/crates/client/sync/src/l2.rs @@ -24,7 +24,7 @@ use sp_runtime::traits::{BlakeTwo256, Block as BlockT, UniqueSaturatedInto}; use sp_runtime::OpaqueExtrinsic; use starknet_api::api_core::ClassHash; use starknet_api::hash::StarkHash; -use starknet_core::types::BlockId as BlockIdCore; +use starknet_core::types::{BlockId as BlockIdCore, StarknetError}; use starknet_ff::FieldElement; use starknet_providers::sequencer::models as p; use starknet_providers::sequencer::models::state_update::{DeclaredContract, DeployedContract}; @@ -110,7 +110,7 @@ async fn fetch_block_and_updates( provider: &SequencerGatewayProvider, overrides: &Arc, OpaqueExtrinsic>>>, client: &C, -) -> Result<(u64, p::Block, StateUpdate, Vec), L2SyncError> +) -> Result<(p::Block, StateUpdate, Vec), L2SyncError> where B: BlockT, C: HeaderBackend, @@ -132,7 +132,7 @@ where } let (block, (state_update, class_update)) = (block?, state_update?); - return Ok((block_n, block, state_update, class_update)); + return Ok((block, state_update, class_update)); } Err(L2SyncError::FetchRetryLimit) @@ -169,22 +169,29 @@ pub async fn sync( let fetch_stream = (first_block..).map(|block_n| fetch_block_and_updates(block_n, &provider, overrides, client.as_ref())); // Have 10 fetches in parallel at once, using futures Buffered - let mut fetch_stream = stream::iter(fetch_stream).buffered(5); - let (fetch_stream_sender, mut fetch_stream_receiver) = mpsc::channel(5); + let mut fetch_stream = stream::iter(fetch_stream).buffered(10); + let (fetch_stream_sender, mut fetch_stream_receiver) = mpsc::channel(10); - tokio::join!( + tokio::select!( // fetch blocks and updates in parallel - async { + _ = async { // FIXME: make it cleaner by replacing this with tokio_util::sync::PollSender to make the channel a // Sink and have the fetch Stream pipe into it while let Some(val) = pin!(fetch_stream.next()).await { fetch_stream_sender.send(val).await.expect("receiver is closed") } - }, + + } => {}, // apply blocks and updates sequentially - async { + _ = async { + let mut block_n = first_block; while let Some(val) = pin!(fetch_stream_receiver.recv()).await { - let (block_n, block, state_update, class_update) = val.expect("fetching block"); + if matches!(val, Err(L2SyncError::Provider(ProviderError::StarknetError(StarknetError::BlockNotFound)))) { + // found the tip of the blockchain :) + break; + } + + let (block, state_update, class_update) = val.expect("fetching block"); let block_hash = block_hash_substrate(client.as_ref(), block_n - 1); @@ -229,10 +236,14 @@ pub async fn sync( } ); - create_block(command_sink, &mut last_block_hash).await.expect("creating block") + create_block(command_sink, &mut last_block_hash).await.expect("creating block"); + + block_n += 1; } - } + } => {}, ); + + log::debug!("L2 sync finished :)"); } async fn fetch_block(client: &SequencerGatewayProvider, block_number: u64) -> Result { @@ -429,11 +440,9 @@ async fn create_block(cmds: &mut CommandSink, parent_hash: &mut Option) -> /// Update the L2 state with the latest data pub fn update_l2(state_update: L2StateUpdate) { - { - let mut last_state_update = - STARKNET_STATE_UPDATE.lock().expect("Failed to acquire lock on STARKNET_STATE_UPDATE"); - *last_state_update = state_update.clone(); - } + let mut last_state_update = + STARKNET_STATE_UPDATE.lock().expect("Failed to acquire lock on STARKNET_STATE_UPDATE"); + *last_state_update = state_update.clone(); } /// Verify and update the L2 state according to the latest state update @@ -459,6 +468,8 @@ pub fn verify_l2( let block_hash = state_update.block_hash.expect("Block hash not found in state update"); log::debug!("update_state_root {} -- block_hash: {block_hash:?}, state_root: {state_root:?}", block_number); + set_highest_block_hash_and_number(block_hash, block_number); + update_l2(L2StateUpdate { block_number, global_root: state_root.into(), @@ -468,18 +479,11 @@ pub fn verify_l2( Ok(()) } -async fn update_highest_block_hash_and_number(client: &SequencerGatewayProvider) -> Result<(), String> { - let block = client.get_block(BlockId::Latest).await.map_err(|e| format!("failed to get block: {e}"))?; - - let hash = block.block_hash.ok_or("block hash not found")?; - let number = block.block_number.ok_or("block number not found")?; - +fn set_highest_block_hash_and_number(hash: FieldElement, number: u64) { let mut highest_block_hash_and_number = STARKNET_HIGHEST_BLOCK_HASH_AND_NUMBER .write() .expect("Failed to acquire write lock on STARKNET_HIGHEST_BLOCK_HASH_AND_NUMBER"); *highest_block_hash_and_number = (hash, number); - - Ok(()) } pub fn get_highest_block_hash_and_number() -> (FieldElement, u64) { From ac2fa21d73baf2649732564c6562d832b92f9603 Mon Sep 17 00:00:00 2001 From: cchudant Date: Thu, 14 Mar 2024 20:56:13 +0000 Subject: [PATCH 4/6] l2 sync: fix clippy and fmt --- crates/client/sync/src/l2.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/crates/client/sync/src/l2.rs b/crates/client/sync/src/l2.rs index b9779aab5..267c6ffb4 100644 --- a/crates/client/sync/src/l2.rs +++ b/crates/client/sync/src/l2.rs @@ -119,7 +119,7 @@ where const MAX_RETRY: usize = 15; for _ in 0..MAX_RETRY { log::debug!("fetch_block_and_updates {}", block_n); - let block = fetch_block(&provider, block_n); + let block = fetch_block(provider, block_n); let state_update = fetch_state_and_class_update(provider, block_n, overrides, client); let (block, state_update) = tokio::join!(block, state_update); log::debug!("fetch_block_and_updates: done {block_n}"); @@ -180,7 +180,7 @@ pub async fn sync( while let Some(val) = pin!(fetch_stream.next()).await { fetch_stream_sender.send(val).await.expect("receiver is closed") } - + } => {}, // apply blocks and updates sequentially _ = async { @@ -196,12 +196,12 @@ pub async fn sync( let block_hash = block_hash_substrate(client.as_ref(), block_n - 1); let state_update = { - let overrides = Arc::clone(&overrides); - let bonsai_contract = Arc::clone(&bonsai_contract); - let bonsai_class = Arc::clone(&bonsai_class); + let overrides = Arc::clone(overrides); + let bonsai_contract = Arc::clone(bonsai_contract); + let bonsai_class = Arc::clone(bonsai_class); let state_update = Arc::new(state_update); let state_update_1 = Arc::clone(&state_update); - + // verify_l2 takes a long time, we don't want to starve the event loop tokio::task::spawn_blocking(move || { verify_l2(block_n, &state_update, &overrides, &bonsai_contract, &bonsai_class, block_hash) @@ -306,13 +306,13 @@ where // defaults to downloading ALL classes if a substrate block hash could not be determined let missing_classes = match block_hash_substrate(client, block_number) { Some(block_hash_substrate) => fetch_missing_classes(state_update, overrides, block_hash_substrate), - None => aggregate_classes(&state_update), + None => aggregate_classes(state_update), }; let arc_provider = Arc::new(provider.clone()); let mut task_set = missing_classes.into_iter().fold(JoinSet::new(), |mut set, class_hash| { let provider = Arc::clone(&arc_provider); - let state_update = Arc::clone(&state_update); + let state_update = Arc::clone(state_update); let class_hash = *class_hash; set.spawn(async move { download_class(class_hash, block_hash_madara(&state_update), &provider).await }); set @@ -440,8 +440,7 @@ async fn create_block(cmds: &mut CommandSink, parent_hash: &mut Option) -> /// Update the L2 state with the latest data pub fn update_l2(state_update: L2StateUpdate) { - let mut last_state_update = - STARKNET_STATE_UPDATE.lock().expect("Failed to acquire lock on STARKNET_STATE_UPDATE"); + let mut last_state_update = STARKNET_STATE_UPDATE.lock().expect("Failed to acquire lock on STARKNET_STATE_UPDATE"); *last_state_update = state_update.clone(); } From 1ec39dd259bb9938c406b626d6c537c2ad959ad7 Mon Sep 17 00:00:00 2001 From: cchudant Date: Fri, 15 Mar 2024 12:50:01 +0000 Subject: [PATCH 5/6] l2 sync: changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f210f7b28..b33c76700 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ git # Deoxys Changelog ## Next release +- perf(l2 sync): parallel fetching of blocks, classes, state updates - fix l1 thread to reflect correct state_root, block_number, block_hash - fix: remove gas_price and update starknet-rs from fork (temporary fix) - fix(root): got state root to work (does not support class root yet) From d12017af37cadd45d2df7a89222a7b0680b0dbd9 Mon Sep 17 00:00:00 2001 From: cchudant Date: Fri, 15 Mar 2024 12:55:18 +0000 Subject: [PATCH 6/6] l2 sync: fix fixme comment --- crates/client/sync/src/l2.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/client/sync/src/l2.rs b/crates/client/sync/src/l2.rs index 267c6ffb4..2e8b9331a 100644 --- a/crates/client/sync/src/l2.rs +++ b/crates/client/sync/src/l2.rs @@ -252,8 +252,9 @@ async fn fetch_block(client: &SequencerGatewayProvider, block_number: u64) -> Re Ok(block) } -// FIXME: I don't know why this is required by the CLI,, this is not used within the sync -// procedure. +// FIXME: This is an artefact of an older version of the code when this was used to retrieve the +// head of the chain during initialization, but is since no longer used. + pub async fn fetch_apply_genesis_block(config: FetchConfig) -> Result { let client = SequencerGatewayProvider::new(config.gateway.clone(), config.feeder_gateway.clone(), config.chain_id); let block = client.get_block(BlockId::Number(0)).await.map_err(|e| format!("failed to get block: {e}"))?;