From 3ff4e119f6a8eafdc96e05fdee8342ef4d3bd520 Mon Sep 17 00:00:00 2001 From: Marcelo Diop-Gonzalez Date: Mon, 6 Nov 2023 23:45:07 -0500 Subject: [PATCH] Revert "cleanup: removed mock-node (#9654)" (#10069) This reverts commit 7d7a18f2082260d5d42aa0e7a70e5ce577388595, and then removes the mock_node feature, which has been causing problems --- Cargo.lock | 36 +++ Cargo.toml | 49 +--- chain/chain/src/chain.rs | 6 +- tools/mock-node/Cargo.toml | 48 ++++ tools/mock-node/README.md | 90 +++++++ tools/mock-node/src/lib.rs | 490 +++++++++++++++++++++++++++++++++++ tools/mock-node/src/main.rs | 202 +++++++++++++++ tools/mock-node/src/setup.rs | 460 ++++++++++++++++++++++++++++++++ 8 files changed, 1339 insertions(+), 42 deletions(-) create mode 100644 tools/mock-node/Cargo.toml create mode 100644 tools/mock-node/README.md create mode 100644 tools/mock-node/src/lib.rs create mode 100644 tools/mock-node/src/main.rs create mode 100644 tools/mock-node/src/setup.rs diff --git a/Cargo.lock b/Cargo.lock index d2226f1927f..719ff12aa63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3318,6 +3318,42 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "mock-node" +version = "0.0.0" +dependencies = [ + "actix", + "actix-rt", + "anyhow", + "clap 4.2.4", + "futures", + "near-actix-test-utils", + "near-async", + "near-chain", + "near-chain-configs", + "near-chunks", + "near-client", + "near-crypto", + "near-epoch-manager", + "near-jsonrpc", + "near-jsonrpc-client", + "near-network", + "near-o11y", + "near-performance-metrics", + "near-primitives", + "near-store", + "near-telemetry", + "nearcore", + "pin-project", + "rand 0.8.5", + "rayon", + "serde", + "serde_json", + "tempfile", + "tokio", + "tracing", +] + [[package]] name = "more-asserts" version = "0.2.2" diff --git a/Cargo.toml b/Cargo.toml index 16e0789e074..51af476acfc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace.package] -version = "0.0.0" # managed by cargo-workspaces, see below +version = "0.0.0" # managed by cargo-workspaces, see below authors = ["Near Inc "] edition = "2021" rust-version = "1.73.0" @@ -69,6 +69,7 @@ members = [ "tools/fork-network", "tools/indexer/example", "tools/mirror", + "tools/mock-node", "tools/ping", "tools/restaked", "tools/rpctypegen/core", @@ -122,10 +123,7 @@ clap = { version = "4.2.0", features = ["derive", "env", "string"] } cloud-storage = "0.11.1" conqueue = "0.4.0" cpu-time = "1.0" -criterion = { version = "0.3.5", default_features = false, features = [ - "html_reports", - "cargo_bench_support", -] } +criterion = { version = "0.3.5", default_features = false, features = ["html_reports", "cargo_bench_support"] } crossbeam = "0.8" crossbeam-channel = "0.5.8" crossbeam-queue = "0.3.8" @@ -175,7 +173,7 @@ more-asserts = "0.2" near-account-id = { version = "1.0.0-alpha.2", features = ["internal_unstable", "serde", "borsh"] } near-actix-test-utils = { path = "test-utils/actix-test-utils" } near-amend-genesis = { path = "tools/amend-genesis" } -near-database-tool = { path = "tools/database" } +near-database-tool= { path = "tools/database" } near-async = { path = "core/async" } near-cache = { path = "utils/near-cache" } near-chain = { path = "chain/chain" } @@ -199,9 +197,7 @@ near-indexer-primitives = { path = "chain/indexer-primitives" } near-jsonrpc = { path = "chain/jsonrpc" } near-jsonrpc-adversarial-primitives = { path = "chain/jsonrpc-adversarial-primitives" } near-jsonrpc-client = { path = "chain/jsonrpc/client" } -near-jsonrpc-primitives = { path = "chain/jsonrpc-primitives", features = [ - "full", -] } +near-jsonrpc-primitives = { path = "chain/jsonrpc-primitives", features = ["full"] } near-jsonrpc-tests = { path = "chain/jsonrpc/jsonrpc-tests" } near-mainnet-res = { path = "utils/mainnet-res" } near-mirror = { path = "tools/mirror" } @@ -276,13 +272,7 @@ reqwest = { version = "0.11.14", features = ["blocking"] } ripemd = "0.1.1" rkyv = "0.7.31" rlimit = "0.7" -rocksdb = { version = "0.21.0", default-features = false, features = [ - "snappy", - "lz4", - "zstd", - "zlib", - "jemalloc", -] } +rocksdb = { version = "0.21.0", default-features = false, features = ["snappy", "lz4", "zstd", "zlib", "jemalloc"] } runtime-tester = { path = "test-utils/runtime-tester" } rusqlite = { version = "0.29.0", features = ["bundled", "chrono", "functions"] } rustc-demangle = "0.1" @@ -332,12 +322,7 @@ tracing = { version = "0.1.36", features = ["std"] } tracing-appender = "0.2.2" tracing-opentelemetry = "0.17.0" tracing-span-tree = "0.1" -tracing-subscriber = { version = "0.3.15", features = [ - "env-filter", - "fmt", - "registry", - "std", -] } +tracing-subscriber = { version = "0.3.15", features = ["env-filter", "fmt", "registry", "std"] } trybuild = "1.0.11" turn = "0.6" validator = "0.12" @@ -345,31 +330,19 @@ wasm-encoder = "0.27.0" wasmer-compiler = { package = "wasmer-compiler-near", version = "=2.4.1" } wasmer-compiler-singlepass = { package = "wasmer-compiler-singlepass-near", version = "=2.4.1" } wasmer-engine = { package = "wasmer-engine-near", version = "=2.4.1" } -wasmer-engine-universal = { package = "wasmer-engine-universal-near", version = "=2.4.1", features = [ - "compiler", -] } -wasmer-runtime = { version = "0.18.0", package = "wasmer-runtime-near", features = [ - "default-backend-singlepass", -], default-features = false } +wasmer-engine-universal = { package = "wasmer-engine-universal-near", version = "=2.4.1", features = ["compiler"] } +wasmer-runtime = { version = "0.18.0", package = "wasmer-runtime-near", features = ["default-backend-singlepass"], default-features = false } wasmer-runtime-core = { version = "0.18.2", package = "wasmer-runtime-core-near" } wasmer-types = { package = "wasmer-types-near", version = "=2.4.1" } wasmer-vm = { package = "wasmer-vm-near", version = "=2.4.1" } wasmparser = "0.78" # TODO: unify at least the versions of wasmparser we have in our codebase wasmprinter = "0.2" wasm-smith = "0.10" -wasmtime = { version = "9.0.3", default-features = false, features = [ - "cranelift", -] } +wasmtime = { version = "9.0.3", default-features = false, features = ["cranelift"] } wast = "40.0" wat = "1.0.40" webrtc-util = "0.7" -winapi = { version = "0.3", features = [ - "winbase", - "memoryapi", - "errhandlingapi", - "winnt", - "impl-default", -] } +winapi = { version = "0.3", features = ["winbase", "memoryapi", "errhandlingapi", "winnt", "impl-default"] } xshell = "0.2.1" xz2 = "0.1.6" diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index de6262cf1d8..13c4938fede 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -4052,6 +4052,7 @@ impl Chain { let protocol_version = self.epoch_manager.get_epoch_protocol_version(block.header().epoch_id())?; + if checked_feature!("stable", AccessKeyNonceRange, protocol_version) { let transaction_validity_period = self.transaction_validity_period; for transaction in transactions { @@ -4061,10 +4062,7 @@ impl Chain { &transaction.transaction.block_hash, transaction_validity_period, ) - .map_err(|_| { - tracing::warn!("Invalid Transactions for mock node"); - Error::from(Error::InvalidTransactions) - })?; + .map_err(|_| Error::from(Error::InvalidTransactions))?; } }; diff --git a/tools/mock-node/Cargo.toml b/tools/mock-node/Cargo.toml new file mode 100644 index 00000000000..ac7953d32f1 --- /dev/null +++ b/tools/mock-node/Cargo.toml @@ -0,0 +1,48 @@ +[package] +name = "mock-node" +version.workspace = true +authors.workspace = true +edition.workspace = true +rust-version.workspace = true +repository.workspace = true +license.workspace = true +publish = false + +[dependencies] +actix-rt.workspace = true +actix.workspace = true +anyhow.workspace = true +clap.workspace = true +futures.workspace = true +pin-project.workspace = true +rand.workspace = true +rayon.workspace = true +serde.workspace = true +serde_json.workspace = true +tempfile.workspace = true +tokio.workspace = true +tracing.workspace = true + +near-actix-test-utils.workspace = true +near-async.workspace = true +near-chain.workspace = true +near-chain-configs.workspace = true +near-client.workspace = true +near-crypto.workspace = true +near-chunks.workspace = true +near-epoch-manager.workspace = true +near-jsonrpc.workspace = true +near-jsonrpc-client.workspace = true +near-network.workspace = true +near-store.workspace = true +near-o11y.workspace = true +near-telemetry.workspace = true +near-performance-metrics.workspace = true +near-primitives.workspace = true +nearcore.workspace = true + +[[bin]] +name = "mock-node" + +[features] +test_features = ["nearcore/test_features"] diff --git a/tools/mock-node/README.md b/tools/mock-node/README.md new file mode 100644 index 00000000000..e8602119940 --- /dev/null +++ b/tools/mock-node/README.md @@ -0,0 +1,90 @@ +# mock-node +This crate hosts libraries to start a test env for a single node by replacing the network module with a mock network environment. +The mock network environment simulates the interaction that the client will usually have with other nodes by +responding to the client's network messages and broadcasting new blocks. The mock network reads a pre-generated chain +history from storage. + +## Quick Start + +```console +$ cargo run --release -p mock-node -- ~/.near/localnet/node0 +``` + +where the `node0` directory contains some pre-generated chain history in storage. +You can find two examples in the ./benches directory. + +If you are running a mock node for mainnet or testnet on a GCP node, you want to place the new client home +dir on a SSD disk for optimal rocksdb performance. Note that the +default booting disk of GCP notes are HDD, so you need to mount a new SSD disk on +your node and put the mock node's home dir there. See https://cloud.google.com/compute/docs/disks/add-persistent-disk +for how to attach and mount a new disk to an existing GCP node. + +See `$ cargo run -p mock-node -- --help` for the list of available options and their documentation. + +## Examples + +#### Replay localnet history + +```console +$ cargo r -r -p mock-node -- ~/.near/localnet/node0 +``` +Here we take the home dir of an existing node in a localnet as chain history home dir, +so the mock network will reproduce the client catching up with the entire history of the localnet from genesis. + +#### Replay mainnet history from a certain height + +To replay mainnet or testnet history, in most use cases, we want to start replaying from a certain height, instead +of from genesis block. The following comment replays mainnet history from block height 60925880 to block height 60925900. + +```console +$ cargo r -r -p mock-node -- ~/.near ~/mock_node_home_dir --start_height 60925880 --target-height 60925900 +``` + +By providing a starting height, +the binary will set up the data dir before starting the client, by copying the state at the specified height +and other chain info necessary for processing the blocks afterwards (such as block headers and blocks). +This initial setup may take a long time (The exact depends on your +source dir, my experiment takes about an hour from a non-archival source dir. Copying from archival node source +dir may take longer as rocksdb is slower). So we suggest specifying a client dir (the `~/mock_node_home_dir` argument) +so you can reuse it again without having to copy the state again. + +Note that the start height must be the last block of an epoch. + +Once you have the source dir already set up, you can run the command without `--start_height`, + +```console +$ cargo r -r -p mock-node -- ~/.near ~/mock_node_home_dir --target-height 60926000 +``` +Without `--starting_height`, the binary will not modify the client home dir before starting the mock node. Therefore, +the mock node will start from the chain head stored in the client dir. + +## Mock Network Configuration + +Certain details around how the mock network behaves can be configured with the file `mock.json` in the chain history +home directory. Currently, the only supported configuration options tell how long to wait before replying to requests +(the same as the --network_delay flag), and how often to send unrequested blocks and chunk part requests. By default, +no such unrequested messages are sent, but the following config file will have the mock code produce unrequested +blocks every 100 milliseconds, and chunk part requests every 50 milliseconds. + +```json +{ + "response_delay": { + "secs": 0, + "nanos": 100000000 + }, + "incoming_requests": { + "block": { + "interval": { + "secs": 0, + "nanos": 100000000 + } + }, + "chunk_request": { + "interval": { + "secs": 0, + "nanos": 50000000 + } + } + } +} +``` diff --git a/tools/mock-node/src/lib.rs b/tools/mock-node/src/lib.rs new file mode 100644 index 00000000000..2ad02b8e01a --- /dev/null +++ b/tools/mock-node/src/lib.rs @@ -0,0 +1,490 @@ +//! Implements `ChainHistoryAccess` and `MockPeerManagerActor`, which is the main +//! components of the mock network. + +use anyhow::{anyhow, Context as AnyhowContext}; +use near_async::time; +use near_chain::{Block, Chain, ChainStoreAccess, Error}; +use near_client::sync::header::MAX_BLOCK_HEADERS; +use near_crypto::SecretKey; +use near_network::raw::{DirectMessage, Listener, Message, RoutedMessage}; +use near_network::tcp; +use near_network::types::{PartialEncodedChunkRequestMsg, PartialEncodedChunkResponseMsg}; +use near_primitives::sharding::ChunkHash; +use near_primitives::types::{BlockHeight, ShardId}; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::future::Future; +use std::path::Path; +use std::pin::Pin; +use std::task::Poll; +use std::time::Duration; + +pub mod setup; + +// For now this is a simple struct with one field just to leave the door +// open for adding stuff and/or having different configs for different message types later. +#[derive(Clone, Debug, serde::Deserialize)] +pub struct MockIncomingRequestConfig { + // How long we wait between sending each incoming request + interval: Duration, +} + +#[derive(Clone, Debug, serde::Deserialize)] +pub struct MockIncomingRequestsConfig { + // Options for sending unrequested blocks + block: Option, + // Options for sending chunk part requests + chunk_request: Option, +} + +#[derive(Clone, Debug, serde::Deserialize)] +pub struct MockNetworkConfig { + #[serde(default = "default_delay")] + // How long we'll wait until sending replies to the client + pub response_delay: Duration, + pub incoming_requests: Option, +} + +impl MockNetworkConfig { + pub fn with_delay(response_delay: Duration) -> Self { + let mut ret = Self::default(); + ret.response_delay = response_delay; + ret + } + + pub fn from_file>(path: &P) -> anyhow::Result { + let s = std::fs::read_to_string(path)?; + Ok(serde_json::from_str(&s)?) + } +} + +pub const MOCK_DEFAULT_NETWORK_DELAY: Duration = Duration::from_millis(100); + +fn default_delay() -> Duration { + MOCK_DEFAULT_NETWORK_DELAY +} + +impl Default for MockNetworkConfig { + fn default() -> Self { + Self { response_delay: default_delay(), incoming_requests: None } + } +} + +// A request we want to spam the node under test with over and over +#[derive(Debug)] +struct PeriodicRequest { + interval: tokio::time::Interval, + message: Message, +} + +async fn next_request(r: Option<&mut PeriodicRequest>) -> Message { + match r { + Some(r) => { + r.interval.tick().await; + r.message.clone() + } + None => futures::future::pending().await, + } +} + +#[derive(Debug)] +// Info related to unrequested messages we'll send to the client +struct IncomingRequests { + block: Option, + chunk_request: Option, +} + +// get some chunk hash to serve as the source of unrequested incoming chunks. +// For now we find the first chunk hash we know about starting from the height the client will start at. +// The lower the height, the better, so that the client will actually do some work on these +// requests instead of just seeing that the chunk hash is unknown. +fn retrieve_starting_chunk_hash( + chain: &Chain, + head_height: BlockHeight, +) -> anyhow::Result { + let mut last_err = None; + for height in (chain.tail().context("failed fetching chain tail")? + 1..=head_height).rev() { + match chain + .store() + .get_block_hash_by_height(height) + .and_then(|hash| chain.store().get_block(&hash)) + .map(|block| block.chunks().iter().next().unwrap().chunk_hash()) + { + Ok(hash) => return Ok(hash), + Err(e) => { + last_err = Some(e); + } + } + } + match last_err { + Some(e) => { + Err(e).with_context(|| format!("Last error (retrieving chunk hash @ #{})", head_height)) + } + None => Err(anyhow!("given head_height is not after the chain tail?")), + } +} + +// get some block to serve as the source of unrequested incoming blocks. +fn retrieve_incoming_block(chain: &Chain, head_height: BlockHeight) -> anyhow::Result { + let mut last_err = None; + for height in (chain.tail().context("failed fetching chain tail")? + 1..=head_height).rev() { + match chain.get_block_by_height(height) { + Ok(b) => return Ok(b), + Err(e) => { + last_err = Some(e); + } + } + } + match last_err { + Some(e) => { + Err(e).with_context(|| format!("Last error (retrieving block #{})", head_height)) + } + None => Err(anyhow!("given head_height is not after the chain tail?")), + } +} + +impl IncomingRequests { + fn new( + config: &Option, + chain: &Chain, + head_height: BlockHeight, + ) -> Self { + let now = std::time::Instant::now(); + let mut block = None; + let mut chunk_request = None; + + if let Some(config) = config { + if let Some(block_config) = &config.block { + match retrieve_incoming_block(chain, head_height) { + Ok(b) => { + block = Some(PeriodicRequest { + interval: tokio::time::interval_at( + (now + block_config.interval).into(), + block_config.interval, + ), + message: Message::Direct(DirectMessage::Block(b)), + }); + } + Err(e) => { + tracing::error!("Can't retrieve block suitable for mock messages: {:?}", e); + } + }; + } + if let Some(chunk_request_config) = &config.chunk_request { + match retrieve_starting_chunk_hash(chain, head_height) { + Ok(chunk_hash) => { + chunk_request = Some(PeriodicRequest { + interval: tokio::time::interval_at( + (now + chunk_request_config.interval).into(), + chunk_request_config.interval, + ), + message: Message::Routed(RoutedMessage::PartialEncodedChunkRequest( + PartialEncodedChunkRequestMsg { + chunk_hash, + part_ords: vec![0], + tracking_shards: std::iter::once(0).collect::>(), + }, + )), + }); + } + Err(e) => { + tracing::error!( + "Can't construct chunk part request suitable for mock messages: {:?}", + e + ); + } + }; + } + } + + Self { block, chunk_request } + } + + // If the user told us to spam the node with incoming messages via the mock.json + // config file, this function will produce them at the rate specified there. + async fn next(&mut self) -> Message { + tokio::select! { + msg = next_request(self.block.as_mut()) => { + msg + } + msg = next_request(self.chunk_request.as_mut()) => { + msg + } + } + } +} + +struct InFlightMessage { + message: Message, + sent_at: tokio::time::Instant, +} + +// type that simulates network latency by waiting for `response_delay` +// before delivering queued up messages +#[pin_project::pin_project] +struct InFlightMessages { + #[pin] + next_delivery: tokio::time::Sleep, + messages: VecDeque, + response_delay: Duration, +} + +impl InFlightMessages { + fn new(response_delay: Duration) -> Self { + Self { + next_delivery: tokio::time::sleep(Duration::ZERO), + messages: VecDeque::new(), + response_delay, + } + } + + fn queue_message(self: Pin<&mut Self>, message: Message) { + let me = self.project(); + let now = tokio::time::Instant::now(); + if me.messages.is_empty() { + me.next_delivery.reset(now + *me.response_delay); + } + tracing::debug!( + "mock peer queueing up message {} to be delivered in {:?}", + &message, + me.response_delay + ); + me.messages.push_back(InFlightMessage { message, sent_at: now }); + } +} + +impl Future for InFlightMessages { + type Output = Message; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + if self.messages.is_empty() { + Poll::Pending + } else { + let mut me = self.project(); + match me.next_delivery.as_mut().poll(cx) { + Poll::Ready(()) => { + let msg = me.messages.pop_front().unwrap(); + if let Some(m) = me.messages.front() { + // if there's another message after the one we're returning here, reset + // the time til the next message gets delivered accordingly. + me.next_delivery.as_mut().reset(m.sent_at + *me.response_delay); + } + Poll::Ready(msg.message) + } + Poll::Pending => Poll::Pending, + } + } + } +} + +struct MockPeer { + listener: Listener, + chain: Chain, + current_height: BlockHeight, + network_config: MockNetworkConfig, + block_production: tokio::time::Interval, + incoming_requests: IncomingRequests, +} + +impl MockPeer { + async fn new( + chain: Chain, + secret_key: SecretKey, + listen_addr: tcp::ListenerAddr, + chain_id: String, + archival: bool, + block_production_delay: Duration, + num_shards: ShardId, + network_start_height: BlockHeight, + network_config: MockNetworkConfig, + ) -> anyhow::Result { + let listener = Listener::bind( + listen_addr, + secret_key, + &chain_id, + *chain.genesis().hash(), + network_start_height, + (0..num_shards).collect(), + archival, + 30 * time::Duration::SECOND, + ) + .await?; + let incoming_requests = + IncomingRequests::new(&network_config.incoming_requests, &chain, network_start_height); + // make sure we start at a height that actually exists, because we want self.produce_block() + // to give the first block immediately. Otherwise the node won't even try asking us for block headers + // until we give it a block. + let tail = chain.tail().context("failed getting chain tail")?; + let mut current_height = None; + for height in (tail..=network_start_height).rev() { + if chain.get_block_by_height(height).is_ok() { + current_height = Some(height); + break; + } + } + let current_height = match current_height { + Some(h) => h, + None => anyhow::bail!( + "No block found between tail {} and network start height {}", + tail, + network_start_height + ), + }; + Ok(Self { + listener, + chain, + current_height, + network_config, + block_production: tokio::time::interval(block_production_delay), + incoming_requests, + }) + } + + fn handle_message( + &self, + message: Message, + outbound: Pin<&mut InFlightMessages>, + ) -> anyhow::Result<()> { + tracing::debug!("mock peer received message: {}", &message); + match message { + Message::Direct(msg) => { + match msg { + DirectMessage::BlockHeadersRequest(hashes) => { + let headers = self + .chain + .retrieve_headers(hashes, MAX_BLOCK_HEADERS, Some(self.current_height)) + .with_context(|| { + format!( + "failed retrieving block headers up to {}", + self.current_height + ) + })?; + outbound + .queue_message(Message::Direct(DirectMessage::BlockHeaders(headers))); + } + DirectMessage::BlockRequest(hash) => { + let block = self + .chain + .get_block(&hash) + .with_context(|| format!("failed getting block {}", &hash))?; + outbound.queue_message(Message::Direct(DirectMessage::Block(block))); + } + _ => {} + }; + } + Message::Routed(r) => { + match r { + RoutedMessage::PartialEncodedChunkRequest(request) => { + let response = retrieve_partial_encoded_chunk(&self.chain, &request) + .with_context(|| { + format!( + "failed getting partial encoded chunk response for {:?}", + &request + ) + })?; + outbound.queue_message(Message::Routed( + RoutedMessage::PartialEncodedChunkResponse(response), + )); + } + // TODO: add state sync requests to possible request types so we can either + // respond or just exit, saying we don't know how to do that + _ => {} + } + } + }; + Ok(()) + } + + // simulate the normal block production of the network by sending out a + // "new" block at an interval set by the config's block_production_delay field + fn produce_block(&mut self) -> anyhow::Result> { + let height = self.current_height; + self.current_height += 1; + match self.chain.get_block_by_height(height) { + Ok(b) => Ok(Some(b)), + Err(near_chain::Error::DBNotFoundErr(_)) => Ok(None), + Err(e) => Err(e.into()), + } + } + + // returns a message produced by this mock peer. Right now this includes a new block + // at a rate given by block_production_delay in the config, and extra chunk part requests + // and blocks as specified by the mock.json config + async fn incoming_message(&mut self, target_height: BlockHeight) -> anyhow::Result { + loop { + tokio::select! { + msg = self.incoming_requests.next() => { + return Ok(msg); + } + _ = self.block_production.tick(), if self.current_height <= target_height => { + if let Some(block) = self.produce_block()? { + return Ok(Message::Direct(DirectMessage::Block(block))); + } + } + } + } + } + + // listen on the addr passed to MockPeer::new() and wait til someone connects. + // Then respond to messages indefinitely until an error occurs + async fn run(mut self, target_height: BlockHeight) -> anyhow::Result<()> { + let mut conn = self.listener.accept().await?; + let messages = InFlightMessages::new(self.network_config.response_delay); + tokio::pin!(messages); + + loop { + tokio::select! { + res = conn.recv() => { + let (msg, _timestamp) = res.with_context(|| format!("failed receiving message from {:?}", &conn))?; + + self.handle_message(msg, messages.as_mut())?; + } + msg = &mut messages => { + tracing::debug!("mock peer sending message {}", &msg); + match msg { + Message::Direct(msg) => conn.send_message(msg).await?, + Message::Routed(msg) => conn.send_routed_message(msg, conn.peer_id().clone(), 100).await?, + }; + } + msg = self.incoming_message(target_height) => { + let msg = msg?; + messages.as_mut().queue_message(msg); + } + } + } + } +} + +// TODO: this is not currently correct if we're an archival node and we get +// asked about an old chunk. In that case it needs to be reconstructed like +// in ShardsManager::prepare_partial_encoded_chunk_response() +fn retrieve_partial_encoded_chunk( + chain: &Chain, + request: &PartialEncodedChunkRequestMsg, +) -> Result { + let num_total_parts = chain.epoch_manager.num_total_parts(); + let partial_chunk = chain.store().get_partial_chunk(&request.chunk_hash)?; + let present_parts: HashMap = + partial_chunk.parts().iter().map(|part| (part.part_ord, part)).collect(); + assert_eq!( + present_parts.len(), + num_total_parts, + "chunk {:?} doesn't have all parts", + request.chunk_hash + ); + let parts: Vec<_> = request + .part_ords + .iter() + .map(|ord| present_parts.get(ord).cloned().cloned().unwrap()) + .collect(); + + // Same process for receipts as above for parts. + let present_receipts: HashMap = + partial_chunk.receipts().iter().map(|receipt| (receipt.1.to_shard_id, receipt)).collect(); + let receipts: Vec<_> = request + .tracking_shards + .iter() + .map(|shard_id| present_receipts.get(shard_id).cloned().cloned().unwrap()) + .collect(); + + Ok(PartialEncodedChunkResponseMsg { chunk_hash: request.chunk_hash.clone(), parts, receipts }) +} diff --git a/tools/mock-node/src/main.rs b/tools/mock-node/src/main.rs new file mode 100644 index 00000000000..1d18afeb652 --- /dev/null +++ b/tools/mock-node/src/main.rs @@ -0,0 +1,202 @@ +//! A binary that starts a mock testing environment for ClientActor. It +//! simulates the entire network by substituting PeerManagerActor with a mock +//! network, responding to the client's network requests by reading from a +//! pre-generated chain history in storage. + +use actix::System; +use anyhow::Context; +use mock_node::setup::{setup_mock_node, MockNode}; +use mock_node::MockNetworkConfig; +use near_actix_test_utils::run_actix; +use near_chain_configs::GenesisValidationMode; +use near_crypto::{InMemorySigner, KeyType}; +use near_jsonrpc_client::JsonRpcClient; +use near_network::tcp; +use near_o11y::testonly::init_integration_logger; +use near_primitives::types::BlockHeight; +use std::net::SocketAddr; +use std::path::{Path, PathBuf}; +use std::time::{Duration, Instant}; + +/// Program to start a mock node, which runs a regular client in a mock network environment. +/// The mock network simulates the entire network by replaying a pre-generated chain history +/// from storage and responds to the client's network requests. +/// +/// There are two ways to replay the stored history: +/// * catchup: client is behind the network and applies the blocks as fast as possible +/// * normal block production: client accept "new" blocks as they are produced +/// (in reality, blocks are just fetched from the pre-generated store). +/// +/// This is controlled by two flags: +/// * `--client-height` specifies the height the client starts at. Defaults to 0. +/// * `--network-height` specifies the hight the rest of the (simulated) +/// network starts at. Defaults to the latest recorded height. +/// +/// As a shortcut, `--start-height` sets both. +/// +/// +/// Examples +/// +/// ```console +/// # Pure catchup from genesis height to the end of the recorded history. +/// $ mock-node ~/.near/localnet/node0 +/// +/// # Pure block production starting from block height 61. +/// $ mock-node ~/.near/localnet/node0 --start-height 61 +/// +/// # Mixed: client starts at genesis and tries to catch up with the network, which starts at height 20. +/// $ mock-node ~/.near/localnet/node0 --network-height 20 +/// ``` +#[derive(clap::Parser)] +struct Cli { + /// Existing home dir for the pre-generated chain history. For example, you can use + /// the home dir of a near node. + chain_history_home_dir: String, + /// Home dir for the new client that will be started. If not specified, the binary will + /// generate a temporary directory + client_home_dir: Option, + /// Simulated network delay (in ms) + #[clap(short = 'd', long)] + network_delay: Option, + /// If specified, the binary will set up client home dir before starting the + /// client node so head of the client chain will be the specified height + /// when the client starts. The given height must be the last block in an + /// epoch. + #[clap(long, default_value = "0")] + client_height: BlockHeight, + /// The height at which the mock network starts. The client would have to + /// catch up to this height before participating in new block production. + /// + /// Defaults to the largest height in history. + #[clap(long)] + network_height: Option, + /// Shortcut to set both `--client-height` and `--network-height`. + #[clap(long, conflicts_with_all(&["client-height", "network-height"]))] + start_height: Option, + /// Target height that the client should sync to before stopping. If not specified, + /// use the height of the last block in chain history + #[clap(long)] + target_height: Option, + /// If true, use in memory storage instead of rocksdb for the client + #[clap(short = 'i', long)] + in_memory_storage: bool, + /// port the mock node should listen on + #[clap(long)] + mock_port: Option, +} + +async fn target_height_reached(client: &JsonRpcClient, target_height: BlockHeight) -> bool { + let t = Instant::now(); + let status = client.status().await; + let latency = t.elapsed(); + if latency > Duration::from_millis(100) { + tracing::warn!( + target: "mock_node", latency = %format_args!("{latency:0.2?}"), + "client is unresponsive, took too long to handle status request" + ); + } + match status { + Ok(status) => status.sync_info.latest_block_height >= target_height, + Err(_) => false, + } +} + +fn main() -> anyhow::Result<()> { + init_integration_logger(); + let args: Cli = clap::Parser::parse(); + let home_dir = Path::new(&args.chain_history_home_dir); + let mut near_config = nearcore::config::load_config(home_dir, GenesisValidationMode::Full) + .context("Error loading config")?; + near_config.validator_signer = None; + near_config.client_config.min_num_peers = 1; + let signer = InMemorySigner::from_random("mock_node".parse().unwrap(), KeyType::ED25519); + near_config.network_config.node_key = signer.secret_key; + near_config.client_config.tracked_shards = + (0..near_config.genesis.config.shard_layout.num_shards()).collect(); + if near_config.rpc_config.is_none() { + near_config.rpc_config = Some(near_jsonrpc::RpcConfig::default()); + } + let tempdir; + let client_home_dir = match &args.client_home_dir { + Some(it) => it.as_path(), + None => { + tempdir = tempfile::Builder::new().prefix("mock_node").tempdir().unwrap(); + tempdir.path() + } + }; + + let mock_config_path = home_dir.join("mock.json"); + let mut network_config = if mock_config_path.exists() { + MockNetworkConfig::from_file(&mock_config_path).with_context(|| { + format!("Error loading mock config from {}", mock_config_path.display()) + })? + } else { + MockNetworkConfig::default() + }; + if let Some(delay) = args.network_delay { + network_config.response_delay = Duration::from_millis(delay); + } + + let client_height = args.start_height.unwrap_or(args.client_height); + let network_height = args.start_height.or(args.network_height); + let addr = tcp::ListenerAddr::new(SocketAddr::new( + "127.0.0.1".parse().unwrap(), + args.mock_port.unwrap_or(24566), + )); + + run_actix(async move { + let MockNode { target_height, mut mock_peer, rpc_client } = setup_mock_node( + Path::new(&client_home_dir), + home_dir, + near_config, + &network_config, + client_height, + network_height, + args.target_height, + args.in_memory_storage, + addr, + ); + + // TODO: would be nice to be able to somehow quit right after the target block + // is applied rather than polling like this + let mut interval = tokio::time::interval(Duration::from_millis(100)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + let start = Instant::now(); + // Let's set the timeout to 5 seconds per block - just in case we test on very full blocks. + let timeout = target_height * 5; + let timeout = u32::try_from(timeout).unwrap_or(u32::MAX) * Duration::from_secs(1); + + loop { + if start.elapsed() > timeout { + tracing::error!( + "node still hasn't made it to #{} after {:?}", + target_height, + timeout + ); + mock_peer.abort(); + break; + } + tokio::select! { + _ = interval.tick() => { + if target_height_reached(&rpc_client, target_height).await { + tracing::info!("node reached target height"); + mock_peer.abort(); + break; + } + } + result = &mut mock_peer => { + match result { + Ok(Ok(_)) => tracing::info!("mock peer exited"), + Ok(Err(e)) => tracing::error!("mock peer exited with error: {:?}", e), + Err(e) => tracing::error!("failed running mock peer task: {:?}", e), + }; + break; + } + } + } + + System::current().stop(); + }); + Ok(()) +} diff --git a/tools/mock-node/src/setup.rs b/tools/mock-node/src/setup.rs new file mode 100644 index 00000000000..276d03ee233 --- /dev/null +++ b/tools/mock-node/src/setup.rs @@ -0,0 +1,460 @@ +//! Provides functions for setting up a mock network from configs and home dirs. + +use crate::{MockNetworkConfig, MockPeer}; +use anyhow::Context; +use near_chain::types::RuntimeAdapter; +use near_chain::ChainStoreUpdate; +use near_chain::{Chain, ChainGenesis, ChainStore, ChainStoreAccess, DoomslugThresholdMode}; +use near_crypto::{KeyType, SecretKey}; +use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig}; +use near_epoch_manager::{EpochManager, EpochManagerAdapter, EpochManagerHandle}; +use near_jsonrpc_client::JsonRpcClient; +use near_network::tcp; +use near_network::types::PeerInfo; +use near_primitives::network::PeerId; +use near_primitives::state_part::PartId; +use near_primitives::state_sync::get_num_state_parts; +use near_primitives::types::{BlockHeight, ShardId}; +use near_store::test_utils::create_test_store; +use nearcore::{NearConfig, NightshadeRuntime}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use std::cmp::min; +use std::path::Path; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +fn setup_runtime( + home_dir: &Path, + config: &NearConfig, + in_memory_storage: bool, +) -> (Arc, ShardTracker, Arc) { + let store = if in_memory_storage { + create_test_store() + } else { + near_store::NodeStorage::opener(home_dir, config.config.archive, &config.config.store, None) + .open() + .unwrap() + .get_hot_store() + }; + let epoch_manager = EpochManager::new_arc_handle(store.clone(), &config.genesis.config); + let shard_tracker = + ShardTracker::new(TrackedConfig::from_config(&config.client_config), epoch_manager.clone()); + let runtime = NightshadeRuntime::from_config(home_dir, store, config, epoch_manager.clone()); + (epoch_manager, shard_tracker, runtime) +} + +fn setup_mock_peer( + chain: Chain, + config: &mut NearConfig, + network_start_height: Option, + network_config: MockNetworkConfig, + target_height: BlockHeight, + num_shards: ShardId, + mock_listen_addr: tcp::ListenerAddr, +) -> tokio::task::JoinHandle> { + let network_start_height = match network_start_height { + None => target_height, + Some(0) => chain.genesis_block().header().height(), + Some(it) => it, + }; + let secret_key = SecretKey::from_random(KeyType::ED25519); + config + .network_config + .peer_store + .boot_nodes + .push(PeerInfo::new(PeerId::new(secret_key.public_key()), *mock_listen_addr)); + let chain_id = config.genesis.config.chain_id.clone(); + let block_production_delay = config.client_config.min_block_production_delay; + let archival = config.client_config.archive; + actix::spawn(async move { + let mock = MockPeer::new( + chain, + secret_key, + mock_listen_addr, + chain_id, + archival, + block_production_delay, + num_shards, + network_start_height, + network_config, + ) + .await?; + mock.run(target_height).await + }) +} + +pub struct MockNode { + // target height actually available to sync to in the chain history database + pub target_height: BlockHeight, + pub mock_peer: tokio::task::JoinHandle>, + // client that allows making RPC requests to the node under test + pub rpc_client: JsonRpcClient, +} + +/// Setup up a mock node, including setting up +/// a MockPeerManagerActor and a ClientActor and a ViewClientActor +/// `client_home_dir`: home dir for the new client +/// `network_home_dir`: home dir that contains the pre-generated chain history, will be used +/// to construct `MockPeerManagerActor` +/// `config`: config for the new client +/// `network_delay`: delay for getting response from the simulated network +/// `client_start_height`: start height for client +/// `network_start_height`: height at which the simulated network starts producing blocks +/// `target_height`: height that the simulated peers will produce blocks until. If None, will +/// use the height from the chain head in storage +/// `in_memory_storage`: if true, make client use in memory storage instead of rocksdb +/// +/// Returns a struct representing the node under test +pub fn setup_mock_node( + client_home_dir: &Path, + network_home_dir: &Path, + mut config: NearConfig, + network_config: &MockNetworkConfig, + client_start_height: BlockHeight, + network_start_height: Option, + target_height: Option, + in_memory_storage: bool, + mock_listen_addr: tcp::ListenerAddr, +) -> MockNode { + let parent_span = tracing::debug_span!(target: "mock_node", "setup_mock_node").entered(); + let (mock_network_epoch_manager, mock_network_shard_tracker, mock_network_runtime) = + setup_runtime(network_home_dir, &config, false); + tracing::info!(target: "mock_node", ?network_home_dir, "Setup network runtime"); + + let chain_genesis = ChainGenesis::new(&config.genesis); + + // set up client dir to be ready to process blocks from client_start_height + if client_start_height > 0 { + tracing::info!(target: "mock_node", "Preparing client data dir to be able to start at the specified start height {}", client_start_height); + let (client_epoch_manager, _, client_runtime) = + setup_runtime(client_home_dir, &config, in_memory_storage); + tracing::info!(target: "mock_node", ?client_home_dir, "Setup client runtime"); + let mut chain_store = ChainStore::new( + client_runtime.store().clone(), + config.genesis.config.genesis_height, + config.client_config.save_trie_changes, + ); + let mut network_chain_store = ChainStore::new( + mock_network_runtime.store().clone(), + config.genesis.config.genesis_height, + config.client_config.save_trie_changes, + ); + + let network_tail_height = network_chain_store.tail().unwrap(); + let network_head_height = network_chain_store.head().unwrap().height; + tracing::info!(target: "mock_node", network_tail_height, network_head_height, "network data chain"); + assert!( + client_start_height <= network_head_height + && client_start_height >= network_tail_height, + "client start height {} is not within the network chain range [{}, {}]", + client_start_height, + network_tail_height, + network_head_height + ); + let hash = network_chain_store.get_block_hash_by_height(client_start_height).unwrap(); + tracing::info!(target: "mock_node", "Checking whether the given start height is the last block of an epoch."); + if !mock_network_epoch_manager.is_next_block_epoch_start(&hash).unwrap() { + let epoch_start_height = + mock_network_epoch_manager.get_epoch_start_height(&hash).unwrap(); + panic!( + "start height must be the last block of an epoch, try using {} instead.", + epoch_start_height - 1 + ); + } + + // copy chain info + let chain_store_update = ChainStoreUpdate::copy_chain_state_as_of_block( + &mut chain_store, + &hash, + mock_network_epoch_manager.as_ref(), + &mut network_chain_store, + ) + .unwrap(); + chain_store_update.commit().unwrap(); + tracing::info!(target: "mock_node", "Done preparing chain state"); + + client_epoch_manager + .write() + .copy_epoch_info_as_of_block(&hash, &mock_network_epoch_manager.read()) + .unwrap(); + tracing::info!(target: "mock_node", "Done preparing epoch info"); + + // copy state for all shards + let block = network_chain_store.get_block(&hash).unwrap(); + let prev_hash = *block.header().prev_hash(); + for (shard_id, chunk_header) in block.chunks().iter().enumerate() { + let shard_id = shard_id as u64; + let state_root = chunk_header.prev_state_root(); + let state_root_node = + mock_network_runtime.get_state_root_node(shard_id, &hash, &state_root).unwrap(); + let num_parts = get_num_state_parts(state_root_node.memory_usage); + let finished_parts_count = Arc::new(AtomicUsize::new(0)); + tracing::info!(target: "mock_node", ?shard_id, ?state_root, num_parts, "Preparing state for a shard"); + + (0..num_parts) + .into_par_iter() + .try_for_each(|part_id| -> anyhow::Result<()> { + let _span = tracing::debug_span!( + target: "mock_node", + parent: &parent_span, + "obtain_and_apply_state_part", + part_id, + shard_id) + .entered(); + + let state_part = mock_network_runtime + .obtain_state_part( + shard_id, + &prev_hash, + &state_root, + PartId::new(part_id, num_parts), + ) + .with_context(|| { + format!("Obtaining state part {} in shard {}", part_id, shard_id) + })?; + client_runtime + .apply_state_part( + shard_id, + &state_root, + PartId::new(part_id, num_parts), + &state_part, + &mock_network_epoch_manager.get_epoch_id_from_prev_block(&hash)?, + ) + .with_context(|| { + format!("Applying state part {} in shard {}", part_id, shard_id) + })?; + finished_parts_count.fetch_add(1, Ordering::SeqCst); + tracing::info!( + target: "mock_node", + "Done {}/{} parts for shard {}", + finished_parts_count.load(Ordering::SeqCst), + num_parts, + shard_id, + ); + Ok(()) + }) + .unwrap(); + } + } + + let chain = Chain::new_for_view_client( + mock_network_epoch_manager.clone(), + mock_network_shard_tracker, + mock_network_runtime, + &chain_genesis, + DoomslugThresholdMode::NoApprovals, + config.client_config.save_trie_changes, + ) + .unwrap(); + let head = chain.head().unwrap(); + let target_height = min(target_height.unwrap_or(head.height), head.height); + let num_shards = mock_network_epoch_manager.num_shards(&head.epoch_id).unwrap(); + + config.network_config.peer_store.boot_nodes.clear(); + let mock_peer = setup_mock_peer( + chain, + &mut config, + network_start_height, + network_config.clone(), + target_height, + num_shards, + mock_listen_addr, + ); + + let rpc_client = near_jsonrpc_client::new_client(&format!( + "http://{}", + &config.rpc_config.as_ref().expect("the JSON RPC config must be set").addr + )); + let _node = nearcore::start_with_config(client_home_dir, config).unwrap(); + + MockNode { target_height, mock_peer, rpc_client } +} + +#[cfg(test)] +mod tests { + use crate::setup::{setup_mock_node, MockNode}; + use crate::MockNetworkConfig; + use actix::{Actor, System}; + use futures::{future, FutureExt}; + use near_actix_test_utils::{run_actix, spawn_interruptible}; + use near_chain::{ChainStore, ChainStoreAccess}; + use near_chain_configs::Genesis; + use near_client::{GetBlock, ProcessTxRequest}; + use near_crypto::{InMemorySigner, KeyType}; + use near_epoch_manager::{EpochManager, EpochManagerAdapter}; + use near_network::tcp; + use near_network::test_utils::{wait_or_timeout, WaitOrTimeoutActor}; + use near_o11y::testonly::init_integration_logger; + use near_o11y::WithSpanContextExt; + use near_primitives::transaction::SignedTransaction; + use near_store::test_utils::gen_account_from_alphabet; + use nearcore::config::GenesisExt; + use nearcore::{load_test_config, start_with_config, NEAR_BASE}; + use rand::thread_rng; + use std::ops::ControlFlow; + use std::sync::{Arc, RwLock}; + use std::time::Duration; + + // Test the basic mocknet setup. + // This test first starts a localnet with one validator node that generates 2 epochs of blocks + // to generate a chain history. + // Then start a mock network with this chain history and test that the client in the mock network can catch up these 2 epochs. + // The localnet needs to have state snapshots enabled. It copies state from + // one instance to another by using the state sync mechanism, which relies + // on the flat storage snapshots. + #[test] + fn test_mock_node_basic() { + init_integration_logger(); + + // first set up a network with only one validator and generate some blocks + let mut genesis = + Genesis::test(vec!["test0".parse().unwrap(), "test1".parse().unwrap()], 1); + let epoch_length = 50; + genesis.config.epoch_length = epoch_length; + let mut near_config = + load_test_config("test0", tcp::ListenerAddr::reserve_for_test(), genesis.clone()); + near_config.client_config.min_num_peers = 0; + near_config.config.store.state_snapshot_enabled = true; + near_config.config.tracked_shards = vec![0]; // Track all shards. + + let dir = tempfile::Builder::new().prefix("test0").tempdir().unwrap(); + let path1 = dir.path(); + run_actix(async move { + let nearcore::NearNode { view_client, client, .. } = + start_with_config(path1, near_config).expect("start_with_config"); + + let view_client1 = view_client; + let nonce = Arc::new(RwLock::new(10)); + WaitOrTimeoutActor::new( + Box::new(move |_ctx| { + let nonce = nonce.clone(); + let client1 = client.clone(); + let actor = view_client1.send(GetBlock::latest().with_span_context()); + let actor = actor.then(move |res| { + if let Ok(Ok(block)) = res { + let next_nonce = *nonce.read().unwrap(); + if next_nonce < 100 { + WaitOrTimeoutActor::new( + Box::new(move |_ctx| { + let signer0 = InMemorySigner::from_seed( + "test1".parse().unwrap(), + KeyType::ED25519, + "test1", + ); + let mut rng = thread_rng(); + let transaction = SignedTransaction::create_account( + next_nonce, + "test1".parse().unwrap(), + gen_account_from_alphabet(&mut rng, b"abcdefghijklmn"), + 5 * NEAR_BASE, + signer0.public_key.clone(), + &signer0, + block.header.hash, + ); + spawn_interruptible( + client1 + .send( + ProcessTxRequest { + transaction, + is_forwarded: false, + check_only: false, + } + .with_span_context(), + ) + .then(move |_res| future::ready(())), + ); + }), + 100, + 30000, + ) + .start(); + *nonce.write().unwrap() = next_nonce + 1; + } + + // This is the flaky part. + // The node needs to stop as late into an epoch as + // possible without going over into the next epoch. + let expected_height = epoch_length * 3 - 5; + if block.header.height >= expected_height { + tracing::info!( + block_height = block.header.height, + expected_height, + "Time to stop" + ); + System::current().stop() + } + } + future::ready(()) + }); + spawn_interruptible(actor); + }), + // Keep this number low to ensure the node is stopped late in + // the epoch without going into the next epoch. + 100, + 60000, + ) + .start(); + }); + + // start the mock network to simulate a new node "test1" to sync up + // start the client at height 10 (end of the first epoch) + let dir1 = tempfile::Builder::new().prefix("test1").tempdir().unwrap(); + let mut near_config1 = load_test_config("", tcp::ListenerAddr::reserve_for_test(), genesis); + near_config1.client_config.min_num_peers = 1; + near_config1.client_config.tracked_shards = vec![0]; // Track all shards. + near_config1.config.store.state_snapshot_enabled = true; + let network_config = MockNetworkConfig::with_delay(Duration::from_millis(10)); + + let client_start_height = { + tracing::info!(target: "mock_node", store_path = ?dir.path(), "Opening the created store to get client_start_height"); + let store = near_store::NodeStorage::opener( + dir.path(), + near_config1.config.archive, + &near_config1.config.store, + None, + ) + .open() + .unwrap() + .get_hot_store(); + let epoch_manager = + EpochManager::new_arc_handle(store.clone(), &near_config1.genesis.config); + let chain_store = ChainStore::new( + store, + near_config1.genesis.config.genesis_height, + near_config1.client_config.save_trie_changes, + ); + let network_head_hash = chain_store.head().unwrap().last_block_hash; + let last_epoch_start_height = + epoch_manager.get_epoch_start_height(&network_head_hash).unwrap(); + tracing::info!(target: "mock_node", ?network_head_hash, last_epoch_start_height); + // Needs to be the last block of an epoch. + last_epoch_start_height - 1 + }; + tracing::info!(target: "mock_node", client_start_height); + + run_actix(async { + let MockNode { rpc_client, .. } = setup_mock_node( + dir1.path(), + dir.path(), + near_config1, + &network_config, + client_start_height, + None, + None, + false, + tcp::ListenerAddr::reserve_for_test(), + ); + wait_or_timeout(100, 60000, || async { + if let Ok(status) = rpc_client.status().await { + if status.sync_info.latest_block_height >= client_start_height { + System::current().stop(); + return ControlFlow::Break(()); + } + } + ControlFlow::Continue(()) + }) + .await + .unwrap(); + }) + } +}