Skip to content

Commit

Permalink
feat(rpc): implement starknet_subscribeEvents WebSocket endpoint (#466)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbcaron authored Jan 21, 2025
1 parent 344822d commit 5cbb26d
Show file tree
Hide file tree
Showing 24 changed files with 1,096 additions and 408 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/db-version.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ on:
workflow_call:

jobs:
db-check:
runs-on: ubuntu-latest
steps:
- run: echo "Checking DB Version"

update-db-version:
runs-on: ubuntu-latest
if: contains(github.event.pull_request.labels.*.name, 'db-migration')
Expand Down
8 changes: 7 additions & 1 deletion .github/workflows/linters.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ jobs:
toml-lint:
runs-on: ubuntu-latest
steps:
- name: Download taplo
run: |
curl -L https://github.com/tamasfe/taplo/releases/download/0.9.3/taplo-linux-x86_64.gz -o taplo.gz
gunzip taplo.gz
chmod +x taplo
mv taplo /usr/local/bin/taplo
- name: Checkout toml files
uses: actions/checkout@v4
- name: Run toml check
run: npx @taplo/[email protected] fmt --config ./taplo/taplo.toml --check
run: taplo fmt --config ./taplo/taplo.toml --check
3 changes: 0 additions & 3 deletions .github/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ jobs:
linters:
name: Run linters
needs: update_db_version
if: ${{ github.event.pull_request.draft == false && always() }}
uses: ./.github/workflows/linters.yml

rust_check:
Expand All @@ -39,14 +38,12 @@ jobs:
coverage:
name: Run Coverage
needs: update_db_version
if: ${{ github.event.pull_request.draft == false && always() }}
secrets: inherit
uses: ./.github/workflows/coverage.yml

build:
name: Build Madara
needs: update_db_version
if: ${{ github.event.pull_request.draft == false && always() }}
uses: ./.github/workflows/build.yml

js_test:
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ Here is a list of all the supported methods with their current status:
| ------ | ------------------------------------------------ |
|| `starknet_unsubscribe` (v0.8.0) |
|| `starknet_subscribeNewHeads` (v0.8.0) |
| | `starknet_subscribeEvents` (v0.8.0) |
| | `starknet_subscribeEvents` (v0.8.0) |
|| `starknet_subscribeTransactionStatus` (v0.8.0) |
|| `starknet_subscribePendingTransactions` (v0.8.0) |
|| `starknet_subscriptionReorg` (v0.8.0) |
Expand Down
1 change: 1 addition & 0 deletions crates/madara/client/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mp-utils = { workspace = true }
blockifier = { workspace = true }
bonsai-trie = { workspace = true }
starknet-types-core = { workspace = true }
starknet-types-rpc = { workspace = true }
starknet_api = { workspace = true }

# Other
Expand Down
29 changes: 29 additions & 0 deletions crates/madara/client/db/src/block_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use mp_state_update::StateDiff;
use rocksdb::WriteOptions;
use starknet_api::core::ChainId;
use starknet_types_core::felt::Felt;
use starknet_types_rpc::EmittedEvent;

type Result<T, E = MadaraStorageError> = std::result::Result<T, E>;

Expand Down Expand Up @@ -313,6 +314,29 @@ impl MadaraBackend {
tracing::debug!("Failed to send block info to subscribers: {e}");
}
}
if self.sender_event.receiver_count() > 0 {
let block_number = block.info.header.block_number;
let block_hash = block.info.block_hash;

block
.inner
.receipts
.iter()
.flat_map(|receipt| {
let tx_hash = receipt.transaction_hash();
receipt.events().iter().map(move |event| (tx_hash, event))
})
.for_each(|(transaction_hash, event)| {
if let Err(e) = self.sender_event.publish(EmittedEvent {
event: event.clone().into(),
block_hash: Some(block_hash),
block_number: Some(block_number),
transaction_hash,
}) {
tracing::debug!("Failed to send event to subscribers: {e}");
}
});
}

// clear pending
tx.delete_cf(&meta, ROW_PENDING_INFO);
Expand Down Expand Up @@ -400,6 +424,11 @@ impl MadaraBackend {
self.sender_block_info.subscribe()
}

#[tracing::instrument(skip(self), fields(module = "BlockDB"))]
pub fn subscribe_events(&self, from_address: Option<Felt>) -> tokio::sync::broadcast::Receiver<EmittedEvent<Felt>> {
self.sender_event.subscribe(from_address)
}

#[tracing::instrument(skip(self, id), fields(module = "BlockDB"))]
pub fn get_block_inner(&self, id: &impl DbBlockIdResolvable) -> Result<Option<MadaraBlockInner>> {
let Some(ty) = id.resolve_db_block_id(self)? else { return Ok(None) };
Expand Down
140 changes: 140 additions & 0 deletions crates/madara/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use rocksdb::{
};
use rocksdb_options::rocksdb_global_options;
use snapshots::Snapshots;
use starknet_types_core::felt::Felt;
use starknet_types_core::hash::{Pedersen, Poseidon, StarkHash};
use starknet_types_rpc::EmittedEvent;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::{fmt, fs};
Expand Down Expand Up @@ -272,6 +274,141 @@ impl Default for TrieLogConfig {
}
}

/// EventChannels manages a highly efficient and scalable pub/sub system for events with 16 specific channels
/// plus one "all" channel. This architecture provides several key benefits:
///
/// Benefits:
/// - Selective Subscription: Subscribers can choose between receiving all events or filtering for specific
/// senders, optimizing network and processing resources
/// - Memory Efficiency: The fixed number of channels (16) provides a good balance between granularity
/// and memory overhead
/// - Predictable Routing: The XOR-based hash function ensures consistent and fast mapping of sender
/// addresses to channels
///
/// Events are distributed based on the sender's address in the event, where each sender address
/// is mapped to one of the 16 specific channels using a simple XOR-based hash function.
/// Subscribers can choose to either receive all events or only events from specific senders
/// by subscribing to the corresponding channel.
pub struct EventChannels {
/// Broadcast channel that receives all events regardless of their sender's address
all_channels: tokio::sync::broadcast::Sender<EmittedEvent<Felt>>,
/// Array of 16 broadcast channels, each handling events from a subset of sender addresses
/// The target channel for an event is determined by the sender's address mapping
specific_channels: [tokio::sync::broadcast::Sender<EmittedEvent<Felt>>; 16],
}

impl EventChannels {
/// Creates a new EventChannels instance with the specified buffer capacity for each channel.
/// Each channel (both all_channels and specific channels) will be able to buffer up to
/// `capacity` events before older events are dropped.
///
/// # Arguments
/// * `capacity` - The maximum number of events that can be buffered in each channel
///
/// # Returns
/// A new EventChannels instance with initialized broadcast channels
pub fn new(capacity: usize) -> Self {
let (all_channels, _) = tokio::sync::broadcast::channel(capacity);

let mut specific_channels = Vec::with_capacity(16);
for _ in 0..16 {
let (sender, _) = tokio::sync::broadcast::channel(capacity);
specific_channels.push(sender);
}

Self { all_channels, specific_channels: specific_channels.try_into().unwrap() }
}

/// Subscribes to events based on an optional sender address filter
///
/// # Arguments
/// * `from_address` - Optional sender address to filter events:
/// * If `Some(address)`, subscribes only to events from senders whose addresses map
/// to the same channel as the provided address (address % 16)
/// * If `None`, subscribes to all events regardless of sender address
///
/// # Returns
/// A broadcast::Receiver that will receive either:
/// * All events (if from_address is None)
/// * Only events from senders whose addresses map to the same channel as the provided address
///
/// # Warning
/// This method only provides a coarse filtering mechanism based on address mapping.
/// You will still need to implement additional filtering in your receiver logic because:
/// * Multiple sender addresses map to the same channel
/// * You may want to match the exact sender address rather than just its channel mapping
///
/// # Implementation Details
/// When a specific address is provided, the method:
/// 1. Calculates the channel index using the sender's address
/// 2. Subscribes to the corresponding specific channel
///
/// This means you'll receive events from all senders whose addresses map to the same channel
pub fn subscribe(&self, from_address: Option<Felt>) -> tokio::sync::broadcast::Receiver<EmittedEvent<Felt>> {
match from_address {
Some(address) => {
let channel_index = self.calculate_channel_index(&address);
self.specific_channels[channel_index].subscribe()
}
None => self.all_channels.subscribe(),
}
}

/// Publishes an event to both the all_channels and the specific channel determined by the sender's address.
/// The event will only be sent to channels that have active subscribers.
///
/// # Arguments
/// * `event` - The event to publish, containing the sender's address that determines the target specific channel
///
/// # Returns
/// * `Ok(usize)` - The sum of the number of subscribers that received the event across both channels
/// * `Ok(0)` - If no subscribers exist in any channel
/// * `Err` - If the event couldn't be sent
pub fn publish(
&self,
event: EmittedEvent<Felt>,
) -> Result<usize, Box<tokio::sync::broadcast::error::SendError<EmittedEvent<Felt>>>> {
let channel_index = self.calculate_channel_index(&event.event.from_address);
let specific_channel = &self.specific_channels[channel_index];

let mut total = 0;
if self.all_channels.receiver_count() > 0 {
total += self.all_channels.send(event.clone())?;
}
if specific_channel.receiver_count() > 0 {
total += specific_channel.send(event)?;
}
Ok(total)
}

pub fn receiver_count(&self) -> usize {
self.all_channels.receiver_count() + self.specific_channels.iter().map(|c| c.receiver_count()).sum::<usize>()
}

/// Calculates the target channel index for a given sender's address
///
/// # Arguments
/// * `address` - The Felt address of the event sender to calculate the channel index for
///
/// # Returns
/// A channel index between 0 and 15, calculated by XORing the two highest limbs of the address
/// and taking the lowest 4 bits of the result.
///
/// # Implementation Details
/// Rather than using the last byte of the address, this function:
/// 1. Gets the raw limbs representation of the address
/// 2. XORs limbs[0] and limbs[1] (the two lowest limbs)
/// 3. Uses the lowest 4 bits of the XOR result to determine the channel
///
/// This provides a balanced distribution of addresses across channels by
/// incorporating entropy from the address
fn calculate_channel_index(&self, address: &Felt) -> usize {
let limbs = address.to_raw();
let hash = limbs[0] ^ limbs[1];
(hash & 0x0f) as usize
}
}

/// Madara client database backend singleton.
pub struct MadaraBackend {
backup_handle: Option<mpsc::Sender<BackupRequest>>,
Expand All @@ -281,6 +418,7 @@ pub struct MadaraBackend {
snapshots: Arc<Snapshots>,
trie_log_config: TrieLogConfig,
sender_block_info: tokio::sync::broadcast::Sender<mp_block::MadaraBlockInfo>,
sender_event: EventChannels,
write_opt_no_wal: WriteOptions,
#[cfg(any(test, feature = "testing"))]
_temp_dir: Option<tempfile::TempDir>,
Expand Down Expand Up @@ -386,6 +524,7 @@ impl MadaraBackend {
snapshots,
trie_log_config: Default::default(),
sender_block_info: tokio::sync::broadcast::channel(100).0,
sender_event: EventChannels::new(100),
write_opt_no_wal: make_write_opt_no_wal(),
_temp_dir: Some(temp_dir),
})
Expand Down Expand Up @@ -445,6 +584,7 @@ impl MadaraBackend {
snapshots,
trie_log_config,
sender_block_info: tokio::sync::broadcast::channel(100).0,
sender_event: EventChannels::new(100),
write_opt_no_wal: make_write_opt_no_wal(),
#[cfg(any(test, feature = "testing"))]
_temp_dir: None,
Expand Down
2 changes: 1 addition & 1 deletion crates/madara/client/gateway/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ starknet-types-rpc.workspace = true
anyhow.workspace = true
bytes.workspace = true
futures.workspace = true
http-body-util.workspace = true
http.workspace = true
http-body-util.workspace = true
hyper = { workspace = true, features = ["full"] }
hyper-tls.workspace = true
hyper-util.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/madara/client/gateway/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ hyper-util.workspace = true
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
thiserror.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/madara/client/mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ mockall = { workspace = true, optional = true }
reqwest.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tokio-util.workspace = true

#Instrumentation
opentelemetry = { workspace = true, features = ["metrics", "logs"] }
Expand Down
Loading

0 comments on commit 5cbb26d

Please sign in to comment.