Skip to content

Commit

Permalink
feat : update table structure, busy_timeout and graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
abdulazeem-tk4vr committed Jan 29, 2025
1 parent c0f523f commit fac3f75
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 20 deletions.
12 changes: 9 additions & 3 deletions src/collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import { sleep } from './utils'
import RMQCyclesConsumer from './collectors/rmq_cycles'
import RMQOriginalTxsConsumer from './collectors/rmq_original_txs'
import RMQReceiptsConsumer from './collectors/rmq_receipts'
import * as db from './storage/sqlite3storage'

if (process.env.PORT) {
CONFIG.port.collector = process.env.PORT
Expand Down Expand Up @@ -395,6 +396,13 @@ const startLoop = async () => {
}
}

async function shutdownCollector() {
// Perform any necessary cleanup here
await db.close() // Close the database connection
console.log('Collector shut down complete.')
process.exit(0) // Exit the process
}

while (true) {
try {
await startServer()
Expand All @@ -409,12 +417,10 @@ const startLoop = async () => {
// starts the syncing process
const status = await startPatching(lastKnownCycle)

// TODO: If repair fails, break out of the while loop
if (!status) {
console.error('Patching process failed, shutting down the collector process.')
break
await shutdownCollector() // Perform graceful shutdown
}
await new Promise((resolve) => setTimeout(resolve, 2000)) // Wait for 2 seconds before we restart
}
}
}
Expand Down
40 changes: 24 additions & 16 deletions src/storage/checkpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,48 @@ import { config } from '../config'
import * as db from './sqlite3storage'

/**
* Inserts or replaces a checkpoint entry in the database with the given cycle number.
* Replaces a checkpoint entry in the database with the given value.
*
* @param {number} cycle - The cycle number to be inserted into the checkpoint table.
* @param {number} value - The value to be inserted into the checkpoint table.
* @param {string} [type='cycle'] - The type of the checkpoint (default is 'cycle').
* @returns {Promise<void>} A promise that resolves when the operation is complete.
*
* @throws Will log an error message if the database operation fails.
*/
export async function insertCheckpoint(cycle: number): Promise<void> {
export async function insertCheckpoint(value: number, type: string = 'cycle'): Promise<void> {
try {
const sql = 'INSERT OR REPLACE INTO `checkpoint` (cycle) VALUES (?)'
await db.run(sql, [cycle])
if (config.verbose) console.log(`Successfully inserted checkpoint ${cycle}`)
const sql = 'REPLACE INTO `checkpoint` (type, value) VALUES (?, ?)'
await db.run(sql, [type, value])
if (config.verbose) console.log(`Successfully replaced checkpoint ${type} with value ${value}`)
} catch (e) {
console.log(`Error in insertCheckpoint with cycle ${cycle}: ${e}`)
console.error(`Error in insertCheckpoint with type ${type} and value ${value}:`, e)
}
}

/**
* Fetches the latest checkpoint cycle from the database.
* Fetches the latest checkpoint value for a given type from the database.
*
* @returns {Promise<number | null>} A promise that resolves to the cycle number if found, otherwise null.
* @throws Will log an error message and return null if the database query fails.
* @param {string} [type='cycle'] - The type of the checkpoint (default is 'cycle').
* @returns {Promise<number>} A promise that resolves to the value if found.
* @throws Will log an error message and throw an error if the database query fails or if the value is not found.
*/
export async function fetchCheckpoint(): Promise<number | null> {
export async function fetchCheckpoint(type: string = 'cycle'): Promise<number> {
try {
const sql = `
SELECT cycle
SELECT value
FROM checkpoint
WHERE type = ?
LIMIT 1
`
const result: { cycle: number } = await db.get(sql)
return typeof result?.cycle === 'number' ? result.cycle : null
const result: { value: number } = await db.get(sql, [type])
if (result?.value === undefined) {
// indicating that a checkpoint has not been inserted yet into the dB
console.log(`a checkpoint has not been inserted yet into the dB, returning checkpoint as 0`)
return 0
}
return result.value
} catch (e) {
console.log(`Error in fetchCheckpoint:`, e)
return null
console.error(`Error in fetchCheckpoint with type ${type}:`, e)
throw e
}
}
4 changes: 3 additions & 1 deletion src/storage/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ export const initializeDB = async (): Promise<void> => {
)

// Table for checkpoints
await db.runCreate('CREATE TABLE if not exists `checkpoint` (cycle INTEGER UNIQUE NOT NULL)')
await db.runCreate(
'CREATE TABLE IF NOT EXISTS `checkpoint` (type TEXT UNIQUE NOT NULL, value INTEGER NOT NULL)'
)
}

export const closeDatabase = async (): Promise<void> => {
Expand Down
1 change: 1 addition & 0 deletions src/storage/sqlite3storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export interface DbOptions {
export async function init(config: DbOptions): Promise<void> {
db = new sqlite3.Database(config.defaultDbSqlitePath)
await run('PRAGMA journal_mode=WAL')
await run('PRAGMA busy_timeout = 5000') // Set busy timeout to 5 seconds
console.log('Database initialized.')
if (config.enableShardeumIndexer) {
shardeumIndexerDb = new sqlite3.Database(config.shardeumIndexerSqlitePath)
Expand Down

0 comments on commit fac3f75

Please sign in to comment.