diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e9649ed0..c9a0619f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Next release +- feat(warp): added warp update to madara - docs(readme): updated README.md docs and added Docker Compose support - fix(log): define RUST_LOG=info by default - fix(tracing): RUST_LOG filtering support diff --git a/Cargo.lock b/Cargo.lock index 7a991995f..c279e3c4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5957,6 +5957,7 @@ dependencies = [ "mp-receipt", "mp-state-update", "mp-transactions", + "mp-utils", "rstest 0.18.2", "serde", "serde_json", @@ -5966,6 +5967,7 @@ dependencies = [ "starknet_api", "thiserror", "tokio", + "tokio-util", "tracing", ] @@ -5977,11 +5979,13 @@ dependencies = [ "futures", "httpmock", "hyper 1.5.1", + "jsonrpsee", "m-cairo-test-contracts", "mc-analytics", "mc-block-import", "mc-db", "mc-gateway-client", + "mc-rpc", "mc-telemetry", "mp-block", "mp-chain-config", diff --git a/README.md b/README.md index 9a00d483d..9dd0f44ae 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ Madara is a powerful Starknet client written in Rust. - [Starknet Compliant](#starknet-compliant) - [Feeder-Gateway State Synchronization](#feeder-gateway-state-synchronization) - [State Commitment Computation](#state-commitment-computation) + - [Database Migration](#database-migration) - 💬 [Get in touch](#-get-in-touch) - [Contributing](#contributing) - [Partnerships](#partnerships) @@ -573,6 +574,54 @@ Besu Bonsai Merkle Tries. See the [bonsai lib](https://github.com/madara-allianc You can read more about Starknet Block structure and how it affects state commitment [here](https://docs.starknet.io/architecture-and-concepts/network-architecture/block-structure/). +### Database Migration + +When migration to a newer version of Madara you might need to update your +database. Instead of re-synchronizing the entirety of your chain's state from +genesis, you can use Madara's **warp update** feature. + +> [!NOTE] +> Warp update requires an already synchronized _local_ node with a working +> database. + +To begin the database migration, you will need to start an existing node with +[admin methods](#madara-specific-json-rpc-methods) and +[feeder gateway](#feeder-gateway-state-synchronization) enabled. This will be +the _source_ of the migration. You can do this with the `--warp-update-sender` +[preset](#4.-presets): + +```bash +cargo run --releasae -- \ + --name Sender \ + --full \ # This also works with other types of nodes + --network mainnet \ + --warp-update-sender +``` + +You will then need to start a second node to synchronize the state of your +database: + +```bash +cargo run --releasae -- \ + --name Receiver \ + --base-path /tmp/madara_new \ # Where you want the new database to be stored + --full \ + --network mainnet \ + --l1-endpoint https://*** \ + --warp-update-receiver +``` + +This will start generating a new up-to-date database under `/tmp/madara_new`. +Once this process is over, the warp update sender node will automatically +shutdown while the warp update receiver will take its place. + +> [!WARNING] +> As of now, the warp update receiver has its rpc disabled, even after the +> migration process has completed. This will be fixed in the future, so that +> services that would otherwise conflict with the sender node will automatically +> start after the migration has finished, allowing for migrations with 0 +> downtime. + ## 💬 Get in touch [⬅️ back to top](#-madara-starknet-client) diff --git a/crates/client/block_import/src/lib.rs b/crates/client/block_import/src/lib.rs index fe2e5d617..113e2fea1 100644 --- a/crates/client/block_import/src/lib.rs +++ b/crates/client/block_import/src/lib.rs @@ -119,16 +119,11 @@ pub struct BlockImporter { backend: Arc, verify_apply: VerifyApply, metrics: BlockMetrics, - always_force_flush: bool, } impl BlockImporter { /// The starting block is used for metrics. Setting it to None means it will look at the database latest block number. - pub fn new( - backend: Arc, - starting_block: Option, - always_force_flush: bool, - ) -> anyhow::Result { + pub fn new(backend: Arc, starting_block: Option) -> anyhow::Result { let pool = Arc::new(RayonPool::new()); let starting_block = if let Some(n) = starting_block { n @@ -145,7 +140,6 @@ impl BlockImporter { pool, metrics: BlockMetrics::register(starting_block).context("Registering metrics for block import")?, backend, - always_force_flush, }) } @@ -176,11 +170,6 @@ impl BlockImporter { validation: BlockValidationContext, ) -> Result { let result = self.verify_apply.verify_apply(block, validation).await?; - // Flush step. - let force = self.always_force_flush; - self.backend - .maybe_flush(force) - .map_err(|err| BlockImportError::Internal(format!("DB flushing error: {err:#}").into()))?; self.metrics.update(&result.header, &self.backend); Ok(result) } diff --git a/crates/client/db/src/lib.rs b/crates/client/db/src/lib.rs index 487ed1884..52cfdb41d 100644 --- a/crates/client/db/src/lib.rs +++ b/crates/client/db/src/lib.rs @@ -1,19 +1,18 @@ //! Madara database -use anyhow::{Context, Result}; +use anyhow::Context; use bonsai_db::{BonsaiDb, DatabaseKeyMapping}; use bonsai_trie::id::BasicId; use bonsai_trie::{BonsaiStorage, BonsaiStorageConfig}; use db_metrics::DbMetrics; use mp_chain_config::ChainConfig; -use mp_utils::service::Service; +use mp_utils::service::{MadaraService, Service}; use rocksdb::backup::{BackupEngine, BackupEngineOptions}; use rocksdb::{BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, Env, FlushOptions, MultiThreaded}; use rocksdb_options::rocksdb_global_options; use starknet_types_core::hash::{Pedersen, Poseidon, StarkHash}; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, Instant}; +use std::sync::Arc; use std::{fmt, fs}; use tokio::sync::{mpsc, oneshot}; @@ -37,7 +36,7 @@ pub type WriteBatchWithTransaction = rocksdb::WriteBatchWithTransaction; const DB_UPDATES_BATCH_SIZE: usize = 1024; -pub fn open_rocksdb(path: &Path) -> Result> { +pub fn open_rocksdb(path: &Path) -> anyhow::Result> { let opts = rocksdb_global_options()?; tracing::debug!("opening db at {:?}", path.display()); let db = DB::open_cf_descriptors( @@ -49,14 +48,14 @@ pub fn open_rocksdb(path: &Path) -> Result> { Ok(Arc::new(db)) } -/// This runs in anothr thread as the backup engine is not thread safe +/// This runs in another thread as the backup engine is not thread safe fn spawn_backup_db_task( backup_dir: &Path, restore_from_latest_backup: bool, db_path: &Path, db_restored_cb: oneshot::Sender<()>, mut recv: mpsc::Receiver, -) -> Result<()> { +) -> anyhow::Result<()> { let mut backup_opts = BackupEngineOptions::new(backup_dir).context("Creating backup options")?; let cores = std::thread::available_parallelism().map(|e| e.get() as i32).unwrap_or(1); backup_opts.set_max_background_operations(cores); @@ -254,7 +253,6 @@ impl DatabaseExt for DB { pub struct MadaraBackend { backup_handle: Option>, db: Arc, - last_flush_time: Mutex>, chain_config: Arc, db_metrics: DbMetrics, sender_block_info: tokio::sync::broadcast::Sender, @@ -305,7 +303,11 @@ impl DatabaseService { } } -impl Service for DatabaseService {} +impl Service for DatabaseService { + fn id(&self) -> MadaraService { + MadaraService::Database + } +} struct BackupRequest { callback: oneshot::Sender<()>, @@ -315,7 +317,7 @@ struct BackupRequest { impl Drop for MadaraBackend { fn drop(&mut self) { tracing::info!("⏳ Gracefully closing the database..."); - self.maybe_flush(true).expect("Error when flushing the database"); // flush :) + self.flush().expect("Error when flushing the database"); // flush :) } } @@ -330,7 +332,6 @@ impl MadaraBackend { Arc::new(Self { backup_handle: None, db: open_rocksdb(temp_dir.as_ref()).unwrap(), - last_flush_time: Default::default(), chain_config, db_metrics: DbMetrics::register().unwrap(), sender_block_info: tokio::sync::broadcast::channel(100).0, @@ -344,7 +345,7 @@ impl MadaraBackend { backup_dir: Option, restore_from_latest_backup: bool, chain_config: Arc, - ) -> Result> { + ) -> anyhow::Result> { let db_path = db_config_dir.join("db"); // when backups are enabled, a thread is spawned that owns the rocksdb BackupEngine (it is not thread safe) and it receives backup requests using a mpsc channel @@ -374,7 +375,6 @@ impl MadaraBackend { db_metrics: DbMetrics::register().context("Registering db metrics")?, backup_handle, db, - last_flush_time: Default::default(), chain_config: Arc::clone(&chain_config), sender_block_info: tokio::sync::broadcast::channel(100).0, #[cfg(feature = "testing")] @@ -384,30 +384,21 @@ impl MadaraBackend { Ok(backend) } - pub fn maybe_flush(&self, force: bool) -> Result { - let mut inst = self.last_flush_time.lock().expect("poisoned mutex"); - let will_flush = force - || match *inst { - Some(inst) => inst.elapsed() >= Duration::from_secs(5), - None => true, - }; - if will_flush { - tracing::debug!("doing a db flush"); - let mut opts = FlushOptions::default(); - opts.set_wait(true); - // we have to collect twice here :/ - let columns = Column::ALL.iter().map(|e| self.db.get_column(*e)).collect::>(); - let columns = columns.iter().collect::>(); - self.db.flush_cfs_opt(&columns, &opts).context("Flushing database")?; - - *inst = Some(Instant::now()); - } + pub fn flush(&self) -> anyhow::Result<()> { + tracing::debug!("doing a db flush"); + let mut opts = FlushOptions::default(); + opts.set_wait(true); + // we have to collect twice here :/ + let columns = Column::ALL.iter().map(|e| self.db.get_column(*e)).collect::>(); + let columns = columns.iter().collect::>(); - Ok(will_flush) + self.db.flush_cfs_opt(&columns, &opts).context("Flushing database")?; + + Ok(()) } #[tracing::instrument(skip(self))] - pub async fn backup(&self) -> Result<()> { + pub async fn backup(&self) -> anyhow::Result<()> { let (callback_sender, callback_recv) = oneshot::channel(); let _res = self .backup_handle diff --git a/crates/client/devnet/src/lib.rs b/crates/client/devnet/src/lib.rs index 85562d77d..0254df6f0 100644 --- a/crates/client/devnet/src/lib.rs +++ b/crates/client/devnet/src/lib.rs @@ -302,7 +302,7 @@ mod tests { let chain_config = Arc::new(ChainConfig::madara_devnet()); let block = g.build(&chain_config).unwrap(); let backend = MadaraBackend::open_for_testing(Arc::clone(&chain_config)); - let importer = Arc::new(BlockImporter::new(Arc::clone(&backend), None, true).unwrap()); + let importer = Arc::new(BlockImporter::new(Arc::clone(&backend), None).unwrap()); println!("{:?}", block.state_diff); tokio::runtime::Runtime::new() diff --git a/crates/client/eth/Cargo.toml b/crates/client/eth/Cargo.toml index 9c3bb8d6c..36f4c06b6 100644 --- a/crates/client/eth/Cargo.toml +++ b/crates/client/eth/Cargo.toml @@ -84,3 +84,4 @@ httpmock = { workspace = true } tracing-test = "0.2.5" serial_test = { workspace = true } lazy_static = { workspace = true } +mp-utils = { workspace = true, features = ["testing"] } diff --git a/crates/client/eth/src/l1_gas_price.rs b/crates/client/eth/src/l1_gas_price.rs index 72317a5bc..33a7f7a9d 100644 --- a/crates/client/eth/src/l1_gas_price.rs +++ b/crates/client/eth/src/l1_gas_price.rs @@ -5,7 +5,7 @@ use anyhow::Context; use mc_mempool::{GasPriceProvider, L1DataProvider}; use std::time::{Duration, UNIX_EPOCH}; -use mp_utils::wait_or_graceful_shutdown; +use mp_utils::{service::ServiceContext, wait_or_graceful_shutdown}; use std::time::SystemTime; pub async fn gas_price_worker_once( @@ -36,12 +36,12 @@ pub async fn gas_price_worker( eth_client: &EthereumClient, l1_gas_provider: GasPriceProvider, gas_price_poll_ms: Duration, - cancellation_token: tokio_util::sync::CancellationToken, + ctx: ServiceContext, ) -> anyhow::Result<()> { l1_gas_provider.update_last_update_timestamp(); let mut interval = tokio::time::interval(gas_price_poll_ms); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - while wait_or_graceful_shutdown(interval.tick(), &cancellation_token).await.is_some() { + while wait_or_graceful_shutdown(interval.tick(), &ctx).await.is_some() { gas_price_worker_once(eth_client, l1_gas_provider.clone(), gas_price_poll_ms).await?; } Ok(()) @@ -135,7 +135,7 @@ mod eth_client_gas_price_worker_test { ð_client, l1_gas_provider, Duration::from_millis(200), - tokio_util::sync::CancellationToken::new(), + ServiceContext::new_for_testing(), ) .await } @@ -280,7 +280,7 @@ mod eth_client_gas_price_worker_test { ð_client, l1_gas_provider.clone(), Duration::from_millis(200), - tokio_util::sync::CancellationToken::new(), + ServiceContext::new_for_testing(), ), ) .await; diff --git a/crates/client/eth/src/l1_messaging.rs b/crates/client/eth/src/l1_messaging.rs index 4a50bb0ee..75fcee712 100644 --- a/crates/client/eth/src/l1_messaging.rs +++ b/crates/client/eth/src/l1_messaging.rs @@ -10,6 +10,7 @@ use futures::StreamExt; use mc_db::{l1_db::LastSyncedEventBlock, MadaraBackend}; use mc_mempool::{Mempool, MempoolProvider}; use mp_utils::channel_wait_or_graceful_shutdown; +use mp_utils::service::ServiceContext; use starknet_api::core::{ChainId, ContractAddress, EntryPointSelector, Nonce}; use starknet_api::transaction::{Calldata, Fee, L1HandlerTransaction, Transaction, TransactionVersion}; use starknet_api::transaction_hash::get_transaction_hash; @@ -42,7 +43,7 @@ pub async fn sync( client: &EthereumClient, chain_id: &ChainId, mempool: Arc, - cancellation_token: tokio_util::sync::CancellationToken, + ctx: ServiceContext, ) -> anyhow::Result<()> { tracing::info!("⟠ Starting L1 Messages Syncing..."); @@ -67,7 +68,7 @@ pub async fn sync( "Failed to watch event filter - Ensure you are using an L1 RPC endpoint that points to an archive node", )? .into_stream(); - while let Some(event_result) = channel_wait_or_graceful_shutdown(event_stream.next(), &cancellation_token).await { + while let Some(event_result) = channel_wait_or_graceful_shutdown(event_stream.next(), &ctx).await { if let Ok((event, meta)) = event_result { tracing::info!( "⟠ Processing L1 Message from block: {:?}, transaction_hash: {:?}, log_index: {:?}, fromAddress: {:?}", @@ -245,6 +246,7 @@ mod l1_messaging_tests { use mc_db::DatabaseService; use mc_mempool::{GasPriceProvider, L1DataProvider, Mempool}; use mp_chain_config::ChainConfig; + use mp_utils::service::ServiceContext; use rstest::*; use starknet_api::core::Nonce; use starknet_types_core::felt::Felt; @@ -411,14 +413,8 @@ mod l1_messaging_tests { let worker_handle = { let db = Arc::clone(&db); tokio::spawn(async move { - sync( - db.backend(), - ð_client, - &chain_config.chain_id, - mempool, - tokio_util::sync::CancellationToken::new(), - ) - .await + sync(db.backend(), ð_client, &chain_config.chain_id, mempool, ServiceContext::new_for_testing()) + .await }) }; @@ -472,14 +468,8 @@ mod l1_messaging_tests { let worker_handle = { let db = Arc::clone(&db); tokio::spawn(async move { - sync( - db.backend(), - ð_client, - &chain_config.chain_id, - mempool, - tokio_util::sync::CancellationToken::new(), - ) - .await + sync(db.backend(), ð_client, &chain_config.chain_id, mempool, ServiceContext::new_for_testing()) + .await }) }; @@ -528,14 +518,8 @@ mod l1_messaging_tests { let worker_handle = { let db = Arc::clone(&db); tokio::spawn(async move { - sync( - db.backend(), - ð_client, - &chain_config.chain_id, - mempool, - tokio_util::sync::CancellationToken::new(), - ) - .await + sync(db.backend(), ð_client, &chain_config.chain_id, mempool, ServiceContext::new_for_testing()) + .await }) }; diff --git a/crates/client/eth/src/state_update.rs b/crates/client/eth/src/state_update.rs index 464782631..ba7d4d1a1 100644 --- a/crates/client/eth/src/state_update.rs +++ b/crates/client/eth/src/state_update.rs @@ -9,6 +9,7 @@ use mc_db::MadaraBackend; use mp_convert::ToFelt; use mp_transactions::MAIN_CHAIN_ID; use mp_utils::channel_wait_or_graceful_shutdown; +use mp_utils::service::ServiceContext; use serde::Deserialize; use starknet_api::core::ChainId; use starknet_types_core::felt::Felt; @@ -36,7 +37,7 @@ pub async fn listen_and_update_state( backend: &MadaraBackend, block_metrics: &L1BlockMetrics, chain_id: ChainId, - cancellation_token: tokio_util::sync::CancellationToken, + ctx: ServiceContext, ) -> anyhow::Result<()> { let event_filter = eth_client.l1_core_contract.event_filter::(); @@ -48,7 +49,7 @@ pub async fn listen_and_update_state( )? .into_stream(); - while let Some(event_result) = channel_wait_or_graceful_shutdown(event_stream.next(), &cancellation_token).await { + while let Some(event_result) = channel_wait_or_graceful_shutdown(event_stream.next(), &ctx).await { let log = event_result.context("listening for events")?; let format_event: L1StateUpdate = convert_log_state_update(log.0.clone()).context("formatting event into an L1StateUpdate")?; @@ -90,7 +91,7 @@ pub async fn state_update_worker( backend: &MadaraBackend, eth_client: &EthereumClient, chain_id: ChainId, - cancellation_token: tokio_util::sync::CancellationToken, + ctx: ServiceContext, ) -> anyhow::Result<()> { // Clear L1 confirmed block at startup backend.clear_last_confirmed_block().context("Clearing l1 last confirmed block number")?; @@ -103,7 +104,7 @@ pub async fn state_update_worker( update_l1(backend, initial_state, ð_client.l1_block_metrics, chain_id.clone())?; // Listen to LogStateUpdate (0x77552641) update and send changes continusly - listen_and_update_state(eth_client, backend, ð_client.l1_block_metrics, chain_id, cancellation_token) + listen_and_update_state(eth_client, backend, ð_client.l1_block_metrics, chain_id, ctx) .await .context("Subscribing to the LogStateUpdate event")?; @@ -198,7 +199,7 @@ mod eth_client_event_subscription_test { db.backend(), ð_client.l1_block_metrics, chain_info.chain_id.clone(), - tokio_util::sync::CancellationToken::new(), + ServiceContext::new_for_testing(), ) .await }) diff --git a/crates/client/eth/src/sync.rs b/crates/client/eth/src/sync.rs index ac9cff3bf..a4794a7a7 100644 --- a/crates/client/eth/src/sync.rs +++ b/crates/client/eth/src/sync.rs @@ -3,6 +3,7 @@ use crate::l1_gas_price::gas_price_worker; use crate::l1_messaging::sync; use crate::state_update::state_update_worker; use mc_mempool::{GasPriceProvider, Mempool}; +use mp_utils::service::ServiceContext; use starknet_api::core::ChainId; use std::sync::Arc; use std::time::Duration; @@ -18,17 +19,17 @@ pub async fn l1_sync_worker( gas_price_sync_disabled: bool, gas_price_poll_ms: Duration, mempool: Arc, - cancellation_token: tokio_util::sync::CancellationToken, + ctx: ServiceContext, ) -> anyhow::Result<()> { tokio::try_join!( - state_update_worker(backend, eth_client, chain_id.clone(), cancellation_token.clone()), + state_update_worker(backend, eth_client, chain_id.clone(), ctx.clone()), async { if !gas_price_sync_disabled { - gas_price_worker(eth_client, l1_gas_provider, gas_price_poll_ms, cancellation_token.clone()).await?; + gas_price_worker(eth_client, l1_gas_provider, gas_price_poll_ms, ctx.clone()).await?; } Ok(()) }, - sync(backend, eth_client, &chain_id, mempool, cancellation_token.clone()) + sync(backend, eth_client, &chain_id, mempool, ctx.clone()) )?; Ok(()) diff --git a/crates/client/gateway/server/src/handler.rs b/crates/client/gateway/server/src/handler.rs index c809f25b6..13c9c1bbf 100644 --- a/crates/client/gateway/server/src/handler.rs +++ b/crates/client/gateway/server/src/handler.rs @@ -16,6 +16,7 @@ use mp_gateway::{ block::{BlockStatus, ProviderBlock, ProviderBlockPending, ProviderBlockSignature}, state_update::{ProviderStateUpdate, ProviderStateUpdatePending}, }; +use mp_utils::service::ServiceContext; use serde::Serialize; use serde_json::json; use starknet_core::types::{ @@ -238,7 +239,13 @@ pub async fn handle_get_block_traces( traces: Vec, } - let traces = v0_7_1_trace_block_transactions(&Starknet::new(backend, add_transaction_provider), block_id).await?; + // TODO: we should probably use the actual service context here instead of + // creating a new one! + let traces = v0_7_1_trace_block_transactions( + &Starknet::new(backend, add_transaction_provider, ServiceContext::new()), + block_id, + ) + .await?; let block_traces = BlockTraces { traces }; Ok(create_json_response(hyper::StatusCode::OK, &block_traces)) diff --git a/crates/client/gateway/server/src/service.rs b/crates/client/gateway/server/src/service.rs index ea5779b25..d199fd549 100644 --- a/crates/client/gateway/server/src/service.rs +++ b/crates/client/gateway/server/src/service.rs @@ -8,7 +8,7 @@ use hyper::{server::conn::http1, service::service_fn}; use hyper_util::rt::TokioIo; use mc_db::MadaraBackend; use mc_rpc::providers::AddTransactionProvider; -use mp_utils::graceful_shutdown; +use mp_utils::{graceful_shutdown, service::ServiceContext}; use tokio::{net::TcpListener, sync::Notify}; use super::router::main_router; @@ -20,7 +20,7 @@ pub async fn start_server( gateway_enable: bool, gateway_external: bool, gateway_port: u16, - cancellation_token: tokio_util::sync::CancellationToken, + ctx: ServiceContext, ) -> anyhow::Result<()> { if !feeder_gateway_enable && !gateway_enable { return Ok(()); @@ -41,7 +41,7 @@ pub async fn start_server( { let shutdown_notify = Arc::clone(&shutdown_notify); tokio::spawn(async move { - graceful_shutdown(&cancellation_token).await; + graceful_shutdown(&ctx).await; shutdown_notify.notify_waiters(); }); } diff --git a/crates/client/mempool/src/block_production.rs b/crates/client/mempool/src/block_production.rs index e324fcaf4..e80ceaabe 100644 --- a/crates/client/mempool/src/block_production.rs +++ b/crates/client/mempool/src/block_production.rs @@ -23,6 +23,7 @@ use mp_state_update::{ }; use mp_transactions::TransactionWithHash; use mp_utils::graceful_shutdown; +use mp_utils::service::ServiceContext; use opentelemetry::KeyValue; use starknet_api::core::ContractAddress; use starknet_types_core::felt::Felt; @@ -418,9 +419,7 @@ impl BlockProductionTask { // todo, prefer using the block import pipeline? self.backend.store_block(self.block.clone().into(), state_diff, self.declared_classes.clone())?; // do not forget to flush :) - self.backend - .maybe_flush(true) - .map_err(|err| BlockImportError::Internal(format!("DB flushing error: {err:#}").into()))?; + self.backend.flush().map_err(|err| BlockImportError::Internal(format!("DB flushing error: {err:#}").into()))?; Ok(()) } @@ -482,7 +481,11 @@ impl BlockProductionTask { declared_classes, ) .await?; - self.block.info.header.parent_block_hash = import_result.block_hash; // fix temp parent block hash for new pending :) + // do not forget to flush :) + self.backend.flush().map_err(|err| BlockImportError::Internal(format!("DB flushing error: {err:#}").into()))?; + + // fix temp parent block hash for new pending :) + self.block.info.header.parent_block_hash = import_result.block_hash; // Prepare for next block. self.executor = @@ -504,11 +507,8 @@ impl BlockProductionTask { Ok(()) } - #[tracing::instrument(skip(self), fields(module = "BlockProductionTask"))] - pub async fn block_production_task( - &mut self, - cancellation_token: tokio_util::sync::CancellationToken, - ) -> Result<(), anyhow::Error> { + #[tracing::instrument(skip(self, ctx), fields(module = "BlockProductionTask"))] + pub async fn block_production_task(&mut self, ctx: ServiceContext) -> Result<(), anyhow::Error> { let start = tokio::time::Instant::now(); let mut interval_block_time = tokio::time::interval_at(start, self.backend.chain_config().block_time); @@ -553,7 +553,7 @@ impl BlockProductionTask { } self.current_pending_tick += 1; }, - _ = graceful_shutdown(&cancellation_token) => break, + _ = graceful_shutdown(&ctx) => break, } } diff --git a/crates/client/rpc/Cargo.toml b/crates/client/rpc/Cargo.toml index 531a7dae2..5a75fc3c9 100644 --- a/crates/client/rpc/Cargo.toml +++ b/crates/client/rpc/Cargo.toml @@ -18,6 +18,7 @@ targets = ["x86_64-unknown-linux-gnu"] rstest = { workspace = true } mc-db = { workspace = true, features = ["testing"] } +mp-utils = { workspace = true, features = ["testing"] } [dependencies] @@ -35,6 +36,7 @@ mp-gateway = { workspace = true } mp-receipt = { workspace = true } mp-state-update = { workspace = true } mp-transactions = { workspace = true } +mp-utils = { workspace = true } # Starknet blockifier = { workspace = true, default-features = true } @@ -53,4 +55,5 @@ serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } tracing = { workspace = true } diff --git a/crates/client/rpc/src/lib.rs b/crates/client/rpc/src/lib.rs index e121ad904..c48b3d7ca 100644 --- a/crates/client/rpc/src/lib.rs +++ b/crates/client/rpc/src/lib.rs @@ -12,6 +12,7 @@ pub mod utils; pub mod versions; use jsonrpsee::RpcModule; +use mp_utils::service::ServiceContext; use starknet_types_core::felt::Felt; use std::sync::Arc; @@ -26,15 +27,29 @@ use providers::AddTransactionProvider; use utils::ResultExt; /// A Starknet RPC server for Madara -#[derive(Clone)] pub struct Starknet { backend: Arc, pub(crate) add_transaction_provider: Arc, + pub ctx: ServiceContext, +} + +impl Clone for Starknet { + fn clone(&self) -> Self { + Self { + backend: Arc::clone(&self.backend), + add_transaction_provider: Arc::clone(&self.add_transaction_provider), + ctx: self.ctx.clone(), + } + } } impl Starknet { - pub fn new(backend: Arc, add_transaction_provider: Arc) -> Self { - Self { backend, add_transaction_provider } + pub fn new( + backend: Arc, + add_transaction_provider: Arc, + ctx: ServiceContext, + ) -> Self { + Self { backend, add_transaction_provider, ctx } } pub fn clone_backend(&self) -> Arc { @@ -107,6 +122,8 @@ pub fn rpc_api_admin(starknet: &Starknet) -> anyhow::Result> { let mut rpc_api = RpcModule::new(()); rpc_api.merge(versions::admin::v0_1_0::MadaraWriteRpcApiV0_1_0Server::into_rpc(starknet.clone()))?; + rpc_api.merge(versions::admin::v0_1_0::MadaraStatusRpcApiV0_1_0Server::into_rpc(starknet.clone()))?; + rpc_api.merge(versions::admin::v0_1_0::MadaraServicesRpcApiV0_1_0Server::into_rpc(starknet.clone()))?; Ok(rpc_api) } diff --git a/crates/client/rpc/src/test_utils.rs b/crates/client/rpc/src/test_utils.rs index 1f5939e9b..6629f1df6 100644 --- a/crates/client/rpc/src/test_utils.rs +++ b/crates/client/rpc/src/test_utils.rs @@ -14,6 +14,7 @@ use mp_state_update::{ StorageEntry, }; use mp_transactions::{BroadcastedDeclareTransactionV0, InvokeTransaction, InvokeTransactionV0, Transaction}; +use mp_utils::service::ServiceContext; use rstest::fixture; use starknet_core::types::{ BroadcastedDeclareTransaction, BroadcastedDeployAccountTransaction, BroadcastedInvokeTransaction, @@ -60,7 +61,7 @@ impl AddTransactionProvider for TestTransactionProvider { pub fn rpc_test_setup() -> (Arc, Starknet) { let chain_config = Arc::new(ChainConfig::madara_test()); let backend = MadaraBackend::open_for_testing(chain_config.clone()); - let rpc = Starknet::new(backend.clone(), Arc::new(TestTransactionProvider)); + let rpc = Starknet::new(backend.clone(), Arc::new(TestTransactionProvider), ServiceContext::new_for_testing()); (backend, rpc) } diff --git a/crates/client/rpc/src/versions/admin/v0_1_0/api.rs b/crates/client/rpc/src/versions/admin/v0_1_0/api.rs index 9a5ff6882..31e1752af 100644 --- a/crates/client/rpc/src/versions/admin/v0_1_0/api.rs +++ b/crates/client/rpc/src/versions/admin/v0_1_0/api.rs @@ -14,3 +14,99 @@ pub trait MadaraWriteRpcApi { declare_transaction_v0: BroadcastedDeclareTransactionV0, ) -> RpcResult; } + +#[versioned_rpc("V0_1_0", "madara")] +pub trait MadaraStatusRpcApi { + /// Can be used to check node availability and network latency + /// + /// # Returns + /// + /// * Ping time in unix time. + #[method(name = "ping")] + async fn ping(&self) -> RpcResult; + + /// Stops the node by gracefully shutting down each of its services. + /// + /// # Returns + /// + /// * Time of shutdown in unix time. + #[method(name = "shutdown")] + async fn shutdown(&self) -> RpcResult; + + /// Periodically sends a signal that the node is alive. + /// + /// # Sends + /// + /// * Current time in unix time + #[subscription(name = "pulse", unsubscribe = "unsubscribe", item = u64)] + async fn pulse(&self) -> jsonrpsee::core::SubscriptionResult; +} + +#[versioned_rpc("V0_1_0", "madara")] +pub trait MadaraServicesRpcApi { + /// Disables user-facing rpc services. + /// + /// This only works if user rpc has been enabled on startup, otherwise this + /// does nothing. + /// + /// # Returns + /// + /// True if user rpc was previously enabled. + #[method(name = "rpcDisable")] + async fn service_rpc_disable(&self) -> RpcResult; + + /// Enables user-facing rpc services. + /// + /// This only works if user rpc has been enabled on startup, otherwise this + /// does nothing. + /// + /// # Returns + /// + /// True if user rpc was previously enabled. + #[method(name = "rpcEnable")] + async fn service_rpc_enable(&self) -> RpcResult; + + /// Restarts user-facing rpc services, with a 5s grace period in between. + /// + /// This only works if user rpc has been enabled on startup, otherwise this + /// does nothing. + /// + /// # Returns + /// + /// True if user rpc was previously enabled. + #[method(name = "rpcRestart")] + async fn service_rpc_restart(&self) -> RpcResult; + + /// Disables l1 and l2 sync services. + /// + /// This only works if sync services have been enabled on startup, otherwise + /// this does nothing. + /// + /// # Returns + /// + /// True if any of l1 or l2 sync was previously enabled. + #[method(name = "syncDisable")] + async fn service_sync_disable(&self) -> RpcResult; + + /// Enables l1 and l2 sync services. + /// + /// This only works if sync services have been enabled on startup, otherwise + /// this does nothing. + /// + /// # Returns + /// + /// True if any of l1 or l2 sync was previously enabled. + #[method(name = "syncEnable")] + async fn service_sync_enable(&self) -> RpcResult; + + /// Disables l1 and l2 sync services, with a 5s grace period in between. + /// + /// This only works if sync services have been enabled on startup, otherwise + /// this does nothing. + /// + /// # Returns + /// + /// True if l1 or l2 sync was previously enabled. + #[method(name = "syncRestart")] + async fn service_sync_restart(&self) -> RpcResult; +} diff --git a/crates/client/rpc/src/versions/admin/v0_1_0/methods/mod.rs b/crates/client/rpc/src/versions/admin/v0_1_0/methods/mod.rs index d692799d7..1deb074ca 100644 --- a/crates/client/rpc/src/versions/admin/v0_1_0/methods/mod.rs +++ b/crates/client/rpc/src/versions/admin/v0_1_0/methods/mod.rs @@ -1 +1,3 @@ +pub mod services; +pub mod status; pub mod write; diff --git a/crates/client/rpc/src/versions/admin/v0_1_0/methods/services.rs b/crates/client/rpc/src/versions/admin/v0_1_0/methods/services.rs new file mode 100644 index 000000000..2257aec7c --- /dev/null +++ b/crates/client/rpc/src/versions/admin/v0_1_0/methods/services.rs @@ -0,0 +1,70 @@ +use std::time::Duration; + +use jsonrpsee::core::{async_trait, RpcResult}; +use mp_utils::service::MadaraService; + +use crate::{versions::admin::v0_1_0::MadaraServicesRpcApiV0_1_0Server, Starknet}; + +const RESTART_INTERVAL: Duration = Duration::from_secs(5); + +#[async_trait] +impl MadaraServicesRpcApiV0_1_0Server for Starknet { + #[tracing::instrument(skip(self), fields(module = "Admin"))] + async fn service_rpc_disable(&self) -> RpcResult { + tracing::info!("🔌 Stopping RPC service..."); + Ok(self.ctx.service_remove(MadaraService::Rpc)) + } + + #[tracing::instrument(skip(self), fields(module = "Admin"))] + async fn service_rpc_enable(&self) -> RpcResult { + tracing::info!("🔌 Starting RPC service..."); + Ok(self.ctx.service_add(MadaraService::Rpc)) + } + + #[tracing::instrument(skip(self), fields(module = "Admin"))] + async fn service_rpc_restart(&self) -> RpcResult { + tracing::info!("🔌 Restarting RPC service..."); + + let res = self.ctx.service_remove(MadaraService::Rpc); + tokio::time::sleep(RESTART_INTERVAL).await; + self.ctx.service_add(MadaraService::Rpc); + + tracing::info!("🔌 Restart complete (Rpc)"); + + return Ok(res); + } + + #[tracing::instrument(skip(self), fields(module = "Admin"))] + async fn service_sync_disable(&self) -> RpcResult { + tracing::info!("🔌 Stopping Sync service..."); + + let res = self.ctx.service_remove(MadaraService::L1Sync) | self.ctx.service_remove(MadaraService::L2Sync); + + Ok(res) + } + + #[tracing::instrument(skip(self), fields(module = "Admin"))] + async fn service_sync_enable(&self) -> RpcResult { + tracing::info!("🔌 Starting Sync service..."); + + let res = self.ctx.service_add(MadaraService::L1Sync) | self.ctx.service_add(MadaraService::L2Sync); + + Ok(res) + } + + #[tracing::instrument(skip(self), fields(module = "Admin"))] + async fn service_sync_restart(&self) -> RpcResult { + tracing::info!("🔌 Stopping Sync service..."); + + let res = self.ctx.service_remove(MadaraService::L1Sync) | self.ctx.service_remove(MadaraService::L2Sync); + + tokio::time::sleep(RESTART_INTERVAL).await; + + self.ctx.service_add(MadaraService::L1Sync); + self.ctx.service_add(MadaraService::L2Sync); + + tracing::info!("🔌 Restart complete (Sync)"); + + Ok(res) + } +} diff --git a/crates/client/rpc/src/versions/admin/v0_1_0/methods/status.rs b/crates/client/rpc/src/versions/admin/v0_1_0/methods/status.rs new file mode 100644 index 000000000..b1e2b25cb --- /dev/null +++ b/crates/client/rpc/src/versions/admin/v0_1_0/methods/status.rs @@ -0,0 +1,42 @@ +use std::time::{Duration, SystemTime}; + +use jsonrpsee::core::async_trait; + +use crate::{errors::ErrorExtWs, versions::admin::v0_1_0::MadaraStatusRpcApiV0_1_0Server, Starknet}; + +#[async_trait] +impl MadaraStatusRpcApiV0_1_0Server for Starknet { + async fn ping(&self) -> jsonrpsee::core::RpcResult { + Ok(unix_now()) + } + + #[tracing::instrument(skip(self), fields(module = "Admin"))] + async fn shutdown(&self) -> jsonrpsee::core::RpcResult { + self.ctx.cancel_global(); + tracing::info!("🔌 Shutting down node..."); + Ok(unix_now()) + } + + async fn pulse( + &self, + subscription_sink: jsonrpsee::PendingSubscriptionSink, + ) -> jsonrpsee::core::SubscriptionResult { + let sink = + subscription_sink.accept().await.or_internal_server_error("Failed to establish websocket connection")?; + + while !self.ctx.is_cancelled() { + let now = unix_now(); + let msg = jsonrpsee::SubscriptionMessage::from_json(&now) + .or_else_internal_server_error(|| format!("Failed to create response message at unix time {now}"))?; + sink.send(msg).await.or_internal_server_error("Failed to respond to websocket request")?; + + tokio::time::sleep(Duration::from_secs(10)).await; + } + + Ok(()) + } +} + +fn unix_now() -> u64 { + SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap_or_default().as_secs() +} diff --git a/crates/client/sync/Cargo.toml b/crates/client/sync/Cargo.toml index fbbb34f0d..4dec773a9 100644 --- a/crates/client/sync/Cargo.toml +++ b/crates/client/sync/Cargo.toml @@ -24,6 +24,7 @@ regex.workspace = true mc-db = { workspace = true, features = ["testing"] } mc-block-import = { workspace = true, features = ["testing"] } +mp-utils = { workspace = true, features = ["testing"] } # Compile the test contracts in test cfg. m-cairo-test-contracts.workspace = true @@ -34,6 +35,7 @@ mc-analytics.workspace = true mc-block-import.workspace = true mc-db.workspace = true mc-gateway-client.workspace = true +mc-rpc.workspace = true mc-telemetry.workspace = true mp-block.workspace = true @@ -70,6 +72,7 @@ tracing-subscriber = { workspace = true, features = ["env-filter"] } anyhow.workspace = true futures = { workspace = true, default-features = true } hyper.workspace = true +jsonrpsee.workspace = true serde_json.workspace = true thiserror.workspace = true tokio = { workspace = true, features = [ diff --git a/crates/client/sync/src/fetch/fetchers.rs b/crates/client/sync/src/fetch/fetchers.rs index 3e4c8b513..36e3f6e7f 100644 --- a/crates/client/sync/src/fetch/fetchers.rs +++ b/crates/client/sync/src/fetch/fetchers.rs @@ -13,6 +13,7 @@ use mp_gateway::block::{ProviderBlock, ProviderBlockPending}; use mp_gateway::error::{SequencerError, StarknetError, StarknetErrorCode}; use mp_gateway::state_update::ProviderStateUpdateWithBlockPendingMaybe::{self}; use mp_gateway::state_update::{ProviderStateUpdate, ProviderStateUpdatePending, StateDiff}; +use mp_utils::service::ServiceContext; use mp_utils::{stopwatch_end, wait_or_graceful_shutdown, PerfStopwatch}; use starknet_api::core::ChainId; use starknet_types_core::felt::Felt; @@ -40,15 +41,27 @@ pub struct FetchConfig { pub sync_polling_interval: Option, /// Number of blocks to sync (for testing purposes). pub n_blocks_to_sync: Option, + /// Number of blocks between db flushes + pub flush_every_n_blocks: u64, + /// Number of seconds between db flushes + pub flush_every_n_seconds: u64, /// Stops the node once all blocks have been synced (for testing purposes) pub stop_on_sync: bool, + /// Number of blocks to fetch in parallel during the sync process + pub sync_parallelism: u8, + /// True if the node is called with `--warp-update-receiver` + pub warp_update: bool, + /// The port used for nodes to make rpc calls during a warp update. + pub warp_update_port_rpc: u16, + /// The port used for nodes to send blocks during a warp update. + pub warp_update_port_fgw: u16, } pub async fn fetch_pending_block_and_updates( parent_block_hash: Felt, chain_id: &ChainId, provider: &GatewayProvider, - cancellation_token: &tokio_util::sync::CancellationToken, + ctx: &ServiceContext, ) -> Result, FetchError> { let block_id = BlockId::Tag(BlockTag::Pending); let sw = PerfStopwatch::new(); @@ -67,7 +80,7 @@ pub async fn fetch_pending_block_and_updates( }, MAX_RETRY, BASE_DELAY, - cancellation_token, + ctx, ) .await?; @@ -86,8 +99,7 @@ pub async fn fetch_pending_block_and_updates( ); return Ok(None); } - let class_update = - fetch_class_updates(chain_id, &state_update.state_diff, block_id.clone(), provider, cancellation_token).await?; + let class_update = fetch_class_updates(chain_id, &state_update.state_diff, block_id.clone(), provider, ctx).await?; stopwatch_end!(sw, "fetching {:?}: {:?}", block_id); @@ -101,7 +113,7 @@ pub async fn fetch_block_and_updates( chain_id: &ChainId, block_n: u64, provider: &GatewayProvider, - cancellation_token: &tokio_util::sync::CancellationToken, + ctx: &ServiceContext, ) -> Result { let block_id = BlockId::Number(block_n); @@ -115,11 +127,10 @@ pub async fn fetch_block_and_updates( }, MAX_RETRY, BASE_DELAY, - cancellation_token, + ctx, ) .await?; - let class_update = - fetch_class_updates(chain_id, state_update.state_diff(), block_id, provider, cancellation_token).await?; + let class_update = fetch_class_updates(chain_id, state_update.state_diff(), block_id, provider, ctx).await?; stopwatch_end!(sw, "fetching {:?}: {:?}", block_n); @@ -136,7 +147,7 @@ async fn retry( mut f: F, max_retries: u32, base_delay: Duration, - cancellation_token: &tokio_util::sync::CancellationToken, + ctx: &ServiceContext, ) -> Result where F: FnMut() -> Fut, @@ -162,7 +173,7 @@ where tracing::warn!("The provider has returned an error: {}, retrying in {:?}", err, delay) } - if wait_or_graceful_shutdown(tokio::time::sleep(delay), cancellation_token).await.is_none() { + if wait_or_graceful_shutdown(tokio::time::sleep(delay), ctx).await.is_none() { return Err(SequencerError::StarknetError(StarknetError::block_not_found())); } } @@ -176,7 +187,7 @@ async fn fetch_class_updates( state_diff: &StateDiff, block_id: BlockId, provider: &GatewayProvider, - cancellation_token: &tokio_util::sync::CancellationToken, + ctx: &ServiceContext, ) -> anyhow::Result> { // for blocks before 2597 on mainnet new classes are not declared in the state update // https://github.com/madara-alliance/madara/issues/233 @@ -196,13 +207,8 @@ async fn fetch_class_updates( let legacy_class_futures = legacy_classes.into_iter().map(|class_hash| { let block_id = block_id.clone(); async move { - let (class_hash, contract_class) = retry( - || fetch_class(class_hash, block_id.clone(), provider), - MAX_RETRY, - BASE_DELAY, - cancellation_token, - ) - .await?; + let (class_hash, contract_class) = + retry(|| fetch_class(class_hash, block_id.clone(), provider), MAX_RETRY, BASE_DELAY, ctx).await?; let ContractClass::Legacy(contract_class) = contract_class else { return Err(L2SyncError::UnexpectedClassType { class_hash }); @@ -218,13 +224,8 @@ async fn fetch_class_updates( let sierra_class_futures = sierra_classes.into_iter().map(|(class_hash, &compiled_class_hash)| { let block_id = block_id.clone(); async move { - let (class_hash, contract_class) = retry( - || fetch_class(class_hash, block_id.clone(), provider), - MAX_RETRY, - BASE_DELAY, - cancellation_token, - ) - .await?; + let (class_hash, contract_class) = + retry(|| fetch_class(class_hash, block_id.clone(), provider), MAX_RETRY, BASE_DELAY, ctx).await?; let ContractClass::Sierra(contract_class) = contract_class else { return Err(L2SyncError::UnexpectedClassType { class_hash }); @@ -348,7 +349,7 @@ mod test_l2_fetchers { Felt::from_hex_unchecked("0x1db054847816dbc0098c88915430c44da2c1e3f910fbcb454e14282baba0e75"), &ctx.backend.chain_config().chain_id, &ctx.provider, - &tokio_util::sync::CancellationToken::new(), + &ServiceContext::new_for_testing(), ) .await; @@ -434,7 +435,7 @@ mod test_l2_fetchers { Felt::from_hex_unchecked("0x1db054847816dbc0098c88915430c44da2c1e3f910fbcb454e14282baba0e75"), &ctx.backend.chain_config().chain_id, &ctx.provider, - &tokio_util::sync::CancellationToken::new(), + &ServiceContext::new_for_testing(), ) .await; @@ -647,7 +648,7 @@ mod test_l2_fetchers { state_diff, BlockId::Number(5), &ctx.provider, - &tokio_util::sync::CancellationToken::new(), + &ServiceContext::new_for_testing(), ) .await .expect("Failed to fetch class updates"); @@ -684,7 +685,7 @@ mod test_l2_fetchers { state_diff, BlockId::Number(5), &ctx.provider, - &tokio_util::sync::CancellationToken::new(), + &ServiceContext::new_for_testing(), ) .await; diff --git a/crates/client/sync/src/fetch/fetchers_real_fgw_test.rs b/crates/client/sync/src/fetch/fetchers_real_fgw_test.rs index ad416196c..5c194aac3 100644 --- a/crates/client/sync/src/fetch/fetchers_real_fgw_test.rs +++ b/crates/client/sync/src/fetch/fetchers_real_fgw_test.rs @@ -15,7 +15,7 @@ async fn test_can_fetch_pending_block(client_mainnet_fixture: GatewayProvider) { Felt::ZERO, &ChainId::Mainnet, &client_mainnet_fixture, - &tokio_util::sync::CancellationToken::new(), + &ServiceContext::new_for_testing(), ) .await .unwrap(); @@ -36,7 +36,7 @@ async fn test_can_fetch_and_convert_block(client_mainnet_fixture: GatewayProvide &ChainId::Mainnet, block_n, &client_mainnet_fixture, - &tokio_util::sync::CancellationToken::new(), + &ServiceContext::new_for_testing(), ) .await .unwrap(); diff --git a/crates/client/sync/src/fetch/mod.rs b/crates/client/sync/src/fetch/mod.rs index 90b24f736..8e9c6de71 100644 --- a/crates/client/sync/src/fetch/mod.rs +++ b/crates/client/sync/src/fetch/mod.rs @@ -1,96 +1,113 @@ -use std::sync::Arc; use std::time::Duration; +use std::{num::NonZeroUsize, sync::Arc}; use futures::prelude::*; use mc_block_import::UnverifiedFullBlock; use mc_db::MadaraBackend; use mc_gateway_client::GatewayProvider; +use mc_rpc::versions::admin::v0_1_0::MadaraStatusRpcApiV0_1_0Client; use mp_gateway::error::{SequencerError, StarknetError, StarknetErrorCode}; -use mp_utils::{channel_wait_or_graceful_shutdown, wait_or_graceful_shutdown}; +use mp_utils::{channel_wait_or_graceful_shutdown, service::ServiceContext, wait_or_graceful_shutdown}; use tokio::sync::{mpsc, oneshot}; +use url::Url; use crate::fetch::fetchers::fetch_block_and_updates; pub mod fetchers; -#[allow(clippy::too_many_arguments)] +pub struct L2FetchConfig { + pub first_block: u64, + pub fetch_stream_sender: mpsc::Sender, + pub once_caught_up_sender: oneshot::Sender<()>, + pub sync_polling_interval: Option, + pub n_blocks_to_sync: Option, + pub stop_on_sync: bool, + pub sync_parallelism: usize, + pub warp_update: bool, + pub warp_update_port_rpc: u16, + pub warp_update_port_fgw: u16, +} + pub async fn l2_fetch_task( backend: Arc, - first_block: u64, - n_blocks_to_sync: Option, - stop_on_sync: bool, - fetch_stream_sender: mpsc::Sender, provider: Arc, - sync_polling_interval: Option, - once_caught_up_callback: oneshot::Sender<()>, - cancellation_token: tokio_util::sync::CancellationToken, + ctx: ServiceContext, + mut config: L2FetchConfig, ) -> anyhow::Result<()> { // First, catch up with the chain - let backend = &backend; + // let backend = &backend; - let mut next_block = first_block; + let L2FetchConfig { first_block, warp_update, warp_update_port_rpc, warp_update_port_fgw, .. } = config; - { - // Fetch blocks and updates in parallel one time before looping - let fetch_stream = (first_block..).take(n_blocks_to_sync.unwrap_or(u64::MAX) as _).map(|block_n| { - let provider = Arc::clone(&provider); - let cancellation_token = cancellation_token.clone(); - async move { - ( - block_n, - fetch_block_and_updates(&backend.chain_config().chain_id, block_n, &provider, &cancellation_token) - .await, - ) - } - }); + if warp_update { + let client = jsonrpsee::http_client::HttpClientBuilder::default() + .build(format!("http://localhost:{warp_update_port_rpc}")) + .expect("Building client"); - // Have 10 fetches in parallel at once, using futures Buffered - let mut fetch_stream = stream::iter(fetch_stream).buffered(10); - while let Some((block_n, val)) = - channel_wait_or_graceful_shutdown(fetch_stream.next(), &cancellation_token).await - { - match val { - Err(FetchError::Sequencer(SequencerError::StarknetError(StarknetError { - code: StarknetErrorCode::BlockNotFound, - .. - }))) => { - tracing::info!("🥳 The sync process has caught up with the tip of the chain"); - break; - } - val => { - if fetch_stream_sender.send(val?).await.is_err() { - // join error - break; - } - } - } + if client.ping().await.is_err() { + tracing::error!("❗ Failed to connect to warp update sender on http://localhost:{warp_update_port_rpc}"); + ctx.cancel_global(); + return Ok(()); + } + + let provider = Arc::new(GatewayProvider::new( + Url::parse(&format!("http://localhost:{warp_update_port_fgw}/gateway/")) + .expect("Failed to parse warp update sender gateway url. This should not fail in prod"), + Url::parse(&format!("http://localhost:{warp_update_port_fgw}/feeder_gateway/")) + .expect("Failed to parse warp update sender feeder gateway url. This should not fail in prod"), + )); + + let save = config.sync_parallelism; + let available_parallelism = std::thread::available_parallelism() + .unwrap_or(NonZeroUsize::new(1usize).expect("1 should always be in usize bound")); + config.sync_parallelism = Into::::into(available_parallelism) * 2; + + let next_block = match sync_blocks(backend.as_ref(), &provider, &ctx, &config).await? { + SyncStatus::Full(next_block) => next_block, + SyncStatus::UpTo(next_block) => next_block, + }; + + if client.shutdown().await.is_err() { + tracing::error!("❗ Failed to shutdown warp update sender"); + ctx.cancel_global(); + return Ok(()); + } + + config.n_blocks_to_sync = config.n_blocks_to_sync.map(|n| n - (next_block - first_block)); + config.first_block = next_block; + config.sync_parallelism = save; + } - next_block = block_n + 1; + let mut next_block = match sync_blocks(backend.as_ref(), &provider, &ctx, &config).await? { + SyncStatus::Full(next_block) => { + tracing::info!("🥳 The sync process has caught up with the tip of the chain"); + next_block } + SyncStatus::UpTo(next_block) => next_block, }; - // We do not call cancellation here as we still want the block to be stored + if config.stop_on_sync { + return anyhow::Ok(()); + } + + let L2FetchConfig { fetch_stream_sender, once_caught_up_sender, sync_polling_interval, stop_on_sync, .. } = config; + + // We do not call cancellation here as we still want the blocks to be stored if stop_on_sync { return anyhow::Ok(()); } - let _ = once_caught_up_callback.send(()); + // TODO: replace this with a tokio::sync::Notify + let _ = once_caught_up_sender.send(()); if let Some(sync_polling_interval) = sync_polling_interval { // Polling let mut interval = tokio::time::interval(sync_polling_interval); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - while wait_or_graceful_shutdown(interval.tick(), &cancellation_token).await.is_some() { + while wait_or_graceful_shutdown(interval.tick(), &ctx).await.is_some() { loop { - match fetch_block_and_updates( - &backend.chain_config().chain_id, - next_block, - &provider, - &cancellation_token, - ) - .await - { + match fetch_block_and_updates(&backend.chain_config().chain_id, next_block, &provider, &ctx).await { Err(FetchError::Sequencer(SequencerError::StarknetError(StarknetError { code: StarknetErrorCode::BlockNotFound, .. @@ -112,6 +129,74 @@ pub async fn l2_fetch_task( Ok(()) } +/// Whether a chain has been caught up to the tip or only a certain block number +/// +/// This is mostly relevant in the context of the `--n-blocks-to-sync` cli +/// argument which states that a node might stop synchronizing before it has +/// caught up with the tip of the chain. +enum SyncStatus { + Full(u64), + UpTo(u64), +} + +/// Sync blocks in parallel from a [GatewayProvider] +/// +/// This function is called during warp update as well as l2 catch up to sync +/// to the tip of a chain. In the case of warp update, this is the tip of the +/// chain provided by the warp update sender. In the case of l2 sync, this is +/// the tip of the entire chain (mainnet, sepolia, devnet). +/// +/// This function _is not_ called after the chain has been synced as this has +/// a different fetch logic which does not fetch block in parallel. +/// +/// Fetch config, including number of blocks to fetch and fetch parallelism, +/// is defined in [L2FetchConfig]. +async fn sync_blocks( + backend: &MadaraBackend, + provider: &Arc, + ctx: &ServiceContext, + config: &L2FetchConfig, +) -> anyhow::Result { + let L2FetchConfig { first_block, fetch_stream_sender, n_blocks_to_sync, sync_parallelism, .. } = config; + + // Fetch blocks and updates in parallel one time before looping + let fetch_stream = + (*first_block..).take(n_blocks_to_sync.unwrap_or(u64::MAX) as _).map(|block_n| { + let provider = Arc::clone(provider); + let ctx = ctx.clone(); + async move { + (block_n, fetch_block_and_updates(&backend.chain_config().chain_id, block_n, &provider, &ctx).await) + } + }); + + // Have `sync_parallelism` fetches in parallel at once, using futures Buffered + let mut next_block = *first_block; + let mut fetch_stream = stream::iter(fetch_stream).buffered(*sync_parallelism); + + loop { + let Some((block_n, val)) = channel_wait_or_graceful_shutdown(fetch_stream.next(), ctx).await else { + return anyhow::Ok(SyncStatus::UpTo(next_block)); + }; + + match val { + Err(FetchError::Sequencer(SequencerError::StarknetError(StarknetError { + code: StarknetErrorCode::BlockNotFound, + .. + }))) => { + return anyhow::Ok(SyncStatus::Full(next_block)); + } + val => { + if fetch_stream_sender.send(val?).await.is_err() { + // join error + return anyhow::Ok(SyncStatus::UpTo(next_block)); + } + } + } + + next_block = block_n + 1; + } +} + #[derive(thiserror::Error, Debug)] pub enum FetchError { #[error(transparent)] @@ -158,14 +243,20 @@ mod test_l2_fetch_task { Duration::from_secs(5), l2_fetch_task( backend, - 0, - Some(5), - false, - fetch_stream_sender, provider, - Some(polling_interval), - once_caught_up_sender, - tokio_util::sync::CancellationToken::new(), + ServiceContext::new_for_testing(), + L2FetchConfig { + first_block: 0, + fetch_stream_sender, + once_caught_up_sender, + sync_polling_interval: Some(polling_interval), + n_blocks_to_sync: Some(5), + stop_on_sync: false, + sync_parallelism: 10, + warp_update: false, + warp_update_port_rpc: 9943, + warp_update_port_fgw: 8080, + }, ), ) .await diff --git a/crates/client/sync/src/l2.rs b/crates/client/sync/src/l2.rs index 57e786abc..356edc08b 100644 --- a/crates/client/sync/src/l2.rs +++ b/crates/client/sync/src/l2.rs @@ -1,6 +1,7 @@ //! Contains the code required to sync data from the feeder efficiently. use crate::fetch::fetchers::fetch_pending_block_and_updates; use crate::fetch::l2_fetch_task; +use crate::fetch::L2FetchConfig; use crate::utils::trim_hash; use anyhow::Context; use futures::{stream, StreamExt}; @@ -14,6 +15,7 @@ use mc_telemetry::{TelemetryHandle, VerbosityLevel}; use mp_block::BlockId; use mp_block::BlockTag; use mp_gateway::error::SequencerError; +use mp_utils::service::ServiceContext; use mp_utils::{channel_wait_or_graceful_shutdown, wait_or_graceful_shutdown, PerfStopwatch}; use starknet_api::core::ChainId; use starknet_types_core::felt::Felt; @@ -44,22 +46,47 @@ pub struct L2StateUpdate { pub block_hash: Felt, } -#[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(backend, updates_receiver, block_import, validation), fields(module = "Sync"))] -async fn l2_verify_and_apply_task( - backend: Arc, - mut updates_receiver: mpsc::Receiver, +pub struct L2VerifyApplyConfig { block_import: Arc, - validation: BlockValidationContext, backup_every_n_blocks: Option, - telemetry: TelemetryHandle, + flush_every_n_blocks: u64, + flush_every_n_seconds: u64, stop_on_sync: bool, - cancellation_token: tokio_util::sync::CancellationToken, + telemetry: TelemetryHandle, + validation: BlockValidationContext, + block_conv_receiver: mpsc::Receiver, +} + +#[tracing::instrument(skip(backend, ctx, config), fields(module = "Sync"))] +async fn l2_verify_and_apply_task( + backend: Arc, + ctx: ServiceContext, + config: L2VerifyApplyConfig, ) -> anyhow::Result<()> { - while let Some(block) = channel_wait_or_graceful_shutdown(pin!(updates_receiver.recv()), &cancellation_token).await - { + let L2VerifyApplyConfig { + block_import, + backup_every_n_blocks, + flush_every_n_blocks, + flush_every_n_seconds, + stop_on_sync, + telemetry, + validation, + mut block_conv_receiver, + } = config; + + let mut last_block_n = 0; + let mut instant = std::time::Instant::now(); + let target_duration = std::time::Duration::from_secs(flush_every_n_seconds); + + while let Some(block) = channel_wait_or_graceful_shutdown(pin!(block_conv_receiver.recv()), &ctx).await { let BlockImportResult { header, block_hash } = block_import.verify_apply(block, validation.clone()).await?; + if header.block_number - last_block_n >= flush_every_n_blocks || instant.elapsed() >= target_duration { + last_block_n = header.block_number; + instant = std::time::Instant::now(); + backend.flush().context("Flushing database")?; + } + tracing::info!( "✨ Imported #{} ({}) and updated state root ({})", header.block_number, @@ -92,7 +119,7 @@ async fn l2_verify_and_apply_task( } if stop_on_sync { - cancellation_token.cancel() + ctx.cancel_global() } Ok(()) @@ -103,26 +130,26 @@ async fn l2_block_conversion_task( output: mpsc::Sender, block_import: Arc, validation: BlockValidationContext, - cancellation_token: tokio_util::sync::CancellationToken, + ctx: ServiceContext, ) -> anyhow::Result<()> { // Items of this stream are futures that resolve to blocks, which becomes a regular stream of blocks // using futures buffered. let conversion_stream = stream::unfold( - (updates_receiver, block_import, validation.clone(), cancellation_token.clone()), - |(mut updates_recv, block_import, validation, cancellation_token)| async move { - channel_wait_or_graceful_shutdown(updates_recv.recv(), &cancellation_token).await.map(|block| { + (updates_receiver, block_import, validation.clone(), ctx.clone()), + |(mut updates_recv, block_import, validation, ctx)| async move { + channel_wait_or_graceful_shutdown(updates_recv.recv(), &ctx).await.map(|block| { let block_import_ = Arc::clone(&block_import); let validation_ = validation.clone(); ( async move { block_import_.pre_validate(block, validation_).await }, - (updates_recv, block_import, validation, cancellation_token), + (updates_recv, block_import, validation, ctx), ) }) }, ); let mut stream = pin!(conversion_stream.buffered(10)); - while let Some(block) = channel_wait_or_graceful_shutdown(stream.next(), &cancellation_token).await { + while let Some(block) = channel_wait_or_graceful_shutdown(stream.next(), &ctx).await { if output.send(block?).await.is_err() { // channel closed break; @@ -131,15 +158,22 @@ async fn l2_block_conversion_task( Ok(()) } -async fn l2_pending_block_task( - backend: Arc, +struct L2PendingBlockConfig { block_import: Arc, + once_caught_up_receiver: oneshot::Receiver<()>, + pending_block_poll_interval: Duration, validation: BlockValidationContext, - sync_finished_cb: oneshot::Receiver<()>, +} + +async fn l2_pending_block_task( + backend: Arc, provider: Arc, - pending_block_poll_interval: Duration, - cancellation_token: tokio_util::sync::CancellationToken, + ctx: ServiceContext, + config: L2PendingBlockConfig, ) -> anyhow::Result<()> { + let L2PendingBlockConfig { block_import, once_caught_up_receiver, pending_block_poll_interval, validation } = + config; + // clear pending status { backend.clear_pending_block().context("Clearing pending block")?; @@ -147,7 +181,7 @@ async fn l2_pending_block_task( } // we start the pending block task only once the node has been fully sync - if sync_finished_cb.await.is_err() { + if once_caught_up_receiver.await.is_err() { // channel closed return Ok(()); } @@ -156,21 +190,17 @@ async fn l2_pending_block_task( let mut interval = tokio::time::interval(pending_block_poll_interval); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - while wait_or_graceful_shutdown(interval.tick(), &cancellation_token).await.is_some() { + while wait_or_graceful_shutdown(interval.tick(), &ctx).await.is_some() { tracing::debug!("Getting pending block..."); let current_block_hash = backend .get_block_hash(&BlockId::Tag(BlockTag::Latest)) .context("Getting latest block hash")? .unwrap_or(/* genesis parent block hash */ Felt::ZERO); - let Some(block) = fetch_pending_block_and_updates( - current_block_hash, - &backend.chain_config().chain_id, - &provider, - &cancellation_token, - ) - .await - .context("Getting pending block from FGW")? + let Some(block) = + fetch_pending_block_and_updates(current_block_hash, &backend.chain_config().chain_id, &provider, &ctx) + .await + .context("Getting pending block from FGW")? else { continue; }; @@ -195,29 +225,34 @@ pub struct L2SyncConfig { pub first_block: u64, pub n_blocks_to_sync: Option, pub stop_on_sync: bool, + pub sync_parallelism: u8, pub verify: bool, pub sync_polling_interval: Option, pub backup_every_n_blocks: Option, + pub flush_every_n_blocks: u64, + pub flush_every_n_seconds: u64, pub pending_block_poll_interval: Duration, pub ignore_block_order: bool, + pub warp_update: bool, + pub warp_update_port_rpc: u16, + pub warp_update_port_fgw: u16, + pub chain_id: ChainId, + pub telemetry: TelemetryHandle, + pub block_importer: Arc, } /// Spawns workers to fetch blocks and state updates from the feeder. -#[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(backend, provider, config, chain_id, telemetry, block_importer), fields(module = "Sync"))] +#[tracing::instrument(skip(backend, provider, ctx, config), fields(module = "Sync"))] pub async fn sync( backend: &Arc, provider: GatewayProvider, + ctx: ServiceContext, config: L2SyncConfig, - chain_id: ChainId, - telemetry: TelemetryHandle, - block_importer: Arc, - cancellation_token: tokio_util::sync::CancellationToken, ) -> anyhow::Result<()> { let (fetch_stream_sender, fetch_stream_receiver) = mpsc::channel(8); let (block_conv_sender, block_conv_receiver) = mpsc::channel(4); let provider = Arc::new(provider); - let (once_caught_up_cb_sender, once_caught_up_cb_receiver) = oneshot::channel(); + let (once_caught_up_sender, once_caught_up_receiver) = oneshot::channel(); // [Fetch task] ==new blocks and updates=> [Block conversion task] ======> [Verification and apply // task] @@ -231,7 +266,7 @@ pub async fn sync( let validation = BlockValidationContext { trust_transaction_hashes: false, trust_global_tries: !config.verify, - chain_id, + chain_id: config.chain_id, trust_class_hashes: false, ignore_block_order: config.ignore_block_order, }; @@ -239,40 +274,52 @@ pub async fn sync( let mut join_set = JoinSet::new(); join_set.spawn(l2_fetch_task( Arc::clone(backend), - config.first_block, - config.n_blocks_to_sync, - config.stop_on_sync, - fetch_stream_sender, Arc::clone(&provider), - config.sync_polling_interval, - once_caught_up_cb_sender, - cancellation_token.clone(), + ctx.clone(), + L2FetchConfig { + first_block: config.first_block, + fetch_stream_sender, + once_caught_up_sender, + sync_polling_interval: config.sync_polling_interval, + n_blocks_to_sync: config.n_blocks_to_sync, + stop_on_sync: config.stop_on_sync, + sync_parallelism: config.sync_parallelism as usize, + warp_update: config.warp_update, + warp_update_port_rpc: config.warp_update_port_rpc, + warp_update_port_fgw: config.warp_update_port_fgw, + }, )); join_set.spawn(l2_block_conversion_task( fetch_stream_receiver, block_conv_sender, - Arc::clone(&block_importer), + Arc::clone(&config.block_importer), validation.clone(), - cancellation_token.clone(), + ctx.clone(), )); join_set.spawn(l2_verify_and_apply_task( Arc::clone(backend), - block_conv_receiver, - Arc::clone(&block_importer), - validation.clone(), - config.backup_every_n_blocks, - telemetry, - config.stop_on_sync, - cancellation_token.clone(), + ctx.clone(), + L2VerifyApplyConfig { + block_import: Arc::clone(&config.block_importer), + backup_every_n_blocks: config.backup_every_n_blocks, + flush_every_n_blocks: config.flush_every_n_blocks, + flush_every_n_seconds: config.flush_every_n_seconds, + stop_on_sync: config.stop_on_sync, + telemetry: config.telemetry, + validation: validation.clone(), + block_conv_receiver, + }, )); join_set.spawn(l2_pending_block_task( Arc::clone(backend), - Arc::clone(&block_importer), - validation.clone(), - once_caught_up_cb_receiver, provider, - config.pending_block_poll_interval, - cancellation_token.clone(), + ctx.clone(), + L2PendingBlockConfig { + block_import: Arc::clone(&config.block_importer), + once_caught_up_receiver, + pending_block_poll_interval: config.pending_block_poll_interval, + validation: validation.clone(), + }, )); while let Some(res) = join_set.join_next().await { @@ -321,7 +368,7 @@ mod tests { async fn test_l2_verify_and_apply_task(test_setup: Arc) { let backend = test_setup; let (block_conv_sender, block_conv_receiver) = mpsc::channel(100); - let block_importer = Arc::new(BlockImporter::new(backend.clone(), None, true).unwrap()); + let block_import = Arc::new(BlockImporter::new(backend.clone(), None).unwrap()); let validation = BlockValidationContext::new(backend.chain_config().chain_id.clone()); let telemetry = TelemetryService::new(true, vec![]).unwrap().new_handle(); @@ -329,16 +376,20 @@ mod tests { let task_handle = tokio::spawn(l2_verify_and_apply_task( backend.clone(), - block_conv_receiver, - block_importer.clone(), - validation.clone(), - Some(1), - telemetry, - false, - tokio_util::sync::CancellationToken::new(), + ServiceContext::new_for_testing(), + L2VerifyApplyConfig { + block_import: block_import.clone(), + backup_every_n_blocks: Some(1), + flush_every_n_blocks: 1, + flush_every_n_seconds: 10, + stop_on_sync: false, + telemetry, + validation: validation.clone(), + block_conv_receiver, + }, )); - let mock_pre_validated_block = block_importer.pre_validate(mock_block, validation.clone()).await.unwrap(); + let mock_pre_validated_block = block_import.pre_validate(mock_block, validation.clone()).await.unwrap(); block_conv_sender.send(mock_pre_validated_block).await.unwrap(); drop(block_conv_sender); @@ -382,7 +433,7 @@ mod tests { let backend = test_setup; let (updates_sender, updates_receiver) = mpsc::channel(100); let (output_sender, mut output_receiver) = mpsc::channel(100); - let block_import = Arc::new(BlockImporter::new(backend.clone(), None, true).unwrap()); + let block_import = Arc::new(BlockImporter::new(backend.clone(), None).unwrap()); let validation = BlockValidationContext::new(backend.chain_config().chain_id.clone()); let mock_block = create_dummy_unverified_full_block(); @@ -394,7 +445,7 @@ mod tests { output_sender, block_import, validation, - tokio_util::sync::CancellationToken::new(), + ServiceContext::new_for_testing(), )); let result = tokio::time::timeout(std::time::Duration::from_secs(5), output_receiver.recv()).await; @@ -436,17 +487,19 @@ mod tests { async fn test_l2_pending_block_task(test_setup: Arc) { let backend = test_setup; let ctx = TestContext::new(backend.clone()); - let block_import = Arc::new(BlockImporter::new(backend.clone(), None, true).unwrap()); + let block_import = Arc::new(BlockImporter::new(backend.clone(), None).unwrap()); let validation = BlockValidationContext::new(backend.chain_config().chain_id.clone()); let task_handle = tokio::spawn(l2_pending_block_task( backend.clone(), - block_import.clone(), - validation.clone(), - ctx.once_caught_up_receiver, ctx.provider.clone(), - std::time::Duration::from_secs(5), - tokio_util::sync::CancellationToken::new(), + ServiceContext::new_for_testing(), + L2PendingBlockConfig { + block_import: block_import.clone(), + once_caught_up_receiver: ctx.once_caught_up_receiver, + pending_block_poll_interval: std::time::Duration::from_secs(5), + validation: validation.clone(), + }, )); // Simulate the "once_caught_up" signal diff --git a/crates/client/sync/src/lib.rs b/crates/client/sync/src/lib.rs index 7854b4261..6bb874f9e 100644 --- a/crates/client/sync/src/lib.rs +++ b/crates/client/sync/src/lib.rs @@ -7,6 +7,7 @@ use mc_db::MadaraBackend; use mc_gateway_client::GatewayProvider; use mc_telemetry::TelemetryHandle; use mp_block::{BlockId, BlockTag}; +use mp_utils::service::ServiceContext; use std::{sync::Arc, time::Duration}; pub mod fetch; @@ -16,19 +17,22 @@ pub mod metrics; pub mod tests; pub mod utils; -#[allow(clippy::too_many_arguments)] -#[tracing::instrument(skip(backend, block_importer, fetch_config, telemetry))] -pub async fn sync( +pub struct SyncConfig { + pub block_importer: Arc, + pub starting_block: Option, + pub backup_every_n_blocks: Option, + pub telemetry: TelemetryHandle, + pub pending_block_poll_interval: Duration, +} + +#[tracing::instrument(skip(backend, ctx, fetch_config, sync_config))] +pub async fn l2_sync_worker( backend: &Arc, - block_importer: Arc, + ctx: ServiceContext, fetch_config: FetchConfig, - starting_block: Option, - backup_every_n_blocks: Option, - telemetry: TelemetryHandle, - pending_block_poll_interval: Duration, - cancellation_token: tokio_util::sync::CancellationToken, + sync_config: SyncConfig, ) -> anyhow::Result<()> { - let (starting_block, ignore_block_order) = if let Some(starting_block) = starting_block { + let (starting_block, ignore_block_order) = if let Some(starting_block) = sync_config.starting_block { tracing::warn!("Forcing unordered state. This will most probably break your database."); (starting_block, true) } else { @@ -55,20 +59,26 @@ pub async fn sync( l2::sync( backend, provider, + ctx, L2SyncConfig { first_block: starting_block, n_blocks_to_sync: fetch_config.n_blocks_to_sync, stop_on_sync: fetch_config.stop_on_sync, verify: fetch_config.verify, sync_polling_interval: fetch_config.sync_polling_interval, - backup_every_n_blocks, - pending_block_poll_interval, + backup_every_n_blocks: sync_config.backup_every_n_blocks, + flush_every_n_blocks: fetch_config.flush_every_n_blocks, + flush_every_n_seconds: fetch_config.flush_every_n_seconds, + pending_block_poll_interval: sync_config.pending_block_poll_interval, ignore_block_order, + sync_parallelism: fetch_config.sync_parallelism, + warp_update: fetch_config.warp_update, + warp_update_port_rpc: fetch_config.warp_update_port_rpc, + warp_update_port_fgw: fetch_config.warp_update_port_fgw, + chain_id: backend.chain_config().chain_id.clone(), + telemetry: sync_config.telemetry, + block_importer: sync_config.block_importer, }, - backend.chain_config().chain_id.clone(), - telemetry, - block_importer, - cancellation_token, ) .await?; diff --git a/crates/client/telemetry/src/lib.rs b/crates/client/telemetry/src/lib.rs index f034b63f4..c5e3bebdd 100644 --- a/crates/client/telemetry/src/lib.rs +++ b/crates/client/telemetry/src/lib.rs @@ -4,7 +4,7 @@ use std::time::SystemTime; use anyhow::Context; use futures::SinkExt; use mp_utils::channel_wait_or_graceful_shutdown; -use mp_utils::service::Service; +use mp_utils::service::{MadaraService, Service, ServiceContext}; use reqwest_websocket::{Message, RequestBuilderExt}; use tokio::sync::mpsc; use tokio::task::JoinSet; @@ -93,11 +93,7 @@ impl TelemetryService { #[async_trait::async_trait] impl Service for TelemetryService { - async fn start( - &mut self, - join_set: &mut JoinSet>, - cancellation_token: tokio_util::sync::CancellationToken, - ) -> anyhow::Result<()> { + async fn start(&mut self, join_set: &mut JoinSet>, ctx: ServiceContext) -> anyhow::Result<()> { if !self.telemetry { return Ok(()); } @@ -128,7 +124,7 @@ impl Service for TelemetryService { let rx = &mut rx; - while let Some(event) = channel_wait_or_graceful_shutdown(rx.recv(), &cancellation_token).await { + while let Some(event) = channel_wait_or_graceful_shutdown(rx.recv(), &ctx).await { tracing::debug!( "Sending telemetry event '{}'.", event.message.get("msg").and_then(|e| e.as_str()).unwrap_or("") @@ -160,4 +156,8 @@ impl Service for TelemetryService { Ok(()) } + + fn id(&self) -> MadaraService { + MadaraService::Telemetry + } } diff --git a/crates/node/src/cli/gateway.rs b/crates/node/src/cli/gateway.rs index baeccdaf8..28e49fdce 100644 --- a/crates/node/src/cli/gateway.rs +++ b/crates/node/src/cli/gateway.rs @@ -1,14 +1,17 @@ use clap::Args; +/// The default port. +pub const FGW_DEFAULT_PORT: u16 = 8080; + /// Parameters used to config gateway. #[derive(Debug, Clone, Args)] pub struct GatewayParams { /// Enable the feeder gateway server. - #[arg(env = "MADARA_FEEDER_GATEWAY_ENABLE", long, alias = "feeder-gateway")] + #[arg(env = "MADARA_FEEDER_GATEWAY_ENABLE", long)] pub feeder_gateway_enable: bool, /// Enable the gateway server. - #[arg(env = "MADARA_GATEWAY_ENABLE", long, alias = "gateway")] + #[arg(env = "MADARA_GATEWAY_ENABLE", long)] pub gateway_enable: bool, /// Listen on all network interfaces. This usually means the gateway server will be accessible externally. @@ -16,6 +19,6 @@ pub struct GatewayParams { pub gateway_external: bool, /// The gateway port to listen at. - #[arg(env = "MADARA_GATEWAY_PORT", long, value_name = "GATEWAY PORT", default_value = "8080")] + #[arg(env = "MADARA_GATEWAY_PORT", long, value_name = "GATEWAY PORT", default_value_t = FGW_DEFAULT_PORT)] pub gateway_port: u16, } diff --git a/crates/node/src/cli/mod.rs b/crates/node/src/cli/mod.rs index f08fc99b5..8ebf30ecd 100644 --- a/crates/node/src/cli/mod.rs +++ b/crates/node/src/cli/mod.rs @@ -24,6 +24,95 @@ use mp_chain_config::ChainConfig; use std::path::PathBuf; use std::sync::Arc; +/// Combines multiple cli args into a single easy to use preset +/// +/// Some args configurations are getting pretty lengthy and easy to get wrong. +/// [ArgsPresetParams] tries to fix this: +/// +/// 1. Argument presets are evaluated _after_ user inputs are parsed. This means +/// it is not possible for users to override cli args set by a preset. This +/// is a limitation in the way in which [clap] currently works. +/// +/// 2. Argument presets are evaluated with [RunCmd::apply_arg_preset]. This has +/// to be called manually, or presets will not be applied! All this does is +/// set various cli flags to some predefined sensible value, tailoring +/// towards a general use case. +/// +/// # TODO +/// +/// ## User input precedence +/// +/// This is still very basic. Some nice improvements would be to allow arg +/// presets to be evaluated _before_ the rest of the user input, so for example +/// if an arg preset overrides `--some-arg`, then the user is still able to set +/// its value by specifying`--some-arg` themselves. This would allow for arg +/// presets to be used as the base for more complex user setups. +/// +/// ## Configuration file +/// +/// Another nice improvement would be the addition of _configuration files_ for +/// a reproducible, declarative setup. This is still a work in progress as of +/// [#285] and is a bit complicated due to this not being a core feature of +/// [clap]. The main issue in this case is that we want the user to be able to +/// pass a configuration file and ignore the rest of the cli arguments _if they +/// are set in the file_. This is non-trivial with the way in which [clap] +/// works (we still want good auto-generated docs and derive api support!). +/// +/// [#285]: https://github.com/madara-alliance/madara/issues/285 +#[derive(Clone, Debug, clap::Parser)] +#[clap( + group( + ArgGroup::new("args-preset") + .args(&["warp_update_sender", "warp_update_receiver", "gateway", "rpc"]) + .multiple(false) + ) +)] +pub struct ArgsPresetParams { + /// Sets up the node as a local feeder gateway, stopping any further sync. + /// This is used to rapidly synchronize local state onto another node with + /// --warp-update-receiver. You can use this to rapidly migrate to a new + /// version of madara without having to re-synchronize from genesis. + #[clap(env = "MADARA_WARP_UPDATE", long, value_name = "WARP UPDATE", group = "args-preset")] + pub warp_update_sender: bool, + + /// Sets up the node to rapidly synchronize state from a local feeder + /// gateway. The node and the feeder gateway will shutdown once this process + /// is complete. We assume the state of the feeder gateway is valid, and + /// therefore we do not re-compute the state root. You can use this to + /// rapidly migrate to a new version of madara without having to + /// re-synchronize from genesis. You can launch the local feeder gateway + /// using --warp-update-sender. + #[clap(env = "MADARA_WARP_UPDATE", long, value_name = "WARP UPDATE", group = "args-preset")] + pub warp_update_receiver: bool, + + /// Sets up the node as an externally facing feeder gateway exposed on + /// 0.0.0.0. Generally speaking, this means the node will be accessible + /// from the outside world. + #[clap(env = "MADARA_GATEWAY", long, value_name = "GATEWAY", group = "args-preset")] + pub gateway: bool, + + /// Sets up the node as an externally facing rpc provider exposed on + /// 0.0.0.0. Generally speaking, this means the node will be accessible + /// from the outside world. Admin rpc methods are also enabled, but are only + /// exposed on localhost. + #[clap(env = "MADARA_RPC", long, value_name = "RPC", group = "args-preset")] + pub rpc: bool, +} + +impl ArgsPresetParams { + pub fn greet(&self) { + if self.warp_update_sender { + tracing::info!("💫 Running Warp Update Sender preset") + } else if self.warp_update_receiver { + tracing::info!("💫 Running Warp Update Receiver preset") + } else if self.gateway { + tracing::info!("💫 Running Gateway preset") + } else if self.rpc { + tracing::info!("💫 Running Rpc preset") + } + } +} + /// Madara: High performance Starknet sequencer/full-node. #[derive(Clone, Debug, clap::Parser)] #[clap( @@ -44,13 +133,16 @@ use std::sync::Arc; .requires("full") ), )] - pub struct RunCmd { /// The human-readable name for this node. /// It is used as the network node name. #[arg(env = "MADARA_NAME", long, value_name = "NAME")] pub name: Option, + #[allow(missing_docs)] + #[clap(flatten)] + pub args_preset: ArgsPresetParams, + #[allow(missing_docs)] #[clap(flatten)] pub db_params: DbParams, @@ -113,6 +205,31 @@ pub struct RunCmd { } impl RunCmd { + // NOTE: (trantorian) I am not entirely satisfied with how this works. The + // main issue is that users cannot override presets as this resolves _after_ + // all arguments have been assigned. It might be worth forking clap for a + // better UX. + pub fn apply_arg_preset(mut self) -> Self { + if self.args_preset.warp_update_sender { + self.gateway_params.feeder_gateway_enable = true; + self.gateway_params.gateway_port = self.sync_params.warp_update_port_fgw; + self.rpc_params.rpc_admin = true; + self.rpc_params.rpc_admin_port = self.sync_params.warp_update_port_rpc; + } else if self.args_preset.warp_update_receiver { + self.rpc_params.rpc_disable = true; + } else if self.args_preset.gateway { + self.gateway_params.feeder_gateway_enable = true; + self.gateway_params.gateway_enable = true; + self.gateway_params.gateway_external = true; + } else if self.args_preset.rpc { + self.rpc_params.rpc_admin = true; + self.rpc_params.rpc_external = true; + self.rpc_params.rpc_cors = Some(Cors::All); + } + + self + } + pub async fn node_name_or_provide(&mut self) -> &str { if self.name.is_none() { let name = crate::util::get_random_pokemon_name().await.unwrap_or_else(|e| { diff --git a/crates/node/src/cli/rpc.rs b/crates/node/src/cli/rpc.rs index c90af95f4..02faa1200 100644 --- a/crates/node/src/cli/rpc.rs +++ b/crates/node/src/cli/rpc.rs @@ -95,7 +95,7 @@ pub struct RpcParams { /// The RPC port to listen at for admin RPC calls. #[arg(env = "MADARA_RPC_PORT_ADMIN", long, value_name = "ADMIN PORT", default_value_t = RPC_DEFAULT_PORT_ADMIN)] - pub rpc_port_admin: u16, + pub rpc_admin_port: u16, /// Maximum number of RPC server connections at a given time. #[arg(env = "MADARA_RPC_MAX_CONNECTIONS", long, value_name = "COUNT", default_value_t = RPC_DEFAULT_MAX_CONNECTIONS)] @@ -175,7 +175,7 @@ impl RpcParams { Ipv4Addr::LOCALHOST }; - SocketAddr::new(listen_addr.into(), self.rpc_port_admin) + SocketAddr::new(listen_addr.into(), self.rpc_admin_port) } pub fn batch_config(&self) -> BatchRequestConfig { diff --git a/crates/node/src/cli/sync.rs b/crates/node/src/cli/sync.rs index a7bccf312..987b91730 100644 --- a/crates/node/src/cli/sync.rs +++ b/crates/node/src/cli/sync.rs @@ -7,6 +7,9 @@ use mc_sync::fetch::fetchers::FetchConfig; use mp_utils::parsers::{parse_duration, parse_url}; use url::Url; +use super::FGW_DEFAULT_PORT; +use super::RPC_DEFAULT_PORT_ADMIN; + #[derive(Clone, Debug, clap::Args)] pub struct SyncParams { /// Disable the sync service. The sync service is responsible for listening for new blocks on starknet and ethereum. @@ -31,6 +34,14 @@ pub struct SyncParams { #[clap(env = "MADARA_GATEWAY_URL", long, value_parser = parse_url, value_name = "URL")] pub gateway_url: Option, + /// The port used for nodes to make rpc calls during a warp update. + #[arg(env = "MADARA_WARP_UPDATE_PORT_RPC", long, value_name = "WARP UPDATE PORT RPC", default_value_t = RPC_DEFAULT_PORT_ADMIN)] + pub warp_update_port_rpc: u16, + + /// The port used for nodes to send blocks during a warp update. + #[arg(env = "MADARA_WARP_UPDATE_PORT_FGW", long, value_name = "WARP UPDATE FGW", default_value_t = FGW_DEFAULT_PORT)] + pub warp_update_port_fgw: u16, + /// Polling interval, in seconds. This only affects the sync service once it has caught up with the blockchain tip. #[clap( env = "MADARA_SYNC_POLLING_INTERVAL", @@ -72,10 +83,67 @@ pub struct SyncParams { /// Periodically create a backup, for debugging purposes. Use it with `--backup-dir `. #[clap(env = "MADARA_BACKUP_EVERY_N_BLOCKS", long, value_name = "NUMBER OF BLOCKS")] pub backup_every_n_blocks: Option, + + /// Periodically flushes the database from ram to disk based on the number + /// of blocks synchronized since the last flush. You can set this to a + /// higher number depending on how fast your machine is at synchronizing + /// blocks and how much ram it has available. + /// + /// Be aware that blocks might still be flushed to db earlier based on the + /// value of --flush-every-n-seconds. + /// + /// Note that keeping this value high could lead to blocks being stored in + /// ram for longer periods of time before they are written to disk. This + /// might be an issue for chains which synchronize slowly. + #[clap( + env = "MADARA_FLUSH_EVERY_N_BLOCKS", + value_name = "FLUSH EVERY N BLOCKS", + long, + value_parser = clap::value_parser!(u64).range(..=10_000), + default_value_t = 1_000 + )] + pub flush_every_n_blocks: u64, + + /// Periodically flushes the database from ram to disk based on the elapsed + /// time since the last flush. You can set this to a higher number + /// depending on how fast your machine is at synchronizing blocks and how + /// much ram it has available. + /// + /// Be aware that blocks might still be flushed to db earlier based on the + /// value of --flush-every-n-blocks. + /// + /// Note that keeping this value high could lead to blocks being stored in + /// ram for longer periods of time before they are written to disk. This + /// might be an issue for chains which synchronize slowly. + #[clap( + env = "MADARA_FLUSH_EVERY_N_BLOCKS", + value_name = "FLUSH EVERY N BLOCKS", + long, + value_parser = clap::value_parser!(u64).range(..=3_600), + default_value_t = 5 + )] + pub flush_every_n_seconds: u64, + + /// Number of blocks to fetch in parallel. This only affects sync time, and + /// does not affect the node once it has reached the tip of the chain. + /// Increasing this can lead to lower sync times at the cost of higher cpu + /// and ram utilization. + #[clap( + env = "MADARA_SYNC_PARALLELISM", + long, value_name = "SYNC PARALLELISM", + default_value_t = 10, + value_parser = clap::value_parser!(u8).range(1..) + )] + pub sync_parallelism: u8, } impl SyncParams { - pub fn block_fetch_config(&self, chain_id: ChainId, chain_config: Arc) -> FetchConfig { + pub fn block_fetch_config( + &self, + chain_id: ChainId, + chain_config: Arc, + warp_update: bool, + ) -> FetchConfig { let (gateway, feeder_gateway) = match &self.gateway_url { Some(url) => ( url.join("/gateway/").expect("Error parsing url"), @@ -94,7 +162,13 @@ impl SyncParams { api_key: self.gateway_key.clone(), sync_polling_interval: polling, n_blocks_to_sync: self.n_blocks_to_sync, + flush_every_n_blocks: self.flush_every_n_blocks, + flush_every_n_seconds: self.flush_every_n_seconds, stop_on_sync: self.stop_on_sync, + sync_parallelism: self.sync_parallelism, + warp_update, + warp_update_port_rpc: self.warp_update_port_rpc, + warp_update_port_fgw: self.warp_update_port_fgw, } } } diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 2f7948287..aa22481ab 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -19,7 +19,7 @@ use mc_mempool::{GasPriceProvider, L1DataProvider, Mempool}; use mc_rpc::providers::{AddTransactionProvider, ForwardToProvider, MempoolAddTxProvider}; use mc_telemetry::{SysInfo, TelemetryService}; use mp_utils::service::{Service, ServiceGroup}; -use service::{BlockProductionService, GatewayService, L1SyncService, RpcService, SyncService}; +use service::{BlockProductionService, GatewayService, L1SyncService, L2SyncService, RpcService}; const GREET_IMPL_NAME: &str = "Madara"; const GREET_SUPPORT_URL: &str = "https://github.com/madara-alliance/madara/issues"; @@ -29,7 +29,7 @@ async fn main() -> anyhow::Result<()> { crate::util::setup_rayon_threadpool()?; crate::util::raise_fdlimit(); - let mut run_cmd: RunCmd = RunCmd::parse(); + let mut run_cmd: RunCmd = RunCmd::parse().apply_arg_preset(); // Setting up analytics @@ -59,6 +59,7 @@ async fn main() -> anyhow::Result<()> { let role = if run_cmd.is_sequencer() { "Sequencer" } else { "Full Node" }; tracing::info!("👤 Role: {}", role); tracing::info!("🌐 Network: {} (chain id `{}`)", chain_config.chain_name, chain_config.chain_id); + run_cmd.args_preset.greet(); let sys_info = SysInfo::probe(); sys_info.show(); @@ -79,14 +80,8 @@ async fn main() -> anyhow::Result<()> { .context("Initializing db service")?; let importer = Arc::new( - BlockImporter::new( - Arc::clone(db_service.backend()), - run_cmd.sync_params.unsafe_starting_block, - // Always flush when in authority mode as we really want to minimize the risk of losing a block when the app is unexpectedly killed :) - /* always_force_flush */ - run_cmd.is_sequencer(), - ) - .context("Initializing importer service")?, + BlockImporter::new(Arc::clone(db_service.backend()), run_cmd.sync_params.unsafe_starting_block) + .context("Initializing importer service")?, ); let l1_gas_setter = GasPriceProvider::new(); @@ -146,12 +141,13 @@ async fn main() -> anyhow::Result<()> { // Block sync service. (full node) false => { // Feeder gateway sync service. - let sync_service = SyncService::new( + let sync_service = L2SyncService::new( &run_cmd.sync_params, Arc::clone(&chain_config), &db_service, importer, telemetry_service.new_handle(), + run_cmd.args_preset.warp_update_receiver, ) .await .context("Initializing sync service")?; @@ -170,8 +166,8 @@ async fn main() -> anyhow::Result<()> { } }; - let rpc_service = RpcService::new(&run_cmd.rpc_params, &db_service, Arc::clone(&rpc_add_txs_method_provider)) - .context("Initializing rpc service")?; + let rpc_service = + RpcService::new(run_cmd.rpc_params, Arc::clone(db_service.backend()), Arc::clone(&rpc_add_txs_method_provider)); let gateway_service = GatewayService::new(run_cmd.gateway_params, &db_service, rpc_add_txs_method_provider) .await @@ -201,7 +197,6 @@ async fn main() -> anyhow::Result<()> { app.start_and_drive_to_end().await?; - tracing::info!("Shutting down analytics"); let _ = analytics.shutdown(); Ok(()) diff --git a/crates/node/src/service/block_production.rs b/crates/node/src/service/block_production.rs index 504d9d48d..39b397d68 100644 --- a/crates/node/src/service/block_production.rs +++ b/crates/node/src/service/block_production.rs @@ -8,7 +8,7 @@ use mc_mempool::{ block_production::BlockProductionTask, block_production_metrics::BlockProductionMetrics, L1DataProvider, Mempool, }; use mc_telemetry::TelemetryHandle; -use mp_utils::service::Service; +use mp_utils::service::{MadaraService, Service, ServiceContext}; use tokio::task::JoinSet; use crate::cli::block_production::BlockProductionParams; @@ -62,12 +62,8 @@ impl BlockProductionService { #[async_trait::async_trait] impl Service for BlockProductionService { // TODO(cchudant,2024-07-30): special threading requirements for the block production task - #[tracing::instrument(skip(self, join_set), fields(module = "BlockProductionService"))] - async fn start( - &mut self, - join_set: &mut JoinSet>, - cancellation_token: tokio_util::sync::CancellationToken, - ) -> anyhow::Result<()> { + #[tracing::instrument(skip(self, join_set, ctx), fields(module = "BlockProductionService"))] + async fn start(&mut self, join_set: &mut JoinSet>, ctx: ServiceContext) -> anyhow::Result<()> { if !self.enabled { return Ok(()); } @@ -117,11 +113,15 @@ impl Service for BlockProductionService { join_set.spawn(async move { BlockProductionTask::new(backend, block_import, mempool, metrics, l1_data_provider)? - .block_production_task(cancellation_token) + .block_production_task(ctx) .await?; Ok(()) }); Ok(()) } + + fn id(&self) -> MadaraService { + MadaraService::BlockProduction + } } diff --git a/crates/node/src/service/gateway.rs b/crates/node/src/service/gateway.rs index 7a0e2a162..6fbf6fc0e 100644 --- a/crates/node/src/service/gateway.rs +++ b/crates/node/src/service/gateway.rs @@ -1,7 +1,7 @@ use crate::cli::GatewayParams; use mc_db::{DatabaseService, MadaraBackend}; use mc_rpc::providers::AddTransactionProvider; -use mp_utils::service::Service; +use mp_utils::service::{MadaraService, Service, ServiceContext}; use std::sync::Arc; use tokio::task::JoinSet; @@ -24,11 +24,7 @@ impl GatewayService { #[async_trait::async_trait] impl Service for GatewayService { - async fn start( - &mut self, - join_set: &mut JoinSet>, - cancellation_token: tokio_util::sync::CancellationToken, - ) -> anyhow::Result<()> { + async fn start(&mut self, join_set: &mut JoinSet>, ctx: ServiceContext) -> anyhow::Result<()> { if self.config.feeder_gateway_enable || self.config.gateway_enable { let GatewayService { db_backend, add_transaction_provider, config } = self.clone(); @@ -40,11 +36,15 @@ impl Service for GatewayService { config.gateway_enable, config.gateway_external, config.gateway_port, - cancellation_token, + ctx, ) .await }); } Ok(()) } + + fn id(&self) -> MadaraService { + MadaraService::Gateway + } } diff --git a/crates/node/src/service/l1.rs b/crates/node/src/service/l1.rs index 9542a2d75..18c32f562 100644 --- a/crates/node/src/service/l1.rs +++ b/crates/node/src/service/l1.rs @@ -5,7 +5,7 @@ use mc_db::{DatabaseService, MadaraBackend}; use mc_eth::client::{EthereumClient, L1BlockMetrics}; use mc_mempool::{GasPriceProvider, Mempool}; use mp_block::H160; -use mp_utils::service::Service; +use mp_utils::service::{MadaraService, Service, ServiceContext}; use starknet_api::core::ChainId; use std::sync::Arc; use std::time::Duration; @@ -83,11 +83,7 @@ impl L1SyncService { #[async_trait::async_trait] impl Service for L1SyncService { - async fn start( - &mut self, - join_set: &mut JoinSet>, - cancellation_token: tokio_util::sync::CancellationToken, - ) -> anyhow::Result<()> { + async fn start(&mut self, join_set: &mut JoinSet>, ctx: ServiceContext) -> anyhow::Result<()> { let L1SyncService { l1_gas_provider, chain_id, gas_price_sync_disabled, gas_price_poll, mempool, .. } = self.clone(); @@ -104,7 +100,7 @@ impl Service for L1SyncService { gas_price_sync_disabled, gas_price_poll, mempool, - cancellation_token, + ctx, ) .await }); @@ -112,4 +108,8 @@ impl Service for L1SyncService { Ok(()) } + + fn id(&self) -> MadaraService { + MadaraService::L1Sync + } } diff --git a/crates/node/src/service/mod.rs b/crates/node/src/service/mod.rs index 0f5c8d1b9..6a95afd73 100644 --- a/crates/node/src/service/mod.rs +++ b/crates/node/src/service/mod.rs @@ -8,4 +8,4 @@ pub use block_production::BlockProductionService; pub use gateway::GatewayService; pub use l1::L1SyncService; pub use rpc::RpcService; -pub use sync::SyncService; +pub use sync::L2SyncService; diff --git a/crates/node/src/service/rpc/middleware.rs b/crates/node/src/service/rpc/middleware.rs index 1bc26174f..7fb119b95 100644 --- a/crates/node/src/service/rpc/middleware.rs +++ b/crates/node/src/service/rpc/middleware.rs @@ -5,6 +5,7 @@ use std::time::Instant; use futures::future::{BoxFuture, FutureExt}; use jsonrpsee::server::middleware::rpc::RpcServiceT; +use mc_rpc::utils::ResultExt; use mp_chain_config::RpcVersion; pub use super::metrics::Metrics; @@ -112,15 +113,21 @@ where return inner.call(req).await; } - let Ok(version) = RpcVersion::from_request_path(&path, version_default).map(|v| v.name()) else { - return jsonrpsee::MethodResponse::error( - req.id, - jsonrpsee::types::ErrorObject::owned( - jsonrpsee::types::error::PARSE_ERROR_CODE, - jsonrpsee::types::error::PARSE_ERROR_MSG, - None::<()>, - ), - ); + let version = match RpcVersion::from_request_path(&path, version_default) + .map(|v| v.name()) + .or_internal_server_error("Failed to get request path") + { + Ok(version) => version, + Err(_) => { + return jsonrpsee::MethodResponse::error( + req.id, + jsonrpsee::types::ErrorObject::owned( + jsonrpsee::types::error::PARSE_ERROR_CODE, + jsonrpsee::types::error::PARSE_ERROR_MSG, + None::<()>, + ), + ) + } }; let Some((namespace, method)) = req.method.split_once('_') else { @@ -134,6 +141,7 @@ where ); }; + let method = method.replacen(&format!("{version}_"), "", 1); let method_new = format!("{namespace}_{version}_{method}"); req.method = jsonrpsee::core::Cow::from(method_new); diff --git a/crates/node/src/service/rpc/mod.rs b/crates/node/src/service/rpc/mod.rs index 438ceebfd..f5a2bd50b 100644 --- a/crates/node/src/service/rpc/mod.rs +++ b/crates/node/src/service/rpc/mod.rs @@ -3,9 +3,9 @@ use std::sync::Arc; use jsonrpsee::server::ServerHandle; use tokio::task::JoinSet; -use mc_db::DatabaseService; +use mc_db::MadaraBackend; use mc_rpc::{providers::AddTransactionProvider, rpc_api_admin, rpc_api_user, Starknet}; -use mp_utils::service::Service; +use mp_utils::service::{MadaraService, Service, ServiceContext}; use metrics::RpcMetrics; use server::{start_server, ServerConfig}; @@ -19,18 +19,29 @@ mod middleware; mod server; pub struct RpcService { - server_config_user: Option, - server_config_admin: Option, + config: RpcParams, + backend: Arc, + add_txs_method_provider: Arc, server_handle_user: Option, server_handle_admin: Option, } + impl RpcService { pub fn new( - config: &RpcParams, - db: &DatabaseService, + config: RpcParams, + backend: Arc, add_txs_method_provider: Arc, - ) -> anyhow::Result { - let starknet = Starknet::new(Arc::clone(db.backend()), add_txs_method_provider); + ) -> Self { + Self { config, backend, add_txs_method_provider, server_handle_user: None, server_handle_admin: None } + } +} + +#[async_trait::async_trait] +impl Service for RpcService { + async fn start(&mut self, join_set: &mut JoinSet>, ctx: ServiceContext) -> anyhow::Result<()> { + let RpcService { config, backend, add_txs_method_provider, .. } = self; + + let starknet = Starknet::new(backend.clone(), add_txs_method_provider.clone(), ctx.clone()); let metrics = RpcMetrics::register()?; let server_config_user = if !config.rpc_disable { @@ -77,29 +88,22 @@ impl RpcService { None }; - Ok(Self { server_config_user, server_config_admin, server_handle_user: None, server_handle_admin: None }) - } -} - -#[async_trait::async_trait] -impl Service for RpcService { - async fn start( - &mut self, - join_set: &mut JoinSet>, - cancellation_token: tokio_util::sync::CancellationToken, - ) -> anyhow::Result<()> { - if let Some(server_config) = &self.server_config_user { + if let Some(server_config) = &server_config_user { // rpc enabled - self.server_handle_user = - Some(start_server(server_config.clone(), join_set, cancellation_token.clone()).await?); + self.server_handle_user = Some(start_server(server_config.clone(), join_set, ctx.clone()).await?); } - if let Some(server_config) = &self.server_config_admin { + if let Some(server_config) = &server_config_admin { // rpc enabled (admin) - self.server_handle_admin = - Some(start_server(server_config.clone(), join_set, cancellation_token.clone()).await?); + let ctx = ctx.child().with_id(MadaraService::RpcAdmin); + ctx.service_add(MadaraService::RpcAdmin); + self.server_handle_admin = Some(start_server(server_config.clone(), join_set, ctx).await?); } Ok(()) } + + fn id(&self) -> MadaraService { + MadaraService::Rpc + } } diff --git a/crates/node/src/service/rpc/server.rs b/crates/node/src/service/rpc/server.rs index 4f560405c..07e3a66f0 100644 --- a/crates/node/src/service/rpc/server.rs +++ b/crates/node/src/service/rpc/server.rs @@ -6,6 +6,7 @@ use std::net::SocketAddr; use std::time::Duration; use anyhow::Context; +use mp_utils::service::ServiceContext; use tokio::task::JoinSet; use tower::Service; @@ -48,7 +49,7 @@ struct PerConnection { pub async fn start_server( config: ServerConfig, join_set: &mut JoinSet>, - cancellation_token: tokio_util::sync::CancellationToken, + ctx: ServiceContext, ) -> anyhow::Result { let ServerConfig { name, @@ -97,9 +98,11 @@ pub async fn start_server( metrics, service_builder: builder.to_service_builder(), }; + let ctx1 = ctx.clone(); let make_service = hyper::service::make_service_fn(move |_| { let cfg = cfg.clone(); + let ctx1 = ctx1.clone(); async move { let cfg = cfg.clone(); @@ -119,9 +122,14 @@ pub async fn start_server( .layer(metrics_layer.clone()); let mut svc = service_builder.set_rpc_middleware(rpc_middleware).build(methods, stop_handle); + let ctx1 = ctx1.clone(); async move { - if req.uri().path() == "/health" { + if !ctx1.is_active() { + Ok(hyper::Response::builder() + .status(hyper::StatusCode::GONE) + .body(hyper::Body::from("GONE"))?) + } else if req.uri().path() == "/health" { Ok(hyper::Response::builder().status(hyper::StatusCode::OK).body(hyper::Body::from("OK"))?) } else { if is_websocket { @@ -157,7 +165,7 @@ pub async fn start_server( ); server .with_graceful_shutdown(async { - wait_or_graceful_shutdown(stop_handle.shutdown(), &cancellation_token).await; + wait_or_graceful_shutdown(stop_handle.shutdown(), &ctx).await; }) .await .context("Running rpc server") diff --git a/crates/node/src/service/sync.rs b/crates/node/src/service/sync.rs index bf6fdb3cb..42a747c4a 100644 --- a/crates/node/src/service/sync.rs +++ b/crates/node/src/service/sync.rs @@ -3,15 +3,16 @@ use anyhow::Context; use mc_block_import::BlockImporter; use mc_db::{DatabaseService, MadaraBackend}; use mc_sync::fetch::fetchers::FetchConfig; +use mc_sync::SyncConfig; use mc_telemetry::TelemetryHandle; use mp_chain_config::ChainConfig; -use mp_utils::service::Service; +use mp_utils::service::{MadaraService, Service, ServiceContext}; use std::sync::Arc; use std::time::Duration; use tokio::task::JoinSet; #[derive(Clone)] -pub struct SyncService { +pub struct L2SyncService { db_backend: Arc, block_importer: Arc, fetch_config: FetchConfig, @@ -22,15 +23,16 @@ pub struct SyncService { pending_block_poll_interval: Duration, } -impl SyncService { +impl L2SyncService { pub async fn new( config: &SyncParams, chain_config: Arc, db: &DatabaseService, block_importer: Arc, telemetry: TelemetryHandle, + warp_update: bool, ) -> anyhow::Result { - let fetch_config = config.block_fetch_config(chain_config.chain_id.clone(), chain_config.clone()); + let fetch_config = config.block_fetch_config(chain_config.chain_id.clone(), chain_config.clone(), warp_update); tracing::info!("🛰️ Using feeder gateway URL: {}", fetch_config.feeder_gateway.as_str()); @@ -48,16 +50,12 @@ impl SyncService { } #[async_trait::async_trait] -impl Service for SyncService { - async fn start( - &mut self, - join_set: &mut JoinSet>, - cancellation_token: tokio_util::sync::CancellationToken, - ) -> anyhow::Result<()> { +impl Service for L2SyncService { + async fn start(&mut self, join_set: &mut JoinSet>, ctx: ServiceContext) -> anyhow::Result<()> { if self.disabled { return Ok(()); } - let SyncService { + let L2SyncService { fetch_config, backup_every_n_blocks, starting_block, @@ -70,19 +68,25 @@ impl Service for SyncService { let db_backend = Arc::clone(&self.db_backend); join_set.spawn(async move { - mc_sync::sync( + mc_sync::l2_sync_worker( &db_backend, - block_importer, + ctx, fetch_config, - starting_block, - backup_every_n_blocks, - telemetry, - pending_block_poll_interval, - cancellation_token, + SyncConfig { + block_importer, + starting_block, + backup_every_n_blocks, + telemetry, + pending_block_poll_interval, + }, ) .await }); Ok(()) } + + fn id(&self) -> MadaraService { + MadaraService::L2Sync + } } diff --git a/crates/primitives/chain_config/src/rpc_version.rs b/crates/primitives/chain_config/src/rpc_version.rs index bb91611ba..0b7ae9564 100644 --- a/crates/primitives/chain_config/src/rpc_version.rs +++ b/crates/primitives/chain_config/src/rpc_version.rs @@ -1,7 +1,8 @@ use std::hash::Hash; use std::str::FromStr; -const SUPPORTED_RPC_VERSIONS: [RpcVersion; 2] = [RpcVersion::RPC_VERSION_0_7_1, RpcVersion::RPC_VERSION_0_8_0]; +const SUPPORTED_RPC_VERSIONS: [RpcVersion; 3] = + [RpcVersion::RPC_VERSION_0_7_1, RpcVersion::RPC_VERSION_0_8_0, RpcVersion::RPC_VERSION_ADMIN_0_1_0]; #[derive(Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize, Hash)] pub struct RpcVersion([u8; 3]); diff --git a/crates/primitives/utils/Cargo.toml b/crates/primitives/utils/Cargo.toml index 081f547d3..6688bb84c 100644 --- a/crates/primitives/utils/Cargo.toml +++ b/crates/primitives/utils/Cargo.toml @@ -53,3 +53,6 @@ tracing-subscriber = { workspace = true, features = ["env-filter"] } [dev-dependencies] rstest.workspace = true + +[features] +testing = [] diff --git a/crates/primitives/utils/src/lib.rs b/crates/primitives/utils/src/lib.rs index 43d69f997..5aac81131 100644 --- a/crates/primitives/utils/src/lib.rs +++ b/crates/primitives/utils/src/lib.rs @@ -8,8 +8,8 @@ pub mod service; use std::time::{Duration, Instant}; use futures::Future; +use service::ServiceContext; use tokio::sync::oneshot; -use tokio_util::sync::CancellationToken; /// Prefer this compared to [`tokio::spawn_blocking`], as spawn_blocking creates new OS threads and /// we don't really need that @@ -27,7 +27,7 @@ where rx.await.expect("tokio channel closed") } -async fn graceful_shutdown_inner(cancellation_token: &CancellationToken) { +async fn graceful_shutdown_inner(ctx: &ServiceContext) { let sigterm = async { match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) { Ok(mut signal) => signal.recv().await, @@ -39,22 +39,19 @@ async fn graceful_shutdown_inner(cancellation_token: &CancellationToken) { tokio::select! { _ = tokio::signal::ctrl_c() => {}, _ = sigterm => {}, - _ = cancellation_token.cancelled() => {}, + _ = ctx.cancelled() => {}, }; - cancellation_token.cancel() + ctx.cancel_local() } -pub async fn graceful_shutdown(cancellation_token: &CancellationToken) { - graceful_shutdown_inner(cancellation_token).await +pub async fn graceful_shutdown(ctx: &ServiceContext) { + graceful_shutdown_inner(ctx).await } /// Should be used with streams/channels `next`/`recv` function. -pub async fn wait_or_graceful_shutdown( - future: impl Future, - cancellation_token: &CancellationToken, -) -> Option { +pub async fn wait_or_graceful_shutdown(future: impl Future, ctx: &ServiceContext) -> Option { tokio::select! { - _ = graceful_shutdown_inner(cancellation_token) => { None }, + _ = graceful_shutdown_inner(ctx) => { None }, res = future => { Some(res) }, } } @@ -62,9 +59,9 @@ pub async fn wait_or_graceful_shutdown( /// Should be used with streams/channels `next`/`recv` function. pub async fn channel_wait_or_graceful_shutdown( future: impl Future>, - cancellation_token: &CancellationToken, + ctx: &ServiceContext, ) -> Option { - wait_or_graceful_shutdown(future, cancellation_token).await? + wait_or_graceful_shutdown(future, ctx).await? } #[derive(Debug, Default)] diff --git a/crates/primitives/utils/src/service.rs b/crates/primitives/utils/src/service.rs index 43e9f080e..e8d5b1da0 100644 --- a/crates/primitives/utils/src/service.rs +++ b/crates/primitives/utils/src/service.rs @@ -1,9 +1,300 @@ //! Service trait and combinators. use anyhow::Context; -use std::panic; +use std::{fmt::Display, panic, sync::Arc}; use tokio::task::JoinSet; +#[repr(u8)] +#[derive(Clone, Copy, PartialEq, Eq, Default, Debug)] +pub enum MadaraService { + #[default] + None = 0, + Database = 1, + L1Sync = 2, + L2Sync = 4, + BlockProduction = 8, + Rpc = 16, + RpcAdmin = 32, + Gateway = 64, + Telemetry = 128, +} + +impl Display for MadaraService { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + match self { + MadaraService::None => "none", + MadaraService::Database => "database", + MadaraService::L1Sync => "l1 sync", + MadaraService::L2Sync => "l2 sync", + MadaraService::BlockProduction => "block production", + MadaraService::Rpc => "rpc", + MadaraService::RpcAdmin => "rpc admin", + MadaraService::Gateway => "gateway", + MadaraService::Telemetry => "telemetry", + } + ) + } +} + +#[repr(transparent)] +#[derive(Default)] +pub struct MadaraServiceMask(std::sync::atomic::AtomicU8); + +impl MadaraServiceMask { + #[cfg(feature = "testing")] + pub fn new_for_testing() -> Self { + Self(std::sync::atomic::AtomicU8::new(u8::MAX)) + } + + #[inline(always)] + pub fn is_active(&self, cap: u8) -> bool { + self.0.load(std::sync::atomic::Ordering::SeqCst) & cap > 0 + } + + #[inline(always)] + pub fn activate(&self, cap: MadaraService) -> bool { + let prev = self.0.fetch_or(cap as u8, std::sync::atomic::Ordering::SeqCst); + prev & cap as u8 > 0 + } + + #[inline(always)] + pub fn deactivate(&self, cap: MadaraService) -> bool { + let cap = cap as u8; + let prev = self.0.fetch_and(!cap, std::sync::atomic::Ordering::SeqCst); + prev & cap > 0 + } +} + +#[repr(u8)] +#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)] +pub enum MadaraState { + #[default] + Starting, + Warp, + Running, + Shutdown, +} + +impl From for MadaraState { + fn from(value: u8) -> Self { + match value { + 0 => Self::Starting, + 1 => Self::Warp, + 2 => Self::Running, + _ => Self::Shutdown, + } + } +} + +/// Atomic state and cancellation context associated to a Service. +/// +/// # Scope +/// +/// You can create a hierarchy of services by calling `ServiceContext::branch_local`. +/// Services are said to be in the same _local scope_ if they inherit the same +/// `token_local` cancellation token. You can think of services being local +/// if they can cancel each other without affecting the rest of the app (this +/// is not exact but it serves as a good mental model). +/// +/// All services which descend from the same context are also said to be in the +/// same _global scope_, that is to say any service in this scope can cancel +/// _all_ other services in the same scope (including child services) at any +/// time. This is true of services in the same [ServiceGroup] for example. +/// +/// # Services +/// +/// - A services is said to be a _child service_ if it uses a context created +/// with `ServiceContext::branch_local` +/// +/// - A service is said to be a _parent service_ if it uses a context which was +/// used to create child services. +/// +/// > A parent services can always cancel all of its child services, but a child +/// > service cannot cancel its parent service. +#[cfg_attr(not(feature = "testing"), derive(Default))] +pub struct ServiceContext { + token_global: tokio_util::sync::CancellationToken, + token_local: Option, + services: Arc, + services_notify: Arc, + state: Arc, + id: MadaraService, +} + +impl Clone for ServiceContext { + fn clone(&self) -> Self { + Self { + token_global: self.token_global.clone(), + token_local: self.token_local.clone(), + services: Arc::clone(&self.services), + services_notify: Arc::clone(&self.services_notify), + state: Arc::clone(&self.state), + id: self.id, + } + } +} + +impl ServiceContext { + pub fn new() -> Self { + Self { + token_global: tokio_util::sync::CancellationToken::new(), + token_local: None, + services: Arc::new(MadaraServiceMask::default()), + services_notify: Arc::new(tokio::sync::Notify::new()), + state: Arc::new(std::sync::atomic::AtomicU8::new(MadaraState::default() as u8)), + id: MadaraService::default(), + } + } + + #[cfg(feature = "testing")] + pub fn new_for_testing() -> Self { + Self { + token_global: tokio_util::sync::CancellationToken::new(), + token_local: None, + services: Arc::new(MadaraServiceMask::new_for_testing()), + services_notify: Arc::new(tokio::sync::Notify::new()), + state: Arc::new(std::sync::atomic::AtomicU8::new(MadaraState::default() as u8)), + id: MadaraService::default(), + } + } + + /// Stops all services under the same global context scope. + pub fn cancel_global(&self) { + self.token_global.cancel(); + } + + /// Stops all services under the same local context scope. + /// + /// A local context is created by calling `branch_local` and allows you to + /// reduce the scope of cancellation only to those services which will use + /// the new context. + pub fn cancel_local(&self) { + self.token_local.as_ref().unwrap_or(&self.token_global).cancel(); + } + + /// A future which completes when the service associated to this + /// [ServiceContext] is canceled. + /// + /// This happens after calling [ServiceContext::cancel_local] or + /// [ServiceContext::cancel_global]. + /// + /// Use this to race against other futures in a [tokio::select] for example. + #[inline(always)] + pub async fn cancelled(&self) { + if self.state() != MadaraState::Shutdown { + match &self.token_local { + Some(token_local) => tokio::select! { + _ = self.token_global.cancelled() => {}, + _ = token_local.cancelled() => {} + }, + None => tokio::select! { + _ = self.token_global.cancelled() => {}, + }, + } + } + } + + /// Check if the service associated to this [ServiceContext] was canceled. + /// + /// This happens after calling [ServiceContext::cancel_local] or + /// [ServiceContext::cancel_global]. + #[inline(always)] + pub fn is_cancelled(&self) -> bool { + self.token_global.is_cancelled() + || self.token_local.as_ref().map(|t| t.is_cancelled()).unwrap_or(false) + || !self.services.is_active(self.id as u8) + || self.state() == MadaraState::Shutdown + } + + /// The id of service associated to this [ServiceContext] + pub fn id(&self) -> MadaraService { + self.id + } + + /// Copies the context, maintaining its scope but with a new id. + pub fn with_id(mut self, id: MadaraService) -> Self { + self.id = id; + self + } + + /// Copies the context into a new local scope. + /// + /// Any service which uses this new context will be able to cancel the + /// services in the same local scope as itself, and any further child + /// services, without affecting the rest of the global scope. + pub fn child(&self) -> Self { + let token_local = self.token_local.as_ref().unwrap_or(&self.token_global).child_token(); + + Self { + token_global: self.token_global.clone(), + token_local: Some(token_local), + services: Arc::clone(&self.services), + services_notify: Arc::clone(&self.services_notify), + state: Arc::clone(&self.state), + id: self.id, + } + } + + /// Atomically checks if a set of services are running. + /// + /// You can combine multiple [MadaraService] into a single bitmask to + /// check the state of multiple services at once. + #[inline(always)] + pub fn service_check(&self, cap: u8) -> bool { + self.services.is_active(cap) + } + + /// Atomically marks a service as active + /// + /// This will immediately be visible to all services in the same global + /// scope. This is true across threads. + #[inline(always)] + pub fn service_add(&self, cap: MadaraService) -> bool { + let res = self.services.activate(cap); + self.services_notify.notify_waiters(); + + res + } + + /// Atomically marks a service as inactive + /// + /// This will immediately be visible to all services in the same global + /// scope. This is true across threads. + #[inline(always)] + pub fn service_remove(&self, cap: MadaraService) -> bool { + self.services.deactivate(cap) + } + + /// Atomically checks if the service associated to this [ServiceContext] is + /// active. + /// + /// This can be updated across threads by calling [ServiceContext::service_remove] + /// or [ServiceContext::service_add] + #[inline(always)] + pub fn is_active(&self) -> bool { + self.services.is_active(self.id as u8) + } + + /// Atomically checks the state of the node + #[inline(always)] + pub fn state(&self) -> MadaraState { + self.state.load(std::sync::atomic::Ordering::SeqCst).into() + } + + /// Atomically sets the state of the node + /// + /// This will immediately be visible to all services in the same global + /// scope. This is true across threads. + pub fn state_advance(&mut self) -> MadaraState { + let state = self.state.load(std::sync::atomic::Ordering::SeqCst).saturating_add(1); + self.state.store(state, std::sync::atomic::Ordering::SeqCst); + state.into() + } +} + /// The app is divided into services, with each service having a different responsability within the app. /// Depending on the startup configuration, some services are enabled and some are disabled. /// @@ -11,11 +302,7 @@ use tokio::task::JoinSet; #[async_trait::async_trait] pub trait Service: 'static + Send + Sync { /// Default impl does not start any task. - async fn start( - &mut self, - _join_set: &mut JoinSet>, - _cancellation_token: tokio_util::sync::CancellationToken, - ) -> anyhow::Result<()> { + async fn start(&mut self, _join_set: &mut JoinSet>, _ctx: ServiceContext) -> anyhow::Result<()> { Ok(()) } @@ -24,10 +311,11 @@ pub trait Service: 'static + Send + Sync { Self: Sized, { let mut join_set = JoinSet::new(); - let cancellation_token = tokio_util::sync::CancellationToken::new(); - self.start(&mut join_set, cancellation_token).await.context("Starting service")?; + self.start(&mut join_set, ServiceContext::new()).await.context("Starting service")?; drive_joinset(join_set).await } + + fn id(&self) -> MadaraService; } pub struct ServiceGroup { @@ -62,20 +350,21 @@ impl ServiceGroup { #[async_trait::async_trait] impl Service for ServiceGroup { - async fn start( - &mut self, - join_set: &mut JoinSet>, - cancellation_token: tokio_util::sync::CancellationToken, - ) -> anyhow::Result<()> { + async fn start(&mut self, join_set: &mut JoinSet>, ctx: ServiceContext) -> anyhow::Result<()> { // drive the join set as a nested task let mut own_join_set = self.join_set.take().expect("Service has already been started."); for svc in self.services.iter_mut() { - svc.start(&mut own_join_set, cancellation_token.clone()).await.context("Starting service")?; + ctx.service_add(svc.id()); + svc.start(&mut own_join_set, ctx.child().with_id(svc.id())).await.context("Starting service")?; } join_set.spawn(drive_joinset(own_join_set)); Ok(()) } + + fn id(&self) -> MadaraService { + MadaraService::None + } } async fn drive_joinset(mut join_set: JoinSet>) -> anyhow::Result<()> { diff --git a/crates/tests/src/lib.rs b/crates/tests/src/lib.rs index f024f59b2..3b778068f 100644 --- a/crates/tests/src/lib.rs +++ b/crates/tests/src/lib.rs @@ -19,7 +19,11 @@ use std::{ }; use tempfile::TempDir; -async fn wait_for_cond>>(mut cond: impl FnMut() -> F, duration: Duration) { +async fn wait_for_cond>>( + mut cond: impl FnMut() -> F, + duration: Duration, + max_attempts: u32, +) { let mut attempt = 0; loop { let Err(err) = cond().await else { @@ -27,7 +31,7 @@ async fn wait_for_cond>>(mut cond: }; attempt += 1; - if attempt >= 10 { + if attempt >= max_attempts { panic!("No answer from the node after {attempt} attempts: {:#}", err) } @@ -65,19 +69,23 @@ impl MadaraCmd { res.error_for_status()?; anyhow::Ok(()) }, - Duration::from_millis(2000), + Duration::from_secs(2), + 10, ) .await; self.ready = true; self } + // TODO: replace this with `subscribeNewHeads` pub async fn wait_for_sync_to(&mut self, block_n: u64) -> &mut Self { let rpc = self.json_rpc(); wait_for_cond( || async { match rpc.block_hash_and_number().await { Ok(got) => { + tracing::info!("Received block number {} out of {block_n}", got.block_number); + if got.block_number < block_n { bail!("got block_n {}, expected {block_n}", got.block_number); } @@ -86,7 +94,8 @@ impl MadaraCmd { Err(err) => bail!(err), } }, - Duration::from_millis(20000), + Duration::from_secs(2), + 100, ) .await; self diff --git a/crates/tests/src/rpc/read.rs b/crates/tests/src/rpc/read.rs index 2dfdfb802..3ca268c1f 100644 --- a/crates/tests/src/rpc/read.rs +++ b/crates/tests/src/rpc/read.rs @@ -570,6 +570,7 @@ mod test_rpc_read_calls { /// ``` #[rstest] #[tokio::test] + // TODO: replace this with jsonrpsee client async fn test_get_txn_receipt_works() { let madara = get_shared_state().await; let json_client = JsonRpcClient::new(HttpTransport::new(madara.rpc_url.clone()));