Skip to content

Commit

Permalink
l2 sync: refactor a bit, and add polling (#101)
Browse files Browse the repository at this point in the history
Co-authored-by: antiyro <[email protected]>
  • Loading branch information
cchudant and antiyro authored May 7, 2024
1 parent ff92916 commit fd93db0
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 131 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
git # Deoxys Changelog
# Deoxys Changelog

## Next release

- feat(l2 sync): polling to get new blocks once sync has caught up with the chain
- perf: store key
- fix: sync, remove `unwrap` in storage
- fix(classes): Fixed classes on the RPC level by adding ordering and complete deserialisation
Expand Down
42 changes: 30 additions & 12 deletions crates/client/sync/src/fetch/fetchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub struct FetchConfig {
pub verify: bool,
/// The optional API_KEY to avoid rate limiting from the sequencer gateway.
pub api_key: Option<String>,
/// Polling interval
pub pending_polling_interval: Duration,
}

pub async fn fetch_block(client: &SequencerGatewayProvider, block_number: u64) -> Result<p::Block, L2SyncError> {
Expand All @@ -49,10 +51,17 @@ pub async fn fetch_block(client: &SequencerGatewayProvider, block_number: u64) -
Ok(block)
}

pub struct L2BlockAndUpdates {
pub block_n: u64,
pub block: p::Block,
pub state_update: StateUpdate,
pub class_update: Vec<ContractClassData>,
}

pub async fn fetch_block_and_updates(
block_n: u64,
provider: Arc<SequencerGatewayProvider>,
) -> Result<(p::Block, StateUpdate, Vec<ContractClassData>), L2SyncError> {
) -> Result<L2BlockAndUpdates, L2SyncError> {
const MAX_RETRY: u32 = 15;
let mut attempt = 0;
let base_delay = Duration::from_secs(1);
Expand All @@ -64,23 +73,32 @@ pub async fn fetch_block_and_updates(
let (block, state_update) = tokio::join!(block, state_update);
log::debug!("fetch_block_and_updates: done {block_n}");

match block.as_ref().err().or(state_update.as_ref().err()) {
Some(L2SyncError::Provider(ProviderError::RateLimited)) => {
log::info!("The fetching process has been rate limited");
log::debug!("The fetching process has been rate limited, retrying in {:?} seconds", base_delay);
match (block, state_update) {
(Err(L2SyncError::Provider(err)), _) | (_, Err(L2SyncError::Provider(err))) => {
// Exponential backoff with a cap on the delay
let delay = base_delay * 2_u32.pow(attempt - 1).min(6); // Cap to prevent overly long delays
match err {
ProviderError::RateLimited => {
log::info!("The fetching process has been rate limited, retrying in {:?} seconds", delay)
}
// sometimes the sequencer just errors out when trying to rate limit us (??) so retry in that case
_ => log::info!("The provider has returned an error, retrying in {:?} seconds", delay),
}
attempt += 1;
if attempt >= MAX_RETRY {
return Err(L2SyncError::FetchRetryLimit);
return Err(if matches!(err, ProviderError::RateLimited) {
L2SyncError::FetchRetryLimit
} else {
L2SyncError::Provider(err)
});
}
// Exponential backoff with a cap on the delay
let delay = base_delay * 2_u32.pow(attempt - 1).min(6); // Cap to prevent overly long delays
tokio::time::sleep(delay).await;
}
_ => {
let (block, (state_update, class_update)) = (block?, state_update?);
return Ok((block, state_update, class_update));
(Err(err), _) | (_, Err(err)) => return Err(err),
(Ok(block), Ok((state_update, class_update))) => {
return Ok(L2BlockAndUpdates { block_n, block, state_update, class_update });
}
}
};
}
}

Expand Down
65 changes: 65 additions & 0 deletions crates/client/sync/src/fetch/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,66 @@
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;

use futures::prelude::*;
use starknet_core::types::StarknetError;
use starknet_providers::{ProviderError, SequencerGatewayProvider};
use tokio::sync::mpsc;

use self::fetchers::L2BlockAndUpdates;
use crate::fetch::fetchers::fetch_block_and_updates;
use crate::l2::L2SyncError;

pub mod fetchers;

pub async fn l2_fetch_task(
first_block: u64,
fetch_stream_sender: mpsc::Sender<L2BlockAndUpdates>,
provider: Arc<SequencerGatewayProvider>,
pending_polling_interval: Duration,
) -> Result<(), L2SyncError> {
// First, catch up with the chain

let mut last_block = {
// Fetch blocks and updates in parallel one time before looping
let fetch_stream = (first_block..).map(|block_n| {
let provider = Arc::clone(&provider);
async move { (block_n, fetch_block_and_updates(block_n, provider).await) }
});

// Have 10 fetches in parallel at once, using futures Buffered
let mut fetch_stream = stream::iter(fetch_stream).buffered(10);
loop {
let (block_n, val) = pin!(fetch_stream.next()).await.unwrap(); // UNWRAP: stream is infinite
log::debug!("got {:?}", block_n);

match val {
Err(L2SyncError::Provider(ProviderError::StarknetError(StarknetError::BlockNotFound))) => {
break block_n;
}
val => fetch_stream_sender.send(val?).await.expect("reciever task is closed"),
}
}
};

log::debug!("We caught up with the chain yay");

// Polling

let mut interval = tokio::time::interval(pending_polling_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;

loop {
match fetch_block_and_updates(last_block + 1, Arc::clone(&provider)).await {
Err(L2SyncError::Provider(ProviderError::StarknetError(StarknetError::BlockNotFound))) => {
break;
}
val => fetch_stream_sender.send(val?).await.expect("reciever task is closed"),
}

last_block += 1;
}
}
}
Loading

0 comments on commit fd93db0

Please sign in to comment.