Skip to content

Commit

Permalink
feat(bundles): get bundles to retry based on retry attempts
Browse files Browse the repository at this point in the history
  • Loading branch information
karlprieb committed Jan 30, 2025
1 parent 8d796bc commit a346215
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 1 deletion.
11 changes: 11 additions & 0 deletions src/database/sql/bundles/import.sql
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ RETURNING
previous_unbundle_filter_id,
previous_index_filter_id;

-- updateBundleRetry
UPDATE bundles
SET
retry_attempt_count = COALESCE(retry_attempt_count, 0) + 1,
first_retried_at = CASE
WHEN first_retried_at IS NULL THEN @current_timestamp
ELSE first_retried_at
END,
last_retried_at = @current_timestamp
WHERE root_transaction_id = @root_transaction_id;

-- insertOrIgnoreWallet
INSERT INTO wallets (address, public_modulus)
VALUES (@address, @public_modulus)
Expand Down
2 changes: 1 addition & 1 deletion src/database/sql/bundles/repair.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ FROM (
b.matched_data_item_count IS NULL
OR b.matched_data_item_count > 0
)
ORDER BY b.import_attempt_count, b.last_queued_at ASC
ORDER BY b.retry_attempt_count, b.last_retried_at ASC
LIMIT @limit
)
ORDER BY RANDOM()
Expand Down
118 changes: 118 additions & 0 deletions src/database/standalone-sqlite.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import { normalizeAns104DataItem } from '../lib/ans-104.js';
import log from '../log.js';
import { BundleRecord } from '../types.js';
import { processBundleStream } from '../lib/bundles.js';
import wait from 'wait';

const HEIGHT = 1138;
const BLOCK_TX_INDEX = 42;
Expand Down Expand Up @@ -1173,6 +1174,123 @@ describe('StandaloneSqliteDatabase', () => {
});
});

describe('saveBundleRetries', () => {
const rootTxId1 = '1111111111111111111111111111111111111111111';
const rootTxId2 = '2222222222222222222222222222222222222222222';
const bundleId1 = '3333333333333333333333333333333333333333333';
const bundleId2 = '4444444444444444444444444444444444444444444';
const bundleId3 = '5555555555555555555555555555555555555555555';

const sql = `
SELECT *
FROM bundles
WHERE id = @id
`;

beforeEach(async () => {
await db.saveBundle({
id: bundleId1,
format: 'ans-104',
dataItemCount: 2,
matchedDataItemCount: 2,
rootTransactionId: rootTxId1,
});

await db.saveBundle({
id: bundleId2,
format: 'ans-104',
dataItemCount: 2,
matchedDataItemCount: 2,
rootTransactionId: rootTxId1,
});

await db.saveBundle({
id: bundleId3,
format: 'ans-104',
dataItemCount: 2,
matchedDataItemCount: 2,
rootTransactionId: rootTxId2,
});
});

it('should update all bundles sharing the same root transaction id', async () => {
let bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) });
let bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) });
let bundle3 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId3) });

assert.equal(bundle1.retry_attempt_count, null);
assert.equal(bundle1.first_retried_at, null);
assert.equal(bundle1.last_retried_at, null);

assert.equal(bundle2.retry_attempt_count, null);
assert.equal(bundle2.first_retried_at, null);
assert.equal(bundle2.last_retried_at, null);

assert.equal(bundle3.retry_attempt_count, null);
assert.equal(bundle3.first_retried_at, null);
assert.equal(bundle3.last_retried_at, null);

await db.saveBundleRetries(rootTxId1);

bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) });
bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) });
bundle3 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId3) });

assert.equal(bundle1.retry_attempt_count, 1);
assert.ok(bundle1.first_retried_at !== null);
assert.ok(bundle1.last_retried_at !== null);

assert.equal(bundle2.retry_attempt_count, 1);
assert.ok(bundle2.first_retried_at !== null);
assert.ok(bundle2.last_retried_at !== null);

assert.equal(bundle3.retry_attempt_count, null);
assert.equal(bundle3.first_retried_at, null);
assert.equal(bundle3.last_retried_at, null);

await wait(1000);

await db.saveBundleRetries(rootTxId1);

bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) });
bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) });
bundle3 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId3) });

assert.equal(bundle1.retry_attempt_count, 2);
assert.ok(bundle1.last_retried_at > bundle1.first_retried_at);

assert.equal(bundle2.retry_attempt_count, 2);
assert.ok(bundle2.last_retried_at > bundle2.first_retried_at);

assert.equal(bundle3.retry_attempt_count, null);
assert.equal(bundle3.first_retried_at, null);
assert.equal(bundle3.last_retried_at, null);
});

it('should update timestamps correctly for multiple bundles', async () => {
await db.saveBundleRetries(rootTxId1);

let bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) });
let bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) });

assert.equal(bundle1.first_retried_at, bundle2.first_retried_at);
assert.equal(bundle1.last_retried_at, bundle2.last_retried_at);

await wait(1000);

await db.saveBundleRetries(rootTxId1);

bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) });
bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) });

assert.equal(bundle1.first_retried_at, bundle2.first_retried_at);

assert.equal(bundle1.last_retried_at, bundle2.last_retried_at);
assert.ok(bundle1.last_retried_at > bundle1.first_retried_at);
assert.ok(bundle2.last_retried_at > bundle2.first_retried_at);
});
});

describe('getUnverifiedDataIds', () => {
it("should return an empty list if there's no unverified data ids", async () => {
const emptyDbIds = await db.getUnverifiedDataIds();
Expand Down
16 changes: 16 additions & 0 deletions src/database/standalone-sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,14 @@ export class StandaloneSqliteDatabaseWorker {
this.insertDataItemFn(item, maybeTxHeight);
}

saveBundleRetries(rootTransactionId: string) {
const rootTxId = fromB64Url(rootTransactionId);
this.stmts.bundles.updateBundleRetry.run({
root_transaction_id: rootTxId,
current_timestamp: currentUnixTimestamp(),
});
}

saveBundle({
id,
rootTransactionId,
Expand Down Expand Up @@ -2923,6 +2931,10 @@ export class StandaloneSqliteDatabase
return this.queueWrite('bundles', 'saveDataItem', [item]);
}

saveBundleRetries(rootTransactionId: string): Promise<void> {
return this.queueWrite('bundles', 'saveBundleRetries', [rootTransactionId]);
}

saveBundle(bundle: BundleRecord): Promise<BundleFilterIds> {
return this.queueWrite('bundles', 'saveBundle', [bundle]);
}
Expand Down Expand Up @@ -3341,6 +3353,10 @@ if (!isMainThread) {
worker.saveDataItem(args[0]);
parentPort?.postMessage(null);
break;
case 'saveBundleRetries':
worker.saveBundleRetries(args[0]);
parentPort?.postMessage(null);
break;
case 'saveBundle':
{
const bundle = worker.saveBundle(args[0]);
Expand Down
1 change: 1 addition & 0 deletions src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ export interface BundleFilterIds {

export interface BundleIndex {
saveBundle(bundle: BundleRecord): Promise<BundleFilterIds>;
saveBundleRetries(rootTransactionId: string): Promise<void>;
getFailedBundleIds(limit: number): Promise<string[]>;
updateBundlesFullyIndexedAt(): Promise<void>;
updateBundlesForFilterChange(
Expand Down
2 changes: 2 additions & 0 deletions src/workers/bundle-repair-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export class BundleRepairWorker {
config.BUNDLE_REPAIR_RETRY_INTERVAL_SECONDS * 1000,
);
this.intervalIds.push(defaultInterval);

const defaultUpdateInterval = setInterval(
this.updateBundleTimestamps.bind(this),
DEFAULT_UPDATE_INTERVAL_MS,
Expand Down Expand Up @@ -107,6 +108,7 @@ export class BundleRepairWorker {
);
for (const bundleId of bundleIds) {
this.log.info('Retrying failed bundle', { bundleId });
await this.bundleIndex.saveBundleRetries(bundleId);
await this.txFetcher.queueTxId({ txId: bundleId });
}
} catch (error: any) {
Expand Down

0 comments on commit a346215

Please sign in to comment.