Skip to content

Commit

Permalink
feat(chain)!: force Persisted<T> to mutate and persist atomically
Browse files Browse the repository at this point in the history
  • Loading branch information
evanlinjin committed Aug 8, 2024
1 parent 6123778 commit f8370ae
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 52 deletions.
45 changes: 29 additions & 16 deletions crates/chain/src/persist.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
use core::{
future::Future,
ops::{Deref, DerefMut},
pin::Pin,
};
use core::{fmt::Debug, future::Future, ops::Deref, pin::Pin};

use alloc::boxed::Box;

Expand Down Expand Up @@ -121,7 +117,7 @@ impl<T> Persisted<T> {
/// Persist staged changes of `T` into `Db`.
///
/// If the database errors, the staged changes will not be cleared.
pub fn persist<Db>(&mut self, db: &mut Db) -> Result<bool, T::PersistError>
fn persist<Db>(&mut self, db: &mut Db) -> Result<bool, T::PersistError>
where
T: PersistWith<Db>,
{
Expand All @@ -134,13 +130,21 @@ impl<T> Persisted<T> {
Ok(true)
}

/// Mutate `T` and persist changes into `Db` atomically.
pub fn mutate<Db, F, R>(&mut self, db: &mut Db, mut mutate: F) -> Result<R, T::PersistError>
where
T: PersistWith<Db>,
F: FnMut(&mut T) -> R,
{
let r = mutate(&mut self.inner);
self.persist(db)?;
Ok(r)
}

/// Persist staged changes of `T` into an async `Db`.
///
/// If the database errors, the staged changes will not be cleared.
pub async fn persist_async<'a, Db>(
&'a mut self,
db: &'a mut Db,
) -> Result<bool, T::PersistError>
async fn persist_async<'a, Db>(&'a mut self, db: &'a mut Db) -> Result<bool, T::PersistError>
where
T: PersistAsyncWith<Db>,
{
Expand All @@ -152,6 +156,21 @@ impl<T> Persisted<T> {
stage.take();
Ok(true)
}

/// Mutate `T` and persist changes into async `Db` atomically.
pub async fn mutate_async<'a, Db, F, R>(
&'a mut self,
db: &'a mut Db,
mut mutate: F,
) -> Result<R, T::PersistError>
where
T: PersistAsyncWith<Db>,
F: FnMut(&mut T) -> R,
{
let r = mutate(&mut self.inner);
self.persist_async(db).await?;
Ok(r)
}
}

impl<T> Deref for Persisted<T> {
Expand All @@ -161,9 +180,3 @@ impl<T> Deref for Persisted<T> {
&self.inner
}
}

impl<T> DerefMut for Persisted<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
1 change: 1 addition & 0 deletions crates/chain/src/spk_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ impl<K: Ord + Clone> FullScanRequest<K> {
/// Data returned from a spk-based blockchain client full scan.
///
/// See also [`FullScanRequest`].
#[derive(Debug, Clone)]
pub struct FullScanResult<K, A = ConfirmationBlockTime> {
/// The update to apply to the receiving [`LocalChain`](crate::local_chain::LocalChain).
pub graph_update: TxGraph<A>,
Expand Down
19 changes: 9 additions & 10 deletions example-crates/wallet_electrum/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ fn main() -> Result<(), anyhow::Error> {
.create_wallet(&mut db)?,
};

let address = wallet.next_unused_address(KeychainKind::External);
wallet.persist(&mut db)?;
let address = wallet.mutate(&mut db, |w| w.next_unused_address(KeychainKind::External))?;
println!("Generated Address: {}", address);

let balance = wallet.balance();
Expand Down Expand Up @@ -71,8 +70,7 @@ fn main() -> Result<(), anyhow::Error> {

println!();

wallet.apply_update(update)?;
wallet.persist(&mut db)?;
wallet.mutate(&mut db, |w| w.apply_update(update.clone()))??;

let balance = wallet.balance();
println!("Wallet balance after syncing: {} sats", balance.total());
Expand All @@ -88,12 +86,13 @@ fn main() -> Result<(), anyhow::Error> {
let faucet_address = Address::from_str("mkHS9ne12qx9pS9VojpwU5xtRd4T7X7ZUt")?
.require_network(Network::Testnet)?;

let mut tx_builder = wallet.build_tx();
tx_builder
.add_recipient(faucet_address.script_pubkey(), SEND_AMOUNT)
.enable_rbf();

let mut psbt = tx_builder.finish()?;
let mut psbt = wallet.mutate(&mut db, |w| -> anyhow::Result<_> {
let mut tx_builder = w.build_tx();
tx_builder
.add_recipient(faucet_address.script_pubkey(), SEND_AMOUNT)
.enable_rbf();
Ok(tx_builder.finish()?)
})??;
let finalized = wallet.sign(&mut psbt, SignOptions::default())?;
assert!(finalized);

Expand Down
19 changes: 9 additions & 10 deletions example-crates/wallet_esplora_async/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ async fn main() -> Result<(), anyhow::Error> {
.create_wallet(&mut conn)?,
};

let address = wallet.next_unused_address(KeychainKind::External);
wallet.persist(&mut conn)?;
let address = wallet.mutate(&mut conn, |w| w.next_unused_address(KeychainKind::External))?;
println!("Next unused address: ({}) {}", address.index, address);

let balance = wallet.balance();
Expand All @@ -60,8 +59,7 @@ async fn main() -> Result<(), anyhow::Error> {
let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs();
let _ = update.graph_update.update_last_seen_unconfirmed(now);

wallet.apply_update(update)?;
wallet.persist(&mut conn)?;
wallet.mutate(&mut conn, |w| w.apply_update(update.clone()))??;
println!();

let balance = wallet.balance();
Expand All @@ -75,12 +73,13 @@ async fn main() -> Result<(), anyhow::Error> {
std::process::exit(0);
}

let mut tx_builder = wallet.build_tx();
tx_builder
.add_recipient(address.script_pubkey(), SEND_AMOUNT)
.enable_rbf();

let mut psbt = tx_builder.finish()?;
let mut psbt = wallet.mutate(&mut conn, |w| -> anyhow::Result<_> {
let mut tx_builder = w.build_tx();
tx_builder
.add_recipient(address.script_pubkey(), SEND_AMOUNT)
.enable_rbf();
Ok(tx_builder.finish()?)
})??;
let finalized = wallet.sign(&mut psbt, SignOptions::default())?;
assert!(finalized);

Expand Down
21 changes: 9 additions & 12 deletions example-crates/wallet_esplora_blocking/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ fn main() -> Result<(), anyhow::Error> {
.create_wallet(&mut db)?,
};

let address = wallet.next_unused_address(KeychainKind::External);
wallet.persist(&mut db)?;
let address = wallet.mutate(&mut db, |w| w.next_unused_address(KeychainKind::External))?;
println!(
"Next unused address: ({}) {}",
address.index, address.address
Expand All @@ -60,10 +59,7 @@ fn main() -> Result<(), anyhow::Error> {
let now = std::time::UNIX_EPOCH.elapsed().unwrap().as_secs();
let _ = update.graph_update.update_last_seen_unconfirmed(now);

wallet.apply_update(update)?;
if let Some(changeset) = wallet.take_staged() {
db.append_changeset(&changeset)?;
}
wallet.mutate(&mut db, |w| w.apply_update(update.clone()))??;
println!();

let balance = wallet.balance();
Expand All @@ -77,12 +73,13 @@ fn main() -> Result<(), anyhow::Error> {
std::process::exit(0);
}

let mut tx_builder = wallet.build_tx();
tx_builder
.add_recipient(address.script_pubkey(), SEND_AMOUNT)
.enable_rbf();

let mut psbt = tx_builder.finish()?;
let mut psbt = wallet.mutate(&mut db, |w| -> anyhow::Result<_> {
let mut tx_builder = w.build_tx();
tx_builder
.add_recipient(address.script_pubkey(), SEND_AMOUNT)
.enable_rbf();
Ok(tx_builder.finish()?)
})??;
let finalized = wallet.sign(&mut psbt, SignOptions::default())?;
assert!(finalized);

Expand Down
10 changes: 6 additions & 4 deletions example-crates/wallet_rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,9 @@ fn main() -> anyhow::Result<()> {
let hash = block_emission.block_hash();
let connected_to = block_emission.connected_to();
let start_apply_block = Instant::now();
wallet.apply_block_connected_to(&block_emission.block, height, connected_to)?;
wallet.persist(&mut db)?;
wallet.mutate(&mut db, |w| {
w.apply_block_connected_to(&block_emission.block, height, connected_to)
})??;
let elapsed = start_apply_block.elapsed().as_secs_f32();
println!(
"Applied block {} at height {} in {}s",
Expand All @@ -155,8 +156,9 @@ fn main() -> anyhow::Result<()> {
}
Emission::Mempool(mempool_emission) => {
let start_apply_mempool = Instant::now();
wallet.apply_unconfirmed_txs(mempool_emission.iter().map(|(tx, time)| (tx, *time)));
wallet.persist(&mut db)?;
wallet.mutate(&mut db, |w| {
w.apply_unconfirmed_txs(mempool_emission.iter().map(|(tx, time)| (tx, *time)))
})?;
println!(
"Applied unconfirmed transactions in {}s",
start_apply_mempool.elapsed().as_secs_f32()
Expand Down

0 comments on commit f8370ae

Please sign in to comment.