Skip to content

Commit

Permalink
feat(chain)!: force Persisted<T> to mutate and persist atomically
Browse files Browse the repository at this point in the history
  • Loading branch information
evanlinjin committed Aug 8, 2024
1 parent 6123778 commit 8d58e7a
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 66 deletions.
45 changes: 29 additions & 16 deletions crates/chain/src/persist.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use core::{
future::Future,
ops::{Deref, DerefMut},
pin::Pin,
};
use core::{fmt::Debug, future::Future, ops::Deref, pin::Pin};

use alloc::boxed::Box;

Expand Down Expand Up @@ -121,7 +117,7 @@ impl<T> Persisted<T> {
/// Persist staged changes of `T` into `Db`.
///
/// If the database errors, the staged changes will not be cleared.
pub fn persist<Db>(&mut self, db: &mut Db) -> Result<bool, T::PersistError>
fn persist<Db>(&mut self, db: &mut Db) -> Result<bool, T::PersistError>
where
T: PersistWith<Db>,
{
Expand All @@ -134,13 +130,21 @@ impl<T> Persisted<T> {
Ok(true)
}

/// Mutate `T` and persist changes into `Db` atomically.
pub fn mutate<Db, F, R>(&mut self, db: &mut Db, mut mutate: F) -> Result<R, T::PersistError>
where
T: PersistWith<Db>,
F: FnMut(&mut T) -> R,
{
let r = mutate(&mut self.inner);
self.persist(db)?;
Ok(r)
}

/// Persist staged changes of `T` into an async `Db`.
///
/// If the database errors, the staged changes will not be cleared.
pub async fn persist_async<'a, Db>(
&'a mut self,
db: &'a mut Db,
) -> Result<bool, T::PersistError>
async fn persist_async<'a, Db>(&'a mut self, db: &'a mut Db) -> Result<bool, T::PersistError>
where
T: PersistAsyncWith<Db>,
{
Expand All @@ -152,6 +156,21 @@ impl<T> Persisted<T> {
stage.take();
Ok(true)
}

/// Mutate `T` and persist changes into async `Db` atomically.
pub async fn mutate_async<'a, Db, F, R>(
&'a mut self,
db: &'a mut Db,
mut mutate: F,
) -> Result<R, T::PersistError>
where
T: PersistAsyncWith<Db>,
F: FnMut(&mut T) -> R,
{
let r = mutate(&mut self.inner);
self.persist_async(db).await?;
Ok(r)
}
}

impl<T> Deref for Persisted<T> {
Expand All @@ -161,9 +180,3 @@ impl<T> Deref for Persisted<T> {
&self.inner
}
}

impl<T> DerefMut for Persisted<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
1 change: 1 addition & 0 deletions crates/chain/src/spk_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ impl<K: Ord + Clone> FullScanRequest<K> {
/// Data returned from a spk-based blockchain client full scan.
///
/// See also [`FullScanRequest`].
#[derive(Debug, Clone)]
pub struct FullScanResult<K, A = ConfirmationBlockTime> {
/// The update to apply to the receiving [`LocalChain`](crate::local_chain::LocalChain).
pub graph_update: TxGraph<A>,
Expand Down
8 changes: 4 additions & 4 deletions crates/wallet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ let mut wallet = match wallet_opt {
.expect("wallet"),
};
// Get a new address to receive bitcoin.
let receive_address = wallet.reveal_next_address(KeychainKind::External);
// Persist staged wallet data changes to the file store.
wallet.persist(&mut db).expect("persist");
// Get a new address to receive bitcoin and persist staged wallet data changes to the file store.
let receive_address = wallet
.mutate(&mut db, |w| w.reveal_next_address(KeychainKind::External))
.expect("persist");
println!("Your new receive address is: {}", receive_address.address);
```

Expand Down
9 changes: 3 additions & 6 deletions crates/wallet/src/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,9 +581,6 @@ impl Wallet {
/// index defined in [BIP32](https://github.com/bitcoin/bips/blob/master/bip-0032.mediawiki),
/// then the last revealed address will be returned.
///
/// **WARNING**: To avoid address reuse you must persist the changes resulting from one or more
/// calls to this method before closing the wallet. For example:
///
/// ```rust,no_run
/// # use bdk_wallet::{LoadParams, ChangeSet, KeychainKind};
/// use bdk_chain::rusqlite::Connection;
Expand All @@ -592,10 +589,10 @@ impl Wallet {
/// .load_wallet(&mut conn)
/// .expect("database is okay")
/// .expect("database has data");
/// let next_address = wallet.reveal_next_address(KeychainKind::External);
/// wallet.persist(&mut conn).expect("write is okay");
/// let next_address = wallet
/// .mutate(&mut conn, |w| w.reveal_next_address(KeychainKind::External))
/// .expect("write is okay");
///
/// // Now it's safe to show the user their next address!
/// println!("Next address: {}", next_address.address);
/// # Ok::<(), anyhow::Error>(())
/// ```
Expand Down
7 changes: 3 additions & 4 deletions crates/wallet/tests/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,9 @@ fn wallet_is_persisted() -> anyhow::Result<()> {
let mut wallet = Wallet::create(external_desc, internal_desc)
.network(Network::Testnet)
.create_wallet(&mut db)?;
wallet.reveal_next_address(KeychainKind::External);

// persist new wallet changes
assert!(wallet.persist(&mut db)?, "must write");
wallet
.mutate(&mut db, |w| w.reveal_next_address(KeychainKind::External))
.expect("must write changes");
wallet.spk_index().clone()
};

Expand Down
19 changes: 9 additions & 10 deletions example-crates/wallet_electrum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ fn main() -> Result<(), anyhow::Error> {
.create_wallet(&mut db)?,
};

let address = wallet.next_unused_address(KeychainKind::External);
wallet.persist(&mut db)?;
let address = wallet.mutate(&mut db, |w| w.next_unused_address(KeychainKind::External))?;
println!("Generated Address: {}", address);

let balance = wallet.balance();
Expand Down Expand Up @@ -71,8 +70,7 @@ fn main() -> Result<(), anyhow::Error> {

println!();

wallet.apply_update(update)?;
wallet.persist(&mut db)?;
wallet.mutate(&mut db, |w| w.apply_update(update.clone()))??;

let balance = wallet.balance();
println!("Wallet balance after syncing: {} sats", balance.total());
Expand All @@ -88,12 +86,13 @@ fn main() -> Result<(), anyhow::Error> {
let faucet_address = Address::from_str("mkHS9ne12qx9pS9VojpwU5xtRd4T7X7ZUt")?
.require_network(Network::Testnet)?;

let mut tx_builder = wallet.build_tx();
tx_builder
.add_recipient(faucet_address.script_pubkey(), SEND_AMOUNT)
.enable_rbf();

let mut psbt = tx_builder.finish()?;
let mut psbt = wallet.mutate(&mut db, |w| -> anyhow::Result<_> {
let mut tx_builder = w.build_tx();
tx_builder
.add_recipient(faucet_address.script_pubkey(), SEND_AMOUNT)
.enable_rbf();
Ok(tx_builder.finish()?)
})??;
let finalized = wallet.sign(&mut psbt, SignOptions::default())?;
assert!(finalized);

Expand Down
19 changes: 9 additions & 10 deletions example-crates/wallet_esplora_async/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ async fn main() -> Result<(), anyhow::Error> {
.create_wallet(&mut conn)?,
};

let address = wallet.next_unused_address(KeychainKind::External);
wallet.persist(&mut conn)?;
let address = wallet.mutate(&mut conn, |w| w.next_unused_address(KeychainKind::External))?;
println!("Next unused address: ({}) {}", address.index, address);

let balance = wallet.balance();
Expand All @@ -60,8 +59,7 @@ async fn main() -> Result<(), anyhow::Error> {
let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs();
let _ = update.graph_update.update_last_seen_unconfirmed(now);

wallet.apply_update(update)?;
wallet.persist(&mut conn)?;
wallet.mutate(&mut conn, |w| w.apply_update(update.clone()))??;
println!();

let balance = wallet.balance();
Expand All @@ -75,12 +73,13 @@ async fn main() -> Result<(), anyhow::Error> {
std::process::exit(0);
}

let mut tx_builder = wallet.build_tx();
tx_builder
.add_recipient(address.script_pubkey(), SEND_AMOUNT)
.enable_rbf();

let mut psbt = tx_builder.finish()?;
let mut psbt = wallet.mutate(&mut conn, |w| -> anyhow::Result<_> {
let mut tx_builder = w.build_tx();
tx_builder
.add_recipient(address.script_pubkey(), SEND_AMOUNT)
.enable_rbf();
Ok(tx_builder.finish()?)
})??;
let finalized = wallet.sign(&mut psbt, SignOptions::default())?;
assert!(finalized);

Expand Down
21 changes: 9 additions & 12 deletions example-crates/wallet_esplora_blocking/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ fn main() -> Result<(), anyhow::Error> {
.create_wallet(&mut db)?,
};

let address = wallet.next_unused_address(KeychainKind::External);
wallet.persist(&mut db)?;
let address = wallet.mutate(&mut db, |w| w.next_unused_address(KeychainKind::External))?;
println!(
"Next unused address: ({}) {}",
address.index, address.address
Expand All @@ -60,10 +59,7 @@ fn main() -> Result<(), anyhow::Error> {
let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs();
let _ = update.graph_update.update_last_seen_unconfirmed(now);

wallet.apply_update(update)?;
if let Some(changeset) = wallet.take_staged() {
db.append_changeset(&changeset)?;
}
wallet.mutate(&mut db, |w| w.apply_update(update.clone()))??;
println!();

let balance = wallet.balance();
Expand All @@ -77,12 +73,13 @@ fn main() -> Result<(), anyhow::Error> {
std::process::exit(0);
}

let mut tx_builder = wallet.build_tx();
tx_builder
.add_recipient(address.script_pubkey(), SEND_AMOUNT)
.enable_rbf();

let mut psbt = tx_builder.finish()?;
let mut psbt = wallet.mutate(&mut db, |w| -> anyhow::Result<_> {
let mut tx_builder = w.build_tx();
tx_builder
.add_recipient(address.script_pubkey(), SEND_AMOUNT)
.enable_rbf();
Ok(tx_builder.finish()?)
})??;
let finalized = wallet.sign(&mut psbt, SignOptions::default())?;
assert!(finalized);

Expand Down
10 changes: 6 additions & 4 deletions example-crates/wallet_rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,9 @@ fn main() -> anyhow::Result<()> {
let hash = block_emission.block_hash();
let connected_to = block_emission.connected_to();
let start_apply_block = Instant::now();
wallet.apply_block_connected_to(&block_emission.block, height, connected_to)?;
wallet.persist(&mut db)?;
wallet.mutate(&mut db, |w| {
w.apply_block_connected_to(&block_emission.block, height, connected_to)
})??;
let elapsed = start_apply_block.elapsed().as_secs_f32();
println!(
"Applied block {} at height {} in {}s",
Expand All @@ -155,8 +156,9 @@ fn main() -> anyhow::Result<()> {
}
Emission::Mempool(mempool_emission) => {
let start_apply_mempool = Instant::now();
wallet.apply_unconfirmed_txs(mempool_emission.iter().map(|(tx, time)| (tx, *time)));
wallet.persist(&mut db)?;
wallet.mutate(&mut db, |w| {
w.apply_unconfirmed_txs(mempool_emission.iter().map(|(tx, time)| (tx, *time)))
})?;
println!(
"Applied unconfirmed transactions in {}s",
start_apply_mempool.elapsed().as_secs_f32()
Expand Down

0 comments on commit 8d58e7a

Please sign in to comment.