Skip to content

Commit

Permalink
Merge branch 'main' into fix-nonce-block-prod
Browse files Browse the repository at this point in the history
  • Loading branch information
antiyro authored Oct 14, 2024
2 parents e050d7f + d7892b2 commit 8bb4dd7
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/starknet-js-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
fail-on-cache-miss: true
- name: Setup dev chain and run tests
run: |
./target/release/madara --name madara --base-path ../madara_db --telemetry-disabled --rpc-port 9944 --rpc-cors all --devnet --preset devnet &
./target/release/madara --name madara --base-path ../madara_db --rpc-port 9944 --rpc-cors all --devnet --preset devnet &
MADARA_PID=$!
while ! echo exit | nc localhost 9944; do sleep 1; done
cd tests/js_tests
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
## Next release

- fix(db): storing a block needs to clear the current pending block
- fix(sync): Fixed pipeline stalling on machines with few cpu cores
- fix(rpc): handle batched requests in middleware
- chore: padded devnet address display with 64 chars
- feat(script): added more capabilities to the launcher script
- fix(fgw): sync from other nodes and block signature
- fix: added more launcher capabilities
Expand Down
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@ hyper = { version = "0.14", features = ["server"] }
ip_network = "0.4"
lazy_static = { version = "1.4", default-features = false }
once_cell = "1.19"
log = { version = "0.4", features = ["std", "kv_std"] }
log = { version = "0.4", features = [
"std",
"kv_std",
"release_max_level_debug",
] }
num-traits = "0.2"
num-bigint = "0.4"
primitive-types = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion crates/client/block_import/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl BlockImporter {
};

Ok(Self {
verify_apply: VerifyApply::new(Arc::clone(&backend), Arc::clone(&pool)),
verify_apply: VerifyApply::new(Arc::clone(&backend)),
pool,
metrics: BlockMetrics::register(starting_block, metrics_registry)
.context("Registering metrics for block import")?,
Expand Down
10 changes: 8 additions & 2 deletions crates/client/block_import/src/pre_validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ pub async fn pre_validate(
block: UnverifiedFullBlock,
validation: BlockValidationContext,
) -> Result<PreValidatedBlock, BlockImportError> {
pool.spawn_rayon_task(move || pre_validate_inner(block, validation)).await
log::debug!("spawning pre_validate");
let res = pool.spawn_rayon_task(move || pre_validate_inner(block, validation)).await;
log::debug!("finished pre_validate");
res
}

/// See [`pre_validate`].
Expand All @@ -31,7 +34,10 @@ pub async fn pre_validate_pending(
block: UnverifiedPendingFullBlock,
validation: BlockValidationContext,
) -> Result<PreValidatedPendingBlock, BlockImportError> {
pool.spawn_rayon_task(move || pre_validate_pending_inner(block, validation)).await
log::debug!("spawning pre_validate (pending)");
let res = pool.spawn_rayon_task(move || pre_validate_pending_inner(block, validation)).await;
log::debug!("finished pre_validate (pending)");
res
}

/// This runs on the [`rayon`] threadpool.
Expand Down
43 changes: 32 additions & 11 deletions crates/client/block_import/src/rayon.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{panic::AssertUnwindSafe, thread};
use std::{panic::AssertUnwindSafe, sync::atomic::AtomicUsize, thread};
use tokio::sync::Semaphore;

/// Wraps the rayon pool in a tokio-friendly way.
Expand All @@ -8,6 +8,9 @@ use tokio::sync::Semaphore;
/// The tasks are added in FIFO order.
pub struct RayonPool {
semaphore: Semaphore,
max_tasks: usize,
permit_id: AtomicUsize,
n_acquired_permits: AtomicUsize,
}

impl Default for RayonPool {
Expand All @@ -19,26 +22,44 @@ impl Default for RayonPool {
impl RayonPool {
pub fn new() -> Self {
let n_cores = thread::available_parallelism().expect("Getting the number of cores").get();

let max_tasks = n_cores * 2;
Self { semaphore: Semaphore::new(max_tasks) }
Self { semaphore: Semaphore::new(max_tasks), max_tasks, permit_id: 0.into(), n_acquired_permits: 0.into() }
}

pub async fn spawn_rayon_task<F, R>(&self, func: F) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let _permit = self.semaphore.acquire().await;
let max_tasks = self.max_tasks;
let permit_id = self.permit_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
log::debug!("acquire permit {permit_id}");
let permit = self.semaphore.acquire().await.expect("Poisoned semaphore");
let n_acquired_permits = self.n_acquired_permits.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + 1;
log::debug!("acquired permit {permit_id} ({n_acquired_permits}/{max_tasks})");

let (tx, rx) = tokio::sync::oneshot::channel();
let res = global_spawn_rayon_task(func).await;

// Important: fifo mode.
rayon::spawn_fifo(move || {
// We bubble up the panics to the tokio pool.
let _result = tx.send(std::panic::catch_unwind(AssertUnwindSafe(func)));
});
drop(permit);

rx.await.expect("tokio channel closed").expect("rayon task panicked")
let n_acquired_permits = self.n_acquired_permits.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
log::debug!("released permit {permit_id} ({n_acquired_permits}/{max_tasks})");
res
}
}

pub async fn global_spawn_rayon_task<F, R>(func: F) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel();

// Important: fifo mode.
rayon::spawn_fifo(move || {
// We bubble up the panics to the tokio pool.
let _result = tx.send(std::panic::catch_unwind(AssertUnwindSafe(func)));
});

rx.await.expect("Tokio channel closed").expect("Rayon task panicked")
}
25 changes: 18 additions & 7 deletions crates/client/block_import/src/verify_apply.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
BlockImportError, BlockImportResult, BlockValidationContext, PendingBlockImportResult, PreValidatedBlock,
PreValidatedPendingBlock, RayonPool, UnverifiedHeader, ValidatedCommitments,
global_spawn_rayon_task, BlockImportError, BlockImportResult, BlockValidationContext, PendingBlockImportResult,
PreValidatedBlock, PreValidatedPendingBlock, UnverifiedHeader, ValidatedCommitments,
};
use itertools::Itertools;
use mc_db::{MadaraBackend, MadaraStorageError};
Expand All @@ -18,28 +18,35 @@ mod classes;
mod contracts;

pub struct VerifyApply {
pool: Arc<RayonPool>,
pub(crate) backend: Arc<MadaraBackend>,
// Only one thread at once can verify_apply. This is the update trie step cannot be parallelized over blocks, and in addition
// our database does not support concurrent write access.
mutex: tokio::sync::Mutex<()>,
}

impl VerifyApply {
pub fn new(backend: Arc<MadaraBackend>, pool: Arc<RayonPool>) -> Self {
Self { pool, backend, mutex: Default::default() }
pub fn new(backend: Arc<MadaraBackend>) -> Self {
Self { backend, mutex: Default::default() }
}

/// This function wraps the [`verify_apply_inner`] step, which runs on the rayon pool, in a tokio-friendly future.
///
/// NOTE: we do not use [`crate::rayon::RayonPool`], but [`global_spawn_rayon_task`] - this is because that would allow for a deadlock if we were to share
/// the semaphore with the [`crate::pre_validate`] task.
/// This is fine because the [`VerifyApply::mutex`] ensures correct backpressure handling.
pub async fn verify_apply(
&self,
block: PreValidatedBlock,
validation: BlockValidationContext,
) -> Result<BlockImportResult, BlockImportError> {
log::debug!("acquiring verify_apply exclusive");
let _exclusive = self.mutex.lock().await;
log::debug!("acquired verify_apply exclusive");

let backend = Arc::clone(&self.backend);
self.pool.spawn_rayon_task(move || verify_apply_inner(&backend, block, validation)).await
let res = global_spawn_rayon_task(move || verify_apply_inner(&backend, block, validation)).await;
log::debug!("releasing verify_apply exclusive");
res
}

/// See [`Self::verify_apply`].
Expand All @@ -48,10 +55,14 @@ impl VerifyApply {
block: PreValidatedPendingBlock,
validation: BlockValidationContext,
) -> Result<PendingBlockImportResult, BlockImportError> {
log::debug!("acquiring verify_apply exclusive (pending)");
let _exclusive = self.mutex.lock().await;
log::debug!("acquired verify_apply exclusive (pending)");

let backend = Arc::clone(&self.backend);
self.pool.spawn_rayon_task(move || verify_apply_pending_inner(&backend, block, validation)).await
let res = global_spawn_rayon_task(move || verify_apply_pending_inner(&backend, block, validation)).await;
log::debug!("releasing verify_apply exclusive (pending)");
res
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/client/devnet/src/predeployed_contracts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ impl fmt::Display for DevnetKeys {
writeln!(f, "==== DEVNET PREDEPLOYED CONTRACTS ====")?;
writeln!(f)?;
for (i, contract) in self.0.iter().enumerate() {
writeln!(f, "(#{}) Address: {:#x}", i + 1, contract.address,)?;
writeln!(f, " Private key: {:#x}", contract.secret.secret_scalar())?;
writeln!(f, "(#{}) Address: {}", i + 1, contract.address.to_fixed_hex_string())?;
writeln!(f, " Private key: {}", contract.secret.secret_scalar().to_fixed_hex_string())?;
match contract.balance.as_u128_fri_wei() {
Ok((fri, wei)) => {
let (strk, eth) = (fri / STRK_FRI_DECIMALS, wei / ETH_WEI_DECIMALS);
Expand Down
34 changes: 20 additions & 14 deletions crates/node/src/service/rpc/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,6 @@ enum VersionMiddlewareError {
BodyReadError(#[from] hyper::Error),
#[error("Failed to parse JSON: {0}")]
JsonParseError(#[from] serde_json::Error),
#[error("Invalid request format")]
InvalidRequestFormat,
#[error("Invalid URL format")]
InvalidUrlFormat,
#[error("Invalid version specified")]
Expand Down Expand Up @@ -236,9 +234,6 @@ where
VersionMiddlewareError::InvalidVersion => {
ErrorObject::owned(-32600, "Invalid RPC version specified", None::<()>)
}
VersionMiddlewareError::InvalidRequestFormat => {
ErrorObject::owned(-32600, "Invalid JSON-RPC request format", None::<()>)
}
VersionMiddlewareError::UnsupportedVersion => {
ErrorObject::owned(-32601, "Unsupported RPC version specified", None::<()>)
}
Expand All @@ -248,7 +243,7 @@ where
let body = json!({
"jsonrpc": "2.0",
"error": error,
"id": 0
"id": null
})
.to_string();

Expand All @@ -267,18 +262,29 @@ async fn add_rpc_version_to_method(req: &mut hyper::Request<Body>) -> Result<(),
let version = RpcVersion::from_request_path(&path)?;

let whole_body = hyper::body::to_bytes(req.body_mut()).await?;
let mut json: Value = serde_json::from_slice(&whole_body)?;

if let Some(method) = json.get_mut("method").as_deref().and_then(Value::as_str) {
let new_method = format!("starknet_{}_{}", version.name(), method.strip_prefix("starknet_").unwrap_or(method));
let json: Value = serde_json::from_slice(&whole_body)?;

json["method"] = Value::String(new_method);
// in case of batched requests, the request is an array of JSON-RPC requests
let mut batched_request = false;
let mut items = if let Value::Array(items) = json {
batched_request = true;
items
} else {
return Err(VersionMiddlewareError::InvalidRequestFormat);
vec![json]
};

for item in items.iter_mut() {
if let Some(method) = item.get_mut("method").as_deref().and_then(Value::as_str) {
let new_method =
format!("starknet_{}_{}", version.name(), method.strip_prefix("starknet_").unwrap_or(method));

item["method"] = Value::String(new_method);
}
// we don't need to throw an error here, the request will be rejected later if the method is not supported
}

let new_body = Body::from(serde_json::to_vec(&json)?);
*req.body_mut() = new_body;
let response = if batched_request { serde_json::to_vec(&items)? } else { serde_json::to_vec(&items[0])? };
*req.body_mut() = Body::from(response);

Ok(())
}
82 changes: 82 additions & 0 deletions crates/tests/src/rpc/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,88 @@ mod test_rpc_read_calls {
assert_eq!(result, 1);
}

/// Fetches the latest block hash and number.
///
/// Example curl command:
///
/// ```bash
/// curl --location 'https://free-rpc.nethermind.io/sepolia-juno/' \
/// --header 'Content-Type: application/json' \
/// --data '[
/// {
/// "jsonrpc": "2.0",
/// "method": "starknet_blockHashAndNumber",
/// "params": {},
/// "id": 0
/// },
/// {
/// "jsonrpc": "2.0",
/// "method": "starknet_getBlockTransactionCount",
/// "params": {
/// "block_id": {
/// "block_number": 2
/// }
/// },
/// "id": 1
/// }
/// ]'
/// ```
#[rstest]
#[tokio::test]
async fn test_batched_requests_work() {
let madara = get_shared_state().await;

// use reqwest to send a batch request to the madara rpc.
// TODO: use a jsonrpc client instead of reqwest when we move
// to starknet-providers 0.12.0
let client = reqwest::Client::new();
let res = client
.post(madara.rpc_url.clone())
.json(&[
serde_json::json!({
"jsonrpc": "2.0",
"method": "starknet_blockHashAndNumber",
"params": {},
"id": 0
}),
serde_json::json!({
"jsonrpc": "2.0",
"method": "starknet_getBlockTransactionCount",
"params": {
"block_id": {
"block_number": 2
}
},
"id": 1
}),
])
.send()
.await
.unwrap();

let result = res.json::<serde_json::Value>().await.unwrap();

assert_eq!(
result[0],
serde_json::json!({
"jsonrpc": "2.0",
"result": {
"block_hash": "0x4177d1ba942a4ab94f86a476c06f0f9e02363ad410cdf177c54064788c9bcb5",
"block_number": 19
},
"id": 0
})
);
assert_eq!(
result[1],
serde_json::json!({
"jsonrpc": "2.0",
"result": 1,
"id": 1
})
);
}

/// Fetches a block with its transactions and receipts.
///
/// Example curl command:
Expand Down

0 comments on commit 8bb4dd7

Please sign in to comment.