Skip to content

Commit

Permalink
refactor: bullmq (#4261)
Browse files Browse the repository at this point in the history
* chore: update

* chore: update

* feat: support bull board using bullmq (#4259)

Co-authored-by: gqc <[email protected]>

* refactor: bull-board

* fix: case

* test: add more case

* docs: update

* docs: update

* docs: update

* fix: merge config

* fix: try to fix test

* fix: lint

---------

Co-authored-by: harperKKK <[email protected]>
Co-authored-by: gqc <[email protected]>
  • Loading branch information
3 people authored Jan 14, 2025
1 parent d9d8187 commit 1fbf7fb
Show file tree
Hide file tree
Showing 28 changed files with 1,881 additions and 211 deletions.
1 change: 0 additions & 1 deletion packages/bull-board/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import '@midwayjs/bull';
import { BullBoardOption } from './dist/index';
export * from './dist/index';

Expand Down
6 changes: 4 additions & 2 deletions packages/bull-board/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"midway",
"IoC",
"bull",
"bullmq",
"bull-ui",
"plugin"
],
Expand All @@ -27,12 +28,13 @@
"@midwayjs/core": "^3.19.0",
"@midwayjs/express": "^3.19.2",
"@midwayjs/koa": "^3.19.2",
"@midwayjs/mock": "^3.19.2"
"@midwayjs/mock": "^3.19.2",
"@midwayjs/bull": "^3.19.3",
"@midwayjs/bullmq": "^0.0.1"
},
"dependencies": {
"@bull-board/api": "5.23.0",
"@bull-board/ui": "5.23.0",
"@midwayjs/bull": "^3.19.3",
"ejs": "3.1.10"
},
"engines": {
Expand Down
34 changes: 30 additions & 4 deletions packages/bull-board/src/board.middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@ import {
Scope,
ScopeEnum,
MidwayFrameworkType,
ApplicationContext,
IMidwayContainer,
MidwayFrameworkService,
} from '@midwayjs/core';
import { extname } from 'path';
import * as bull from '@midwayjs/bull';
import { createBullBoard } from '@bull-board/api';
import { BullAdapter } from '@bull-board/api/bullAdapter';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { MidwayAdapter } from './adapter';
import { BullBoardOption } from './interface';
import { BullBoardManager } from './board.manager';
import type { Framework as BullFramework } from '@midwayjs/bull';
import type { Framework as BullMQFramework } from '@midwayjs/bullmq';

const MIME_MAP = {
'.html': 'text/html',
Expand All @@ -44,21 +49,42 @@ export class BoardMiddleware
implements IMiddleware<IMidwayContext, NextFunction, unknown>
{
@Inject()
protected framework: bull.Framework;
protected frameworkService: MidwayFrameworkService;

@Config('bullBoard')
protected bullBoardConfig: BullBoardOption;

@Inject()
protected bullBoardManager: BullBoardManager;

@ApplicationContext()
protected applicationContext: IMidwayContainer;

private basePath: string;
private serverAdapter: MidwayAdapter;

@Init()
protected async init() {
const queueList = this.framework.getQueueList();
const wrapQueues = queueList.map(queue => new BullAdapter(queue));
let framework: BullFramework | BullMQFramework =
this.frameworkService.getFramework('bull') as BullFramework;
if (!framework) {
framework = this.frameworkService.getFramework(
'bullmq'
) as BullMQFramework;
}

if (!framework) {
return;
}

const queueList = framework.getQueueList();
const wrapQueues = queueList.map(queue => {
if (this.applicationContext.hasNamespace('bull')) {
return new BullAdapter(queue);
} else if (this.applicationContext.hasNamespace('bullmq')) {
return new BullMQAdapter(queue);
}
});
this.basePath = this.bullBoardConfig.basePath;

this.serverAdapter = new MidwayAdapter();
Expand Down
12 changes: 8 additions & 4 deletions packages/bull-board/src/configuration.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as bull from '@midwayjs/bull';
import {
Configuration,
IMidwayContainer,
Inject,
MidwayApplicationManager,
MidwayConfigService,
Expand All @@ -9,7 +9,6 @@ import { BoardMiddleware } from './board.middleware';

@Configuration({
namespace: 'bull-board',
imports: [bull],
importConfigs: [
{
default: {
Expand All @@ -31,15 +30,20 @@ export class BullBoardConfiguration {
@Inject()
configService: MidwayConfigService;

async onReady() {
async onReady(container: IMidwayContainer) {
const apps = this.applicationManager.getApplications([
'express',
'egg',
'koa',
]);
if (apps.length) {
apps.forEach(app => {
app.useMiddleware(BoardMiddleware);
if (
container.hasNamespace('bull') ||
container.hasNamespace('bullmq')
) {
app.useMiddleware(BoardMiddleware);
}
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/bull-board/src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ export interface BullBoardOption {
basePath?: string;
uiConfig?: UIConfig;
adapterOptions?: QueueAdapterOptions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Configuration } from '@midwayjs/core';
import * as bullBoard from '../../../../src';
import * as bullmq from '@midwayjs/bullmq';
import * as koa from '@midwayjs/koa';

@Configuration({
imports: [
koa,
bullmq,
bullBoard,
],
importConfigs: [
{
default: {
keys: 123,
bullmq: {
defaultConnection: {
host: '127.0.0.1',
port: 6379,
}
},
},
},
],
})
export class AutoConfiguration {

async onReady(){
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { App, Inject } from '@midwayjs/core';
import { Processor, Application } from '../../../../../src';
import { Processor, Application } from '@midwayjs/bullmq';

@Processor('test')
export class QueueTask {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ import { Configuration } from '@midwayjs/core';
import * as bullBoard from '../../../../src';
import { join } from 'path'
import * as express from '@midwayjs/express';
import * as bull from '@midwayjs/bull';

@Configuration({
imports: [express, bullBoard],
imports: [express, bull, bullBoard],
importConfigs: [join(__dirname, 'config')]
})
export class AutoConfiguration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ import { Configuration } from '@midwayjs/core';
import * as bullBoard from '../../../../src';
import { join } from 'path'
import * as koa from '@midwayjs/koa';
import * as bull from '@midwayjs/bull';

@Configuration({
imports: [koa, bullBoard],
imports: [koa, bull, bullBoard],
importConfigs: [join(__dirname, 'config')]
})
export class AutoConfiguration {
Expand Down
22 changes: 22 additions & 0 deletions packages/bull-board/test/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { createApp, close, createHttpRequest, createLightApp } from '@midwayjs/mock';
import { join } from 'path';
import * as bullboard from '../src';
import * as bullmq from '@midwayjs/bullmq';

describe(`/test/index.test.ts`, () => {
it('test ui in koa', async () => {
Expand Down Expand Up @@ -74,4 +75,25 @@ describe(`/test/index.test.ts`, () => {

await close(app);
});

it('test using package bullmq', async () => {
const app = await createApp(join(__dirname, 'fixtures', 'base-app-bullmq'));

const bullFramework = app.getApplicationContext().get(bullmq.Framework);
const testQueue = bullFramework.getQueue('test');
await testQueue?.runJob({name: 'stone-jin'});
// page
let result = await createHttpRequest(app).get('/ui');
expect(result.status).toBe(200);
expect(result.text).toMatch(/doctype html/);
expect(result.headers['content-type']).toMatch(/text\/html/);

result = await createHttpRequest(app).get('/ui/api/queues?activeQueue=test&page=1&jobsPerPage=10');
expect(result.status).toBe(200);
expect(result.body.queues.length).toBe(1);
expect(result.body.queues[0].type).toBe('bullmq');
expect(result.headers['content-type']).toMatch('application/json');

await close(app);
});
});
1 change: 0 additions & 1 deletion packages/bullmq/CHANGELOG.md

This file was deleted.

16 changes: 2 additions & 14 deletions packages/bullmq/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,8 @@
import { ConnectionOptions } from 'bullmq';
import { BullMQConfig } from './dist/index';
export * from './dist/index';
export { Job } from 'bullmq';
import { IQueueOptions, IWorkerOptions } from './dist/index';

declare module '@midwayjs/core/dist/interface' {
// bullmq 新引入了 worker 作为执行任务的实例,一个队列 queue 和 worker 中 connection, prefix 必须一致才能正常执行
// 所以 config 中 connection, prefix 单独配置
// eslint-disable-next-line
interface MidwayConfig {
bullmq?: {
connection: ConnectionOptions;
prefix?: string;
defaultQueueOptions?: IQueueOptions;
defaultWorkerOptions?: IWorkerOptions;
clearRepeatJobWhenStart?: boolean;
contextLoggerFormat?: (info: any) => string;
};
bullmq?: BullMQConfig;
}
}
6 changes: 3 additions & 3 deletions packages/bullmq/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@midwayjs/bullmq",
"version": "1.0.0",
"description": "midway component for bullmq",
"version": "0.0.1",
"description": "midway component for BullMQ",
"main": "dist/index.js",
"typings": "index.d.ts",
"scripts": {
Expand All @@ -13,7 +13,7 @@
"midway",
"IoC",
"task",
"bullmq",
"BullMQ",
"plugin"
],
"author": "guo qicong",
Expand Down
6 changes: 4 additions & 2 deletions packages/bullmq/src/config/config.default.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
export const bullmq = {
prefix: '{midway-bullmq}',
import { BullMQConfig } from '../interface';

export const bullmq: BullMQConfig = {
defaultPrefix: '{midway-bullmq}',
defaultQueueOptions: {
defaultJobOptions: {
removeOnComplete: 3,
Expand Down
30 changes: 29 additions & 1 deletion packages/bullmq/src/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import {
} from '@midwayjs/core';
import * as DefaultConfig from './config/config.default';
import { BullMQFramework } from './framework';
import { BULLMQ_QUEUE_KEY } from './constants';
import {
BULLMQ_FLOW_PRODUCER_KEY,
BULLMQ_QUEUE_KEY,
BULLMQ_WORKER_KEY,
} from './constants';

@Configuration({
namespace: 'bullmq',
Expand Down Expand Up @@ -36,6 +40,30 @@ export class BullConfiguration {
return this.framework.getQueue(meta.queueName);
}
);

this.decoratorService.registerPropertyHandler(
BULLMQ_WORKER_KEY,
(
propertyName,
meta: {
queueName: string;
}
) => {
return this.framework.getWorker(meta.queueName);
}
);

this.decoratorService.registerPropertyHandler(
BULLMQ_FLOW_PRODUCER_KEY,
(
propertyName,
meta: {
producerName: string;
}
) => {
return this.framework.getFlowProducer(meta.producerName);
}
);
}

async onReady() {
Expand Down
2 changes: 2 additions & 0 deletions packages/bullmq/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// task
export const BULLMQ_QUEUE_KEY = 'bullmq:queue';
export const BULLMQ_PROCESSOR_KEY = 'bullmq:processor';
export const BULLMQ_WORKER_KEY = 'bullmq:worker';
export const BULLMQ_FLOW_PRODUCER_KEY = 'bullmq:flow-producer';
28 changes: 22 additions & 6 deletions packages/bullmq/src/decorator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@ import {
Scope,
ScopeEnum,
} from '@midwayjs/core';
import { IQueueOptions, IWorkerOptions } from './interface';
import { BULLMQ_PROCESSOR_KEY, BULLMQ_QUEUE_KEY } from './constants';
import { JobsOptions } from 'bullmq';
import {
BULLMQ_FLOW_PRODUCER_KEY,
BULLMQ_PROCESSOR_KEY,
BULLMQ_QUEUE_KEY,
BULLMQ_WORKER_KEY,
} from './constants';
import { QueueOptions, WorkerOptions, JobsOptions } from 'bullmq';

export function Processor(
queueName: string,
jobOptions?: JobsOptions,
workerOptions?: IWorkerOptions,
queueOptions?: IQueueOptions
workerOptions?: Partial<WorkerOptions>,
queueOptions?: Partial<QueueOptions>
): ClassDecorator {
return function (target: any) {
saveModule(BULLMQ_PROCESSOR_KEY, target);
Expand All @@ -23,8 +27,8 @@ export function Processor(
{
queueName,
jobOptions,
queueOptions,
workerOptions,
queueOptions,
},
target
);
Expand All @@ -38,3 +42,15 @@ export function InjectQueue(queueName: string): PropertyDecorator {
queueName,
});
}

export function InjectWorker(queueName: string): PropertyDecorator {
return createCustomPropertyDecorator(BULLMQ_WORKER_KEY, {
queueName,
});
}

export function InjectFlowProducer(producerName: string): PropertyDecorator {
return createCustomPropertyDecorator(BULLMQ_FLOW_PRODUCER_KEY, {
producerName,
});
}
Loading

0 comments on commit 1fbf7fb

Please sign in to comment.