Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bundles): retry based on retry attempts #299

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions migrations/2025.01.30T14.12.03.bundles.add-retry-stats.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ALTER TABLE bundles ADD COLUMN retry_attempt_count INTEGER;
ALTER TABLE bundles ADD COLUMN first_retried_at INTEGER;
ALTER TABLE bundles ADD COLUMN last_retried_at INTEGER;

DROP INDEX IF EXISTS import_attempt_last_queued_idx;
CREATE INDEX IF NOT EXISTS import_attempt_last_retried_idx ON bundles (import_attempt_count, last_retried_at);
CREATE INDEX IF NOT EXISTS root_transaction_id_idx ON bundles (root_transaction_id);
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ALTER TABLE bundles DROP COLUMN retry_attempt_count;
ALTER TABLE bundles DROP COLUMN first_retried_at;
ALTER TABLE bundles DROP COLUMN last_retried_at;

CREATE INDEX IF NOT EXISTS import_attempt_last_queued_idx ON bundles (import_attempt_count, last_queued_at);
DROP INDEX IF EXISTS import_attempt_last_retried_idx;
DROP INDEX IF EXISTS root_transaction_id_idx;
11 changes: 11 additions & 0 deletions src/database/sql/bundles/import.sql
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ RETURNING
previous_unbundle_filter_id,
previous_index_filter_id;

-- updateBundleRetry
UPDATE bundles
SET
retry_attempt_count = COALESCE(retry_attempt_count, 0) + 1,
first_retried_at = CASE
WHEN first_retried_at IS NULL THEN @current_timestamp
ELSE first_retried_at
END,
last_retried_at = @current_timestamp
WHERE root_transaction_id = @root_transaction_id;

-- insertOrIgnoreWallet
INSERT INTO wallets (address, public_modulus)
VALUES (@address, @public_modulus)
Expand Down
2 changes: 1 addition & 1 deletion src/database/sql/bundles/repair.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ FROM (
b.matched_data_item_count IS NULL
OR b.matched_data_item_count > 0
)
ORDER BY b.import_attempt_count, b.last_queued_at ASC
ORDER BY b.retry_attempt_count, b.last_retried_at ASC
LIMIT @limit
)
ORDER BY RANDOM()
Expand Down
118 changes: 118 additions & 0 deletions src/database/standalone-sqlite.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import { normalizeAns104DataItem } from '../lib/ans-104.js';
import log from '../log.js';
import { BundleRecord } from '../types.js';
import { processBundleStream } from '../lib/bundles.js';
import wait from 'wait';

const HEIGHT = 1138;
const BLOCK_TX_INDEX = 42;
Expand Down Expand Up @@ -1173,6 +1174,123 @@ describe('StandaloneSqliteDatabase', () => {
});
});

describe('saveBundleRetries', () => {
const rootTxId1 = '1111111111111111111111111111111111111111111';
const rootTxId2 = '2222222222222222222222222222222222222222222';
const bundleId1 = '3333333333333333333333333333333333333333333';
const bundleId2 = '4444444444444444444444444444444444444444444';
const bundleId3 = '5555555555555555555555555555555555555555555';

const sql = `
SELECT *
FROM bundles
WHERE id = @id
`;

beforeEach(async () => {
await db.saveBundle({
id: bundleId1,
format: 'ans-104',
dataItemCount: 2,
matchedDataItemCount: 2,
rootTransactionId: rootTxId1,
});

await db.saveBundle({
id: bundleId2,
format: 'ans-104',
dataItemCount: 2,
matchedDataItemCount: 2,
rootTransactionId: rootTxId1,
});

await db.saveBundle({
id: bundleId3,
format: 'ans-104',
dataItemCount: 2,
matchedDataItemCount: 2,
rootTransactionId: rootTxId2,
});
});

it('should update all bundles sharing the same root transaction id', async () => {
let bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) });
let bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) });
let bundle3 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId3) });

assert.equal(bundle1.retry_attempt_count, null);
assert.equal(bundle1.first_retried_at, null);
assert.equal(bundle1.last_retried_at, null);

assert.equal(bundle2.retry_attempt_count, null);
assert.equal(bundle2.first_retried_at, null);
assert.equal(bundle2.last_retried_at, null);

assert.equal(bundle3.retry_attempt_count, null);
assert.equal(bundle3.first_retried_at, null);
assert.equal(bundle3.last_retried_at, null);

await db.saveBundleRetries(rootTxId1);

bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) });
bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) });
bundle3 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId3) });

assert.equal(bundle1.retry_attempt_count, 1);
assert.ok(bundle1.first_retried_at !== null);
assert.ok(bundle1.last_retried_at !== null);

assert.equal(bundle2.retry_attempt_count, 1);
assert.ok(bundle2.first_retried_at !== null);
assert.ok(bundle2.last_retried_at !== null);

assert.equal(bundle3.retry_attempt_count, null);
assert.equal(bundle3.first_retried_at, null);
assert.equal(bundle3.last_retried_at, null);

await wait(1000);

await db.saveBundleRetries(rootTxId1);

bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) });
bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) });
bundle3 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId3) });

assert.equal(bundle1.retry_attempt_count, 2);
assert.ok(bundle1.last_retried_at > bundle1.first_retried_at);

assert.equal(bundle2.retry_attempt_count, 2);
assert.ok(bundle2.last_retried_at > bundle2.first_retried_at);

assert.equal(bundle3.retry_attempt_count, null);
assert.equal(bundle3.first_retried_at, null);
assert.equal(bundle3.last_retried_at, null);
});

it('should update timestamps correctly for multiple bundles', async () => {
await db.saveBundleRetries(rootTxId1);

let bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) });
let bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) });

assert.equal(bundle1.first_retried_at, bundle2.first_retried_at);
assert.equal(bundle1.last_retried_at, bundle2.last_retried_at);

await wait(1000);

await db.saveBundleRetries(rootTxId1);

bundle1 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId1) });
bundle2 = bundlesDb.prepare(sql).get({ id: fromB64Url(bundleId2) });

assert.equal(bundle1.first_retried_at, bundle2.first_retried_at);

assert.equal(bundle1.last_retried_at, bundle2.last_retried_at);
assert.ok(bundle1.last_retried_at > bundle1.first_retried_at);
assert.ok(bundle2.last_retried_at > bundle2.first_retried_at);
});
});

describe('getUnverifiedDataIds', () => {
it("should return an empty list if there's no unverified data ids", async () => {
const emptyDbIds = await db.getUnverifiedDataIds();
Expand Down
16 changes: 16 additions & 0 deletions src/database/standalone-sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,14 @@ export class StandaloneSqliteDatabaseWorker {
this.insertDataItemFn(item, maybeTxHeight);
}

saveBundleRetries(rootTransactionId: string) {
const rootTxId = fromB64Url(rootTransactionId);
this.stmts.bundles.updateBundleRetry.run({
root_transaction_id: rootTxId,
current_timestamp: currentUnixTimestamp(),
});
}

saveBundle({
id,
rootTransactionId,
Expand Down Expand Up @@ -2923,6 +2931,10 @@ export class StandaloneSqliteDatabase
return this.queueWrite('bundles', 'saveDataItem', [item]);
}

saveBundleRetries(rootTransactionId: string): Promise<void> {
return this.queueWrite('bundles', 'saveBundleRetries', [rootTransactionId]);
}

saveBundle(bundle: BundleRecord): Promise<BundleFilterIds> {
return this.queueWrite('bundles', 'saveBundle', [bundle]);
}
Expand Down Expand Up @@ -3341,6 +3353,10 @@ if (!isMainThread) {
worker.saveDataItem(args[0]);
parentPort?.postMessage(null);
break;
case 'saveBundleRetries':
worker.saveBundleRetries(args[0]);
parentPort?.postMessage(null);
break;
case 'saveBundle':
{
const bundle = worker.saveBundle(args[0]);
Expand Down
1 change: 1 addition & 0 deletions src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ export interface BundleFilterIds {

export interface BundleIndex {
saveBundle(bundle: BundleRecord): Promise<BundleFilterIds>;
saveBundleRetries(rootTransactionId: string): Promise<void>;
getFailedBundleIds(limit: number): Promise<string[]>;
updateBundlesFullyIndexedAt(): Promise<void>;
updateBundlesForFilterChange(
Expand Down
2 changes: 2 additions & 0 deletions src/workers/bundle-repair-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export class BundleRepairWorker {
config.BUNDLE_REPAIR_RETRY_INTERVAL_SECONDS * 1000,
);
this.intervalIds.push(defaultInterval);

const defaultUpdateInterval = setInterval(
this.updateBundleTimestamps.bind(this),
DEFAULT_UPDATE_INTERVAL_MS,
Expand Down Expand Up @@ -107,6 +108,7 @@ export class BundleRepairWorker {
);
for (const bundleId of bundleIds) {
this.log.info('Retrying failed bundle', { bundleId });
await this.bundleIndex.saveBundleRetries(bundleId);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@djwhitt I'm awaiting here to make sure we update retry stats

await this.txFetcher.queueTxId({ txId: bundleId });
}
} catch (error: any) {
Expand Down
5 changes: 3 additions & 2 deletions test/bundles-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ CREATE TABLE bundles (
last_unbundled_at INTEGER,
first_fully_indexed_at INTEGER,
last_fully_indexed_at INTEGER
, root_transaction_id BLOB, import_attempt_count INTEGER, duplicated_data_item_count INTEGER, previous_unbundle_filter_id INTEGER, previous_index_filter_id INTEGER);
, root_transaction_id BLOB, import_attempt_count INTEGER, duplicated_data_item_count INTEGER, previous_unbundle_filter_id INTEGER, previous_index_filter_id INTEGER, retry_attempt_count INTEGER, first_retried_at INTEGER, last_retried_at INTEGER);
CREATE INDEX bundles_format_id_idx ON bundles (format_id);
CREATE INDEX bundles_last_queued_at_idx
ON bundles (last_queued_at);
Expand All @@ -70,7 +70,6 @@ CREATE INDEX bundles_index_filter_id_idx
ON bundles (index_filter_id);
CREATE INDEX bundle_data_items_parent_id_filter_id_idx
ON bundle_data_items (parent_id, filter_id);
CREATE INDEX import_attempt_last_queued_idx ON bundles (import_attempt_count, last_queued_at);
CREATE TABLE new_data_item_tags (
tag_name_hash BLOB NOT NULL,
tag_value_hash BLOB NOT NULL,
Expand Down Expand Up @@ -157,6 +156,8 @@ CREATE INDEX new_data_items_owner_address_id_idx ON new_data_items (owner_addres
CREATE INDEX new_data_items_height_indexed_at_idx ON new_data_items (height, indexed_at);
CREATE INDEX bundle_data_items_root_transaction_id_idx ON bundle_data_items (root_transaction_id);
CREATE INDEX stable_data_items_indexed_at_idx ON stable_data_items (indexed_at);
CREATE INDEX import_attempt_last_retried_idx ON bundles (import_attempt_count, last_retried_at);
CREATE INDEX root_transaction_id_idx ON bundles (root_transaction_id);
PRAGMA foreign_keys=OFF;
BEGIN TRANSACTION;
CREATE TABLE bundle_formats (
Expand Down