Skip to content

Commit

Permalink
Cleanup and handle tags without persisting first
Browse files Browse the repository at this point in the history
  • Loading branch information
TonyGiorgio committed May 8, 2024
1 parent 5248bc8 commit 55a91a1
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 79 deletions.
122 changes: 50 additions & 72 deletions mutiny-core/src/federation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use crate::{
storage::{
delete_transaction_details, get_transaction_details, list_payment_info,
persist_payment_info, persist_transaction_details, MutinyStorage, VersionedValue,
TRANSACTION_DETAILS_PREFIX_KEY,
},
utils::sleep,
HTLCStatus, MutinyInvoice, DEFAULT_PAYMENT_TIMEOUT,
};
use crate::{labels::LabelStorage, storage::TRANSACTION_DETAILS_PREFIX_KEY};
use crate::{
utils::{convert_from_fedimint_invoice, convert_to_fedimint_invoice, now, spawn},
TransactionDetails,
Expand Down Expand Up @@ -387,14 +387,14 @@ impl<S: MutinyStorage> FederationClient<S> {
.map(|(h, _i)| h.0),
);

// pending on chain operations
let pending_wallet_txids = self
// confirmed on chain operations
let confirmed_wallet_txids = self
.storage
.scan::<TransactionDetails>(TRANSACTION_DETAILS_PREFIX_KEY, None)?
.into_iter()
.filter(|(_k, v)| match v.confirmation_time {
ConfirmationTime::Unconfirmed { .. } => true, // return all unconfirmed transactions
_ => false, // skip confirmed transactions
ConfirmationTime::Unconfirmed { .. } => false, // skip unconfirmed transactions
ConfirmationTime::Confirmed { .. } => true, // return all confirmed transactions
})
.map(|(_h, i)| i.internal_id)
.collect::<HashSet<Txid>>();
Expand Down Expand Up @@ -431,7 +431,8 @@ impl<S: MutinyStorage> FederationClient<S> {
.map_err(|_| MutinyError::ChainAccessFailed)
.expect("should convert");

if pending_wallet_txids.contains(&internal_id) {
// if already confirmed, no reason to subscribe
if !confirmed_wallet_txids.contains(&internal_id) {
self.subscribe_operation(entry, key.operation_id);
}
} else {
Expand Down Expand Up @@ -533,23 +534,13 @@ impl<S: MutinyStorage> FederationClient<S> {
.get_deposit_address(fedimint_core::time::now() + PEG_IN_TIMEOUT_YEAR, ())
.await?;

let internal_id = Txid::from_slice(&op_id.0).map_err(|_| MutinyError::ChainAccessFailed)?;

// persist the data we can while we wait for the transaction to come from fedimint
let pending_transaction_details = TransactionDetails {
transaction: None,
txid: None,
internal_id,
received: 0,
sent: 0,
fee: None,
confirmation_time: ConfirmationTime::Unconfirmed {
last_seen: now().as_secs(),
},
labels,
};
let address = Address::from_str(&address.to_string())
.expect("should convert")
.assume_checked();

persist_transaction_details(&self.storage, &pending_transaction_details)?;
// persist the labels
self.storage
.set_address_labels(address.clone(), labels.clone())?;

// subscribe
let operation = self
Expand All @@ -560,9 +551,7 @@ impl<S: MutinyStorage> FederationClient<S> {
.expect("just created it");
self.subscribe_operation(operation, op_id);

Ok(Address::from_str(&address.to_string())
.expect("should convert")
.assume_checked())
Ok(address)
}

/// Get the balance of this federation client in sats
Expand Down Expand Up @@ -691,7 +680,7 @@ impl<S: MutinyStorage> FederationClient<S> {
amount: u64,
labels: Vec<String>,
) -> Result<Txid, MutinyError> {
let address = bitcoin30_to_bitcoin29_address(send_to);
let address = bitcoin30_to_bitcoin29_address(send_to.clone());

let btc_amount = fedimint_ln_common::bitcoin::Amount::from_sat(amount);

Expand Down Expand Up @@ -719,11 +708,14 @@ impl<S: MutinyStorage> FederationClient<S> {
confirmation_time: ConfirmationTime::Unconfirmed {
last_seen: now().as_secs(),
},
labels,
labels: labels.clone(),
};

persist_transaction_details(&self.storage, &pending_transaction_details)?;

// persist the labels
self.storage.set_address_labels(send_to, labels)?;

// subscribe
let operation = self
.fedimint_client
Expand Down Expand Up @@ -1127,24 +1119,25 @@ async fn process_operation_until_timeout<S: MutinyStorage>(
} else if module_type == WalletCommonInit::KIND.as_str() {
let wallet_meta: WalletOperationMeta = entry.meta();
let wallet_module = Arc::new(fedimint_client.get_first_module::<WalletClientModule>());
let internal_id = Txid::from_slice(&operation_id.0)
.map_err(|_| MutinyError::ChainAccessFailed)
.expect("should convert");
let stored_transaction_details = get_transaction_details(&storage, internal_id, &logger);
if stored_transaction_details.is_none() {
log_warn!(logger, "could not find transaction details: {internal_id}")
}

match wallet_meta.variant {
fedimint_wallet_client::WalletOperationMetaVariant::Deposit {
address: _,
address,
expires_at: _,
} => {
match wallet_module.subscribe_deposit_updates(operation_id).await {
Ok(o) => {
let labels = match storage.get_address_labels() {
Ok(l) => l.get(&address.to_string()).cloned(),
Err(e) => {
log_warn!(logger, "could not get labels: {e}");
None
}
};

process_onchain_deposit_outcome(
o,
stored_transaction_details,
labels.unwrap_or_default(),
operation_id,
storage,
esplora,
Expand All @@ -1160,16 +1153,24 @@ async fn process_operation_until_timeout<S: MutinyStorage>(
};
}
fedimint_wallet_client::WalletOperationMetaVariant::Withdraw {
address: _,
address,
amount,
fee,
change: _,
} => {
match wallet_module.subscribe_withdraw_updates(operation_id).await {
Ok(o) => {
let labels = match storage.get_address_labels() {
Ok(l) => l.get(&address.to_string()).cloned(),
Err(e) => {
log_warn!(logger, "could not get labels: {e}");
None
}
};

process_onchain_withdraw_outcome(
o,
stored_transaction_details,
labels.unwrap_or_default(),
amount,
fee.amount(),
operation_id,
Expand Down Expand Up @@ -1305,7 +1306,7 @@ where
#[allow(clippy::too_many_arguments)]
async fn process_onchain_withdraw_outcome<S: MutinyStorage>(
stream_or_outcome: UpdateStreamOrOutcome<fedimint_wallet_client::WithdrawState>,
original_transaction_details: Option<TransactionDetails>,
labels: Vec<String>,
amount: fedimint_ln_common::bitcoin::Amount,
fee: fedimint_ln_common::bitcoin::Amount,
operation_id: OperationId,
Expand All @@ -1315,10 +1316,7 @@ async fn process_onchain_withdraw_outcome<S: MutinyStorage>(
stop: Arc<AtomicBool>,
logger: Arc<MutinyLogger>,
) {
let labels = original_transaction_details
.as_ref()
.map(|o| o.labels.clone())
.unwrap_or_default();
let internal_id = Txid::from_slice(&operation_id.0).expect("should convert");

let mut s = stream_or_outcome.into_stream();

Expand Down Expand Up @@ -1346,7 +1344,6 @@ async fn process_onchain_withdraw_outcome<S: MutinyStorage>(
WithdrawState::Succeeded(txid) => {
log_info!(logger, "Withdraw successful: {txid}");

let internal_id = Txid::from_slice(&operation_id.0).expect("should convert");
let txid = Txid::from_slice(&txid).expect("should convert");
let updated_transaction_details = TransactionDetails {
transaction: None,
Expand Down Expand Up @@ -1376,16 +1373,14 @@ async fn process_onchain_withdraw_outcome<S: MutinyStorage>(
WithdrawState::Failed(e) => {
log_error!(logger, "Transaction failed: {e}");

// if we have the original transaction details, delete it
if let Some(t) = original_transaction_details {
match delete_transaction_details(&storage, &t) {
Ok(_) => {
log_info!(logger, "Transaction deleted");
},
Err(e) => {
log_error!(logger, "Error deleting transaction: {e}");
},
}
// Delete the pending tx if it failed
match delete_transaction_details(&storage, internal_id) {
Ok(_) => {
log_info!(logger, "Transaction deleted");
},
Err(e) => {
log_error!(logger, "Error deleting transaction: {e}");
},
}

break;
Expand Down Expand Up @@ -1458,19 +1453,14 @@ async fn subscribe_onchain_confirmation_check<S: MutinyStorage>(
#[allow(clippy::too_many_arguments)]
async fn process_onchain_deposit_outcome<S: MutinyStorage>(
stream_or_outcome: UpdateStreamOrOutcome<fedimint_wallet_client::DepositState>,
original_transaction_details: Option<TransactionDetails>,
labels: Vec<String>,
operation_id: OperationId,
storage: S,
esplora: Arc<AsyncClient>,
timeout: Option<u64>,
stop: Arc<AtomicBool>,
logger: Arc<MutinyLogger>,
) {
let labels = original_transaction_details
.as_ref()
.map(|o| o.labels.clone())
.unwrap_or_default();

let mut s = stream_or_outcome.into_stream();

// break out after sleep time or check stop signal
Expand Down Expand Up @@ -1560,18 +1550,6 @@ async fn process_onchain_deposit_outcome<S: MutinyStorage>(
fedimint_wallet_client::DepositState::Failed(e) => {
log_error!(logger, "Transaction failed: {e}");

// if we have the original transaction details, delete it
if let Some(t) = original_transaction_details {
match delete_transaction_details(&storage, &t) {
Ok(_) => {
log_info!(logger, "Transaction deleted");
},
Err(e) => {
log_error!(logger, "Error deleting transaction: {e}");
},
}
}

break;
}
}
Expand Down
5 changes: 4 additions & 1 deletion mutiny-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2062,7 +2062,10 @@ impl<S: MutinyStorage> MutinyWallet<S> {
}
}

async fn create_address(&self, labels: Vec<String>) -> Result<bitcoin::Address, MutinyError> {
pub async fn create_address(
&self,
labels: Vec<String>,
) -> Result<bitcoin::Address, MutinyError> {
// Attempt to create federation invoice if available
let federation_ids = self.list_federation_ids().await?;
if !federation_ids.is_empty() {
Expand Down
13 changes: 7 additions & 6 deletions mutiny-core/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,19 +926,20 @@ pub(crate) fn persist_transaction_details<S: MutinyStorage>(
Ok(())
}

// Deletes the transaction detail and removes the pending index if it exists
pub(crate) fn delete_transaction_details<S: MutinyStorage>(
storage: &S,
transaction_details: &TransactionDetails,
txid: Txid,
) -> Result<(), MutinyError> {
let key = transaction_details_key(transaction_details.internal_id);
let key = transaction_details_key(txid);
storage.delete(&[key.clone()])?;

// delete from index
// delete the pending index item, if it exists
let index = storage.activity_index();
let mut index = index.try_write()?;
index.insert(IndexItem {
timestamp: None,
key,
index.remove(&IndexItem {
timestamp: None, // timestamp would be None for Unconfirmed
key: key.clone(),
});

Ok(())
Expand Down

0 comments on commit 55a91a1

Please sign in to comment.