From 8d796bc3d6bc69b1002e714115b3927c917fb4d8 Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Wed, 29 Jan 2025 13:19:43 -0300 Subject: [PATCH 1/2] feat(bundles): add retry stats db columns --- migrations/2025.01.30T14.12.03.bundles.add-retry-stats.sql | 7 +++++++ .../down/2025.01.30T14.12.03.bundles.add-retry-stats.sql | 7 +++++++ test/bundles-schema.sql | 5 +++-- 3 files changed, 17 insertions(+), 2 deletions(-) create mode 100644 migrations/2025.01.30T14.12.03.bundles.add-retry-stats.sql create mode 100644 migrations/down/2025.01.30T14.12.03.bundles.add-retry-stats.sql diff --git a/migrations/2025.01.30T14.12.03.bundles.add-retry-stats.sql b/migrations/2025.01.30T14.12.03.bundles.add-retry-stats.sql new file mode 100644 index 00000000..be432fa7 --- /dev/null +++ b/migrations/2025.01.30T14.12.03.bundles.add-retry-stats.sql @@ -0,0 +1,7 @@ +ALTER TABLE bundles ADD COLUMN retry_attempt_count INTEGER; +ALTER TABLE bundles ADD COLUMN first_retried_at INTEGER; +ALTER TABLE bundles ADD COLUMN last_retried_at INTEGER; + +DROP INDEX IF EXISTS import_attempt_last_queued_idx; +CREATE INDEX IF NOT EXISTS import_attempt_last_retried_idx ON bundles (import_attempt_count, last_retried_at); +CREATE INDEX IF NOT EXISTS root_transaction_id_idx ON bundles (root_transaction_id); diff --git a/migrations/down/2025.01.30T14.12.03.bundles.add-retry-stats.sql b/migrations/down/2025.01.30T14.12.03.bundles.add-retry-stats.sql new file mode 100644 index 00000000..e7a542e4 --- /dev/null +++ b/migrations/down/2025.01.30T14.12.03.bundles.add-retry-stats.sql @@ -0,0 +1,7 @@ +ALTER TABLE bundles DROP COLUMN retry_attempt_count; +ALTER TABLE bundles DROP COLUMN first_retried_at; +ALTER TABLE bundles DROP COLUMN last_retried_at; + +CREATE INDEX IF NOT EXISTS import_attempt_last_queued_idx ON bundles (import_attempt_count, last_queued_at); +DROP INDEX IF EXISTS import_attempt_last_retried_idx; +DROP INDEX IF EXISTS root_transaction_id_idx; diff --git a/test/bundles-schema.sql b/test/bundles-schema.sql index 23280e7a..713ca02b 100644 --- a/test/bundles-schema.sql +++ b/test/bundles-schema.sql @@ -54,7 +54,7 @@ CREATE TABLE bundles ( last_unbundled_at INTEGER, first_fully_indexed_at INTEGER, last_fully_indexed_at INTEGER -, root_transaction_id BLOB, import_attempt_count INTEGER, duplicated_data_item_count INTEGER, previous_unbundle_filter_id INTEGER, previous_index_filter_id INTEGER); +, root_transaction_id BLOB, import_attempt_count INTEGER, duplicated_data_item_count INTEGER, previous_unbundle_filter_id INTEGER, previous_index_filter_id INTEGER, retry_attempt_count INTEGER, first_retried_at INTEGER, last_retried_at INTEGER); CREATE INDEX bundles_format_id_idx ON bundles (format_id); CREATE INDEX bundles_last_queued_at_idx ON bundles (last_queued_at); @@ -70,7 +70,6 @@ CREATE INDEX bundles_index_filter_id_idx ON bundles (index_filter_id); CREATE INDEX bundle_data_items_parent_id_filter_id_idx ON bundle_data_items (parent_id, filter_id); -CREATE INDEX import_attempt_last_queued_idx ON bundles (import_attempt_count, last_queued_at); CREATE TABLE new_data_item_tags ( tag_name_hash BLOB NOT NULL, tag_value_hash BLOB NOT NULL, @@ -157,6 +156,8 @@ CREATE INDEX new_data_items_owner_address_id_idx ON new_data_items (owner_addres CREATE INDEX new_data_items_height_indexed_at_idx ON new_data_items (height, indexed_at); CREATE INDEX bundle_data_items_root_transaction_id_idx ON bundle_data_items (root_transaction_id); CREATE INDEX stable_data_items_indexed_at_idx ON stable_data_items (indexed_at); +CREATE INDEX import_attempt_last_retried_idx ON bundles (import_attempt_count, last_retried_at); +CREATE INDEX root_transaction_id_idx ON bundles (root_transaction_id); PRAGMA foreign_keys=OFF; BEGIN TRANSACTION; CREATE TABLE bundle_formats ( From a346215298b9ac20f5a8dc0b19172a417cdbbdd2 Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Thu, 30 Jan 2025 14:12:09 -0300 Subject: [PATCH 2/2] feat(bundles): get bundles to retry based on retry attempts --- src/database/sql/bundles/import.sql | 11 +++ src/database/sql/bundles/repair.sql | 2 +- src/database/standalone-sqlite.test.ts | 118 +++++++++++++++++++++++++ src/database/standalone-sqlite.ts | 16 ++++ src/types.d.ts | 1 + src/workers/bundle-repair-worker.ts | 2 + 6 files changed, 149 insertions(+), 1 deletion(-) diff --git a/src/database/sql/bundles/import.sql b/src/database/sql/bundles/import.sql index 75fa313a..ab9809c6 100644 --- a/src/database/sql/bundles/import.sql +++ b/src/database/sql/bundles/import.sql @@ -54,6 +54,17 @@ RETURNING previous_unbundle_filter_id, previous_index_filter_id; +-- updateBundleRetry +UPDATE bundles +SET + retry_attempt_count = COALESCE(retry_attempt_count, 0) + 1, + first_retried_at = CASE + WHEN first_retried_at IS NULL THEN @current_timestamp + ELSE first_retried_at + END, + last_retried_at = @current_timestamp +WHERE root_transaction_id = @root_transaction_id; + -- insertOrIgnoreWallet INSERT INTO wallets (address, public_modulus) VALUES (@address, @public_modulus) diff --git a/src/database/sql/bundles/repair.sql b/src/database/sql/bundles/repair.sql index 607105c4..bf135183 100644 --- a/src/database/sql/bundles/repair.sql +++ b/src/database/sql/bundles/repair.sql @@ -19,7 +19,7 @@ FROM ( b.matched_data_item_count IS NULL OR b.matched_data_item_count > 0 ) - ORDER BY b.import_attempt_count, b.last_queued_at ASC + ORDER BY b.retry_attempt_count, b.last_retried_at ASC LIMIT @limit ) ORDER BY RANDOM() diff --git a/src/database/standalone-sqlite.test.ts b/src/database/standalone-sqlite.test.ts index e4dcaa3d..82d4cc78 100644 --- a/src/database/standalone-sqlite.test.ts +++ b/src/database/standalone-sqlite.test.ts @@ -46,6 +46,7 @@ import { normalizeAns104DataItem } from '../lib/ans-104.js'; import log from '../log.js'; import { BundleRecord } from '../types.js'; import { processBundleStream } from '../lib/bundles.js'; +import wait from 'wait'; const HEIGHT = 1138; const BLOCK_TX_INDEX = 42; @@ -1173,6 +1174,123 @@ describe('StandaloneSqliteDatabase', () => { }); }); + describe('saveBundleRetries', () => { + const rootTxId1 = '1111111111111111111111111111111111111111111'; + const rootTxId2 = '2222222222222222222222222222222222222222222'; + const bundleId1 = '3333333333333333333333333333333333333333333'; + const bundleId2 = '4444444444444444444444444444444444444444444'; + const bundleId3 = '5555555555555555555555555555555555555555555'; + + const sql = ` + SELECT * + FROM bundles + WHERE id = @id + `; + + beforeEach(async () => { + await db.saveBundle({ + id: bundleId1, + format: 'ans-104', + dataItemCount: 2, + matchedDataItemCount: 2, + rootTransactionId: rootTxId1, + }); + + await db.saveBundle({ + id: bundleId2, + format: 'ans-104', + dataItemCount: 2, + matchedDataItemCount: 2, + rootTransactionId: rootTxId1, + }); + + await db.saveBundle({ + id: bundleId3, + format: 'ans-104', + dataItemCount: 2, + matchedDataItemCount: 2, + rootTransactionId: rootTxId2, + }); + }); + + it('should update all bundles sharing the same root transaction id', async () => { + let bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) }); + let bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) }); + let bundle3 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId3) }); + + assert.equal(bundle1.retry_attempt_count, null); + assert.equal(bundle1.first_retried_at, null); + assert.equal(bundle1.last_retried_at, null); + + assert.equal(bundle2.retry_attempt_count, null); + assert.equal(bundle2.first_retried_at, null); + assert.equal(bundle2.last_retried_at, null); + + assert.equal(bundle3.retry_attempt_count, null); + assert.equal(bundle3.first_retried_at, null); + assert.equal(bundle3.last_retried_at, null); + + await db.saveBundleRetries(rootTxId1); + + bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) }); + bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) }); + bundle3 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId3) }); + + assert.equal(bundle1.retry_attempt_count, 1); + assert.ok(bundle1.first_retried_at !== null); + assert.ok(bundle1.last_retried_at !== null); + + assert.equal(bundle2.retry_attempt_count, 1); + assert.ok(bundle2.first_retried_at !== null); + assert.ok(bundle2.last_retried_at !== null); + + assert.equal(bundle3.retry_attempt_count, null); + assert.equal(bundle3.first_retried_at, null); + assert.equal(bundle3.last_retried_at, null); + + await wait(1000); + + await db.saveBundleRetries(rootTxId1); + + bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) }); + bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) }); + bundle3 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId3) }); + + assert.equal(bundle1.retry_attempt_count, 2); + assert.ok(bundle1.last_retried_at > bundle1.first_retried_at); + + assert.equal(bundle2.retry_attempt_count, 2); + assert.ok(bundle2.last_retried_at > bundle2.first_retried_at); + + assert.equal(bundle3.retry_attempt_count, null); + assert.equal(bundle3.first_retried_at, null); + assert.equal(bundle3.last_retried_at, null); + }); + + it('should update timestamps correctly for multiple bundles', async () => { + await db.saveBundleRetries(rootTxId1); + + let bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) }); + let bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) }); + + assert.equal(bundle1.first_retried_at, bundle2.first_retried_at); + assert.equal(bundle1.last_retried_at, bundle2.last_retried_at); + + await wait(1000); + + await db.saveBundleRetries(rootTxId1); + + bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) }); + bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) }); + + assert.equal(bundle1.first_retried_at, bundle2.first_retried_at); + + assert.equal(bundle1.last_retried_at, bundle2.last_retried_at); + assert.ok(bundle1.last_retried_at > bundle1.first_retried_at); + assert.ok(bundle2.last_retried_at > bundle2.first_retried_at); + }); + }); + describe('getUnverifiedDataIds', () => { it("should return an empty list if there's no unverified data ids", async () => { const emptyDbIds = await db.getUnverifiedDataIds(); diff --git a/src/database/standalone-sqlite.ts b/src/database/standalone-sqlite.ts index 9e93fed3..fdd29b59 100644 --- a/src/database/standalone-sqlite.ts +++ b/src/database/standalone-sqlite.ts @@ -911,6 +911,14 @@ export class StandaloneSqliteDatabaseWorker { this.insertDataItemFn(item, maybeTxHeight); } + saveBundleRetries(rootTransactionId: string) { + const rootTxId = fromB64Url(rootTransactionId); + this.stmts.bundles.updateBundleRetry.run({ + root_transaction_id: rootTxId, + current_timestamp: currentUnixTimestamp(), + }); + } + saveBundle({ id, rootTransactionId, @@ -2923,6 +2931,10 @@ export class StandaloneSqliteDatabase return this.queueWrite('bundles', 'saveDataItem', [item]); } + saveBundleRetries(rootTransactionId: string): Promise { + return this.queueWrite('bundles', 'saveBundleRetries', [rootTransactionId]); + } + saveBundle(bundle: BundleRecord): Promise { return this.queueWrite('bundles', 'saveBundle', [bundle]); } @@ -3341,6 +3353,10 @@ if (!isMainThread) { worker.saveDataItem(args[0]); parentPort?.postMessage(null); break; + case 'saveBundleRetries': + worker.saveBundleRetries(args[0]); + parentPort?.postMessage(null); + break; case 'saveBundle': { const bundle = worker.saveBundle(args[0]); diff --git a/src/types.d.ts b/src/types.d.ts index aa0e28b8..fd3e84ba 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -227,6 +227,7 @@ export interface BundleFilterIds { export interface BundleIndex { saveBundle(bundle: BundleRecord): Promise; + saveBundleRetries(rootTransactionId: string): Promise; getFailedBundleIds(limit: number): Promise; updateBundlesFullyIndexedAt(): Promise; updateBundlesForFilterChange( diff --git a/src/workers/bundle-repair-worker.ts b/src/workers/bundle-repair-worker.ts index a6c905b1..ac8b4ec8 100644 --- a/src/workers/bundle-repair-worker.ts +++ b/src/workers/bundle-repair-worker.ts @@ -68,6 +68,7 @@ export class BundleRepairWorker { config.BUNDLE_REPAIR_RETRY_INTERVAL_SECONDS * 1000, ); this.intervalIds.push(defaultInterval); + const defaultUpdateInterval = setInterval( this.updateBundleTimestamps.bind(this), DEFAULT_UPDATE_INTERVAL_MS, @@ -107,6 +108,7 @@ export class BundleRepairWorker { ); for (const bundleId of bundleIds) { this.log.info('Retrying failed bundle', { bundleId }); + await this.bundleIndex.saveBundleRetries(bundleId); await this.txFetcher.queueTxId({ txId: bundleId }); } } catch (error: any) {