Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC] Introduce Task Workers #685

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions lib/build/TaskRunner.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
* @hideconstructor
*/
class TaskRunner {
/**

Check failure on line 12 in lib/build/TaskRunner.js

View workflow job for this annotation

GitHub Actions / General checks, tests and coverage reporting

Missing JSDoc @param "parameters.workDispatcher" declaration
* Constructor
*
* @param {object} parameters

Check failure on line 15 in lib/build/TaskRunner.js

View workflow job for this annotation

GitHub Actions / General checks, tests and coverage reporting

Missing @param "parameters.workDispatcher"
* @param {object} parameters.graph
* @param {object} parameters.project
* @param {@ui5/logger/loggers/ProjectBuild} parameters.log Logger to use
Expand All @@ -21,7 +21,7 @@
* @param {@ui5/project/build/ProjectBuilder~BuildConfiguration} parameters.buildConfig
* Build configuration
*/
constructor({graph, project, log, taskUtil, taskRepository, buildConfig}) {
constructor({graph, project, log, taskUtil, taskRepository, buildConfig, workDispatcher}) {
if (!graph || !project || !log || !taskUtil || !taskRepository || !buildConfig) {
throw new Error("TaskRunner: One or more mandatory parameters not provided");
}
Expand All @@ -31,6 +31,7 @@
this._taskRepository = taskRepository;
this._buildConfig = buildConfig;
this._log = log;
this._workDispatcher = workDispatcher;

this._directDependencies = new Set(this._taskUtil.getDependencies());
}
Expand Down Expand Up @@ -198,7 +199,21 @@
}

if (!taskFunction) {
taskFunction = (await this._taskRepository.getTask(taskName)).task;
const taskInfo = await this._taskRepository.getTask(taskName);
taskFunction = taskInfo.task;
if (taskInfo.processors) {
const workDispatcher = this._workDispatcher;
params.processors = {
execute: async function(processorName, params) {
const processorInfo = taskInfo.processors[processorName];
if (!processorInfo) {
throw new Error(`Unknown processor ${processorName} requested by task ${taskName}`);
}
const processor = workDispatcher.getProcessor(processorInfo.path);
return await processor.execute(params);
}
}

Check failure on line 215 in lib/build/TaskRunner.js

View workflow job for this annotation

GitHub Actions / General checks, tests and coverage reporting

Missing semicolon
}
}
return taskFunction(params);
};
Expand Down
10 changes: 10 additions & 0 deletions lib/build/helpers/BuildContext.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import ProjectBuildContext from "./ProjectBuildContext.js";
import WorkDispatcher from "./WorkDispatcher.js";

/**
* Context of a build process
Expand All @@ -7,6 +8,8 @@ import ProjectBuildContext from "./ProjectBuildContext.js";
* @memberof @ui5/project/build/helpers
*/
class BuildContext {
#workDispatcher = null;

constructor(graph, taskRepository, { // buildConfig
selfContained = false,
cssVariables = false,
Expand Down Expand Up @@ -49,6 +52,8 @@ class BuildContext {
cssVariables: cssVariables
};
this._projectBuildContexts = [];

this.#workDispatcher = WorkDispatcher.getInstance(this);
}

getRootProject() {
Expand All @@ -71,6 +76,10 @@ class BuildContext {
return this._graph;
}

getWorkDispatcher() {
return this.#workDispatcher;
}

createProjectContext({project}) {
const projectBuildContext = new ProjectBuildContext({
buildContext: this,
Expand All @@ -84,6 +93,7 @@ class BuildContext {
await Promise.all(this._projectBuildContexts.map((ctx) => {
return ctx.executeCleanupTasks(force);
}));
await this.#workDispatcher.cleanup(this, force);
}
}

Expand Down
3 changes: 2 additions & 1 deletion lib/build/helpers/ProjectBuildContext.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class ProjectBuildContext {
taskUtil: this.getTaskUtil(),
graph: this._buildContext.getGraph(),
taskRepository: this._buildContext.getTaskRepository(),
buildConfig: this._buildContext.getBuildConfig()
buildConfig: this._buildContext.getBuildConfig(),
workDispatcher: this._buildContext.getWorkDispatcher()
});
}
return this._taskRunner;
Expand Down
153 changes: 153 additions & 0 deletions lib/build/helpers/WorkDispatcher.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import workerpool from "workerpool";
import os from "node:os";
import {fileURLToPath} from "node:url";
import {getLogger} from "@ui5/logger";
import {serializeData, deserializeData, serializeResources, FsMainThreadInterface} from "./threadUtils.js";
import {setTimeout as setTimeoutPromise} from "node:timers/promises";

const MIN_WORKERS = 2;
const MAX_WORKERS = 4;
const osCpus = os.cpus().length || 1;
const maxWorkers = Math.max(Math.min(osCpus - 1, MAX_WORKERS), MIN_WORKERS);

export default class WorkDispatcher {
#log = getLogger("build:helpers:WorkDispatcher");
#activeBuilds = new Set();
#pool;
static #ensureSingleton = false;
static #instance;

#getPool() {
if (!this.#pool) {
this.#log.verbose(
`Creating workerpool with up to ${maxWorkers} workers (available CPU cores: ${osCpus})`
);
const workerPath = fileURLToPath(
new URL("./threadRunner.js", import.meta.url)
);
this.#pool = workerpool.pool(workerPath, {
workerType: "auto",
maxWorkers,
});
}
return this.#pool;
}

constructor() {
if (!WorkDispatcher.#ensureSingleton) {
throw new Error(
"WorkDispatcher is a singleton class. Use WorkDispatcher.getInstance()"
);
}
}

static getInstance(buildRef) {
if (!buildRef) {
throw new Error(`A reference to the calling instance must be provided`);
}
if (!WorkDispatcher.#instance) {
WorkDispatcher.#ensureSingleton = true;
WorkDispatcher.#instance = new WorkDispatcher();
WorkDispatcher.#ensureSingleton = false;
}

WorkDispatcher.#instance.#registerActiveBuild(buildRef);

return WorkDispatcher.#instance;
}

getProcessor(modulePath) {
return {
execute: async ({resources, options, reader}) => {
const buildUpArgs = {modulePath, args: {options: await serializeData(options)}};
let toTransfer;
let threadMessageHandler;
let fsInterfaceMainPort;

if (reader) {
const {port1, port2} = new MessageChannel();
fsInterfaceMainPort = port1;
buildUpArgs.args.fsInterfaceComPort = port2;
toTransfer = {transfer: [port2]};

threadMessageHandler = new FsMainThreadInterface(reader);
threadMessageHandler.startCommunication(fsInterfaceMainPort);
}

if (resources) {
buildUpArgs.args.resources = await serializeResources(resources);
}

const result = await this.#getPool().exec("execProcessor", [buildUpArgs], toTransfer);

if (reader) {
threadMessageHandler.endCommunication(fsInterfaceMainPort);
}

return deserializeData(result);
}
};
}

async cleanup(buildRef, force) {
const attemptPoolTermination = async () => {
if (this.#activeBuilds.size && !force) {
this.#log.verbose(
`Pool termination canceled. There are still ${this.#activeBuilds.size} active builds`
);
return;
}

this.#log.verbose(`Attempting to terminate the workerpool...`);

if (!this.#pool) {
this.#log.verbose(
"Pool termination requested, but a pool has not been initialized or has already been terminated."
);
return;
}

// There are many stats that could be used, but these ones seem the most
// convenient. When all the (available) workers are idle, then it's safe to terminate.
// There are many stats that could be used, but these ones seem the most
// convenient. When all the (available) workers are idle, then it's safe to terminate.
let {idleWorkers, totalWorkers} = this.#pool.stats();
while (idleWorkers !== totalWorkers && !force) {
await setTimeoutPromise(100); // Wait a bit workers to finish and try again
({idleWorkers, totalWorkers} = this.#pool.stats());
}

return await this.terminateTasks(force);
};

if (!buildRef) {
throw new Error(`A reference to the calling instance must be provided`);
}
if (!this.#activeBuilds.has(buildRef)) {
throw new Error(`The provided build reference is unknown`);
}
this.#activeBuilds.delete(buildRef);

return await attemptPoolTermination();
}

async terminateTasks(force) {
if (!this.#pool) {
this.#log.verbose(
"Pool termination requested, but a pool has not been initialized or has already been terminated");
return;
}

this.#activeBuilds = [];
const pool = this.#pool;
this.#pool = null;
return await pool.terminate(force);
}

#registerActiveBuild(instanceRef) {
if (this.#activeBuilds.has(instanceRef)) {
throw new Error(`Build already registered in Work Dispatcher. This should never happen`);
}
this.#activeBuilds.add(instanceRef);
}
}
40 changes: 40 additions & 0 deletions lib/build/helpers/threadRunner.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import workerpool from "workerpool";
import {FsWorkerThreadInterface, deserializeResources, serializeData, deserializeData} from "./threadUtils.js";
import {getLogger} from "@ui5/logger";
import {createResource} from "@ui5/fs/resourceFactory";

export default async function execProcessor({modulePath, args}) {
const {default: moduleToExecute} = await import(modulePath);
if (!moduleToExecute) {
throw new Error(`No default export for module ${modulePath}`);
}
const methodCall = moduleToExecute;
const {options, resources, fsInterfaceComPort} = args;

const buildUpArgs = {
options: await deserializeData(options),
resourceFactory: {createResource},
log: getLogger(`builder:processor:${modulePath}`)
};

if (resources) {
buildUpArgs.resources = await deserializeResources(resources);
}
if (fsInterfaceComPort) {
buildUpArgs.fs = new FsWorkerThreadInterface(fsInterfaceComPort);
}

const result = await methodCall(buildUpArgs);

return serializeData(result);
}

// Test execution via ava is never done on the main thread
/* istanbul ignore else */
if (!workerpool.isMainThread) {
// Script got loaded through workerpool
// => Create a worker and register public functions
workerpool.worker({
execProcessor,
});
}
Loading
Loading