Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: process txs as they arrive #810

Merged
merged 5 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions src/new/wallet.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* LICENSE file in the root directory of this source tree.
*/

import { get } from 'lodash';
import { cloneDeep, get } from 'lodash';
import bitcore, { HDPrivateKey } from 'bitcore-lib';
import EventEmitter from 'events';
import { NATIVE_TOKEN_UID, P2SH_ACCT_PATH, P2PKH_ACCT_PATH } from '../constants';
Expand Down Expand Up @@ -51,6 +51,7 @@
import { prepareNanoSendTransaction } from '../nano_contracts/utils';
import { IHistoryTxSchema } from '../schemas';
import GLL from '../sync/gll';
import { checkTxMetadataChanged } from '../sync/utils';

/**
* @typedef {import('../models/create_token_transaction').default} CreateTokenTransaction
Expand Down Expand Up @@ -698,7 +699,9 @@
throw new WalletError('Not implemented.');
}
const uid = token || this.token.uid;
let tokenData = await this.storage.getToken(uid);
// Using clone deep so the balance returned will not be updated in case
// we change the storage
let tokenData = cloneDeep(await this.storage.getToken(uid));
if (tokenData === null) {
// We don't have the token on storage, so we need to return an empty default response
tokenData = {
Expand Down Expand Up @@ -1334,12 +1337,30 @@

newTx.processingStatus = TxHistoryProcessingStatus.PROCESSING;

// Save the transaction in the storage
// Metadata changed MUST be before addTx because we compare the stored tx with the new one.
// So overwriting the stored tx before this would make the check invalid.
const metadataChanged = await checkTxMetadataChanged(newTx, this.storage);
await this.storage.addTx(newTx);

await this.scanAddressesToLoad();

// set state to processing and save current state.
const previousState = this.state;
this.state = HathorWallet.PROCESSING;
// Process history to update metadatas
await this.storage.processHistory();
if (metadataChanged) {
// Process the full history because
// this tx changed the voided state
// XXX in the future we should be able to handle this
// voidness alone, without processing everything
// but for now it's an isolated case and it's easier
await this.storage.processHistory();

Check warning on line 1356 in src/new/wallet.js

View check run for this annotation

Codecov / codecov/patch

src/new/wallet.js#L1356

Added line #L1356 was not covered by tests
} else if (isNewTx) {
// Process this single transaction.
// Handling new metadatas and deleting utxos that are not available anymore
await this.storage.processNewTx(newTx);
}
// restore previous state
this.state = previousState;
pedroferreira1 marked this conversation as resolved.
Show resolved Hide resolved

newTx.processingStatus = TxHistoryProcessingStatus.FINISHED;
// Save the transaction in the storage
Expand Down
4 changes: 4 additions & 0 deletions src/storage/leveldb/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ export default class LevelDBStore implements IStore {
return this.utxoIndex.saveUtxo(utxo);
}

async deleteUtxo(utxo: IUtxo): Promise<void> {
await this.utxoIndex.deleteUtxo(utxo);
}

/**
* Save a locked utxo to the database.
* Used when a new utxo is received but it is either time locked or height locked.
Expand Down
11 changes: 11 additions & 0 deletions src/storage/leveldb/utxo_index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,17 @@ export default class LevelUtxoIndex implements IKVUtxoIndex {
await this.tokenUtxoDB.put(_token_utxo_key(utxo), utxo);
}

/**
* Remove an utxo from the database.
* @param {IUtxo} utxo utxo to be deleted
* @returns {Promise<void>}
*/
async deleteUtxo(utxo: IUtxo): Promise<void> {
await this.utxoDB.del(_utxo_id(utxo));
await this.tokenAddressUtxoDB.del(_token_address_utxo_key(utxo));
await this.tokenUtxoDB.del(_token_utxo_key(utxo));
}

/**
* Save a locked utxo on the database.
*
Expand Down
4 changes: 4 additions & 0 deletions src/storage/memory_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,10 @@ export class MemoryStore implements IStore {
}
}

async deleteUtxo(utxo: IUtxo): Promise<void> {
this.utxos.delete(`${utxo.txId}:${utxo.index}`);
}

/**
* Fetch utxos based on a selection criteria
* @param {IUtxoFilterOptions} options Options to filter utxos
Expand Down
17 changes: 16 additions & 1 deletion src/storage/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ import {
getDefaultLogger,
} from '../types';
import transactionUtils from '../utils/transaction';
import { processHistory as processHistoryUtil, processUtxoUnlock } from '../utils/storage';
import {
processHistory as processHistoryUtil,
processSingleTx as processSingleTxUtil,
processUtxoUnlock,
} from '../utils/storage';
import config, { Config } from '../config';
import { decryptData, checkPassword } from '../utils/crypto';
import FullNodeConnection from '../new/connection';
Expand Down Expand Up @@ -358,6 +362,17 @@ export class Storage implements IStorage {
await processHistoryUtil(this, { rewardLock: this.version?.reward_spend_min_blocks });
}

/**
* Process the transaction history to calculate the metadata.
* @returns {Promise<void>}
*/
async processNewTx(tx: IHistoryTx): Promise<void> {
// Keep tx-timestamp index sorted
await this.store.preProcessHistory();
// Process the single tx we received
await processSingleTxUtil(this, tx, { rewardLock: this.version?.reward_spend_min_blocks });
}

/**
* Iterate on all tokens on the storage.
*
Expand Down
22 changes: 22 additions & 0 deletions src/sync/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Copyright (c) Hathor Labs and its affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

import { IHistoryTx, IStorage } from '../types';

/**
* Currently we only need to check that a transaction has been voided or un-voided.
*/
export async function checkTxMetadataChanged(tx: IHistoryTx, storage: IStorage): Promise<boolean> {
const txId = tx.tx_id;
const storageTx = await storage.getTx(txId);
if (!storageTx) {
// This is a new tx
return false;
}

return tx.is_voided !== storageTx.is_voided;
}
2 changes: 2 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ export interface IStore {
saveLockedUtxo(lockedUtxo: ILockedUtxo): Promise<void>;
iterateLockedUtxos(): AsyncGenerator<ILockedUtxo>;
unlockUtxo(lockedUtxo: ILockedUtxo): Promise<void>;
deleteUtxo(utxoId: IUtxo): Promise<void>;

// Wallet data
getAccessData(): Promise<IWalletAccessData | null>;
Expand Down Expand Up @@ -502,6 +503,7 @@ export interface IStorage {
getSpentTxs(inputs: Input[]): AsyncGenerator<{ tx: IHistoryTx; input: Input; index: number }>;
addTx(tx: IHistoryTx): Promise<void>;
processHistory(): Promise<void>;
processNewTx(tx: IHistoryTx): Promise<void>;

// Tokens
isTokenRegistered(tokenUid: string): Promise<boolean>;
Expand Down
60 changes: 60 additions & 0 deletions src/utils/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
HistorySyncMode,
HistorySyncFunction,
WalletType,
IUtxo,
} from '../types';
import walletApi from '../api/wallet';
import helpers from './helpers';
Expand Down Expand Up @@ -396,6 +397,65 @@
await updateWalletMetadataFromProcessedTxData(storage, { maxIndexUsed, tokens });
}

export async function processSingleTx(
storage: IStorage,
tx: IHistoryTx,
{ rewardLock }: { rewardLock?: number } = {}
): Promise<void> {
const { store } = storage;
const nowTs = Math.floor(Date.now() / 1000);
const currentHeight = await store.getCurrentHeight();

const tokens = new Set<string>();
const processedData = await processNewTx(storage, tx, { rewardLock, nowTs, currentHeight });
const maxIndexUsed = processedData.maxAddressIndex;
for (const token of processedData.tokens) {
tokens.add(token);
}

for (const input of tx.inputs) {
const origTx = await storage.getTx(input.tx_id);
if (!origTx) {
// The tx being spent is not from the wallet.
continue;
}

if (origTx.outputs.length <= input.index) {
throw new Error('Spending an unexistent output');

Check warning on line 424 in src/utils/storage.ts

View check run for this annotation

Codecov / codecov/patch

src/utils/storage.ts#L424

Added line #L424 was not covered by tests
}

const output = origTx.outputs[input.index];
if (!output.decoded.address) {
// Tx is ours but output is not from an address.
continue;

Check warning on line 430 in src/utils/storage.ts

View check run for this annotation

Codecov / codecov/patch

src/utils/storage.ts#L430

Added line #L430 was not covered by tests
}

if (!(await storage.isAddressMine(output.decoded.address))) {
// Address is not ours.
continue;
}

// Now we get the utxo object to be deleted from the store
const utxo: IUtxo = {
txId: input.tx_id,
index: input.index,
token: output.token,
address: output.decoded.address,
authorities: transactionUtils.isAuthorityOutput(output) ? output.value : 0n,
value: output.value,
timelock: output.decoded?.timelock ?? null,
type: origTx.version,
height: origTx.height ?? null,
};

// Delete utxo
await store.deleteUtxo(utxo);
}

// Update wallet data in the store
await updateWalletMetadataFromProcessedTxData(storage, { maxIndexUsed, tokens });
}

/**
* Fetch and save the data of the token set on the storage
* @param {IStorage} storage - Storage to save the tokens.
Expand Down
Loading