Skip to content

Commit

Permalink
Merge branch 'main' into fix/rework-analytics
Browse files Browse the repository at this point in the history
  • Loading branch information
heemankv authored Dec 24, 2024
2 parents ef5e630 + 34d629c commit 922f0ca
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 152 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
## Next release

- fix: instrumentation code
- feat: block resource cap removed from the pending tick
- fix: replace class hash issue resolved + gas fees issue resolved
- fix: trim hash of eth state was failing with 0x0
- fix: devnet accounts getting deployed in sequencer mode
- fix(rpc): fix BroadcastedDeclareTxn V3 in starknet-types-rpc
- fix: oracle need condition
Expand Down
2 changes: 1 addition & 1 deletion cairo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
These contracts are used for the genesis block in devnet mode. For real world use, the [madara bootstrapper] is used instead.
We use [OpenZeppelin] contracts.

[OpenZeppelin]: https://docs.openzeppelin.com
[openzeppelin]: https://docs.openzeppelin.com
[madara bootstrapper]: https://github.com/madara-alliance/madara-bootstrapper
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ pub(crate) fn state_map_to_state_diff(
let mut replaced_classes = Vec::new();
for (contract_address, new_class_hash) in diff.class_hashes {
let replaced = if let Some(on_top_of) = on_top_of {
backend.get_contract_class_hash_at(on_top_of, &contract_address.to_felt())?.is_some()
match backend.get_contract_class_hash_at(on_top_of, &contract_address.to_felt())? {
Some(class_hash) => class_hash != new_class_hash.to_felt(),
None => false,
}
} else {
// Executing genesis block: nothing being redefined here
false
Expand Down
234 changes: 130 additions & 104 deletions crates/client/block_production/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use crate::close_block::close_block;
use crate::metrics::BlockProductionMetrics;
use blockifier::blockifier::transaction_executor::{TransactionExecutor, BLOCK_STATE_ACCESS_ERR};
use blockifier::bouncer::{BouncerWeights, BuiltinCount};
use blockifier::bouncer::BouncerWeights;
use blockifier::transaction::errors::TransactionExecutionError;
use finalize_execution_state::StateDiffToStateMapError;
use mc_block_import::{BlockImportError, BlockImporter};
Expand Down Expand Up @@ -77,6 +77,29 @@ pub enum Error {
#[error("State diff error when continuing the pending block: {0:#}")]
PendingStateDiff(#[from] StateDiffToStateMapError),
}

/// Result of a block continuation operation, containing the updated state and execution statistics.
/// This is returned by [`BlockProductionTask::continue_block`] when processing a batch of transactions.
struct ContinueBlockResult {
/// The accumulated state changes from executing transactions in this continuation
state_diff: StateDiff,

/// Tracks which segments of Cairo program code were accessed during transaction execution,
/// organized by class hash. This information is used as input for SNOS (Starknet OS)
/// when generating proofs of execution.
visited_segments: VisitedSegments,

/// The current state of resource consumption tracked by the bouncer
bouncer_weights: BouncerWeights,

/// Statistics about transaction processing during this continuation
stats: ContinueBlockStats,

/// Indicates whether the block reached its resource limits during this continuation.
/// When true, no more transactions can be added to the current block.
block_now_full: bool,
}

/// The block production task consumes transactions from the mempool in batches.
/// This is to allow optimistic concurrency. However, the block may get full during batch execution,
/// and we need to re-add the transactions back into the mempool.
Expand Down Expand Up @@ -165,11 +188,9 @@ impl<Mempool: MempoolProvider> BlockProductionTask<Mempool> {
}

#[tracing::instrument(skip(self), fields(module = "BlockProductionTask"))]
fn continue_block(
&mut self,
bouncer_cap: BouncerWeights,
) -> Result<(StateDiff, VisitedSegments, BouncerWeights, ContinueBlockStats), Error> {
fn continue_block(&mut self, bouncer_cap: BouncerWeights) -> Result<ContinueBlockResult, Error> {
let mut stats = ContinueBlockStats::default();
let mut block_now_full = false;

self.executor.bouncer.bouncer_config.block_max_capacity = bouncer_cap;
let batch_size = self.backend.chain_config().execution_batch_size;
Expand Down Expand Up @@ -201,7 +222,7 @@ impl<Mempool: MempoolProvider> BlockProductionTask<Mempool> {
// Execute the transactions.
let all_results = self.executor.execute_txs(&txs_to_process_blockifier);
// When the bouncer cap is reached, blockifier will return fewer results than what we asked for.
let block_now_full = all_results.len() < txs_to_process_blockifier.len();
block_now_full = all_results.len() < txs_to_process_blockifier.len();

txs_to_process_blockifier.drain(..all_results.len()); // remove the used txs

Expand Down Expand Up @@ -273,54 +294,85 @@ impl<Mempool: MempoolProvider> BlockProductionTask<Mempool> {
stats.n_re_added_to_mempool
);

Ok((state_diff, visited_segments, bouncer_weights, stats))
Ok(ContinueBlockResult { state_diff, visited_segments, bouncer_weights, stats, block_now_full })
}

/// Closes the current block and prepares for the next one
#[tracing::instrument(skip(self), fields(module = "BlockProductionTask"))]
async fn close_and_prepare_next_block(
&mut self,
state_diff: StateDiff,
visited_segments: VisitedSegments,
start_time: Instant,
) -> Result<(), Error> {
let block_n = self.block_n();
// Convert the pending block to a closed block and save to db
let parent_block_hash = Felt::ZERO; // temp parent block hash
let new_empty_block = MadaraPendingBlock::new_empty(make_pending_header(
parent_block_hash,
self.backend.chain_config(),
self.l1_data_provider.as_ref(),
));

let block_to_close = mem::replace(&mut self.block, new_empty_block);
let declared_classes = mem::take(&mut self.declared_classes);

let n_txs = block_to_close.inner.transactions.len();

// Close and import the block
let import_result = close_block(
&self.importer,
block_to_close,
&state_diff,
self.backend.chain_config().chain_id.clone(),
block_n,
declared_classes,
visited_segments,
)
.await?;

// Flush changes to disk
self.backend.flush().map_err(|err| BlockImportError::Internal(format!("DB flushing error: {err:#}").into()))?;

// Update parent hash for new pending block
self.block.info.header.parent_block_hash = import_result.block_hash;

// Prepare executor for next block
self.executor =
ExecutionContext::new_in_block(Arc::clone(&self.backend), &self.block.info.clone().into())?.tx_executor();
self.current_pending_tick = 0;

let end_time = start_time.elapsed();
tracing::info!("⛏️ Closed block #{} with {} transactions - {:?}", block_n, n_txs, end_time);

// Record metrics
let attributes = [
KeyValue::new("transactions_added", n_txs.to_string()),
KeyValue::new("closing_time", end_time.as_secs_f32().to_string()),
];

self.metrics.block_counter.add(1, &[]);
self.metrics.block_gauge.record(block_n, &attributes);
self.metrics.transaction_counter.add(n_txs as u64, &[]);

Ok(())
}

/// Each "tick" of the block time updates the pending block but only with the appropriate fraction of the total bouncer capacity.
#[tracing::instrument(skip(self), fields(module = "BlockProductionTask"))]
pub fn on_pending_time_tick(&mut self) -> Result<(), Error> {
pub async fn on_pending_time_tick(&mut self) -> Result<bool, Error> {
let current_pending_tick = self.current_pending_tick;
let n_pending_ticks_per_block = self.backend.chain_config().n_pending_ticks_per_block();
let config_bouncer = self.backend.chain_config().bouncer_config.block_max_capacity;
if current_pending_tick == 0 {
return Ok(());
return Ok(false);
}

// Reduced bouncer capacity for the current pending tick

// reduced_gas = gas * current_pending_tick/n_pending_ticks_per_block
// - we're dealing with integers here so prefer having the division last
// - use u128 here because the multiplication would overflow
// - div by zero: see [`ChainConfig::precheck_block_production`]
let reduced_cap =
|v: usize| (v as u128 * current_pending_tick as u128 / n_pending_ticks_per_block as u128) as usize;

let gas = reduced_cap(config_bouncer.gas);
let frac = current_pending_tick as f64 / n_pending_ticks_per_block as f64;
tracing::debug!("begin pending tick {current_pending_tick}/{n_pending_ticks_per_block}, proportion for this tick: {frac:.2}, gas limit: {gas}/{}", config_bouncer.gas);

let bouncer_cap = BouncerWeights {
builtin_count: BuiltinCount {
add_mod: reduced_cap(config_bouncer.builtin_count.add_mod),
bitwise: reduced_cap(config_bouncer.builtin_count.bitwise),
ecdsa: reduced_cap(config_bouncer.builtin_count.ecdsa),
ec_op: reduced_cap(config_bouncer.builtin_count.ec_op),
keccak: reduced_cap(config_bouncer.builtin_count.keccak),
mul_mod: reduced_cap(config_bouncer.builtin_count.mul_mod),
pedersen: reduced_cap(config_bouncer.builtin_count.pedersen),
poseidon: reduced_cap(config_bouncer.builtin_count.poseidon),
range_check: reduced_cap(config_bouncer.builtin_count.range_check),
range_check96: reduced_cap(config_bouncer.builtin_count.range_check96),
},
gas,
message_segment_length: reduced_cap(config_bouncer.message_segment_length),
n_events: reduced_cap(config_bouncer.n_events),
n_steps: reduced_cap(config_bouncer.n_steps),
state_diff_size: reduced_cap(config_bouncer.state_diff_size),
};
// Use full bouncer capacity
let bouncer_cap = self.backend.chain_config().bouncer_config.block_max_capacity;

let start_time = Instant::now();
let (state_diff, visited_segments, bouncer_weights, stats) = self.continue_block(bouncer_cap)?;

let ContinueBlockResult { state_diff, visited_segments, bouncer_weights, stats, block_now_full } =
self.continue_block(bouncer_cap)?;

if stats.n_added_to_block > 0 {
tracing::info!(
"🧮 Executed and added {} transaction(s) to the pending block at height {} - {:?}",
Expand All @@ -330,6 +382,13 @@ impl<Mempool: MempoolProvider> BlockProductionTask<Mempool> {
);
}

// Check if block is full
if block_now_full {
tracing::info!("Resource limits reached, closing block early");
self.close_and_prepare_next_block(state_diff, visited_segments, start_time).await?;
return Ok(true);
}

// Store pending block
// todo, prefer using the block import pipeline?
self.backend.store_block(
Expand All @@ -342,7 +401,7 @@ impl<Mempool: MempoolProvider> BlockProductionTask<Mempool> {
// do not forget to flush :)
self.backend.flush().map_err(|err| BlockImportError::Internal(format!("DB flushing error: {err:#}").into()))?;

Ok(())
Ok(false)
}

/// This creates a block, continuing the current pending block state up to the full bouncer limit.
Expand All @@ -351,10 +410,15 @@ impl<Mempool: MempoolProvider> BlockProductionTask<Mempool> {
let block_n = self.block_n();
tracing::debug!("closing block #{}", block_n);

// Complete the block with full bouncer capacity.
// Complete the block with full bouncer capacity
let start_time = Instant::now();
let (mut new_state_diff, visited_segments, _weights, _stats) =
self.continue_block(self.backend.chain_config().bouncer_config.block_max_capacity)?;
let ContinueBlockResult {
state_diff: mut new_state_diff,
visited_segments,
bouncer_weights: _weights,
stats: _stats,
block_now_full: _block_now_full,
} = self.continue_block(self.backend.chain_config().bouncer_config.block_max_capacity)?;

// SNOS requirement: For blocks >= 10, the hash of the block 10 blocks prior
// at address 0x1 with the block number as the key
Expand All @@ -371,62 +435,14 @@ impl<Mempool: MempoolProvider> BlockProductionTask<Mempool> {
.ok_or_else(|| {
Error::Unexpected(format!("No block hash found for block number {prev_block_number}").into())
})?;
let address = Felt::ONE;

new_state_diff.storage_diffs.push(ContractStorageDiffItem {
address,
address: Felt::ONE,
storage_entries: vec![StorageEntry { key: Felt::from(prev_block_number), value: prev_block_hash }],
});
}

// Convert the pending block to a closed block and save to db.

let parent_block_hash = Felt::ZERO; // temp parent block hash
let new_empty_block = MadaraPendingBlock::new_empty(make_pending_header(
parent_block_hash,
self.backend.chain_config(),
self.l1_data_provider.as_ref(),
));

let block_to_close = mem::replace(&mut self.block, new_empty_block);
let declared_classes = mem::take(&mut self.declared_classes);

let n_txs = block_to_close.inner.transactions.len();

// This is compute heavy as it does the commitments and trie computations.
let import_result = close_block(
&self.importer,
block_to_close,
&new_state_diff,
self.backend.chain_config().chain_id.clone(),
block_n,
declared_classes,
visited_segments,
)
.await?;
// do not forget to flush :)
self.backend.flush().map_err(|err| BlockImportError::Internal(format!("DB flushing error: {err:#}").into()))?;

// fix temp parent block hash for new pending :)
self.block.info.header.parent_block_hash = import_result.block_hash;

// Prepare for next block.
self.executor =
ExecutionContext::new_in_block(Arc::clone(&self.backend), &self.block.info.clone().into())?.tx_executor();
self.current_pending_tick = 0;

let end_time = start_time.elapsed();
tracing::info!("⛏️ Closed block #{} with {} transactions - {:?}", block_n, n_txs, end_time);

let attributes = [
KeyValue::new("transactions_added", n_txs.to_string()),
KeyValue::new("closing_time", end_time.as_secs_f32().to_string()),
];

self.metrics.block_counter.add(1, &[]);
self.metrics.block_gauge.record(block_n, &attributes);
self.metrics.transaction_counter.add(n_txs as u64, &[]);

Ok(())
self.close_and_prepare_next_block(new_state_diff, visited_segments, start_time).await
}

#[tracing::instrument(skip(self, ctx), fields(module = "BlockProductionTask"))]
Expand Down Expand Up @@ -463,7 +479,7 @@ impl<Mempool: MempoolProvider> BlockProductionTask<Mempool> {
// ensure the pending block tick and block time match up
interval_pending_block_update.reset_at(instant + interval_pending_block_update.period());
},
_ = interval_pending_block_update.tick() => {
instant = interval_pending_block_update.tick() => {
let n_pending_ticks_per_block = self.backend.chain_config().n_pending_ticks_per_block();

if self.current_pending_tick == 0 || self.current_pending_tick >= n_pending_ticks_per_block {
Expand All @@ -473,10 +489,20 @@ impl<Mempool: MempoolProvider> BlockProductionTask<Mempool> {
continue
}

if let Err(err) = self.on_pending_time_tick() {
tracing::error!("Pending block update task has errored: {err:#}");
match self.on_pending_time_tick().await {
Ok(block_closed) => {
if block_closed {
interval_pending_block_update.reset_at(instant + interval_pending_block_update.period());
interval_block_time.reset_at(instant + interval_block_time.period());
self.current_pending_tick = 0;
} else {
self.current_pending_tick += 1;
}
}
Err(err) => {
tracing::error!("Pending block update task has errored: {err:#}");
}
}
self.current_pending_tick += 1;
},
_ = ctx.cancelled() => break,
}
Expand Down
Loading

0 comments on commit 922f0ca

Please sign in to comment.