diff --git a/src/modules/crawler/job-unlock.provider.ts b/src/modules/crawler/job-unlock.provider.ts index ba05e35..d6d5ac2 100644 --- a/src/modules/crawler/job-unlock.provider.ts +++ b/src/modules/crawler/job-unlock.provider.ts @@ -21,9 +21,6 @@ import { IGenerateSignature, IJobUnlockPayload, IUnlockToken } from './interface @Injectable() export class JobUnlockProvider { - private readonly signatureJobBackOff = 5 * 1000; - private readonly sendTxJobBackOff = 60 * 1000; - private readonly jobRemoveDueDate = 30 * 24 * 60 * 60; // 30 days in seconds constructor( private readonly queueService: QueueService, private readonly configService: ConfigService, @@ -144,11 +141,9 @@ export class JobUnlockProvider { eventLogId: data.eventLogId, }, { - attempts: 5, - removeOnComplete: { - age: this.jobRemoveDueDate, - }, - backoff: this.signatureJobBackOff, + jobId: `signature-validate-${data.eventLogId}`, + removeOnComplete: true, + removeOnFail: true, }, ); } @@ -168,11 +163,9 @@ export class JobUnlockProvider { eventLogId: data.eventLogId, }, { - attempts: 5, - removeOnComplete: { - age: this.jobRemoveDueDate, - }, - backoff: this.sendTxJobBackOff, + jobId: `send-unlock-${data.eventLogId}`, + removeOnComplete: true, + removeOnFail: true, }, ); } diff --git a/src/shared/modules/queue/queue.service.ts b/src/shared/modules/queue/queue.service.ts index c1d592c..3171fc2 100644 --- a/src/shared/modules/queue/queue.service.ts +++ b/src/shared/modules/queue/queue.service.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; -import Bull, { DoneCallback, Job, JobOptions, Queue } from 'bull'; +import { DoneCallback, Job, JobOptions, Queue } from 'bull'; import { EEnvKey } from '../../../constants/env.constant.js'; import { BullLib } from '../../../shared/utils/queue.js'; @@ -39,29 +39,7 @@ export class QueueService { } }); } - public async addJobToQueue(queueName: string, job: T, options: JobOptions = { attempts: 3, backoff: 5000 }) { - const queue = this.initQueueOnDemand(queueName); - if (!!options.jobId) { - const canContinue = await this.removeExistedJobIfFailed(options.jobId, queue); - if (!canContinue) { - this.logger.warn('this job is existed in queue and not in failed status'); - return false; - } - } - await queue.add(job, options); - return true; - } - public async removeExistedJobIfFailed(jobId: Bull.JobId, queue: Queue): Promise { - try { - const existedJob = await queue.getJob(jobId); - if (existedJob) { - await existedJob.remove(); - return true; - } - return false; - } catch (error) { - this.logger.error(error); - return false; - } + public async addJobToQueue(queueName: string, job: T, options: JobOptions) { + return this.initQueueOnDemand(queueName).add(job, options); } } diff --git a/src/shared/utils/queue.ts b/src/shared/utils/queue.ts index 59252c2..c8b4e51 100644 --- a/src/shared/utils/queue.ts +++ b/src/shared/utils/queue.ts @@ -2,14 +2,17 @@ import Queue from 'bull'; import * as Redis from 'ioredis'; export class BullLib { - static createNewQueue(queueName: string, redisConfig: Redis.RedisOptions): Queue.Queue { - const defaultLockTime = 1 * 60 * 60 * 1000; + static createNewQueue( + queueName: string, + redisConfig: Redis.RedisOptions, + lockDuration: number = 1 * 60 * 60 * 1000, + ): Queue.Queue { // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore return new Queue(queueName, { redis: redisConfig, settings: { - lockDuration: defaultLockTime, // lock the job for one hours. + lockDuration, // lock the job for one hours. maxStalledCount: 0, }, }); diff --git a/src/shared/utils/time.ts b/src/shared/utils/time.ts index 3d0e9b1..dbbcf65 100644 --- a/src/shared/utils/time.ts +++ b/src/shared/utils/time.ts @@ -19,8 +19,8 @@ export const unixZeroMinuteSecond = (value: number) => dayjs.unix(value).minute( export const unixToDate = (value: number) => dayjs.unix(value).toDate(); -export const startOfDayUnix = (date: Date) => dayjs(date).startOf('day').valueOf() / 1000; +export const startOfDayUnix = (date: Date) => Math.floor(dayjs(date).startOf('day').valueOf() / 1000); -export const endOfDayUnix = (date: Date) => dayjs(date).endOf('day').valueOf() / 1000; +export const endOfDayUnix = (date: Date) => Math.floor(dayjs(date).endOf('day').valueOf() / 1000); export const getTimeInFutureInMinutes = (minutes: number) => dayjs(new Date()).add(minutes, 'minutes').unix(); export const getNextDayInUnix = () => dayjs().add(1, 'days').subtract(dayjs().hour()).unix();