diff --git a/migrations/2025.01.30T14.12.03.bundles.add-retry-stats.sql b/migrations/2025.01.30T14.12.03.bundles.add-retry-stats.sql new file mode 100644 index 00000000..be432fa7 --- /dev/null +++ b/migrations/2025.01.30T14.12.03.bundles.add-retry-stats.sql @@ -0,0 +1,7 @@ +ALTER TABLE bundles ADD COLUMN retry_attempt_count INTEGER; +ALTER TABLE bundles ADD COLUMN first_retried_at INTEGER; +ALTER TABLE bundles ADD COLUMN last_retried_at INTEGER; + +DROP INDEX IF EXISTS import_attempt_last_queued_idx; +CREATE INDEX IF NOT EXISTS import_attempt_last_retried_idx ON bundles (import_attempt_count, last_retried_at); +CREATE INDEX IF NOT EXISTS root_transaction_id_idx ON bundles (root_transaction_id); diff --git a/migrations/down/2025.01.30T14.12.03.bundles.add-retry-stats.sql b/migrations/down/2025.01.30T14.12.03.bundles.add-retry-stats.sql new file mode 100644 index 00000000..e7a542e4 --- /dev/null +++ b/migrations/down/2025.01.30T14.12.03.bundles.add-retry-stats.sql @@ -0,0 +1,7 @@ +ALTER TABLE bundles DROP COLUMN retry_attempt_count; +ALTER TABLE bundles DROP COLUMN first_retried_at; +ALTER TABLE bundles DROP COLUMN last_retried_at; + +CREATE INDEX IF NOT EXISTS import_attempt_last_queued_idx ON bundles (import_attempt_count, last_queued_at); +DROP INDEX IF EXISTS import_attempt_last_retried_idx; +DROP INDEX IF EXISTS root_transaction_id_idx; diff --git a/src/database/sql/bundles/import.sql b/src/database/sql/bundles/import.sql index 75fa313a..ab9809c6 100644 --- a/src/database/sql/bundles/import.sql +++ b/src/database/sql/bundles/import.sql @@ -54,6 +54,17 @@ RETURNING previous_unbundle_filter_id, previous_index_filter_id; +-- updateBundleRetry +UPDATE bundles +SET + retry_attempt_count = COALESCE(retry_attempt_count, 0) + 1, + first_retried_at = CASE + WHEN first_retried_at IS NULL THEN @current_timestamp + ELSE first_retried_at + END, + last_retried_at = @current_timestamp +WHERE root_transaction_id = @root_transaction_id; + -- insertOrIgnoreWallet INSERT INTO wallets (address, public_modulus) VALUES (@address, @public_modulus) diff --git a/src/database/sql/bundles/repair.sql b/src/database/sql/bundles/repair.sql index 607105c4..bf135183 100644 --- a/src/database/sql/bundles/repair.sql +++ b/src/database/sql/bundles/repair.sql @@ -19,7 +19,7 @@ FROM ( b.matched_data_item_count IS NULL OR b.matched_data_item_count > 0 ) - ORDER BY b.import_attempt_count, b.last_queued_at ASC + ORDER BY b.retry_attempt_count, b.last_retried_at ASC LIMIT @limit ) ORDER BY RANDOM() diff --git a/src/database/standalone-sqlite.test.ts b/src/database/standalone-sqlite.test.ts index 03651d6f..662b3957 100644 --- a/src/database/standalone-sqlite.test.ts +++ b/src/database/standalone-sqlite.test.ts @@ -46,6 +46,7 @@ import { normalizeAns104DataItem } from '../lib/ans-104.js'; import log from '../log.js'; import { BundleRecord } from '../types.js'; import { processBundleStream } from '../lib/bundles.js'; +import wait from 'wait'; const HEIGHT = 1138; const BLOCK_TX_INDEX = 42; @@ -1203,6 +1204,123 @@ describe('StandaloneSqliteDatabase', () => { }); }); + describe('saveBundleRetries', () => { + const rootTxId1 = '1111111111111111111111111111111111111111111'; + const rootTxId2 = '2222222222222222222222222222222222222222222'; + const bundleId1 = '3333333333333333333333333333333333333333333'; + const bundleId2 = '4444444444444444444444444444444444444444444'; + const bundleId3 = '5555555555555555555555555555555555555555555'; + + const sql = ` + SELECT * + FROM bundles + WHERE id = @id + `; + + beforeEach(async () => { + await db.saveBundle({ + id: bundleId1, + format: 'ans-104', + dataItemCount: 2, + matchedDataItemCount: 2, + rootTransactionId: rootTxId1, + }); + + await db.saveBundle({ + id: bundleId2, + format: 'ans-104', + dataItemCount: 2, + matchedDataItemCount: 2, + rootTransactionId: rootTxId1, + }); + + await db.saveBundle({ + id: bundleId3, + format: 'ans-104', + dataItemCount: 2, + matchedDataItemCount: 2, + rootTransactionId: rootTxId2, + }); + }); + + it('should update all bundles sharing the same root transaction id', async () => { + let bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) }); + let bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) }); + let bundle3 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId3) }); + + assert.equal(bundle1.retry_attempt_count, null); + assert.equal(bundle1.first_retried_at, null); + assert.equal(bundle1.last_retried_at, null); + + assert.equal(bundle2.retry_attempt_count, null); + assert.equal(bundle2.first_retried_at, null); + assert.equal(bundle2.last_retried_at, null); + + assert.equal(bundle3.retry_attempt_count, null); + assert.equal(bundle3.first_retried_at, null); + assert.equal(bundle3.last_retried_at, null); + + await db.saveBundleRetries(rootTxId1); + + bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) }); + bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) }); + bundle3 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId3) }); + + assert.equal(bundle1.retry_attempt_count, 1); + assert.ok(bundle1.first_retried_at !== null); + assert.ok(bundle1.last_retried_at !== null); + + assert.equal(bundle2.retry_attempt_count, 1); + assert.ok(bundle2.first_retried_at !== null); + assert.ok(bundle2.last_retried_at !== null); + + assert.equal(bundle3.retry_attempt_count, null); + assert.equal(bundle3.first_retried_at, null); + assert.equal(bundle3.last_retried_at, null); + + await wait(1000); + + await db.saveBundleRetries(rootTxId1); + + bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) }); + bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) }); + bundle3 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId3) }); + + assert.equal(bundle1.retry_attempt_count, 2); + assert.ok(bundle1.last_retried_at > bundle1.first_retried_at); + + assert.equal(bundle2.retry_attempt_count, 2); + assert.ok(bundle2.last_retried_at > bundle2.first_retried_at); + + assert.equal(bundle3.retry_attempt_count, null); + assert.equal(bundle3.first_retried_at, null); + assert.equal(bundle3.last_retried_at, null); + }); + + it('should update timestamps correctly for multiple bundles', async () => { + await db.saveBundleRetries(rootTxId1); + + let bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) }); + let bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) }); + + assert.equal(bundle1.first_retried_at, bundle2.first_retried_at); + assert.equal(bundle1.last_retried_at, bundle2.last_retried_at); + + await wait(1000); + + await db.saveBundleRetries(rootTxId1); + + bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) }); + bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) }); + + assert.equal(bundle1.first_retried_at, bundle2.first_retried_at); + + assert.equal(bundle1.last_retried_at, bundle2.last_retried_at); + assert.ok(bundle1.last_retried_at > bundle1.first_retried_at); + assert.ok(bundle2.last_retried_at > bundle2.first_retried_at); + }); + }); + describe('getVerifiableDataIds', () => { it("should return an empty list if there's no verifiable data ids", async () => { const emptyDbIds = await db.getVerifiableDataIds(); diff --git a/src/database/standalone-sqlite.ts b/src/database/standalone-sqlite.ts index 5862a77a..a86d4a01 100644 --- a/src/database/standalone-sqlite.ts +++ b/src/database/standalone-sqlite.ts @@ -911,6 +911,14 @@ export class StandaloneSqliteDatabaseWorker { this.insertDataItemFn(item, maybeTxHeight); } + saveBundleRetries(rootTransactionId: string) { + const rootTxId = fromB64Url(rootTransactionId); + this.stmts.bundles.updateBundleRetry.run({ + root_transaction_id: rootTxId, + current_timestamp: currentUnixTimestamp(), + }); + } + saveBundle({ id, rootTransactionId, @@ -2934,6 +2942,10 @@ export class StandaloneSqliteDatabase return this.queueWrite('bundles', 'saveDataItem', [item]); } + saveBundleRetries(rootTransactionId: string): Promise { + return this.queueWrite('bundles', 'saveBundleRetries', [rootTransactionId]); + } + saveBundle(bundle: BundleRecord): Promise { return this.queueWrite('bundles', 'saveBundle', [bundle]); } @@ -3352,6 +3364,10 @@ if (!isMainThread) { worker.saveDataItem(args[0]); parentPort?.postMessage(null); break; + case 'saveBundleRetries': + worker.saveBundleRetries(args[0]); + parentPort?.postMessage(null); + break; case 'saveBundle': { const bundle = worker.saveBundle(args[0]); diff --git a/src/types.d.ts b/src/types.d.ts index 04f8b653..758b90ed 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -228,6 +228,7 @@ export interface BundleFilterIds { export interface BundleIndex { saveBundle(bundle: BundleRecord): Promise; + saveBundleRetries(rootTransactionId: string): Promise; getFailedBundleIds(limit: number): Promise; updateBundlesFullyIndexedAt(): Promise; updateBundlesForFilterChange( diff --git a/src/workers/bundle-repair-worker.ts b/src/workers/bundle-repair-worker.ts index a6c905b1..ac8b4ec8 100644 --- a/src/workers/bundle-repair-worker.ts +++ b/src/workers/bundle-repair-worker.ts @@ -68,6 +68,7 @@ export class BundleRepairWorker { config.BUNDLE_REPAIR_RETRY_INTERVAL_SECONDS * 1000, ); this.intervalIds.push(defaultInterval); + const defaultUpdateInterval = setInterval( this.updateBundleTimestamps.bind(this), DEFAULT_UPDATE_INTERVAL_MS, @@ -107,6 +108,7 @@ export class BundleRepairWorker { ); for (const bundleId of bundleIds) { this.log.info('Retrying failed bundle', { bundleId }); + await this.bundleIndex.saveBundleRetries(bundleId); await this.txFetcher.queueTxId({ txId: bundleId }); } } catch (error: any) { diff --git a/test/bundles-schema.sql b/test/bundles-schema.sql index 23280e7a..713ca02b 100644 --- a/test/bundles-schema.sql +++ b/test/bundles-schema.sql @@ -54,7 +54,7 @@ CREATE TABLE bundles ( last_unbundled_at INTEGER, first_fully_indexed_at INTEGER, last_fully_indexed_at INTEGER -, root_transaction_id BLOB, import_attempt_count INTEGER, duplicated_data_item_count INTEGER, previous_unbundle_filter_id INTEGER, previous_index_filter_id INTEGER); +, root_transaction_id BLOB, import_attempt_count INTEGER, duplicated_data_item_count INTEGER, previous_unbundle_filter_id INTEGER, previous_index_filter_id INTEGER, retry_attempt_count INTEGER, first_retried_at INTEGER, last_retried_at INTEGER); CREATE INDEX bundles_format_id_idx ON bundles (format_id); CREATE INDEX bundles_last_queued_at_idx ON bundles (last_queued_at); @@ -70,7 +70,6 @@ CREATE INDEX bundles_index_filter_id_idx ON bundles (index_filter_id); CREATE INDEX bundle_data_items_parent_id_filter_id_idx ON bundle_data_items (parent_id, filter_id); -CREATE INDEX import_attempt_last_queued_idx ON bundles (import_attempt_count, last_queued_at); CREATE TABLE new_data_item_tags ( tag_name_hash BLOB NOT NULL, tag_value_hash BLOB NOT NULL, @@ -157,6 +156,8 @@ CREATE INDEX new_data_items_owner_address_id_idx ON new_data_items (owner_addres CREATE INDEX new_data_items_height_indexed_at_idx ON new_data_items (height, indexed_at); CREATE INDEX bundle_data_items_root_transaction_id_idx ON bundle_data_items (root_transaction_id); CREATE INDEX stable_data_items_indexed_at_idx ON stable_data_items (indexed_at); +CREATE INDEX import_attempt_last_retried_idx ON bundles (import_attempt_count, last_retried_at); +CREATE INDEX root_transaction_id_idx ON bundles (root_transaction_id); PRAGMA foreign_keys=OFF; BEGIN TRANSACTION; CREATE TABLE bundle_formats (