Skip to content

Commit

Permalink
Merge pull request #1615 from Oscar-Pepper/fixed_output_batching
Browse files Browse the repository at this point in the history
Fixed output batching
  • Loading branch information
dorianvp authored Jan 31, 2025
2 parents 93385a0 + 2fda65b commit 1652bc3
Show file tree
Hide file tree
Showing 10 changed files with 790 additions and 278 deletions.
11 changes: 7 additions & 4 deletions zingo-sync/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ pub enum FetchRequest {
/// Gets the height of the blockchain from the server.
ChainTip(oneshot::Sender<BlockId>),
/// Gets the specified range of compact blocks from the server (end exclusive).
CompactBlockRange(oneshot::Sender<Vec<CompactBlock>>, Range<BlockHeight>),
CompactBlockRange(
oneshot::Sender<tonic::Streaming<CompactBlock>>,
Range<BlockHeight>,
),
/// Gets the tree states for a specified block height.
TreeState(oneshot::Sender<TreeState>, BlockHeight),
/// Get a full transaction by txid.
Expand Down Expand Up @@ -74,14 +77,14 @@ pub async fn get_chain_height(
pub async fn get_compact_block_range(
fetch_request_sender: UnboundedSender<FetchRequest>,
block_range: Range<BlockHeight>,
) -> Result<Vec<CompactBlock>, ()> {
) -> Result<tonic::Streaming<CompactBlock>, ()> {
let (reply_sender, reply_receiver) = oneshot::channel();
fetch_request_sender
.send(FetchRequest::CompactBlockRange(reply_sender, block_range))
.unwrap();
let compact_blocks = reply_receiver.await.unwrap();
let block_stream = reply_receiver.await.unwrap();

Ok(compact_blocks)
Ok(block_stream)
}

/// Gets the stream of shards (subtree roots)
Expand Down
19 changes: 7 additions & 12 deletions zingo-sync/src/client/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ async fn fetch_from_server(
}
FetchRequest::CompactBlockRange(sender, block_range) => {
tracing::debug!("Fetching compact blocks. {:?}", &block_range);
let compact_blocks = get_block_range(client, block_range).await.unwrap();
sender.send(compact_blocks).unwrap();
let block_stream = get_block_range(client, block_range).await.unwrap();
sender.send(block_stream).unwrap();
}
FetchRequest::GetSubtreeRoots(sender, start_index, shielded_protocol, max_entries) => {
tracing::debug!(
Expand Down Expand Up @@ -169,13 +169,11 @@ async fn get_latest_block(

Ok(client.get_latest_block(request).await.unwrap().into_inner())
}

async fn get_block_range(
client: &mut CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
block_range: Range<BlockHeight>,
) -> Result<Vec<CompactBlock>, ()> {
let mut compact_blocks: Vec<CompactBlock> =
Vec::with_capacity(u64::from(block_range.end - block_range.start) as usize);

) -> Result<tonic::Streaming<CompactBlock>, ()> {
let request = tonic::Request::new(BlockRange {
start: Some(BlockId {
height: u64::from(block_range.start),
Expand All @@ -186,13 +184,8 @@ async fn get_block_range(
hash: vec![],
}),
});
let mut block_stream = client.get_block_range(request).await.unwrap().into_inner();

while let Some(compact_block) = block_stream.message().await.unwrap() {
compact_blocks.push(compact_block);
}

Ok(compact_blocks)
Ok(client.get_block_range(request).await.unwrap().into_inner())
}

async fn get_subtree_roots(
Expand All @@ -206,12 +199,14 @@ async fn get_subtree_roots(
shielded_protocol,
max_entries,
};

Ok(client
.get_subtree_roots(request)
.await
.unwrap()
.into_inner())
}

async fn get_tree_state(
client: &mut CompactTxStreamerClient<zingo_netutils::UnderlyingService>,
block_height: BlockHeight,
Expand Down
21 changes: 21 additions & 0 deletions zingo-sync/src/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,28 @@ impl SyncState {
}
}

/// Returns the highest block height that has been scanned.
///
/// If no scan ranges have been scanned, returns the block below the wallet birthday.
/// Will panic if called before scan ranges are updated for the first time.
pub fn highest_scanned_height(&self) -> BlockHeight {
if let Some(last_scanned_range) = self
.scan_ranges()
.iter()
.filter(|scan_range| scan_range.priority() == ScanPriority::Scanned)
.last()
{
last_scanned_range.block_range().end - 1
} else {
self.wallet_birthday()
.expect("scan ranges always non-empty")
- 1
}
}

/// Returns the wallet birthday or `None` if `self.scan_ranges` is empty.
///
/// If the wallet birthday is below the sapling activation height, returns the sapling activation height instead.
pub fn wallet_birthday(&self) -> Option<BlockHeight> {
self.scan_ranges()
.first()
Expand Down
173 changes: 70 additions & 103 deletions zingo-sync/src/scan.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
use std::{
cmp,
collections::{BTreeMap, BTreeSet, HashMap},
};
use std::collections::{BTreeMap, BTreeSet, HashMap};

use orchard::tree::MerkleHashOrchard;
use task::ScanTask;
use tokio::sync::mpsc;

use incrementalmerkletree::Position;
use zcash_client_backend::{data_api::scanning::ScanRange, proto::compact_formats::CompactBlock};
use zcash_client_backend::proto::compact_formats::CompactBlock;
use zcash_keys::keys::UnifiedFullViewingKey;
use zcash_primitives::{
consensus::{BlockHeight, NetworkUpgrade, Parameters},
consensus::{self, BlockHeight},
transaction::TxId,
zip32::AccountId,
};

use crate::{
client::{self, FetchRequest},
keys::transparent::TransparentAddressId,
client::FetchRequest,
primitives::{Locator, NullifierMap, OutPointMap, OutputId, WalletBlock, WalletTransaction},
witness::{self, LocatedTreeData, WitnessData},
};
Expand All @@ -32,7 +29,8 @@ pub(crate) mod task;
pub(crate) mod transactions;

struct InitialScanData {
previous_block: Option<WalletBlock>,
start_seam_block: Option<WalletBlock>,
end_seam_block: Option<WalletBlock>,
sapling_initial_tree_size: u32,
orchard_initial_tree_size: u32,
}
Expand All @@ -42,79 +40,35 @@ impl InitialScanData {
fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
consensus_parameters: &P,
first_block: &CompactBlock,
previous_wallet_block: Option<WalletBlock>,
start_seam_block: Option<WalletBlock>,
end_seam_block: Option<WalletBlock>,
) -> Result<Self, ()>
where
P: Parameters + Sync + Send + 'static,
P: consensus::Parameters + Sync + Send + 'static,
{
// gets initial tree size from previous block if available
// otherwise, from first block if available
// otherwise, fetches frontiers from server
let (sapling_initial_tree_size, orchard_initial_tree_size) = if let Some(prev) =
&previous_wallet_block
{
(
prev.tree_boundaries().sapling_final_tree_size,
prev.tree_boundaries().orchard_final_tree_size,
)
} else if let Some(chain_metadata) = &first_block.chain_metadata {
// calculate initial tree size by subtracting number of outputs in block from the blocks final tree size
let sapling_output_count: u32 = first_block
.vtx
.iter()
.map(|tx| tx.outputs.len())
.sum::<usize>()
.try_into()
.expect("Sapling output count cannot exceed a u32");
let orchard_output_count: u32 = first_block
.vtx
.iter()
.map(|tx| tx.actions.len())
.sum::<usize>()
.try_into()
.expect("Sapling output count cannot exceed a u32");

(
chain_metadata
.sapling_commitment_tree_size
.checked_sub(sapling_output_count)
.unwrap(),
chain_metadata
.orchard_commitment_tree_size
.checked_sub(orchard_output_count)
.unwrap(),
)
} else {
let sapling_activation_height = consensus_parameters
.activation_height(NetworkUpgrade::Sapling)
.expect("should have some sapling activation height");

match first_block.height().cmp(&sapling_activation_height) {
cmp::Ordering::Greater => {
let frontiers =
client::get_frontiers(fetch_request_sender, first_block.height() - 1)
.await
.unwrap();
(
frontiers
.final_sapling_tree()
.tree_size()
.try_into()
.expect("should not be more than 2^32 note commitments in the tree!"),
frontiers
.final_orchard_tree()
.tree_size()
.try_into()
.expect("should not be more than 2^32 note commitments in the tree!"),
)
}
cmp::Ordering::Equal => (0, 0),
cmp::Ordering::Less => panic!("pre-sapling not supported!"),
}
};
let (sapling_initial_tree_size, orchard_initial_tree_size) =
if let Some(prev) = &start_seam_block {
(
prev.tree_boundaries().sapling_final_tree_size,
prev.tree_boundaries().orchard_final_tree_size,
)
} else {
let tree_boundaries = compact_blocks::calculate_block_tree_boundaries(
consensus_parameters,
fetch_request_sender,
first_block,
)
.await;

(
tree_boundaries.sapling_initial_tree_size,
tree_boundaries.orchard_initial_tree_size,
)
};

Ok(InitialScanData {
previous_block: previous_wallet_block,
start_seam_block,
end_seam_block,
sapling_initial_tree_size,
orchard_initial_tree_size,
})
Expand Down Expand Up @@ -154,41 +108,56 @@ impl DecryptedNoteData {

/// Scans a given range and returns all data relevant to the specified keys.
///
/// `previous_wallet_block` is the wallet block with height [scan_range.start - 1].
/// `start_seam_block` and `end_seam_block` are the blocks adjacent to the `scan_range` for verification of continuity.
/// `locators` are the block height and txid of transactions in the `scan_range` that are known to be relevant to the
/// wallet and are appended to during scanning if trial decryption succeeds. If there are no known relevant transctions
/// then `locators` will start empty.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn scan<P>(
fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
parameters: &P,
consensus_parameters: &P,
ufvks: &HashMap<AccountId, UnifiedFullViewingKey>,
scan_range: ScanRange,
previous_wallet_block: Option<WalletBlock>,
mut locators: BTreeSet<Locator>,
transparent_addresses: HashMap<String, TransparentAddressId>,
scan_task: ScanTask,
) -> Result<ScanResults, ScanError>
where
P: Parameters + Sync + Send + 'static,
P: consensus::Parameters + Sync + Send + 'static,
{
let compact_blocks = client::get_compact_block_range(
fetch_request_sender.clone(),
scan_range.block_range().clone(),
)
.await
.unwrap();
let ScanTask {
compact_blocks,
scan_range,
start_seam_block,
end_seam_block,
mut locators,
transparent_addresses,
} = scan_task;

if compact_blocks
.first()
.expect("compacts blocks should not be empty")
.height
!= scan_range.block_range().start.into()
|| compact_blocks
.last()
.expect("compacts blocks should not be empty")
.height
!= (scan_range.block_range().end - 1).into()
{
panic!("compact blocks do not match scan range!")
}

let initial_scan_data = InitialScanData::new(
fetch_request_sender.clone(),
parameters,
consensus_parameters,
compact_blocks
.first()
.expect("compacts blocks should not be empty"),
previous_wallet_block,
start_seam_block,
end_seam_block,
)
.await
.unwrap();

let consensus_parameters_clone = parameters.clone();
let consensus_parameters_clone = consensus_parameters.clone();
let ufvks_clone = ufvks.clone();
let scan_data = tokio::task::spawn_blocking(move || {
scan_compact_blocks(
Expand All @@ -214,7 +183,7 @@ where
let mut outpoints = OutPointMap::new();
let wallet_transactions = scan_transactions(
fetch_request_sender,
parameters,
consensus_parameters,
ufvks,
locators,
decrypted_note_data,
Expand All @@ -232,15 +201,13 @@ where
orchard_leaves_and_retentions,
} = witness_data;

let sapling_located_trees = tokio::task::spawn_blocking(move || {
witness::build_located_trees(sapling_initial_position, sapling_leaves_and_retentions)
.unwrap()
})
.await
.unwrap();
let orchard_located_trees = tokio::task::spawn_blocking(move || {
witness::build_located_trees(orchard_initial_position, orchard_leaves_and_retentions)
.unwrap()
let (sapling_located_trees, orchard_located_trees) = tokio::task::spawn_blocking(move || {
(
witness::build_located_trees(sapling_initial_position, sapling_leaves_and_retentions)
.unwrap(),
witness::build_located_trees(orchard_initial_position, orchard_leaves_and_retentions)
.unwrap(),
)
})
.await
.unwrap();
Expand Down
Loading

0 comments on commit 1652bc3

Please sign in to comment.