Skip to content

Commit

Permalink
refactor callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
tomyrd committed Jan 31, 2025
1 parent 109fcc6 commit 53601a6
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 45 deletions.
13 changes: 13 additions & 0 deletions crates/rust-client/src/note/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,19 @@ impl NoteUpdates {
self.updated_output_notes.extend(other.updated_output_notes);
}

pub fn insert_updates(
&mut self,
input_note: Option<InputNoteRecord>,
output_note: Option<OutputNoteRecord>,
) {
if let Some(input_note) = input_note {
self.updated_input_notes.insert(input_note.id(), input_note);
}
if let Some(output_note) = output_note {
self.updated_output_notes.insert(output_note.id(), output_note);
}
}

/// Returns a mutable reference to the input note record with the provided nullifier if it
/// exists.
pub fn get_input_note_by_nullifier(
Expand Down
117 changes: 72 additions & 45 deletions crates/rust-client/src/sync/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use miden_objects::{
block::{BlockHeader, BlockNumber},
crypto::merkle::{InOrderIndex, MmrDelta, MmrPeaks, PartialMmr},
note::{NoteId, NoteInclusionProof, NoteTag, Nullifier},
transaction::TransactionId,
Digest,
};
use tracing::info;
Expand All @@ -18,7 +19,7 @@ use crate::{
domain::{note::CommittedNote, nullifier::NullifierUpdate, transaction::TransactionUpdate},
NodeRpcClient,
},
store::{InputNoteRecord, NoteFilter, Store, StoreError},
store::{InputNoteRecord, NoteFilter, OutputNoteRecord, Store, StoreError},
transaction::TransactionUpdates,
ClientError,
};
Expand Down Expand Up @@ -80,16 +81,33 @@ pub type OnNoteReceived = Box<
CommittedNote,
BlockHeader,
Arc<Vec<InputNoteRecord>>,
) -> Pin<Box<dyn Future<Output = Result<(NoteUpdates, bool), ClientError>>>>,
) -> Pin<
Box<
dyn Future<
Output = Result<
(Option<InputNoteRecord>, Option<OutputNoteRecord>, bool),
ClientError,
>,
>,
>,
>,
>;

/// TODO: document
pub type OnNullifierReceived = Box<
dyn Fn(
NullifierUpdate,
Arc<Vec<TransactionUpdate>>,
)
-> Pin<Box<dyn Future<Output = Result<(NoteUpdates, TransactionUpdates), ClientError>>>>,
) -> Pin<
Box<
dyn Future<
Output = Result<
(Option<InputNoteRecord>, Option<OutputNoteRecord>, Option<TransactionId>),
ClientError,
>,
>,
>,
>,
>;

// STATE SYNC
Expand Down Expand Up @@ -379,11 +397,13 @@ impl StateSync {
let new_public_notes =
Arc::new(self.fetch_public_note_details(&public_note_ids, &block_header).await?);
for committed_note in note_inclusions {
let (new_note_updates, note_is_relevant) =
let (updated_input_note, updated_output_note, note_is_relevant) =
(self.on_note_received)(committed_note, block_header, new_public_notes.clone())
.await?;

self.state_sync_update.note_updates.extend(new_note_updates);
self.state_sync_update
.note_updates
.insert_updates(updated_input_note, updated_output_note);
found_relevant_note |= note_is_relevant;
}

Expand All @@ -400,12 +420,17 @@ impl StateSync {
.consumed_externally(nullifier_update.nullifier, nullifier_update.block_num)?;
}

let (new_note_updates, new_transaction_update) =
let (updated_input_note, updated_output_note, discarded_transaction) =
(self.on_nullifier_received)(nullifier_update, committed_transactions.clone())
.await?;

self.state_sync_update.note_updates.extend(new_note_updates);
self.state_sync_update.transaction_updates.extend(new_transaction_update);
self.state_sync_update
.note_updates
.insert_updates(updated_input_note, updated_output_note);

if let Some(transaction_id) = discarded_transaction {
self.state_sync_update.transaction_updates.discarded_transaction(transaction_id);
}
}

self.state_sync_update
Expand Down Expand Up @@ -478,9 +503,9 @@ pub async fn on_note_received(
committed_note: CommittedNote,
block_header: BlockHeader,
new_public_notes: Arc<Vec<InputNoteRecord>>,
) -> Result<(NoteUpdates, bool), ClientError> {
let mut updated_input_notes = vec![];
let mut updated_output_notes = vec![];
) -> Result<(Option<InputNoteRecord>, Option<OutputNoteRecord>, bool), ClientError> {
let mut updated_input_note = None;
let mut updated_output_note = None;

let inclusion_proof = NoteInclusionProof::new(
block_header.block_num(),
Expand All @@ -498,12 +523,14 @@ pub async fn on_note_received(
{
// The note belongs to our locally tracked set of input notes
is_tracked_note = true;
block_is_relevant = true; //TODO: Check if this is always true
input_note_record
block_is_relevant = true;
let inclusion_proof_received = input_note_record
.inclusion_proof_received(inclusion_proof.clone(), committed_note.metadata())?;
input_note_record.block_header_received(block_header)?;
let block_header_received = input_note_record.block_header_received(block_header)?;

updated_input_notes.push(input_note_record); // TODO: Only do this if it actually changed
if inclusion_proof_received || block_header_received {
updated_input_note.replace(input_note_record);
}
}

if let Some(mut output_note_record) = store
Expand All @@ -513,9 +540,9 @@ pub async fn on_note_received(
{
// The note belongs to our locally tracked set of output notes
is_tracked_note = true;
output_note_record.inclusion_proof_received(inclusion_proof.clone())?;

updated_output_notes.push(output_note_record); // TODO: Only do this if it actually changed
if output_note_record.inclusion_proof_received(inclusion_proof.clone())? {
updated_output_note.replace(output_note_record);
}
}

if !is_tracked_note {
Expand All @@ -524,7 +551,7 @@ pub async fn on_note_received(
{
// The note wasn't being tracked but it came in the sync response, it means it matched a
// note tag we are tracking and it needs to be inserted in the store
updated_input_notes.push(public_note.clone());
updated_input_note.replace(public_note.clone());

// If the note isn't consumable by the client then the block isn't relevant
block_is_relevant = !NoteScreener::new(store)
Expand All @@ -536,8 +563,7 @@ pub async fn on_note_received(
}
}

Ok((NoteUpdates::new(updated_input_notes, updated_output_notes), block_is_relevant))
//TODO: add insert note functions to note updates
Ok((updated_input_note, updated_output_note, block_is_relevant))
}

/// Default implementation of the [OnNullifierReceived] callback. It queries the store for the notes
Expand All @@ -547,54 +573,55 @@ pub async fn on_nullifier_received(
store: Arc<dyn Store>,
nullifier_update: NullifierUpdate,
transaction_updates: Arc<Vec<TransactionUpdate>>,
) -> Result<(NoteUpdates, TransactionUpdates), ClientError> {
let mut updated_input_notes = vec![];
let mut updated_output_notes = vec![];

let mut discarded_transactions = vec![];
) -> Result<(Option<InputNoteRecord>, Option<OutputNoteRecord>, Option<TransactionId>), ClientError>
{
let mut updated_input_note = None;
let mut updated_output_note = None;
let mut discarded_transaction = None;

if let Some(mut input_note_record) = store
.get_input_notes(NoteFilter::Nullifiers(vec![nullifier_update.nullifier]))
.await?
.pop()
{
if let Some(consumer_transaction) = transaction_updates.iter().find(|t| {
input_note_record
.consumer_transaction_id()
.map_or(false, |id| id == &t.transaction_id)
}) {
let note_changed = if let Some(consumer_transaction) =
transaction_updates.iter().find(|t| {
input_note_record
.consumer_transaction_id()
.map_or(false, |id| id == &t.transaction_id)
}) {
// The note was being processed by a local transaction that just got committed
input_note_record.transaction_committed(
consumer_transaction.transaction_id,
consumer_transaction.block_num,
)?;
)?
} else {
// The note was consumed by an external transaction
if let Some(id) = input_note_record.consumer_transaction_id() {
// The note was being processed by a local transaction that didn't end up being
// committed so it should be discarded
discarded_transactions.push(*id);
discarded_transaction.replace(*id);
}
input_note_record
.consumed_externally(nullifier_update.nullifier, nullifier_update.block_num)?;
}
.consumed_externally(nullifier_update.nullifier, nullifier_update.block_num)?
};

updated_input_notes.push(input_note_record); // TODO: Only do this if it actually changed
if note_changed {
updated_input_note.replace(input_note_record);
}
}

if let Some(mut output_note_record) = store
.get_output_notes(NoteFilter::Nullifiers(vec![nullifier_update.nullifier]))
.await?
.pop()
{
output_note_record
.nullifier_received(nullifier_update.nullifier, nullifier_update.block_num)?;

updated_output_notes.push(output_note_record); // TODO: Only do this if it actually changed
if output_note_record
.nullifier_received(nullifier_update.nullifier, nullifier_update.block_num)?
{
updated_output_note.replace(output_note_record);
}
}

Ok((
NoteUpdates::new(updated_input_notes, updated_output_notes),
TransactionUpdates::new(vec![], discarded_transactions),
))
Ok((updated_input_note, updated_output_note, discarded_transaction))
}
5 changes: 5 additions & 0 deletions crates/rust-client/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ impl TransactionUpdates {
pub fn discarded_transactions(&self) -> &[TransactionId] {
&self.discarded_transactions
}

/// Inserts a committed transaction into the transaction updates.
pub fn discarded_transaction(&mut self, transaction_id: TransactionId) {
self.discarded_transactions.push(transaction_id);
}
}

// TRANSACTION RESULT
Expand Down

0 comments on commit 53601a6

Please sign in to comment.