diff --git a/src/collector.ts b/src/collector.ts index 121ea75..8100402 100644 --- a/src/collector.ts +++ b/src/collector.ts @@ -38,6 +38,7 @@ import { sleep } from './utils' import RMQCyclesConsumer from './collectors/rmq_cycles' import RMQOriginalTxsConsumer from './collectors/rmq_original_txs' import RMQReceiptsConsumer from './collectors/rmq_receipts' +import * as db from './storage/sqlite3storage' if (process.env.PORT) { CONFIG.port.collector = process.env.PORT @@ -395,6 +396,13 @@ const startLoop = async () => { } } + async function shutdownCollector() { + // Perform any necessary cleanup here + await db.close() // Close the database connection + console.log('Collector shut down complete.') + process.exit(0) // Exit the process + } + while (true) { try { await startServer() @@ -409,12 +417,10 @@ const startLoop = async () => { // starts the syncing process const status = await startPatching(lastKnownCycle) - // TODO: If repair fails, break out of the while loop if (!status) { console.error('Patching process failed, shutting down the collector process.') - break + await shutdownCollector() // Perform graceful shutdown } - await new Promise((resolve) => setTimeout(resolve, 2000)) // Wait for 2 seconds before we restart } } } diff --git a/src/storage/checkpoint.ts b/src/storage/checkpoint.ts index 3f10450..7977012 100644 --- a/src/storage/checkpoint.ts +++ b/src/storage/checkpoint.ts @@ -2,40 +2,48 @@ import { config } from '../config' import * as db from './sqlite3storage' /** - * Inserts or replaces a checkpoint entry in the database with the given cycle number. + * Replaces a checkpoint entry in the database with the given value. * - * @param {number} cycle - The cycle number to be inserted into the checkpoint table. + * @param {number} value - The value to be inserted into the checkpoint table. + * @param {string} [type='cycle'] - The type of the checkpoint (default is 'cycle'). * @returns {Promise} A promise that resolves when the operation is complete. * * @throws Will log an error message if the database operation fails. */ -export async function insertCheckpoint(cycle: number): Promise { +export async function insertCheckpoint(value: number, type: string = 'cycle'): Promise { try { - const sql = 'INSERT OR REPLACE INTO `checkpoint` (cycle) VALUES (?)' - await db.run(sql, [cycle]) - if (config.verbose) console.log(`Successfully inserted checkpoint ${cycle}`) + const sql = 'REPLACE INTO `checkpoint` (type, value) VALUES (?, ?)' + await db.run(sql, [type, value]) + if (config.verbose) console.log(`Successfully replaced checkpoint ${type} with value ${value}`) } catch (e) { - console.log(`Error in insertCheckpoint with cycle ${cycle}: ${e}`) + console.error(`Error in insertCheckpoint with type ${type} and value ${value}:`, e) } } /** - * Fetches the latest checkpoint cycle from the database. + * Fetches the latest checkpoint value for a given type from the database. * - * @returns {Promise} A promise that resolves to the cycle number if found, otherwise null. - * @throws Will log an error message and return null if the database query fails. + * @param {string} [type='cycle'] - The type of the checkpoint (default is 'cycle'). + * @returns {Promise} A promise that resolves to the value if found. + * @throws Will log an error message and throw an error if the database query fails or if the value is not found. */ -export async function fetchCheckpoint(): Promise { +export async function fetchCheckpoint(type: string = 'cycle'): Promise { try { const sql = ` - SELECT cycle + SELECT value FROM checkpoint + WHERE type = ? LIMIT 1 ` - const result: { cycle: number } = await db.get(sql) - return typeof result?.cycle === 'number' ? result.cycle : null + const result: { value: number } = await db.get(sql, [type]) + if (result?.value === undefined) { + // indicating that a checkpoint has not been inserted yet into the dB + console.log(`a checkpoint has not been inserted yet into the dB, returning checkpoint as 0`) + return 0 + } + return result.value } catch (e) { - console.log(`Error in fetchCheckpoint:`, e) - return null + console.error(`Error in fetchCheckpoint with type ${type}:`, e) + throw e } } diff --git a/src/storage/index.ts b/src/storage/index.ts index 1ee4e0b..b6b7661 100644 --- a/src/storage/index.ts +++ b/src/storage/index.ts @@ -118,7 +118,9 @@ export const initializeDB = async (): Promise => { ) // Table for checkpoints - await db.runCreate('CREATE TABLE if not exists `checkpoint` (cycle INTEGER UNIQUE NOT NULL)') + await db.runCreate( + 'CREATE TABLE IF NOT EXISTS `checkpoint` (type TEXT UNIQUE NOT NULL, value INTEGER NOT NULL)' + ) } export const closeDatabase = async (): Promise => { diff --git a/src/storage/sqlite3storage.ts b/src/storage/sqlite3storage.ts index 6bfe36c..5b65fbe 100644 --- a/src/storage/sqlite3storage.ts +++ b/src/storage/sqlite3storage.ts @@ -18,6 +18,7 @@ export interface DbOptions { export async function init(config: DbOptions): Promise { db = new sqlite3.Database(config.defaultDbSqlitePath) await run('PRAGMA journal_mode=WAL') + await run('PRAGMA busy_timeout = 5000') // Set busy timeout to 5 seconds console.log('Database initialized.') if (config.enableShardeumIndexer) { shardeumIndexerDb = new sqlite3.Database(config.shardeumIndexerSqlitePath)