From ed57451deff0187db41bbe654bfcb7a2901a96fd Mon Sep 17 00:00:00 2001 From: Kenneth Hoang Date: Wed, 11 Dec 2024 14:59:02 +0700 Subject: [PATCH 1/2] feat: add job id for unlock provider remove jobs on complete, failed --- src/modules/crawler/job-unlock.provider.ts | 23 +++++++----------- src/shared/modules/queue/queue.service.ts | 28 +++------------------- src/shared/utils/queue.ts | 9 ++++--- src/shared/utils/time.ts | 4 ++-- 4 files changed, 19 insertions(+), 45 deletions(-) diff --git a/src/modules/crawler/job-unlock.provider.ts b/src/modules/crawler/job-unlock.provider.ts index ba05e35..9cad55c 100644 --- a/src/modules/crawler/job-unlock.provider.ts +++ b/src/modules/crawler/job-unlock.provider.ts @@ -21,9 +21,6 @@ import { IGenerateSignature, IJobUnlockPayload, IUnlockToken } from './interface @Injectable() export class JobUnlockProvider { - private readonly signatureJobBackOff = 5 * 1000; - private readonly sendTxJobBackOff = 60 * 1000; - private readonly jobRemoveDueDate = 30 * 24 * 60 * 60; // 30 days in seconds constructor( private readonly queueService: QueueService, private readonly configService: ConfigService, @@ -37,8 +34,8 @@ export class JobUnlockProvider { public async handleJob() { await Promise.all([ this.getPendingTx(true), - this.getPendingTx(false), - this.tokenPriceCrawler.handleCrawlInterval(), + // this.getPendingTx(false), + // this.tokenPriceCrawler.handleCrawlInterval(), ]); } @@ -144,11 +141,9 @@ export class JobUnlockProvider { eventLogId: data.eventLogId, }, { - attempts: 5, - removeOnComplete: { - age: this.jobRemoveDueDate, - }, - backoff: this.signatureJobBackOff, + jobId: `signature-validate-${data.eventLogId}`, + removeOnComplete: true, + removeOnFail: true, }, ); } @@ -168,11 +163,9 @@ export class JobUnlockProvider { eventLogId: data.eventLogId, }, { - attempts: 5, - removeOnComplete: { - age: this.jobRemoveDueDate, - }, - backoff: this.sendTxJobBackOff, + jobId: `send-unlock-${data.eventLogId}`, + removeOnComplete: true, + removeOnFail: true, }, ); } diff --git a/src/shared/modules/queue/queue.service.ts b/src/shared/modules/queue/queue.service.ts index c1d592c..3171fc2 100644 --- a/src/shared/modules/queue/queue.service.ts +++ b/src/shared/modules/queue/queue.service.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; -import Bull, { DoneCallback, Job, JobOptions, Queue } from 'bull'; +import { DoneCallback, Job, JobOptions, Queue } from 'bull'; import { EEnvKey } from '../../../constants/env.constant.js'; import { BullLib } from '../../../shared/utils/queue.js'; @@ -39,29 +39,7 @@ export class QueueService { } }); } - public async addJobToQueue(queueName: string, job: T, options: JobOptions = { attempts: 3, backoff: 5000 }) { - const queue = this.initQueueOnDemand(queueName); - if (!!options.jobId) { - const canContinue = await this.removeExistedJobIfFailed(options.jobId, queue); - if (!canContinue) { - this.logger.warn('this job is existed in queue and not in failed status'); - return false; - } - } - await queue.add(job, options); - return true; - } - public async removeExistedJobIfFailed(jobId: Bull.JobId, queue: Queue): Promise { - try { - const existedJob = await queue.getJob(jobId); - if (existedJob) { - await existedJob.remove(); - return true; - } - return false; - } catch (error) { - this.logger.error(error); - return false; - } + public async addJobToQueue(queueName: string, job: T, options: JobOptions) { + return this.initQueueOnDemand(queueName).add(job, options); } } diff --git a/src/shared/utils/queue.ts b/src/shared/utils/queue.ts index 59252c2..c8b4e51 100644 --- a/src/shared/utils/queue.ts +++ b/src/shared/utils/queue.ts @@ -2,14 +2,17 @@ import Queue from 'bull'; import * as Redis from 'ioredis'; export class BullLib { - static createNewQueue(queueName: string, redisConfig: Redis.RedisOptions): Queue.Queue { - const defaultLockTime = 1 * 60 * 60 * 1000; + static createNewQueue( + queueName: string, + redisConfig: Redis.RedisOptions, + lockDuration: number = 1 * 60 * 60 * 1000, + ): Queue.Queue { // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore return new Queue(queueName, { redis: redisConfig, settings: { - lockDuration: defaultLockTime, // lock the job for one hours. + lockDuration, // lock the job for one hours. maxStalledCount: 0, }, }); diff --git a/src/shared/utils/time.ts b/src/shared/utils/time.ts index 3d0e9b1..dbbcf65 100644 --- a/src/shared/utils/time.ts +++ b/src/shared/utils/time.ts @@ -19,8 +19,8 @@ export const unixZeroMinuteSecond = (value: number) => dayjs.unix(value).minute( export const unixToDate = (value: number) => dayjs.unix(value).toDate(); -export const startOfDayUnix = (date: Date) => dayjs(date).startOf('day').valueOf() / 1000; +export const startOfDayUnix = (date: Date) => Math.floor(dayjs(date).startOf('day').valueOf() / 1000); -export const endOfDayUnix = (date: Date) => dayjs(date).endOf('day').valueOf() / 1000; +export const endOfDayUnix = (date: Date) => Math.floor(dayjs(date).endOf('day').valueOf() / 1000); export const getTimeInFutureInMinutes = (minutes: number) => dayjs(new Date()).add(minutes, 'minutes').unix(); export const getNextDayInUnix = () => dayjs().add(1, 'days').subtract(dayjs().hour()).unix(); From 1de468b2cb5f1786a3ea4d79e4944bdb8c87439e Mon Sep 17 00:00:00 2001 From: Kenneth Hoang Date: Wed, 11 Dec 2024 16:03:11 +0700 Subject: [PATCH 2/2] refactor: lint --- src/modules/crawler/job-unlock.provider.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/modules/crawler/job-unlock.provider.ts b/src/modules/crawler/job-unlock.provider.ts index 9cad55c..d6d5ac2 100644 --- a/src/modules/crawler/job-unlock.provider.ts +++ b/src/modules/crawler/job-unlock.provider.ts @@ -34,8 +34,8 @@ export class JobUnlockProvider { public async handleJob() { await Promise.all([ this.getPendingTx(true), - // this.getPendingTx(false), - // this.tokenPriceCrawler.handleCrawlInterval(), + this.getPendingTx(false), + this.tokenPriceCrawler.handleCrawlInterval(), ]); }