From f602b2dcd3ea403da36470ca4d4fde1261fdb761 Mon Sep 17 00:00:00 2001 From: Tan Hoang Date: Wed, 2 Oct 2024 00:28:56 +0700 Subject: [PATCH 1/6] refactor: infra --- docker-compose.dev.yaml | 55 ++++- docker-compose.yml | 26 ++- src/app.module.ts | 5 +- src/constants/env.constant.ts | 2 + src/constants/error.constant.ts | 1 + src/constants/queue.constant.ts | 9 +- .../1727799403569-unique-validator-tx.ts | 17 ++ .../repositories/event-log.repository.ts | 1 - src/modules/crawler/crawler.console.ts | 87 +++---- src/modules/crawler/crawler.evmbridge.ts | 8 +- src/modules/crawler/crawler.minabridge.ts | 8 +- src/modules/crawler/crawler.module.ts | 2 + .../crawler/interfaces/job.interface.ts | 22 ++ src/modules/crawler/job-unlock.provider.ts | 135 +++++++++++ src/modules/crawler/sender.evmbridge.ts | 217 ++++++++--------- src/modules/crawler/sender.minabridge.ts | 221 +++++++++--------- src/shared/modules/queue/queue.module.ts | 12 + src/shared/modules/queue/queue.service.ts | 45 ++++ src/shared/utils/queue.ts | 2 +- 19 files changed, 584 insertions(+), 291 deletions(-) create mode 100644 src/database/migrations/1727799403569-unique-validator-tx.ts create mode 100644 src/modules/crawler/interfaces/job.interface.ts create mode 100644 src/modules/crawler/job-unlock.provider.ts create mode 100644 src/shared/modules/queue/queue.module.ts create mode 100644 src/shared/modules/queue/queue.service.ts diff --git a/docker-compose.dev.yaml b/docker-compose.dev.yaml index 240ab06..5b56362 100644 --- a/docker-compose.dev.yaml +++ b/docker-compose.dev.yaml @@ -9,6 +9,7 @@ services: - ${PORT}:${PORT} depends_on: - postgres + - redis networks: - myNetwork user: node @@ -21,6 +22,7 @@ services: depends_on: - postgres + - redis networks: - myNetwork user: node @@ -33,6 +35,7 @@ services: depends_on: - postgres + - redis networks: - myNetwork user: node @@ -45,6 +48,7 @@ services: depends_on: - postgres + - redis networks: - myNetwork user: node @@ -56,6 +60,7 @@ services: restart: always depends_on: - postgres + - redis networks: - myNetwork user: node @@ -67,11 +72,13 @@ services: restart: always depends_on: - postgres + - redis networks: - myNetwork user: node environment: EVM_VALIDATOR_PRIVATE_KEY : ${EVM_VALIDATOR_PRIVATE_KEY_1} + THIS_VALIDATOR_INDEX: 1 validate-evm-signature-2: image: mina-bridge:1.0.0 @@ -81,11 +88,13 @@ services: restart: always depends_on: - postgres + - redis networks: - myNetwork user: node environment: EVM_VALIDATOR_PRIVATE_KEY : ${EVM_VALIDATOR_PRIVATE_KEY_2} + THIS_VALIDATOR_INDEX: 2 validate-evm-signature-3: image: mina-bridge:1.0.0 @@ -95,11 +104,13 @@ services: restart: always depends_on: - postgres + - redis networks: - myNetwork user: node environment: EVM_VALIDATOR_PRIVATE_KEY : ${EVM_VALIDATOR_PRIVATE_KEY_3} + THIS_VALIDATOR_INDEX: 3 validate-mina-signature-1: image: mina-bridge:1.0.0 command: > @@ -108,10 +119,12 @@ services: restart: always depends_on: - postgres + - redis networks: - myNetwork environment: MINA_VALIDATOR_PRIVATE_KEY: ${MINA_VALIDATOR_PRIVATE_KEY_1} + THIS_VALIDATOR_INDEX: 1 user: node validate-mina-signature-2: image: mina-bridge:1.0.0 @@ -121,11 +134,13 @@ services: restart: always depends_on: - postgres + - redis networks: - myNetwork user: node environment: MINA_VALIDATOR_PRIVATE_KEY: ${MINA_VALIDATOR_PRIVATE_KEY_2} + THIS_VALIDATOR_INDEX: 2 validate-mina-signature-3: image: mina-bridge:1.0.0 command: > @@ -134,11 +149,25 @@ services: restart: always depends_on: - postgres + - redis networks: - myNetwork user: node environment: MINA_VALIDATOR_PRIVATE_KEY: ${MINA_VALIDATOR_PRIVATE_KEY_3} + THIS_VALIDATOR_INDEX: 3 + job-unlock-provider: + image: mina-bridge:1.0.0 + command: > + sh -c "npm run console unlock-job-provider" + tty: true + restart: always + depends_on: + - postgres + - redis + networks: + - myNetwork + user: node postgres: container_name: mina-bridge-${NODE_ENV}-postgres image: postgres:15.3-alpine3.18 @@ -153,10 +182,34 @@ services: POSTGRES_DB: mina-bridge networks: myNetwork: - + bull_monitor: + image: hocptit/bull_ui:0.0.2 + container_name: mina_bull_ui + environment: + REDIS_URI: redis://redis:${REDIS_PORT} + USERNAME: admin + PASSWORD: minabridge + QUEUES: UNLOCK_JOB_QUEUE,EVM_SENDER_QUEUE,MINA_SENDER_QUEUE,EVM_VALIDATOR_1,EVM_VALIDATOR_2,EVM_VALIDATOR_2,MINA_VALIDATOR_1,MINA_VALIDATOR_2,MINA_VALIDATOR_3 + ports: + - '${BULL_MONITOR_PORT}:3011' + depends_on: + - redis + networks: + - myNetwork + redis: + image: redis:6.2-alpine + container_name: mereo_ticket_redis + ports: + - '${REDIS_PORT}:${REDIS_PORT}' + command: redis-server --save 20 1 --loglevel warning --port ${REDIS_PORT} + networks: + - myNetwork + volumes: + - redisData:/data volumes: postgresData: + redisData: networks: myNetwork: name: minaBridgeNetwork${NODE_ENV} diff --git a/docker-compose.yml b/docker-compose.yml index 1b7fde3..0e4c89b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,9 +14,33 @@ services: POSTGRES_DB: mina-bridge networks: myNetwork: - + bull_monitor: + image: hocptit/bull_ui:0.0.2 + container_name: mina_bull_ui + environment: + REDIS_URI: redis://redis:${REDIS_PORT} + USERNAME: admin + PASSWORD: minabridge + QUEUES: UNLOCK_JOB_QUEUE,EVM_SENDER_QUEUE,MINA_SENDER_QUEUE,EVM_VALIDATOR_1,EVM_VALIDATOR_2,EVM_VALIDATOR_2,MINA_VALIDATOR_1,MINA_VALIDATOR_2,MINA_VALIDATOR_3 + ports: + - '${BULL_MONITOR_PORT}:3011' + depends_on: + - redis + networks: + - myNetwork + redis: + image: redis:6.2-alpine + container_name: mereo_ticket_redis + ports: + - '${REDIS_PORT}:${REDIS_PORT}' + command: redis-server --save 20 1 --loglevel warning --port ${REDIS_PORT} + networks: + - myNetwork + volumes: + - redisData:/data volumes: postgresData: + redisData: networks: myNetwork: name: minaBridgeNetwork${NODE_ENV} diff --git a/src/app.module.ts b/src/app.module.ts index ee7d022..eccf19c 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -12,7 +12,9 @@ import { MODULES } from './modules/index.js'; import { CustomAuthorizationHeaderMiddleware } from './shared/middleware/custom-authorization-header.middleware.js'; import { LoggerHttpRequestMiddleware } from './shared/middleware/logger-http-request.middleware.js'; import { LoggingModule } from './shared/modules/logger/logger.module.js'; +import { QueueModule } from './shared/modules/queue/queue.module.js'; import { Web3Module } from './shared/modules/web3/web3.module.js'; +import { isDevelopmentEnvironment } from './shared/utils/util.js'; const modules = [ ConfigurationModule, @@ -21,6 +23,7 @@ const modules = [ LoggingModule, GuardModule, Web3Module, + QueueModule, ConsoleModule, ...MODULES, ]; @@ -38,7 +41,7 @@ export class AppModule { }, ]; consumer.apply(CustomAuthorizationHeaderMiddleware).forRoutes(...thirdPartyLoginRoutes); - if (process.env.NODE_ENV !== 'production') { + if (isDevelopmentEnvironment()) { consumer.apply(LoggerHttpRequestMiddleware).forRoutes('*'); } } diff --git a/src/constants/env.constant.ts b/src/constants/env.constant.ts index d538aae..a93dea2 100644 --- a/src/constants/env.constant.ts +++ b/src/constants/env.constant.ts @@ -44,9 +44,11 @@ export enum EEnvKey { GAS_FEE_EVM = 'GAS_FEE_EVM', DECIMAL_TOKEN_EVM = 'DECIMAL_TOKEN_EVM', MINA_VALIDATOR_THRESHHOLD = 'MINA_VALIDATOR_THRESHHOLD', + EVM_VALIDATOR_THRESHHOLD = 'EVM_VALIDATOR_THRESHHOLD', MINA_VALIDATOR_PRIVATE_KEY = 'MINA_VALIDATOR_PRIVATE_KEY', EVM_VALIDATOR_PRIVATE_KEY = 'EVM_VALIDATOR_PRIVATE_KEY', MINA_CRAWL_SAFE_BLOCK = 'MINA_CRAWL_SAFE_BLOCK', + THIS_VALIDATOR_INDEX = 'THIS_VALIDATOR_INDEX', } export enum EEnvironments { diff --git a/src/constants/error.constant.ts b/src/constants/error.constant.ts index 11dc05d..8282f01 100644 --- a/src/constants/error.constant.ts +++ b/src/constants/error.constant.ts @@ -8,4 +8,5 @@ export enum EError { INVALID_SIGNATURE = 'INVALID_SIGNATURE', OVER_DAILY_QUOTA = 'OVER_DAILY_QUOTA', RESOURCE_NOT_FOUND = 'RESOURCE_NOT_FOUND', + NETWORK_FAILED = 'NETWORK_FAILED', } diff --git a/src/constants/queue.constant.ts b/src/constants/queue.constant.ts index 772f91e..a798719 100644 --- a/src/constants/queue.constant.ts +++ b/src/constants/queue.constant.ts @@ -1,2 +1,7 @@ -export const MINA_BRIDGE_CRAWLER_QUEUE = 'MINA_BRIDGE_CRAWLER_QUEUE'; -export const MINA_BRIDGE_SUBMITTER_QUEUE = 'MINA_BRIDGE_SUBMITTER_QUEUE'; +export enum EQueueName { + EVM_SENDER_QUEUE = 'EVM_SENDER_QUEUE', + MINA_SENDER_QUEUE = 'MINA_SENDER_QUEUE', + UNLOCK_JOB_QUEUE = 'UNLOCK_JOB_QUEUE', +} +export const getEvmValidatorQueueName = (index: number) => `EVM_VALIDATOR_${index}`; +export const getMinaValidatorQueueName = (index: number) => `MINA_VALIDATOR_${index}`; diff --git a/src/database/migrations/1727799403569-unique-validator-tx.ts b/src/database/migrations/1727799403569-unique-validator-tx.ts new file mode 100644 index 0000000..c0716f0 --- /dev/null +++ b/src/database/migrations/1727799403569-unique-validator-tx.ts @@ -0,0 +1,17 @@ +import { MigrationInterface, QueryRunner, TableUnique } from 'typeorm'; + +export class UniqueValidatorTx1727799403569 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + return queryRunner.createUniqueConstraint( + 'multi_signature', + new TableUnique({ + name: 'unique_validator_tx', + columnNames: ['validator', 'tx_id'], + }), + ); + } + + public async down(queryRunner: QueryRunner): Promise { + return queryRunner.dropUniqueConstraint('multi_signature','unique_validator_tx') + } +} diff --git a/src/database/repositories/event-log.repository.ts b/src/database/repositories/event-log.repository.ts index 6f15cab..7cf32a4 100644 --- a/src/database/repositories/event-log.repository.ts +++ b/src/database/repositories/event-log.repository.ts @@ -45,7 +45,6 @@ export class EventLogRepository extends BaseRepository { ...updateData }: { id: number; - retry: number; status: EEventStatus; amountReceived?: string; protocolFee?: string; diff --git a/src/modules/crawler/crawler.console.ts b/src/modules/crawler/crawler.console.ts index b512bd7..edaccdf 100644 --- a/src/modules/crawler/crawler.console.ts +++ b/src/modules/crawler/crawler.console.ts @@ -1,32 +1,31 @@ import { ConfigService } from '@nestjs/config'; -import { Logger } from 'log4js'; import { Command, Console } from 'nestjs-console'; import { EEnvKey } from '../../constants/env.constant.js'; +import { EQueueName, getEvmValidatorQueueName, getMinaValidatorQueueName } from '../../constants/queue.constant.js'; import { LoggerService } from '../../shared/modules/logger/logger.service.js'; +import { QueueService } from '../../shared/modules/queue/queue.service.js'; import { sleep } from '../../shared/utils/promise.js'; -import { BatchJobGetPriceToken } from './batch.tokenprice.js'; import { BlockchainEVMCrawler } from './crawler.evmbridge.js'; import { SCBridgeMinaCrawler } from './crawler.minabridge.js'; +import { IGenerateSignature, IUnlockToken } from './interfaces/job.interface.js'; +import { JobUnlockProvider } from './job-unlock.provider.js'; import { SenderEVMBridge } from './sender.evmbridge.js'; import { SenderMinaBridge } from './sender.minabridge.js'; @Console() export class CrawlerConsole { - private readonly numberOfBlockPerJob: number; - private readonly logger: Logger; constructor( private readonly configService: ConfigService, private blockchainEVMCrawler: BlockchainEVMCrawler, private scBridgeMinaCrawler: SCBridgeMinaCrawler, private senderEVMBridge: SenderEVMBridge, private senderMinaBridge: SenderMinaBridge, - private jobGetPrice: BatchJobGetPriceToken, - private loggerService: LoggerService, - ) { - this.numberOfBlockPerJob = +this.configService.get(EEnvKey.NUMBER_OF_BLOCK_PER_JOB); - this.logger = loggerService.getLogger('CRAWLER_CONSOLE'); - } + private readonly loggerService: LoggerService, + private readonly queueService: QueueService, + private readonly unlockProviderService: JobUnlockProvider, + ) {} + private readonly logger = this.loggerService.getLogger('CRAWLER_CONSOLE'); @Command({ command: 'crawl-eth-bridge-contract', @@ -48,14 +47,15 @@ export class CrawlerConsole { description: 'validate ETH Bridge unlock', }) async handleValidateEthLockTx() { - try { - while (true) { - await this.senderEVMBridge.unlockEVMTransaction(); - await sleep(15); - } - } catch (error) { - this.logger.error(error); - } + const thisValidatorIndex = this.configService.get(EEnvKey.THIS_VALIDATOR_INDEX); + this.logger.info(`EVM_VALIDATOR_JOB_${thisValidatorIndex}: started`); + await this.queueService.handleQueueJob( + getEvmValidatorQueueName(thisValidatorIndex), + (data: IGenerateSignature) => { + return this.senderEVMBridge.validateUnlockEVMTransaction(data.eventLogId); + }, + 10, + ); } @Command({ @@ -63,14 +63,15 @@ export class CrawlerConsole { description: 'validate MINA Bridge unlock', }) async handleValidateMinaLockTx() { - try { - while (true) { - await this.senderMinaBridge.handleValidateUnlockTxMina(); - await sleep(1); - } - } catch (error) { - this.logger.error(error); - } + const thisValidatorIndex = this.configService.get(EEnvKey.THIS_VALIDATOR_INDEX); + this.logger.info(`MINA_VALIDATOR_JOB_${thisValidatorIndex}: started`); + await this.queueService.handleQueueJob( + getMinaValidatorQueueName(thisValidatorIndex), + (data: IGenerateSignature) => { + return this.senderMinaBridge.handleValidateUnlockTxMina(data.eventLogId); + }, + 10, + ); } @Command({ @@ -78,14 +79,10 @@ export class CrawlerConsole { description: 'sender ETH Bridge unlock', }) async handleSenderETHBridgeUnlock() { - try { - while (true) { - await this.senderEVMBridge.handleUnlockEVM(); - await sleep(15); - } - } catch (error) { - this.logger.error(error); - } + this.logger.info('MINA_SENDER_JOB: started'); + await this.queueService.handleQueueJob(EQueueName.EVM_SENDER_QUEUE, (data: IUnlockToken) => { + return this.senderEVMBridge.handleUnlockEVM(data.eventLogId); + }); } @Command({ @@ -108,13 +105,19 @@ export class CrawlerConsole { description: 'sender Mina Bridge unlock', }) async handleSenderMinaBridgeUnlock() { - try { - while (true) { - await this.senderMinaBridge.handleUnlockMina(); - await sleep(15); - } - } catch (error) { - this.logger.error(error); - } + this.logger.info('MINA_SENDER_JOB: started'); + await this.queueService.handleQueueJob(EQueueName.MINA_SENDER_QUEUE, (data: IUnlockToken) => { + return this.senderMinaBridge.handleUnlockMina(data.eventLogId); + }); + } + @Command({ + command: 'unlock-job-provider', + description: 'handle all network unlock.', + }) + async handleUnlockJobProvider() { + this.logger.info('UNLOCK_JOB_PROVIDER: started'); + await this.queueService.handleQueueJob(EQueueName.UNLOCK_JOB_QUEUE, data => { + return this.unlockProviderService.handleJob(data); + }); } } diff --git a/src/modules/crawler/crawler.evmbridge.ts b/src/modules/crawler/crawler.evmbridge.ts index 8b43a1c..fd9653d 100644 --- a/src/modules/crawler/crawler.evmbridge.ts +++ b/src/modules/crawler/crawler.evmbridge.ts @@ -1,5 +1,6 @@ import { Injectable } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; +import assert from 'assert'; import { Logger } from 'log4js'; import { DataSource, QueryRunner } from 'typeorm'; import { EventData } from 'web3-eth-contract'; @@ -12,6 +13,7 @@ import { TokenPairRepository } from '../../database/repositories/token-pair.repo import { CrawlContract, EventLog } from '../../modules/crawler/entities/index.js'; import { LoggerService } from '../../shared/modules/logger/logger.service.js'; import { ETHBridgeContract } from '../../shared/modules/web3/web3.service.js'; +import { JobUnlockProvider } from './job-unlock.provider.js'; @Injectable() export class BlockchainEVMCrawler { @@ -24,6 +26,7 @@ export class BlockchainEVMCrawler { private readonly tokenPairRepository: TokenPairRepository, private readonly loggerService: LoggerService, private readonly ethBridgeContract: ETHBridgeContract, + private readonly unlockJobProvider: JobUnlockProvider, ) { this.numberOfBlockPerJob = +this.configService.get(EEnvKey.NUMBER_OF_BLOCK_PER_JOB); this.logger = loggerService.getLogger('BLOCKCHAIN_EVM_CRAWLER'); @@ -104,8 +107,9 @@ export class BlockchainEVMCrawler { eventUnlock.toTokenDecimal = tokenPair.toDecimal; } - await queryRunner.manager.save(EventLog, eventUnlock); - + const result = await queryRunner.manager.save(EventLog, eventUnlock); + assert(!!result.id && !!result.networkReceived, 'Cannot add job to signatures queue.'); + await this.unlockJobProvider.addJobSignatures(result.id, result.networkReceived); return { success: true, }; diff --git a/src/modules/crawler/crawler.minabridge.ts b/src/modules/crawler/crawler.minabridge.ts index 6e05d64..39d7213 100644 --- a/src/modules/crawler/crawler.minabridge.ts +++ b/src/modules/crawler/crawler.minabridge.ts @@ -1,5 +1,6 @@ import { Injectable } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; +import assert from 'assert'; import dayjs from 'dayjs'; import { Logger } from 'log4js'; import { fetchLastBlock, Field, Mina, PublicKey, UInt32 } from 'o1js'; @@ -12,6 +13,7 @@ import { CrawlContractRepository } from '../../database/repositories/crawl-contr import { TokenPairRepository } from '../../database/repositories/token-pair.repository.js'; import { CrawlContract, EventLog } from '../../modules/crawler/entities/index.js'; import { LoggerService } from '../../shared/modules/logger/logger.service.js'; +import { JobUnlockProvider } from './job-unlock.provider.js'; import { Bridge } from './minaSc/Bridge.js'; @Injectable() @@ -23,6 +25,7 @@ export class SCBridgeMinaCrawler { private readonly crawlContractRepository: CrawlContractRepository, private readonly tokenPairRepository: TokenPairRepository, private readonly loggerService: LoggerService, + private readonly unlockJobProvider: JobUnlockProvider, ) { this.logger = this.loggerService.getLogger('SC_BRIDGE_MINA_CRAWLER'); const Network = Mina.Network({ @@ -136,8 +139,9 @@ export class SCBridgeMinaCrawler { this.logger.info({ eventUnlock }); - await queryRunner.manager.save(EventLog, eventUnlock); - + const result = await queryRunner.manager.save(EventLog, eventUnlock); + assert(!!result.id && !!result.networkReceived, 'Cannot add job to signatures queue.'); + await this.unlockJobProvider.addJobSignatures(result.id, result.networkReceived); return { success: true, }; diff --git a/src/modules/crawler/crawler.module.ts b/src/modules/crawler/crawler.module.ts index 795bcc1..49cb7cd 100644 --- a/src/modules/crawler/crawler.module.ts +++ b/src/modules/crawler/crawler.module.ts @@ -12,6 +12,7 @@ import { CrawlerConsole } from './crawler.console.js'; import { BlockchainEVMCrawler } from './crawler.evmbridge.js'; import { SCBridgeMinaCrawler } from './crawler.minabridge.js'; import { CrawlerService } from './crawler.service.js'; +import { JobUnlockProvider } from './job-unlock.provider.js'; import { SenderEVMBridge } from './sender.evmbridge.js'; import { SenderMinaBridge } from './sender.minabridge.js'; @@ -34,6 +35,7 @@ import { SenderMinaBridge } from './sender.minabridge.js'; SCBridgeMinaCrawler, SenderMinaBridge, BatchJobGetPriceToken, + JobUnlockProvider, ], exports: [CrawlerService], }) diff --git a/src/modules/crawler/interfaces/job.interface.ts b/src/modules/crawler/interfaces/job.interface.ts new file mode 100644 index 0000000..c3ea504 --- /dev/null +++ b/src/modules/crawler/interfaces/job.interface.ts @@ -0,0 +1,22 @@ +import { ENetworkName } from '../../../constants/blockchain.constant.js'; + +export interface IReceiveCrawledEvent { + network: ENetworkName; + eventLogId: number; +} +export interface IReceiveVerifiedSignature { + eventLogId: number; + signatureId: number; + network: ENetworkName; +} +export interface IGenerateSignature { + eventLogId: number; +} +export interface IUnlockToken { + eventLogId: number; +} +export interface IJobUnlockPayload { + type: 'need_signatures' | 'need_send_tx'; + eventLogId: number; + network: ENetworkName; +} diff --git a/src/modules/crawler/job-unlock.provider.ts b/src/modules/crawler/job-unlock.provider.ts new file mode 100644 index 0000000..fab1c64 --- /dev/null +++ b/src/modules/crawler/job-unlock.provider.ts @@ -0,0 +1,135 @@ +import { Injectable } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { assert } from 'console'; + +import { ENetworkName } from '../../constants/blockchain.constant.js'; +import { EEnvKey } from '../../constants/env.constant.js'; +import { EQueueName, getEvmValidatorQueueName, getMinaValidatorQueueName } from '../../constants/queue.constant.js'; +import { EventLogRepository } from '../../database/repositories/event-log.repository.js'; +import { LoggerService } from '../../shared/modules/logger/logger.service.js'; +import { QueueService } from '../../shared/modules/queue/queue.service.js'; +import { IGenerateSignature, IJobUnlockPayload, IUnlockToken } from './interfaces/job.interface.js'; + +@Injectable() +export class JobUnlockProvider { + constructor( + private readonly queueService: QueueService, + private readonly configService: ConfigService, + private readonly loggerService: LoggerService, + private readonly eventLogRepository: EventLogRepository, + ) {} + private logger = this.loggerService.getLogger('JOB_UNLOCK_PROVIDER'); + public addJobSignatures(eventLogId: number, network: ENetworkName) { + return this.queueService.addJobToQueue( + EQueueName.UNLOCK_JOB_QUEUE, + { eventLogId, network, type: 'need_signatures' }, + { + attempts: 5, + backoff: 5000, + }, + ); + } + public addJobSendTx(eventLogId: number, network: ENetworkName) { + return this.queueService.addJobToQueue( + EQueueName.UNLOCK_JOB_QUEUE, + { eventLogId, network, type: 'need_send_tx' }, + { + attempts: 5, + backoff: 5000, + }, + ); + } + public async handleJob(data: IJobUnlockPayload) { + switch (data.type) { + case 'need_signatures': + return this.handleSignaturesJobs(data); + case 'need_send_tx': + return this.handleSendTxJobs(data); + default: + this.logger.error('unknown type', data); + return; + } + } + private getNumOfValidators(network: ENetworkName) { + switch (network) { + case ENetworkName.ETH: + return this.configService.get(EEnvKey.EVM_VALIDATOR_THRESHHOLD); + case ENetworkName.MINA: + return this.configService.get(EEnvKey.MINA_VALIDATOR_THRESHHOLD); + default: + this.logger.warn('Unknown network!'); + throw new Error('Unknown network!'); + } + } + private getSenderQueueName(network: ENetworkName) { + switch (network) { + case ENetworkName.ETH: + return EQueueName.EVM_SENDER_QUEUE; + case ENetworkName.MINA: + return EQueueName.MINA_SENDER_QUEUE; + default: + this.logger.warn('Unknown network!'); + throw new Error('Unknown network!'); + } + } + private getValidatorQueueName(network: ENetworkName, index: number) { + switch (network) { + case ENetworkName.ETH: + return getEvmValidatorQueueName(index); + case ENetworkName.MINA: + return getMinaValidatorQueueName(index); + default: + this.logger.warn('Unknown network!'); + throw new Error('Unknown network!'); + } + } + private async handleSignaturesJobs(data: IJobUnlockPayload) { + // create a job for every validators in a network. + this.logger.info(`Handling create validator jobs for tx ${data.eventLogId},network ${data.network}`); + for (let i = 1; i <= this.getNumOfValidators(data.network); i++) { + await this.queueService.addJobToQueue( + this.getValidatorQueueName(data.network, i), + { + eventLogId: data.eventLogId, + }, + { + attempts: 5, + backoff: 5000, + jobId: `validate-signature-${data.eventLogId}-${i}`, + }, + ); + } + this.logger.info('done'); + } + + private async handleSendTxJobs(data: IJobUnlockPayload) { + // check if there is enough threshhold -> then create an unlock job. + const numOfValidators = this.getNumOfValidators(data.network); + + const eventLog = await this.eventLogRepository.findOne({ + where: { + id: data.eventLogId, + }, + relations: { + validator: true, + }, + }); + assert(!!eventLog, 'not found tx with id = ' + data.eventLogId); + this.logger.info(`Found ${eventLog.validator.length} signatures.`); + if (eventLog.validator.length < numOfValidators) { + this.logger.warn('not enough validators tx', data.eventLogId); + return; + } + await this.queueService.addJobToQueue( + this.getSenderQueueName(data.network), + { + eventLogId: data.eventLogId, + }, + { + attempts: 5, + backoff: 5000, + jobId: `send-tx-${data.eventLogId}`, + }, + ); + } +} diff --git a/src/modules/crawler/sender.evmbridge.ts b/src/modules/crawler/sender.evmbridge.ts index f0fde2e..ebb3560 100644 --- a/src/modules/crawler/sender.evmbridge.ts +++ b/src/modules/crawler/sender.evmbridge.ts @@ -1,5 +1,6 @@ import { Injectable } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; +import assert from 'assert'; import { BigNumber } from 'bignumber.js'; // import BigNumber from 'bignumber.js/bignumber.mjs'; import { ethers } from 'ethers'; @@ -18,6 +19,7 @@ import { ETHBridgeContract } from '../../shared/modules/web3/web3.service.js'; import { addDecimal, calculateFee, calculateTip } from '../../shared/utils/bignumber.js'; import { EventLog } from './entities/event-logs.entity.js'; import { MultiSignature } from './entities/multi-signature.entity.js'; +import { JobUnlockProvider } from './job-unlock.provider.js'; @Injectable() export class SenderEVMBridge { @@ -30,108 +32,106 @@ export class SenderEVMBridge { private readonly ethBridgeContract: ETHBridgeContract, private readonly configService: ConfigService, private readonly loggerService: LoggerService, + private readonly unlockJobProvider: JobUnlockProvider, ) { this.logger = loggerService.getLogger('SENDER_EVM_CONSOLE'); } - async handleUnlockEVM() { - let dataLock: EventLog; - try { - const [threshold, configTip] = await Promise.all([ - this.ethBridgeContract.getValidatorThreshold(), - this.commonConfigRepository.getCommonConfig(), - ]); - - dataLock = await this.eventLogRepository.getEventLockWithNetwork(ENetworkName.ETH, threshold); - if (!dataLock) return; - await this.eventLogRepository.updateLockEvenLog(dataLock.id, EEventStatus.PROCESSING); - - const { tokenReceivedAddress, txHashLock, receiveAddress } = dataLock; - - const { tokenPair, amountReceived } = await this.getTokenPairAndAmount(dataLock); - if (!tokenPair) { - await this.updateLogStatusWithRetry(dataLock, EEventStatus.NOTOKENPAIR); - return; - } - - const isPassQuota = await this.isPassDailyQuota(dataLock.senderAddress, tokenPair.fromDecimal); - if (!isPassQuota) { - await this.updateLogStatusWithRetry(dataLock, EEventStatus.FAILED, EError.OVER_DAILY_QUOTA); - return; - } - const gasFeeEvmWithoutDecimals = this.configService.get(EEnvKey.GAS_FEE_EVM); - // fee and received amount. - const gasFeeEth = addDecimal(gasFeeEvmWithoutDecimals, this.configService.get(EEnvKey.DECIMAL_TOKEN_EVM)); - const protocolFee = calculateFee(amountReceived, gasFeeEth, configTip.tip); - // call unlock function - const result = await this.ethBridgeContract.unlock( - tokenReceivedAddress, - BigNumber(amountReceived), - txHashLock, - receiveAddress, - BigNumber(protocolFee), - dataLock.validator.map(e => e.signature), - ); - // Update status eventLog when call function unlock - if (result.success) { - await this.eventLogRepository.updateStatusAndRetryEvenLog({ - id: dataLock.id, - retry: dataLock.retry, - status: EEventStatus.PROCESSING, - errorDetail: null, - protocolFee, - amountReceived: BigNumber(amountReceived).minus(protocolFee).toFixed(0), - gasFee: gasFeeEvmWithoutDecimals, - tip: calculateTip(amountReceived, gasFeeEth, configTip.tip) - .div(BigNumber(DECIMAL_BASE).pow(tokenPair.toDecimal)) - .toString(), - }); - } else { - await this.handleError(result.error, dataLock); - } - return result; - } catch (error) { - await this.handleError(error, dataLock); + async handleUnlockEVM(txId: number) { + const [dataLock, configTip] = await Promise.all([ + this.eventLogRepository.findOneBy({ id: txId, networkReceived: ENetworkName.ETH }), + this.commonConfigRepository.getCommonConfig(), + ]); + + if (!dataLock) { + this.logger.warn('data not found with tx', txId); + return; } - } - async unlockEVMTransaction() { - let dataLock: EventLog; - try { - const wallet = this.getWallet(); - const [validatorData, configTip] = await Promise.all([ - this.eventLogRepository.getValidatorPendingSignature(wallet.address, ENetworkName.ETH), - this.commonConfigRepository.getCommonConfig(), - ]); - dataLock = validatorData; - if (!dataLock) { - return; - } - const { tokenReceivedAddress, txHashLock, receiveAddress } = dataLock; - const { tokenPair, amountReceived } = await this.getTokenPairAndAmount(dataLock); - if (!tokenPair) { - await this.updateLogStatusWithRetry(dataLock, EEventStatus.NOTOKENPAIR); - return; - } - - const protocolFee = calculateFee( - amountReceived, - addDecimal(this.configService.get(EEnvKey.GAS_FEE_EVM), this.configService.get(EEnvKey.DECIMAL_TOKEN_EVM)), - configTip.tip, - ); + const { tokenReceivedAddress, txHashLock, receiveAddress } = dataLock; - const signTx = await this.getSignature(wallet, { - token: tokenReceivedAddress, - amount: amountReceived, - user: receiveAddress, - hash: txHashLock, - fee: protocolFee.toString(), + const { tokenPair, amountReceived } = await this.getTokenPairAndAmount(dataLock); + if (!tokenPair) { + this.logger.warn('No token pair found!'); + await this.updateLogStatusWithRetry(dataLock, EEventStatus.NOTOKENPAIR); + return; + } + + const isPassQuota = await this.isPassDailyQuota(dataLock.senderAddress, tokenPair.fromDecimal); + if (!isPassQuota) { + this.logger.warn('Over daility quota!'); + await this.updateLogStatusWithRetry(dataLock, EEventStatus.FAILED, EError.OVER_DAILY_QUOTA); + return; + } + const gasFeeEvmWithoutDecimals = this.configService.get(EEnvKey.GAS_FEE_EVM); + // fee and received amount. + const gasFeeEth = addDecimal(gasFeeEvmWithoutDecimals, this.configService.get(EEnvKey.DECIMAL_TOKEN_EVM)); + const protocolFee = calculateFee(amountReceived, gasFeeEth, configTip.tip); + // call unlock function + const result = await this.ethBridgeContract.unlock( + tokenReceivedAddress, + BigNumber(amountReceived), + txHashLock, + receiveAddress, + BigNumber(protocolFee), + dataLock.validator.map(e => e.signature), + ); + // Update status eventLog when call function unlock + if (result.success) { + await this.eventLogRepository.updateStatusAndRetryEvenLog({ + id: dataLock.id, + status: EEventStatus.PROCESSING, + errorDetail: null, + protocolFee, + amountReceived: BigNumber(amountReceived).minus(protocolFee).toFixed(0), + gasFee: gasFeeEvmWithoutDecimals, + tip: calculateTip(amountReceived, gasFeeEth, configTip.tip) + .div(BigNumber(DECIMAL_BASE).pow(tokenPair.toDecimal)) + .toString(), }); + } else { + this.logger.error(result.error); + this.updateLogStatusWithRetry(dataLock, EEventStatus.FAILED); + } + + return result; + } - if (signTx.success) await this.saveSignature(wallet.address, signTx.signature, dataLock.id); - } catch (error) { - await this.handleError(error, dataLock, true, this.getWallet().address); + async validateUnlockEVMTransaction(txId: number) { + const wallet = this.getWallet(); + const [dataLock, configTip] = await Promise.all([ + this.eventLogRepository.findOneBy({ id: txId, networkReceived: ENetworkName.ETH }), + this.commonConfigRepository.getCommonConfig(), + ]); + if (!dataLock) { + this.logger.warn('no data found tx', txId); + return; + } + const { tokenReceivedAddress, txHashLock, receiveAddress } = dataLock; + const { tokenPair, amountReceived } = await this.getTokenPairAndAmount(dataLock); + if (!tokenPair) { + this.logger.warn('no token pair found tx', dataLock.tokenReceivedAddress, dataLock.tokenFromAddress); + await this.updateLogStatusWithRetry(dataLock, EEventStatus.NOTOKENPAIR); + return; } + + const protocolFee = calculateFee( + amountReceived, + addDecimal(this.configService.get(EEnvKey.GAS_FEE_EVM), this.configService.get(EEnvKey.DECIMAL_TOKEN_EVM)), + configTip.tip, + ); + + const signTx = await this.getSignature(wallet, { + token: tokenReceivedAddress, + amount: amountReceived, + user: receiveAddress, + hash: txHashLock, + fee: protocolFee.toString(), + }); + assert(signTx.success, `Generate signature failed!`); + await this.saveSignature(wallet.address, signTx.signature, dataLock.id); + await this.unlockJobProvider.addJobSendTx(dataLock.id, dataLock.networkReceived); + // notice job unlock provider about this. } private async isPassDailyQuota(address: string, fromDecimal: number): Promise { @@ -146,22 +146,6 @@ export class SenderEVMBridge { : true; } - private async handleError(error: any, dataLock: EventLog, isMultiSignature = false, wallet?: string) { - this.logger.log(EError.INVALID_SIGNATURE, error); - const retryCount = dataLock ? Number(dataLock.retry + 1) : 1; - - if (isMultiSignature) { - await this.upsertErrorAndRetryMultiSignature(wallet, dataLock.id, error); - } else { - await this.eventLogRepository.updateStatusAndRetryEvenLog({ - id: dataLock.id, - retry: retryCount, - status: EEventStatus.FAILED, - errorDetail: error, - }); - } - } - public async getSignature(wallet: ethers.Wallet, value: Record) { const signature = await wallet._signTypedData( { @@ -185,19 +169,6 @@ export class SenderEVMBridge { return { success: true, signature, payload: { data: value } }; } - public async upsertErrorAndRetryMultiSignature(validator: string, txId: number, errorCode: unknown) { - const validatorSignature = await this.multiSignatureRepository.findOne({ - where: { txId, validator }, - }); - if (!validatorSignature) { - await this.multiSignatureRepository.save( - new MultiSignature({ txId, validator, retry: 1, errorCode, chain: ENetworkName.ETH }), - ); - } else { - await this.multiSignatureRepository.update({ txId, validator }, { retry: ++validatorSignature.retry, errorCode }); - } - } - public async saveSignature(validatorAddress: string, signature: string, txId: number) { const validatorRecord = await this.multiSignatureRepository.findOneBy({ validator: validatorAddress, txId }); @@ -227,13 +198,15 @@ export class SenderEVMBridge { private async updateLogStatusWithRetry(dataLock: EventLog, status: EEventStatus, errorDetail?: EError) { await this.eventLogRepository.updateStatusAndRetryEvenLog({ id: dataLock.id, - retry: dataLock.retry, status, errorDetail, }); } public getWallet(): ethers.Wallet { + assert(!!this.configService.get(EEnvKey.EVM_VALIDATOR_PRIVATE_KEY), 'validator private key invalid'); + assert(!!this.configService.get(EEnvKey.THIS_VALIDATOR_INDEX), 'invalid validator index'); + const privateKey = this.configService.get(EEnvKey.EVM_VALIDATOR_PRIVATE_KEY); return new ethers.Wallet(privateKey); } diff --git a/src/modules/crawler/sender.minabridge.ts b/src/modules/crawler/sender.minabridge.ts index 8dabf1f..836b7df 100644 --- a/src/modules/crawler/sender.minabridge.ts +++ b/src/modules/crawler/sender.minabridge.ts @@ -14,10 +14,12 @@ import { EventLogRepository } from '../../database/repositories/event-log.reposi import { MultiSignatureRepository } from '../../database/repositories/multi-signature.repository.js'; import { TokenPairRepository } from '../../database/repositories/token-pair.repository.js'; import { LoggerService } from '../../shared/modules/logger/logger.service.js'; +import { QueueService } from '../../shared/modules/queue/queue.service.js'; import { addDecimal, calculateFee, calculateTip } from '../../shared/utils/bignumber.js'; import { TokenPair } from '../users/entities/tokenpair.entity.js'; import { CommonConfig } from './entities/common-config.entity.js'; import { MultiSignature } from './entities/multi-signature.entity.js'; +import { JobUnlockProvider } from './job-unlock.provider.js'; import { Bridge } from './minaSc/Bridge.js'; import { Manager } from './minaSc/Manager.js'; import { ValidatorManager } from './minaSc/ValidatorManager.js'; @@ -29,7 +31,6 @@ export class SenderMinaBridge { private readonly feePayerKey: PrivateKey; private readonly bridgeKey: PrivateKey; private readonly tokenPublicKey: PublicKey; - private readonly validatorThreshhold; constructor( private readonly configService: ConfigService, private readonly eventLogRepository: EventLogRepository, @@ -37,12 +38,13 @@ export class SenderMinaBridge { private readonly tokenPairRepository: TokenPairRepository, private readonly multiSignatureRepository: MultiSignatureRepository, private readonly loggerService: LoggerService, + private readonly queueService: QueueService, + private readonly unlockJobProvider: JobUnlockProvider, ) { this.logger = this.loggerService.getLogger('SENDER_MINA_BRIDGE'); this.feePayerKey = PrivateKey.fromBase58(this.configService.get(EEnvKey.SIGNER_MINA_PRIVATE_KEY)); this.bridgeKey = PrivateKey.fromBase58(this.configService.get(EEnvKey.MINA_BRIDGE_SC_PRIVATE_KEY)); this.tokenPublicKey = PublicKey.fromBase58(this.configService.get(EEnvKey.MINA_TOKEN_BRIDGE_ADDRESS)); - this.validatorThreshhold = this.configService.get(EEnvKey.MINA_VALIDATOR_THRESHHOLD); const network = Mina.Network({ mina: this.configService.get(EEnvKey.MINA_BRIDGE_RPC_OPTIONS), @@ -93,78 +95,64 @@ export class SenderMinaBridge { gasFeeMina: this.configService.get(EEnvKey.GASFEEMINA), }; } - public async handleUnlockMina() { - let dataLock, configTip; - try { - [dataLock, configTip] = await Promise.all([ - this.eventLogRepository.getEventLockWithNetwork(ENetworkName.MINA, this.validatorThreshhold), - this.commonConfigRepository.getCommonConfig(), - ]); - if (!dataLock) { - return; - } + public async handleUnlockMina(txId: number) { + const [dataLock, configTip] = await Promise.all([ + this.eventLogRepository.findOneBy({ id: txId, networkReceived: ENetworkName.MINA }), + this.commonConfigRepository.getCommonConfig(), + ]); + if (!dataLock) { + this.logger.warn(`Not found tx with id ${txId}`); + return; + } - await this.eventLogRepository.updateLockEvenLog(dataLock.id, EEventStatus.PROCESSING); - const { tokenReceivedAddress, tokenFromAddress, id, receiveAddress, amountFrom, senderAddress } = dataLock; - const tokenPair = await this.tokenPairRepository.getTokenPair(tokenFromAddress, tokenReceivedAddress); - if (!tokenPair) { - this.logger.warn('Token pair not found.'); - await this.eventLogRepository.updateStatusAndRetryEvenLog({ - id: dataLock.id, - retry: dataLock.retry, - status: EEventStatus.NOTOKENPAIR, - }); - return; - } + await this.eventLogRepository.updateLockEvenLog(dataLock.id, EEventStatus.PROCESSING); + const { tokenReceivedAddress, tokenFromAddress, id, receiveAddress, amountFrom, senderAddress } = dataLock; + const tokenPair = await this.tokenPairRepository.getTokenPair(tokenFromAddress, tokenReceivedAddress); + if (!tokenPair) { + this.logger.warn('Token pair not found.'); + await this.eventLogRepository.updateStatusAndRetryEvenLog({ + id: dataLock.id, + status: EEventStatus.NOTOKENPAIR, + }); + return; + } - const isPassDailyQuota = await this.isPassDailyQuota(senderAddress, tokenPair.fromDecimal); - if (!isPassDailyQuota) { - this.logger.warn('Passed daily quota.'); - await this.eventLogRepository.updateStatusAndRetryEvenLog({ - id: dataLock.id, - retry: dataLock.retry, - status: EEventStatus.FAILED, - errorDetail: EError.OVER_DAILY_QUOTA, - }); - return; - } - const { amountReceived, protocolFeeAmount, gasFeeMina, tipAmount } = this.getAmountReceivedAndFee( - tokenPair, - configTip, - amountFrom, - ); - const result = await this.callUnlockFunction(amountReceived, id, receiveAddress); - // Update status eventLog when call function unlock - if (result.success) { - await this.eventLogRepository.updateStatusAndRetryEvenLog({ - id: dataLock.id, - retry: dataLock.retry, - status: EEventStatus.PROCESSING, - errorDetail: result.error, - txHashUnlock: result.data, - amountReceived, - protocolFee: protocolFeeAmount, - gasFee: gasFeeMina, - tip: tipAmount, - }); - } else { - await this.eventLogRepository.updateStatusAndRetryEvenLog({ - id: dataLock.id, - retry: Number(dataLock.retry + 1), - status: EEventStatus.FAILED, - errorDetail: JSON.stringify(result.error), - }); - } - return result; - } catch (error) { - console.log(error); + const isPassDailyQuota = await this.isPassDailyQuota(senderAddress, tokenPair.fromDecimal); + if (!isPassDailyQuota) { + this.logger.warn('Passed daily quota.'); + await this.eventLogRepository.updateStatusAndRetryEvenLog({ + id: dataLock.id, + status: EEventStatus.FAILED, + errorDetail: EError.OVER_DAILY_QUOTA, + }); + return; + } + const { amountReceived, protocolFeeAmount, gasFeeMina, tipAmount } = this.getAmountReceivedAndFee( + tokenPair, + configTip, + amountFrom, + ); + const result = await this.callUnlockFunction(amountReceived, id, receiveAddress); + // Update status eventLog when call function unlock + if (result.success) { + await this.eventLogRepository.updateStatusAndRetryEvenLog({ + id: dataLock.id, + status: EEventStatus.PROCESSING, + errorDetail: result.error, + txHashUnlock: result.data, + amountReceived, + protocolFee: protocolFeeAmount, + gasFee: gasFeeMina, + tip: tipAmount, + }); + } else { await this.eventLogRepository.updateStatusAndRetryEvenLog({ id: dataLock.id, - retry: Number(dataLock.retry + 1), status: EEventStatus.FAILED, - errorDetail: JSON.stringify(error), + errorDetail: JSON.stringify(result.error), }); } + return result; } private async callUnlockFunction(amount: string, txId: number, receiveAddress: string) { @@ -238,64 +226,65 @@ export class SenderMinaBridge { } return true; } - async handleValidateUnlockTxMina() { - let dataLock, config; - assert(!!this.configService.get(EEnvKey.MINA_VALIDATOR_PRIVATE_KEY)); + + async handleValidateUnlockTxMina(txId: number) { + assert(!!this.configService.get(EEnvKey.MINA_VALIDATOR_PRIVATE_KEY), 'invalid validator private key'); + assert(!!this.configService.get(EEnvKey.THIS_VALIDATOR_INDEX), 'invalid validator index'); + const signerPrivateKey = PrivateKey.fromBase58(this.configService.get(EEnvKey.MINA_VALIDATOR_PRIVATE_KEY)); const signerPublicKey = PublicKey.fromPrivateKey(signerPrivateKey).toBase58(); - try { - [dataLock, config] = await Promise.all([ - this.eventLogRepository.getValidatorPendingSignature(signerPublicKey, ENetworkName.MINA), - this.commonConfigRepository.getCommonConfig(), - ]); - - if (!dataLock) { - return; - } - this.logger.info('Start generating mina signatures.'); + const [dataLock, config] = await Promise.all([ + this.eventLogRepository.findOneBy({ id: txId, networkReceived: ENetworkName.MINA }), + this.commonConfigRepository.getCommonConfig(), + ]); - const { tokenReceivedAddress, tokenFromAddress, receiveAddress, amountFrom } = dataLock; + if (!dataLock) { + this.logger.warn(`Data not found with id ${txId}`); + return; + } + this.logger.info('Start generating mina signatures for tx', txId); - const tokenPair = await this.tokenPairRepository.getTokenPair(tokenFromAddress, tokenReceivedAddress); + const { tokenReceivedAddress, tokenFromAddress, receiveAddress, amountFrom } = dataLock; - if (!tokenPair) { - this.logger.warn('Unknown token pair', tokenFromAddress, tokenReceivedAddress); - await this.eventLogRepository.updateStatusAndRetryEvenLog({ - id: dataLock.id, - retry: dataLock.retry, - status: EEventStatus.NOTOKENPAIR, - }); - return; - } + const tokenPair = await this.tokenPairRepository.getTokenPair(tokenFromAddress, tokenReceivedAddress); - // check if this signature has been tried before. - let multiSignature = await this.multiSignatureRepository.findOneBy({ - txId: dataLock.id, - validator: signerPublicKey, + if (!tokenPair) { + this.logger.warn('Unknown token pair', tokenFromAddress, tokenReceivedAddress); + await this.eventLogRepository.updateStatusAndRetryEvenLog({ + id: dataLock.id, + status: EEventStatus.NOTOKENPAIR, }); - if (!multiSignature) { - multiSignature = new MultiSignature({ - chain: ENetworkName.MINA, - validator: signerPublicKey, - txId: dataLock.id, - }); - } + return; + } - const receiverPublicKey = PublicKey.fromBase58(receiveAddress); - const { amountReceived } = this.getAmountReceivedAndFee(tokenPair, config, amountFrom); + // check if this signature has been tried before. + let multiSignature = await this.multiSignatureRepository.findOneBy({ + txId: dataLock.id, + validator: signerPublicKey, + }); + if (multiSignature) { + this.logger.warn('signature existed'); + return; + } - const msg = [ - ...receiverPublicKey.toFields(), - ...UInt64.from(amountReceived).toFields(), - ...this.tokenPublicKey.toFields(), - ]; - const signature = Signature.create(signerPrivateKey, msg); + const receiverPublicKey = PublicKey.fromBase58(receiveAddress); + const { amountReceived } = this.getAmountReceivedAndFee(tokenPair, config, amountFrom); - multiSignature.signature = signature.toJSON(); - await this.multiSignatureRepository.save(multiSignature); - } catch (error) { - this.logger.log(EError.INVALID_SIGNATURE, error); - await this.multiSignatureRepository.upsertErrorAndRetryMultiSignature(signerPublicKey, dataLock.id, error); - } + const msg = [ + ...receiverPublicKey.toFields(), + ...UInt64.from(amountReceived).toFields(), + ...this.tokenPublicKey.toFields(), + ]; + const signature = Signature.create(signerPrivateKey, msg).toJSON(); + + multiSignature = new MultiSignature({ + chain: ENetworkName.MINA, + validator: signerPublicKey, + txId: dataLock.id, + signature, + }); + await this.multiSignatureRepository.save(multiSignature); + await this.unlockJobProvider.addJobSendTx(dataLock.id, dataLock.networkReceived); + // notice the job unlock provider here } } diff --git a/src/shared/modules/queue/queue.module.ts b/src/shared/modules/queue/queue.module.ts new file mode 100644 index 0000000..9f1e841 --- /dev/null +++ b/src/shared/modules/queue/queue.module.ts @@ -0,0 +1,12 @@ +import { Global, Module } from '@nestjs/common'; +import { ConfigModule } from '@nestjs/config'; + +import { QueueService } from './queue.service.js'; + +@Global() +@Module({ + imports: [ConfigModule], + providers: [QueueService], + exports: [QueueService], +}) +export class QueueModule {} diff --git a/src/shared/modules/queue/queue.service.ts b/src/shared/modules/queue/queue.service.ts new file mode 100644 index 0000000..2839f70 --- /dev/null +++ b/src/shared/modules/queue/queue.service.ts @@ -0,0 +1,45 @@ +import { Injectable } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { DoneCallback, Job, JobOptions, Queue } from 'bull'; + +import { EEnvKey } from '../../../constants/env.constant.js'; +import { BullLib } from '../../../shared/utils/queue.js'; +import { LoggerService } from '../logger/logger.service.js'; + +@Injectable() +export class QueueService { + private queues = new Map(); + constructor( + private readonly configService: ConfigService, + private readonly loggerService: LoggerService, + ) {} + private logger = this.loggerService.getLogger('IN_QUEUE'); + private initQueueOnDemand(queueName: string) { + // reduce connection to redis + const redisConfig = { + host: this.configService.get(EEnvKey.REDIS_HOST), + port: this.configService.get(EEnvKey.REDIS_PORT), + }; + if (!this.queues.has(queueName)) { + this.logger.info('setup Queue', queueName); + this.queues.set(queueName, BullLib.createNewQueue(queueName, redisConfig)); + } + } + public async handleQueueJob(queueName: string, handleFunction: CallableFunction, numOfJobs = 1) { + this.initQueueOnDemand(queueName); + await this.queues.get(queueName).process(numOfJobs, async (job: Job, done: DoneCallback) => { + try { + this.logger.info('Handling job', job.data); + await handleFunction(job.data); + done(); + } catch (error) { + this.logger.warn('Job failed', job.data, error); + done(error); + } + }); + } + public addJobToQueue(queueName: string, job: T, options: JobOptions = { attempts: 3, backoff: 5000 }) { + this.initQueueOnDemand(queueName); + return this.queues.get(queueName).add(job, options); + } +} diff --git a/src/shared/utils/queue.ts b/src/shared/utils/queue.ts index 840c37f..584b029 100644 --- a/src/shared/utils/queue.ts +++ b/src/shared/utils/queue.ts @@ -2,7 +2,7 @@ import Queue from 'bull'; import * as Redis from 'ioredis'; export class BullLib { - static createNewQueue(queueName: string, redisConfig: Redis.RedisOptions): Queue.Queue { + static createNewQueue(queueName: string, redisConfig: Redis.RedisOptions): Queue.Queue { // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore return new Queue(queueName, { From 8490fecf029a2d63d5f4ad23de7b9e43512ea291 Mon Sep 17 00:00:00 2001 From: Tan Hoang Date: Wed, 2 Oct 2024 00:44:18 +0700 Subject: [PATCH 2/6] fix: redis name --- docker-compose.dev.yaml | 2 +- docker-compose.yml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose.dev.yaml b/docker-compose.dev.yaml index 5b56362..f9c00e8 100644 --- a/docker-compose.dev.yaml +++ b/docker-compose.dev.yaml @@ -198,7 +198,7 @@ services: - myNetwork redis: image: redis:6.2-alpine - container_name: mereo_ticket_redis + container_name: mina_redis ports: - '${REDIS_PORT}:${REDIS_PORT}' command: redis-server --save 20 1 --loglevel warning --port ${REDIS_PORT} diff --git a/docker-compose.yml b/docker-compose.yml index 0e4c89b..8ed8367 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -30,7 +30,7 @@ services: - myNetwork redis: image: redis:6.2-alpine - container_name: mereo_ticket_redis + container_name: mina_redis ports: - '${REDIS_PORT}:${REDIS_PORT}' command: redis-server --save 20 1 --loglevel warning --port ${REDIS_PORT} From c0d1f8f55fa1ad59a8e63eec7ff061a34f14d86c Mon Sep 17 00:00:00 2001 From: Tan Hoang Date: Wed, 2 Oct 2024 01:13:21 +0700 Subject: [PATCH 3/6] fix: unlock evm --- .github/workflows/auto-deploy-develop.yml | 2 +- .github/workflows/auto-deploy-test.yml | 2 +- src/modules/crawler/crawler.console.ts | 2 +- src/modules/crawler/sender.evmbridge.ts | 10 +++++++++- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/.github/workflows/auto-deploy-develop.yml b/.github/workflows/auto-deploy-develop.yml index 62c71c4..34325dd 100644 --- a/.github/workflows/auto-deploy-develop.yml +++ b/.github/workflows/auto-deploy-develop.yml @@ -15,4 +15,4 @@ jobs: - run: | echo "${{ vars.MINA_BRIDGE_BE_DEV }}" >> .env docker build . -t mina-bridge:1.0.0 - docker compose -f docker-compose.dev.yaml up -d + docker compose -f docker-compose.dev.yaml up -d --remove-orphans diff --git a/.github/workflows/auto-deploy-test.yml b/.github/workflows/auto-deploy-test.yml index a114934..958e677 100644 --- a/.github/workflows/auto-deploy-test.yml +++ b/.github/workflows/auto-deploy-test.yml @@ -15,4 +15,4 @@ jobs: - run: | echo "${{ vars.MINA_BRIDGE_BE_TEST }}" >> .env docker build . -t mina-bridge:1.0.0 - docker compose -f docker-compose.dev.yaml up -d + docker compose -f docker-compose.dev.yaml up -d --remove-orphans diff --git a/src/modules/crawler/crawler.console.ts b/src/modules/crawler/crawler.console.ts index edaccdf..0662094 100644 --- a/src/modules/crawler/crawler.console.ts +++ b/src/modules/crawler/crawler.console.ts @@ -79,7 +79,7 @@ export class CrawlerConsole { description: 'sender ETH Bridge unlock', }) async handleSenderETHBridgeUnlock() { - this.logger.info('MINA_SENDER_JOB: started'); + this.logger.info('ETH_SENDER_JOB: started'); await this.queueService.handleQueueJob(EQueueName.EVM_SENDER_QUEUE, (data: IUnlockToken) => { return this.senderEVMBridge.handleUnlockEVM(data.eventLogId); }); diff --git a/src/modules/crawler/sender.evmbridge.ts b/src/modules/crawler/sender.evmbridge.ts index ebb3560..3532da2 100644 --- a/src/modules/crawler/sender.evmbridge.ts +++ b/src/modules/crawler/sender.evmbridge.ts @@ -39,7 +39,15 @@ export class SenderEVMBridge { async handleUnlockEVM(txId: number) { const [dataLock, configTip] = await Promise.all([ - this.eventLogRepository.findOneBy({ id: txId, networkReceived: ENetworkName.ETH }), + this.eventLogRepository.findOne({ + where: { + id: txId, + networkReceived: ENetworkName.ETH, + }, + relations: { + validator: true, + }, + }), this.commonConfigRepository.getCommonConfig(), ]); From 303aef9c26b0aab1f9345dce4c8aab67af099603 Mon Sep 17 00:00:00 2001 From: Tan Hoang Date: Wed, 2 Oct 2024 11:26:27 +0700 Subject: [PATCH 4/6] fix: update job unlock --- ...38565058-add-timestamp-check-event-logs.ts | 14 +++ .../repositories/event-log.repository.ts | 32 +++++- src/modules/crawler/crawler.console.ts | 6 +- src/modules/crawler/crawler.evmbridge.ts | 3 - src/modules/crawler/crawler.minabridge.ts | 3 - .../crawler/entities/event-logs.entity.ts | 6 + .../crawler/interfaces/job.interface.ts | 1 - src/modules/crawler/job-unlock.provider.ts | 106 ++++++++++-------- src/modules/crawler/sender.evmbridge.ts | 4 +- src/modules/crawler/sender.minabridge.ts | 8 +- src/shared/utils/time.ts | 1 + 11 files changed, 121 insertions(+), 63 deletions(-) create mode 100644 src/database/migrations/1727838565058-add-timestamp-check-event-logs.ts diff --git a/src/database/migrations/1727838565058-add-timestamp-check-event-logs.ts b/src/database/migrations/1727838565058-add-timestamp-check-event-logs.ts new file mode 100644 index 0000000..c14b47c --- /dev/null +++ b/src/database/migrations/1727838565058-add-timestamp-check-event-logs.ts @@ -0,0 +1,14 @@ +import { MigrationInterface, QueryRunner, TableColumn } from 'typeorm'; + +export class AddTimestampCheckEventLogs1727838565058 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + return queryRunner.addColumns('event_logs', [ + new TableColumn({ name: 'next_validate_signature_job_time', type: 'bigint', default: 0 }), + new TableColumn({ name: 'next_send_tx_job_time', type: 'bigint', default: 0 }), + ]); + } + + public async down(queryRunner: QueryRunner): Promise { + return queryRunner.dropColumns('event_logs', ['next_send_tx_job_time', 'next_validate_signature_job_time']); + } +} diff --git a/src/database/repositories/event-log.repository.ts b/src/database/repositories/event-log.repository.ts index 7cf32a4..5cc1c2b 100644 --- a/src/database/repositories/event-log.repository.ts +++ b/src/database/repositories/event-log.repository.ts @@ -1,4 +1,3 @@ -import { GetHistoryDto } from 'modules/users/dto/history-response.dto.js'; import { EntityRepository } from 'nestjs-typeorm-custom-repository'; import { Brackets } from 'typeorm'; @@ -8,7 +7,8 @@ import { ETableName } from '../../constants/entity.constant.js'; import { MAX_RETRIES } from '../../constants/service.constant.js'; import { BaseRepository } from '../../core/base-repository.js'; import { EventLog } from '../../modules/crawler/entities/event-logs.entity.js'; -import { endOfDayUnix, startOfDayUnix } from '../../shared/utils/time.js'; +import { GetHistoryDto } from '../../modules/users/dto/history-response.dto.js'; +import { endOfDayUnix, nowUnix, startOfDayUnix } from '../../shared/utils/time.js'; @EntityRepository(EventLog) export class EventLogRepository extends BaseRepository { @@ -39,6 +39,34 @@ export class EventLogRepository extends BaseRepository { return qb.getOne(); } + public async getPendingTx( + network: ENetworkName, + isSignatureFullFilled: boolean, + numOfSignaturesNeeded: number, + ): Promise> { + const currentUnixTimestamp = nowUnix(); + const qb = this.createQueryBuilder(`${this.alias}`); + qb.select([`${this.alias}.id as "id"`, `${this.alias}.network_received as "networkReceived"`]); + qb.leftJoin(`${this.alias}.validator`, 'signature'); + + qb.where(`${this.alias}.network_received = :network`, { network }); + + qb.andWhere(`${this.alias}.status IN (:...status)`, { + status: [EEventStatus.WAITING], // EEventStatus.PROCESSING add in future + }) + .andWhere(`${this.alias}.retry < :retryNumber`, { retryNumber: MAX_RETRIES }) + .orderBy(`${this.alias}.id`, EDirection.DESC) + .groupBy(`${this.alias}.id`) + .addGroupBy(`${this.alias}.network_received`); + if (isSignatureFullFilled) { + qb.andWhere(`${this.alias}.next_send_tx_job_time < :currentUnixTimestamp`, { currentUnixTimestamp }); + qb.having(`COUNT(signature.id) = :numOfSignaturesNeeded`, { numOfSignaturesNeeded }); + } else { + qb.andWhere(`${this.alias}.next_validate_signature_job_time < :currentUnixTimestamp`, { currentUnixTimestamp }); + qb.having(`COUNT(signature.id) < :numOfSignaturesNeeded`, { numOfSignaturesNeeded }); + } + return qb.getRawMany(); + } public async updateStatusAndRetryEvenLog({ id, diff --git a/src/modules/crawler/crawler.console.ts b/src/modules/crawler/crawler.console.ts index 0662094..35acf47 100644 --- a/src/modules/crawler/crawler.console.ts +++ b/src/modules/crawler/crawler.console.ts @@ -115,9 +115,7 @@ export class CrawlerConsole { description: 'handle all network unlock.', }) async handleUnlockJobProvider() { - this.logger.info('UNLOCK_JOB_PROVIDER: started'); - await this.queueService.handleQueueJob(EQueueName.UNLOCK_JOB_QUEUE, data => { - return this.unlockProviderService.handleJob(data); - }); + this.logger.info('JOB_UNLOCK_PROVIDER: started'); + await this.unlockProviderService.handleJob(); } } diff --git a/src/modules/crawler/crawler.evmbridge.ts b/src/modules/crawler/crawler.evmbridge.ts index fd9653d..6fc8c38 100644 --- a/src/modules/crawler/crawler.evmbridge.ts +++ b/src/modules/crawler/crawler.evmbridge.ts @@ -13,7 +13,6 @@ import { TokenPairRepository } from '../../database/repositories/token-pair.repo import { CrawlContract, EventLog } from '../../modules/crawler/entities/index.js'; import { LoggerService } from '../../shared/modules/logger/logger.service.js'; import { ETHBridgeContract } from '../../shared/modules/web3/web3.service.js'; -import { JobUnlockProvider } from './job-unlock.provider.js'; @Injectable() export class BlockchainEVMCrawler { @@ -26,7 +25,6 @@ export class BlockchainEVMCrawler { private readonly tokenPairRepository: TokenPairRepository, private readonly loggerService: LoggerService, private readonly ethBridgeContract: ETHBridgeContract, - private readonly unlockJobProvider: JobUnlockProvider, ) { this.numberOfBlockPerJob = +this.configService.get(EEnvKey.NUMBER_OF_BLOCK_PER_JOB); this.logger = loggerService.getLogger('BLOCKCHAIN_EVM_CRAWLER'); @@ -109,7 +107,6 @@ export class BlockchainEVMCrawler { const result = await queryRunner.manager.save(EventLog, eventUnlock); assert(!!result.id && !!result.networkReceived, 'Cannot add job to signatures queue.'); - await this.unlockJobProvider.addJobSignatures(result.id, result.networkReceived); return { success: true, }; diff --git a/src/modules/crawler/crawler.minabridge.ts b/src/modules/crawler/crawler.minabridge.ts index 39d7213..4629fc5 100644 --- a/src/modules/crawler/crawler.minabridge.ts +++ b/src/modules/crawler/crawler.minabridge.ts @@ -13,7 +13,6 @@ import { CrawlContractRepository } from '../../database/repositories/crawl-contr import { TokenPairRepository } from '../../database/repositories/token-pair.repository.js'; import { CrawlContract, EventLog } from '../../modules/crawler/entities/index.js'; import { LoggerService } from '../../shared/modules/logger/logger.service.js'; -import { JobUnlockProvider } from './job-unlock.provider.js'; import { Bridge } from './minaSc/Bridge.js'; @Injectable() @@ -25,7 +24,6 @@ export class SCBridgeMinaCrawler { private readonly crawlContractRepository: CrawlContractRepository, private readonly tokenPairRepository: TokenPairRepository, private readonly loggerService: LoggerService, - private readonly unlockJobProvider: JobUnlockProvider, ) { this.logger = this.loggerService.getLogger('SC_BRIDGE_MINA_CRAWLER'); const Network = Mina.Network({ @@ -141,7 +139,6 @@ export class SCBridgeMinaCrawler { const result = await queryRunner.manager.save(EventLog, eventUnlock); assert(!!result.id && !!result.networkReceived, 'Cannot add job to signatures queue.'); - await this.unlockJobProvider.addJobSignatures(result.id, result.networkReceived); return { success: true, }; diff --git a/src/modules/crawler/entities/event-logs.entity.ts b/src/modules/crawler/entities/event-logs.entity.ts index 10eaae7..1059331 100644 --- a/src/modules/crawler/entities/event-logs.entity.ts +++ b/src/modules/crawler/entities/event-logs.entity.ts @@ -84,6 +84,12 @@ export class EventLog extends BaseEntityIncludeTime { @JoinColumn({ name: 'id' }) validator: Relation; + @Column({ name: 'next_validate_signature_job_time', default: 0 }) + nextValidateSignatureTime: string; + + @Column({ name: 'next_send_tx_job_time', default: 0 }) + nextSendTxJobTime: string; + constructor(value: Partial) { super(); Object.assign(this, value); diff --git a/src/modules/crawler/interfaces/job.interface.ts b/src/modules/crawler/interfaces/job.interface.ts index c3ea504..62d5f0d 100644 --- a/src/modules/crawler/interfaces/job.interface.ts +++ b/src/modules/crawler/interfaces/job.interface.ts @@ -16,7 +16,6 @@ export interface IUnlockToken { eventLogId: number; } export interface IJobUnlockPayload { - type: 'need_signatures' | 'need_send_tx'; eventLogId: number; network: ENetworkName; } diff --git a/src/modules/crawler/job-unlock.provider.ts b/src/modules/crawler/job-unlock.provider.ts index fab1c64..764be39 100644 --- a/src/modules/crawler/job-unlock.provider.ts +++ b/src/modules/crawler/job-unlock.provider.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; -import { assert } from 'console'; +import { In } from 'typeorm'; import { ENetworkName } from '../../constants/blockchain.constant.js'; import { EEnvKey } from '../../constants/env.constant.js'; @@ -8,6 +8,9 @@ import { EQueueName, getEvmValidatorQueueName, getMinaValidatorQueueName } from import { EventLogRepository } from '../../database/repositories/event-log.repository.js'; import { LoggerService } from '../../shared/modules/logger/logger.service.js'; import { QueueService } from '../../shared/modules/queue/queue.service.js'; +import { sleep } from '../../shared/utils/promise.js'; +import { getTimeInFutureInSeconds } from '../../shared/utils/time.js'; +import { EventLog } from './entities/event-logs.entity.js'; import { IGenerateSignature, IJobUnlockPayload, IUnlockToken } from './interfaces/job.interface.js'; @Injectable() @@ -19,36 +22,63 @@ export class JobUnlockProvider { private readonly eventLogRepository: EventLogRepository, ) {} private logger = this.loggerService.getLogger('JOB_UNLOCK_PROVIDER'); - public addJobSignatures(eventLogId: number, network: ENetworkName) { - return this.queueService.addJobToQueue( - EQueueName.UNLOCK_JOB_QUEUE, - { eventLogId, network, type: 'need_signatures' }, - { - attempts: 5, - backoff: 5000, - }, - ); + + public async handleJob() { + await Promise.all([this.getPendingTx(true), this.getPendingTx(false)]); } - public addJobSendTx(eventLogId: number, network: ENetworkName) { - return this.queueService.addJobToQueue( - EQueueName.UNLOCK_JOB_QUEUE, - { eventLogId, network, type: 'need_send_tx' }, - { - attempts: 5, - backoff: 5000, - }, - ); + + private async getPendingTx(isSignatureFullFilled: boolean) { + while (true) { + try { + const [pendingSignaturesMinaTx, pendingSignaturesEthTx] = await Promise.all([ + this.eventLogRepository.getPendingTx( + ENetworkName.MINA, + isSignatureFullFilled, + this.getNumOfValidators(ENetworkName.MINA), + ), + this.eventLogRepository.getPendingTx( + ENetworkName.ETH, + isSignatureFullFilled, + this.getNumOfValidators(ENetworkName.ETH), + ), + ]); + const totalTxs = [...pendingSignaturesEthTx, ...pendingSignaturesMinaTx]; + if (totalTxs.length > 0) { + this.logger.info( + `${isSignatureFullFilled ? 'Sending tx' : 'Validating signature'} for ${totalTxs.length}, mina count ${pendingSignaturesMinaTx.length}, eth count ${pendingSignaturesEthTx.length}`, + ); + } else { + this.logger.info('no pending tx'); + return; + } + for (const tx of totalTxs) { + if (isSignatureFullFilled) { + await this.handleSendTxJobs({ eventLogId: tx.id, network: tx.networkReceived }); + } else { + await this.handleSignaturesJobs({ eventLogId: tx.id, network: tx.networkReceived }); + } + } + await this.updateIntervalStatusForTxs( + totalTxs.map(e => e.id), + isSignatureFullFilled, + ); + // update interval status of tx + } catch (error) { + this.logger.error(error); + } finally { + await sleep(5); + } + } } - public async handleJob(data: IJobUnlockPayload) { - switch (data.type) { - case 'need_signatures': - return this.handleSignaturesJobs(data); - case 'need_send_tx': - return this.handleSendTxJobs(data); - default: - this.logger.error('unknown type', data); - return; + // helpers + private updateIntervalStatusForTxs(ids: number[], isSignatureFullFilled: boolean) { + const payload: Partial = {}; + if (isSignatureFullFilled) { + payload.nextSendTxJobTime = getTimeInFutureInSeconds(10).toString(); + } else { + payload.nextValidateSignatureTime = getTimeInFutureInSeconds(10).toString(); } + return this.eventLogRepository.update({ id: In(ids) }, payload); } private getNumOfValidators(network: ENetworkName) { switch (network) { @@ -57,8 +87,8 @@ export class JobUnlockProvider { case ENetworkName.MINA: return this.configService.get(EEnvKey.MINA_VALIDATOR_THRESHHOLD); default: - this.logger.warn('Unknown network!'); - throw new Error('Unknown network!'); + this.logger.warn('Unknown network!', network); + throw new Error('Unknown network!' + network); } } private getSenderQueueName(network: ENetworkName) { @@ -104,22 +134,6 @@ export class JobUnlockProvider { private async handleSendTxJobs(data: IJobUnlockPayload) { // check if there is enough threshhold -> then create an unlock job. - const numOfValidators = this.getNumOfValidators(data.network); - - const eventLog = await this.eventLogRepository.findOne({ - where: { - id: data.eventLogId, - }, - relations: { - validator: true, - }, - }); - assert(!!eventLog, 'not found tx with id = ' + data.eventLogId); - this.logger.info(`Found ${eventLog.validator.length} signatures.`); - if (eventLog.validator.length < numOfValidators) { - this.logger.warn('not enough validators tx', data.eventLogId); - return; - } await this.queueService.addJobToQueue( this.getSenderQueueName(data.network), { diff --git a/src/modules/crawler/sender.evmbridge.ts b/src/modules/crawler/sender.evmbridge.ts index 3532da2..220935a 100644 --- a/src/modules/crawler/sender.evmbridge.ts +++ b/src/modules/crawler/sender.evmbridge.ts @@ -5,6 +5,7 @@ import { BigNumber } from 'bignumber.js'; // import BigNumber from 'bignumber.js/bignumber.mjs'; import { ethers } from 'ethers'; import { Logger } from 'log4js'; +import { Not } from 'typeorm'; import { getEthBridgeAddress } from '../../config/common.config.js'; import { DECIMAL_BASE, EEventStatus, ENetworkName } from '../../constants/blockchain.constant.js'; @@ -43,6 +44,7 @@ export class SenderEVMBridge { where: { id: txId, networkReceived: ENetworkName.ETH, + status: Not(EEventStatus.PROCESSING), }, relations: { validator: true, @@ -138,8 +140,6 @@ export class SenderEVMBridge { }); assert(signTx.success, `Generate signature failed!`); await this.saveSignature(wallet.address, signTx.signature, dataLock.id); - await this.unlockJobProvider.addJobSendTx(dataLock.id, dataLock.networkReceived); - // notice job unlock provider about this. } private async isPassDailyQuota(address: string, fromDecimal: number): Promise { diff --git a/src/modules/crawler/sender.minabridge.ts b/src/modules/crawler/sender.minabridge.ts index 836b7df..8c81ddb 100644 --- a/src/modules/crawler/sender.minabridge.ts +++ b/src/modules/crawler/sender.minabridge.ts @@ -5,6 +5,7 @@ import BigNumber from 'bignumber.js/bignumber.mjs'; import { Logger } from 'log4js'; import { FungibleToken, FungibleTokenAdmin } from 'mina-fungible-token'; import { AccountUpdate, Bool, fetchAccount, Mina, PrivateKey, PublicKey, Signature, UInt64 } from 'o1js'; +import { Not } from 'typeorm'; import { DECIMAL_BASE, EEventStatus, ENetworkName } from '../../constants/blockchain.constant.js'; import { EEnvKey } from '../../constants/env.constant.js'; @@ -234,7 +235,11 @@ export class SenderMinaBridge { const signerPrivateKey = PrivateKey.fromBase58(this.configService.get(EEnvKey.MINA_VALIDATOR_PRIVATE_KEY)); const signerPublicKey = PublicKey.fromPrivateKey(signerPrivateKey).toBase58(); const [dataLock, config] = await Promise.all([ - this.eventLogRepository.findOneBy({ id: txId, networkReceived: ENetworkName.MINA }), + this.eventLogRepository.findOneBy({ + id: txId, + networkReceived: ENetworkName.MINA, + status: Not(EEventStatus.PROCESSING), + }), this.commonConfigRepository.getCommonConfig(), ]); @@ -284,7 +289,6 @@ export class SenderMinaBridge { signature, }); await this.multiSignatureRepository.save(multiSignature); - await this.unlockJobProvider.addJobSendTx(dataLock.id, dataLock.networkReceived); // notice the job unlock provider here } } diff --git a/src/shared/utils/time.ts b/src/shared/utils/time.ts index af96e79..40c35c6 100644 --- a/src/shared/utils/time.ts +++ b/src/shared/utils/time.ts @@ -22,3 +22,4 @@ export const unixToDate = (value: number) => dayjs.unix(value).toDate(); export const startOfDayUnix = (date: Date) => dayjs(date).startOf('day').valueOf() / 1000; export const endOfDayUnix = (date: Date) => dayjs(date).endOf('day').valueOf() / 1000; +export const getTimeInFutureInSeconds = (minutes: number) => dayjs(new Date()).add(minutes, 'minutes').unix(); From 38b5034c73bf776681df273767da5da1ece2dedf Mon Sep 17 00:00:00 2001 From: Tan Hoang Date: Wed, 2 Oct 2024 11:39:04 +0700 Subject: [PATCH 5/6] fix: job unlock --- src/modules/crawler/job-unlock.provider.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/modules/crawler/job-unlock.provider.ts b/src/modules/crawler/job-unlock.provider.ts index 764be39..02aa577 100644 --- a/src/modules/crawler/job-unlock.provider.ts +++ b/src/modules/crawler/job-unlock.provider.ts @@ -49,7 +49,7 @@ export class JobUnlockProvider { ); } else { this.logger.info('no pending tx'); - return; + continue; } for (const tx of totalTxs) { if (isSignatureFullFilled) { From f096f8516e0ac574d0381dbebbeba67aa9d8bd79 Mon Sep 17 00:00:00 2001 From: Tan Hoang Date: Wed, 2 Oct 2024 14:41:05 +0700 Subject: [PATCH 6/6] fix: lock bull queue --- src/modules/crawler/job-unlock.provider.ts | 7 ++++--- src/modules/crawler/sender.evmbridge.ts | 10 ++-------- src/modules/crawler/sender.minabridge.ts | 10 ++-------- src/shared/utils/queue.ts | 6 ++++++ src/shared/utils/time.ts | 2 +- 5 files changed, 15 insertions(+), 20 deletions(-) diff --git a/src/modules/crawler/job-unlock.provider.ts b/src/modules/crawler/job-unlock.provider.ts index 02aa577..c599c09 100644 --- a/src/modules/crawler/job-unlock.provider.ts +++ b/src/modules/crawler/job-unlock.provider.ts @@ -9,7 +9,7 @@ import { EventLogRepository } from '../../database/repositories/event-log.reposi import { LoggerService } from '../../shared/modules/logger/logger.service.js'; import { QueueService } from '../../shared/modules/queue/queue.service.js'; import { sleep } from '../../shared/utils/promise.js'; -import { getTimeInFutureInSeconds } from '../../shared/utils/time.js'; +import { getTimeInFutureInMinutes } from '../../shared/utils/time.js'; import { EventLog } from './entities/event-logs.entity.js'; import { IGenerateSignature, IJobUnlockPayload, IUnlockToken } from './interfaces/job.interface.js'; @@ -73,10 +73,11 @@ export class JobUnlockProvider { // helpers private updateIntervalStatusForTxs(ids: number[], isSignatureFullFilled: boolean) { const payload: Partial = {}; + const nextTime = getTimeInFutureInMinutes(60 * 5).toString(); if (isSignatureFullFilled) { - payload.nextSendTxJobTime = getTimeInFutureInSeconds(10).toString(); + payload.nextSendTxJobTime = nextTime; } else { - payload.nextValidateSignatureTime = getTimeInFutureInSeconds(10).toString(); + payload.nextValidateSignatureTime = nextTime; } return this.eventLogRepository.update({ id: In(ids) }, payload); } diff --git a/src/modules/crawler/sender.evmbridge.ts b/src/modules/crawler/sender.evmbridge.ts index 220935a..c3efb7d 100644 --- a/src/modules/crawler/sender.evmbridge.ts +++ b/src/modules/crawler/sender.evmbridge.ts @@ -2,9 +2,7 @@ import { Injectable } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import assert from 'assert'; import { BigNumber } from 'bignumber.js'; -// import BigNumber from 'bignumber.js/bignumber.mjs'; import { ethers } from 'ethers'; -import { Logger } from 'log4js'; import { Not } from 'typeorm'; import { getEthBridgeAddress } from '../../config/common.config.js'; @@ -20,11 +18,9 @@ import { ETHBridgeContract } from '../../shared/modules/web3/web3.service.js'; import { addDecimal, calculateFee, calculateTip } from '../../shared/utils/bignumber.js'; import { EventLog } from './entities/event-logs.entity.js'; import { MultiSignature } from './entities/multi-signature.entity.js'; -import { JobUnlockProvider } from './job-unlock.provider.js'; @Injectable() export class SenderEVMBridge { - private readonly logger: Logger; constructor( private readonly eventLogRepository: EventLogRepository, private readonly commonConfigRepository: CommonConfigRepository, @@ -33,10 +29,8 @@ export class SenderEVMBridge { private readonly ethBridgeContract: ETHBridgeContract, private readonly configService: ConfigService, private readonly loggerService: LoggerService, - private readonly unlockJobProvider: JobUnlockProvider, - ) { - this.logger = loggerService.getLogger('SENDER_EVM_CONSOLE'); - } + ) {} + private logger = this.loggerService.getLogger('SENDER_EVM_CONSOLE'); async handleUnlockEVM(txId: number) { const [dataLock, configTip] = await Promise.all([ diff --git a/src/modules/crawler/sender.minabridge.ts b/src/modules/crawler/sender.minabridge.ts index 8c81ddb..3c3335c 100644 --- a/src/modules/crawler/sender.minabridge.ts +++ b/src/modules/crawler/sender.minabridge.ts @@ -1,8 +1,7 @@ import { Injectable } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import assert from 'assert'; -import BigNumber from 'bignumber.js/bignumber.mjs'; -import { Logger } from 'log4js'; +import { BigNumber } from 'bignumber.js'; import { FungibleToken, FungibleTokenAdmin } from 'mina-fungible-token'; import { AccountUpdate, Bool, fetchAccount, Mina, PrivateKey, PublicKey, Signature, UInt64 } from 'o1js'; import { Not } from 'typeorm'; @@ -15,19 +14,16 @@ import { EventLogRepository } from '../../database/repositories/event-log.reposi import { MultiSignatureRepository } from '../../database/repositories/multi-signature.repository.js'; import { TokenPairRepository } from '../../database/repositories/token-pair.repository.js'; import { LoggerService } from '../../shared/modules/logger/logger.service.js'; -import { QueueService } from '../../shared/modules/queue/queue.service.js'; import { addDecimal, calculateFee, calculateTip } from '../../shared/utils/bignumber.js'; import { TokenPair } from '../users/entities/tokenpair.entity.js'; import { CommonConfig } from './entities/common-config.entity.js'; import { MultiSignature } from './entities/multi-signature.entity.js'; -import { JobUnlockProvider } from './job-unlock.provider.js'; import { Bridge } from './minaSc/Bridge.js'; import { Manager } from './minaSc/Manager.js'; import { ValidatorManager } from './minaSc/ValidatorManager.js'; @Injectable() export class SenderMinaBridge { - private readonly logger: Logger; private isContractCompiled = false; private readonly feePayerKey: PrivateKey; private readonly bridgeKey: PrivateKey; @@ -39,10 +35,7 @@ export class SenderMinaBridge { private readonly tokenPairRepository: TokenPairRepository, private readonly multiSignatureRepository: MultiSignatureRepository, private readonly loggerService: LoggerService, - private readonly queueService: QueueService, - private readonly unlockJobProvider: JobUnlockProvider, ) { - this.logger = this.loggerService.getLogger('SENDER_MINA_BRIDGE'); this.feePayerKey = PrivateKey.fromBase58(this.configService.get(EEnvKey.SIGNER_MINA_PRIVATE_KEY)); this.bridgeKey = PrivateKey.fromBase58(this.configService.get(EEnvKey.MINA_BRIDGE_SC_PRIVATE_KEY)); this.tokenPublicKey = PublicKey.fromBase58(this.configService.get(EEnvKey.MINA_TOKEN_BRIDGE_ADDRESS)); @@ -53,6 +46,7 @@ export class SenderMinaBridge { }); Mina.setActiveInstance(network); } + private logger = this.loggerService.getLogger('SENDER_MINA_BRIDGE'); private getContractsInfo() { this.logger.log('Bridge', this.bridgeKey.toPublicKey().toBase58()); this.logger.log('FeePayer', this.feePayerKey.toPublicKey().toBase58()); diff --git a/src/shared/utils/queue.ts b/src/shared/utils/queue.ts index 584b029..575a812 100644 --- a/src/shared/utils/queue.ts +++ b/src/shared/utils/queue.ts @@ -3,10 +3,16 @@ import * as Redis from 'ioredis'; export class BullLib { static createNewQueue(queueName: string, redisConfig: Redis.RedisOptions): Queue.Queue { + const defaultLockTime = 1 * 60 * 60 * 1000; // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore return new Queue(queueName, { redis: redisConfig, + settings: { + lockDuration: defaultLockTime, // lock the job for one hours. + lockRenewTime: Math.floor(defaultLockTime / 2), + maxStalledCount: 0, + }, }); } } diff --git a/src/shared/utils/time.ts b/src/shared/utils/time.ts index 40c35c6..9fcafbf 100644 --- a/src/shared/utils/time.ts +++ b/src/shared/utils/time.ts @@ -22,4 +22,4 @@ export const unixToDate = (value: number) => dayjs.unix(value).toDate(); export const startOfDayUnix = (date: Date) => dayjs(date).startOf('day').valueOf() / 1000; export const endOfDayUnix = (date: Date) => dayjs(date).endOf('day').valueOf() / 1000; -export const getTimeInFutureInSeconds = (minutes: number) => dayjs(new Date()).add(minutes, 'minutes').unix(); +export const getTimeInFutureInMinutes = (minutes: number) => dayjs(new Date()).add(minutes, 'minutes').unix();