Skip to content

Commit

Permalink
perf: convert blocks in parallel (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
cchudant authored May 8, 2024
1 parent b64fddd commit a6d28fa
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 56 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Next release

- perf: convert blocks in parallel
- feat(commitments): Joined hash computation in event and tx commitments
- feat(l2 sync): polling to get new blocks once sync has caught up with the chain
- perf: store key
Expand Down
2 changes: 1 addition & 1 deletion crates/client/sync/src/fetch/fetchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub async fn fetch_apply_genesis_block(config: FetchConfig) -> Result<DeoxysBloc
};
let block = client.get_block(BlockId::Number(0)).await.map_err(|e| format!("failed to get block: {e}"))?;

Ok(crate::convert::block(block).await)
Ok(crate::convert::convert_block(block).expect("invalid genesis block"))
}

#[allow(clippy::too_many_arguments)]
Expand Down
123 changes: 74 additions & 49 deletions crates/client/sync/src/l2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use std::pin::pin;
use std::str::FromStr;
use std::sync::{Arc, RwLock};

use futures::{stream, StreamExt, TryStreamExt};
use lazy_static::lazy_static;
use mc_db::storage_handler::primitives::contract_class::ClassUpdateWrapper;
use mc_db::storage_handler::primitives::contract_class::{ClassUpdateWrapper, ContractClassData};
use mc_db::storage_updates::{store_class_update, store_key_update, store_state_update};
use mc_db::DeoxysBackend;
use mp_block::DeoxysBlock;
Expand All @@ -24,10 +25,12 @@ use tokio::sync::mpsc::Sender;
use tokio::time::Duration;

use crate::commitments::lib::{build_commitment_state_diff, update_state_root};
use crate::convert::convert_block;
use crate::fetch::fetchers::L2BlockAndUpdates;
use crate::fetch::l2_fetch_task;
use crate::l1::ETHEREUM_STATE_UPDATE;
use crate::CommandSink;
use crate::utils::PerfStopwatch;
use crate::{stopwatch_end, CommandSink};

/// Prefer this compared to [`tokio::spawn_blocking`], as spawn_blocking creates new OS threads and
/// we don't really need that
Expand Down Expand Up @@ -129,7 +132,7 @@ pub struct SenderConfig {
}

async fn l2_verify_and_apply_task(
mut fetch_stream_receiver: mpsc::Receiver<L2BlockAndUpdates>,
mut updates_receiver: mpsc::Receiver<L2ConvertedBlockAndUpdates>,
block_sender: Sender<DeoxysBlock>,
mut command_sink: CommandSink,
verify: bool,
Expand All @@ -138,81 +141,64 @@ async fn l2_verify_and_apply_task(

let mut last_block_hash = None;

while let Some(L2BlockAndUpdates { block_n, block, state_update, class_update }) =
pin!(fetch_stream_receiver.recv()).await
while let Some(L2ConvertedBlockAndUpdates { block_n, block, state_update, class_update }) =
pin!(updates_receiver.recv()).await
{
let (state_update, block_conv) = {
let state_update = if verify {
let state_update = Arc::new(state_update);
let state_update_1 = Arc::clone(&state_update);
let global_state_root = block.header().global_state_root;

let block_conv = spawn_compute(move || {
let convert_block = |block| {
let start = std::time::Instant::now();
let block_conv = crate::convert::convert_block_sync(block);
log::debug!("convert::convert_block_sync {}: {:?}", block_n, std::time::Instant::now() - start);
block_conv
};
let ver_l2 = || {
let start = std::time::Instant::now();
let state_root = verify_l2(block_n, &state_update);
log::debug!("verify_l2: {:?}", std::time::Instant::now() - start);
state_root
};
spawn_compute(move || {
let sw = PerfStopwatch::new();
let state_root = verify_l2(block_n, &state_update);
stopwatch_end!(sw, "verify_l2: {:?}");

if verify {
let (state_root, block_conv) = rayon::join(ver_l2, || convert_block(block));
if (block_conv.header().global_state_root) != state_root {
log::info!(
"❗ Verified state: {} doesn't match fetched state: {}",
state_root,
block_conv.header().global_state_root
);
}
block_conv
} else {
convert_block(block)
if global_state_root != state_root {
log::info!("❗ Verified state: {} doesn't match fetched state: {}", state_root, global_state_root);
}
})
.await;

// UNWRAP: we need a 'static future as we are spawning tokio tasks further down the line
// this is a hack to achieve that, we put the update in an arc and then unwrap it at the end
// this will not panic as the Arc should not be aliased.
let state_update = Arc::try_unwrap(state_update_1).unwrap();
(state_update, block_conv)
Arc::try_unwrap(state_update_1).unwrap()
} else {
state_update
};

let block_sender = Arc::clone(&block_sender);
let storage_diffs = state_update.state_diff.storage_diffs.clone();
tokio::join!(
async move {
block_sender.send(block_conv).await.expect("block reciever channel is closed");
block_sender.send(block).await.expect("block reciever channel is closed");
},
async {
let start = std::time::Instant::now();
let sw = PerfStopwatch::new();
if store_state_update(block_n, state_update).await.is_err() {
log::info!("❗ Failed to store state update for block {block_n}");
};
log::debug!("end store_state {}: {:?}", block_n, std::time::Instant::now() - start);
stopwatch_end!(sw, "end store_state {}: {:?}", block_n);
},
async {
let start = std::time::Instant::now();
let sw = PerfStopwatch::new();
if store_class_update(block_n, ClassUpdateWrapper(class_update)).await.is_err() {
log::info!("❗ Failed to store class update for block {block_n}");
};
log::debug!("end store_class {}: {:?}", block_n, std::time::Instant::now() - start);
stopwatch_end!(sw, "end store_class {}: {:?}", block_n);
},
async {
let start = std::time::Instant::now();
let sw = PerfStopwatch::new();
if store_key_update(block_n, &storage_diffs).await.is_err() {
log::info!("❗ Failed to store key update for block {block_n}");
};
log::debug!("end store_key {}: {:?}", block_n, std::time::Instant::now() - start);
stopwatch_end!(sw, "end store_key {}: {:?}", block_n);
},
async {
let start = std::time::Instant::now();
let sw = PerfStopwatch::new();
create_block(&mut command_sink, &mut last_block_hash).await.expect("creating block");
log::debug!("end create_block {}: {:?}", block_n, std::time::Instant::now() - start);
stopwatch_end!(sw, "end create_block {}: {:?}", block_n);
}
);

Expand All @@ -225,6 +211,42 @@ async fn l2_verify_and_apply_task(
Ok(())
}

pub struct L2ConvertedBlockAndUpdates {
pub block_n: u64,
pub block: DeoxysBlock,
pub state_update: StateUpdate,
pub class_update: Vec<ContractClassData>,
}

async fn l2_block_conversion_task(
updates_receiver: mpsc::Receiver<L2BlockAndUpdates>,
output: mpsc::Sender<L2ConvertedBlockAndUpdates>,
) -> Result<(), L2SyncError> {
// Items of this stream are futures that resolve to blocks, which becomes a regular stream of blocks
// using futures buffered.
let conversion_stream = stream::unfold(updates_receiver, |mut updates_recv| async {
updates_recv.recv().await.map(|L2BlockAndUpdates { block_n, block, state_update, class_update }| {
(
spawn_compute(move || {
let sw = PerfStopwatch::new();
let block = convert_block(block)?;
stopwatch_end!(sw, "convert_block: {:?}");
Ok(L2ConvertedBlockAndUpdates { block_n, block, state_update, class_update })
}),
updates_recv,
)
})
});

conversion_stream
.buffered(10)
.try_for_each(|block| async {
output.send(block).await.expect("downstream task is not running");
Ok(())
})
.await
}

/// Spawns workers to fetch blocks and state updates from the feeder.
/// `n_blocks` is optionally the total number of blocks to sync, for debugging/benchmark purposes.
pub async fn sync<C>(
Expand All @@ -240,11 +262,15 @@ where
C: HeaderBackend<DBlockT> + 'static,
{
let (fetch_stream_sender, fetch_stream_receiver) = mpsc::channel(10);
let (block_conv_sender, block_conv_receiver) = mpsc::channel(10);
let provider = Arc::new(provider);

// [Fetch task] ==new blocks and updates=> [Verification and apply task]
// Fetch task does parallel fetching, verification is sequential and does all the compute and db
// updates
// [Fetch task] ==new blocks and updates=> [Block conversion task] ======> [Verification and apply
// task]
// - Fetch task does parallel fetching
// - Block conversion is compute heavy and parallel wrt. the next few blocks,
// - Verification is sequential and does a lot of compute when state root verification is enabled.
// DB updates happen here too.

// TODO: make it cancel-safe, tasks outlive their parent here when error occurs here
// we are using separate tasks so that fetches don't get clogged up if by any chance the verify task
Expand All @@ -262,10 +288,9 @@ where
}
}
} => Ok(()),
// fetch blocks and updates in parallel
res = tokio::spawn(l2_fetch_task(first_block, fetch_stream_sender, Arc::clone(&provider), pending_polling_interval)) => res.expect("join error"),
// apply blocks and updates sequentially
res = tokio::spawn(l2_verify_and_apply_task(fetch_stream_receiver, block_sender, command_sink, verify)) => res.expect("join error"),
res = tokio::spawn(l2_block_conversion_task(fetch_stream_receiver, block_conv_sender)) => res.expect("join error"),
res = tokio::spawn(l2_verify_and_apply_task(block_conv_receiver, block_sender, command_sink, verify)) => res.expect("join error"),
)?;

Ok(())
Expand Down Expand Up @@ -340,7 +365,7 @@ where
.map_err(|e| format!("Failed to get pending state update: {e}"))?;

*STARKNET_PENDING_BLOCK.write().expect("Failed to acquire write lock on STARKNET_PENDING_BLOCK") =
Some(crate::convert::block(block).await);
Some(spawn_compute(|| crate::convert::convert_block(block)).await.unwrap());

*STARKNET_PENDING_STATE_UPDATE.write().expect("Failed to aquire write lock on STARKNET_PENDING_STATE_UPDATE") =
Some(crate::convert::state_update(state_update));
Expand Down
10 changes: 4 additions & 6 deletions crates/client/sync/src/utils/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ use starknet_providers::sequencer::models::state_update::{
use starknet_providers::sequencer::models::{self as p, StateUpdate as StateUpdateProvider};

use crate::commitments::lib::calculate_commitments;
use crate::l2::L2SyncError;
use crate::utility::get_config;

pub async fn block(block: p::Block) -> DeoxysBlock {
tokio::task::spawn_blocking(|| convert_block_sync(block)).await.expect("join error")
}

pub fn convert_block_sync(block: p::Block) -> DeoxysBlock {
/// Compute heavy, this should only be called in a rayon ctx
pub fn convert_block(block: p::Block) -> Result<DeoxysBlock, L2SyncError> {
// converts starknet_provider transactions and events to mp_transactions and starknet_api events
let transactions = transactions(block.transactions);
let events = events(&block.transaction_receipts);
Expand Down Expand Up @@ -72,7 +70,7 @@ pub fn convert_block_sync(block: p::Block) -> DeoxysBlock {
.map(|(i, r)| mp_block::OrderedEvents::new(i as u128, r.events.iter().map(event).collect()))
.collect();

DeoxysBlock::new(header, transactions, ordered_events)
Ok(DeoxysBlock::new(header, transactions, ordered_events))
}

fn transactions(txs: Vec<p::TransactionType>) -> Vec<Transaction> {
Expand Down
19 changes: 19 additions & 0 deletions crates/client/sync/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
#![macro_use]
#![allow(clippy::new_without_default)]
use std::time::Instant;

pub mod constant;
pub mod convert;
#[cfg(feature = "m")]
pub mod m;
pub mod utility;

pub struct PerfStopwatch(pub Instant);

impl PerfStopwatch {
pub fn new() -> PerfStopwatch {
PerfStopwatch(Instant::now())
}
}

#[macro_export]
macro_rules! stopwatch_end {
($stopwatch:expr, $($arg:tt)+) => {
log::debug!($($arg)+, $stopwatch.0.elapsed())
}
}

0 comments on commit a6d28fa

Please sign in to comment.