diff --git a/lib/build/TaskRunner.js b/lib/build/TaskRunner.js index c4782c6f7..dc595705b 100644 --- a/lib/build/TaskRunner.js +++ b/lib/build/TaskRunner.js @@ -21,7 +21,7 @@ class TaskRunner { * @param {@ui5/project/build/ProjectBuilder~BuildConfiguration} parameters.buildConfig * Build configuration */ - constructor({graph, project, log, taskUtil, taskRepository, buildConfig}) { + constructor({graph, project, log, taskUtil, taskRepository, buildConfig, workDispatcher}) { if (!graph || !project || !log || !taskUtil || !taskRepository || !buildConfig) { throw new Error("TaskRunner: One or more mandatory parameters not provided"); } @@ -31,6 +31,7 @@ class TaskRunner { this._taskRepository = taskRepository; this._buildConfig = buildConfig; this._log = log; + this._workDispatcher = workDispatcher; this._directDependencies = new Set(this._taskUtil.getDependencies()); } @@ -198,7 +199,21 @@ class TaskRunner { } if (!taskFunction) { - taskFunction = (await this._taskRepository.getTask(taskName)).task; + const taskInfo = await this._taskRepository.getTask(taskName); + taskFunction = taskInfo.task; + if (taskInfo.processors) { + const workDispatcher = this._workDispatcher; + params.processors = { + execute: async function(processorName, params) { + const processorInfo = taskInfo.processors[processorName]; + if (!processorInfo) { + throw new Error(`Unknown processor ${processorName} requested by task ${taskName}`); + } + const processor = workDispatcher.getProcessor(processorInfo.path); + return await processor.execute(params); + } + } + } } return taskFunction(params); }; diff --git a/lib/build/helpers/BuildContext.js b/lib/build/helpers/BuildContext.js index f10aa072d..6c0b30014 100644 --- a/lib/build/helpers/BuildContext.js +++ b/lib/build/helpers/BuildContext.js @@ -1,4 +1,5 @@ import ProjectBuildContext from "./ProjectBuildContext.js"; +import WorkDispatcher from "./WorkDispatcher.js"; /** * Context of a build process @@ -7,6 +8,8 @@ import ProjectBuildContext from "./ProjectBuildContext.js"; * @memberof @ui5/project/build/helpers */ class BuildContext { + #workDispatcher = null; + constructor(graph, taskRepository, { // buildConfig selfContained = false, cssVariables = false, @@ -49,6 +52,8 @@ class BuildContext { cssVariables: cssVariables }; this._projectBuildContexts = []; + + this.#workDispatcher = WorkDispatcher.getInstance(this); } getRootProject() { @@ -71,6 +76,10 @@ class BuildContext { return this._graph; } + getWorkDispatcher() { + return this.#workDispatcher; + } + createProjectContext({project}) { const projectBuildContext = new ProjectBuildContext({ buildContext: this, @@ -84,6 +93,7 @@ class BuildContext { await Promise.all(this._projectBuildContexts.map((ctx) => { return ctx.executeCleanupTasks(force); })); + await this.#workDispatcher.cleanup(this, force); } } diff --git a/lib/build/helpers/ProjectBuildContext.js b/lib/build/helpers/ProjectBuildContext.js index 10eb2a67a..bc9112af7 100644 --- a/lib/build/helpers/ProjectBuildContext.js +++ b/lib/build/helpers/ProjectBuildContext.js @@ -114,7 +114,8 @@ class ProjectBuildContext { taskUtil: this.getTaskUtil(), graph: this._buildContext.getGraph(), taskRepository: this._buildContext.getTaskRepository(), - buildConfig: this._buildContext.getBuildConfig() + buildConfig: this._buildContext.getBuildConfig(), + workDispatcher: this._buildContext.getWorkDispatcher() }); } return this._taskRunner; diff --git a/lib/build/helpers/WorkDispatcher.js b/lib/build/helpers/WorkDispatcher.js new file mode 100644 index 000000000..4b51f1383 --- /dev/null +++ b/lib/build/helpers/WorkDispatcher.js @@ -0,0 +1,153 @@ +import workerpool from "workerpool"; +import os from "node:os"; +import {fileURLToPath} from "node:url"; +import {getLogger} from "@ui5/logger"; +import {serializeData, deserializeData, serializeResources, FsMainThreadInterface} from "./threadUtils.js"; +import {setTimeout as setTimeoutPromise} from "node:timers/promises"; + +const MIN_WORKERS = 2; +const MAX_WORKERS = 4; +const osCpus = os.cpus().length || 1; +const maxWorkers = Math.max(Math.min(osCpus - 1, MAX_WORKERS), MIN_WORKERS); + +export default class WorkDispatcher { + #log = getLogger("build:helpers:WorkDispatcher"); + #activeBuilds = new Set(); + #pool; + static #ensureSingleton = false; + static #instance; + + #getPool() { + if (!this.#pool) { + this.#log.verbose( + `Creating workerpool with up to ${maxWorkers} workers (available CPU cores: ${osCpus})` + ); + const workerPath = fileURLToPath( + new URL("./threadRunner.js", import.meta.url) + ); + this.#pool = workerpool.pool(workerPath, { + workerType: "auto", + maxWorkers, + }); + } + return this.#pool; + } + + constructor() { + if (!WorkDispatcher.#ensureSingleton) { + throw new Error( + "WorkDispatcher is a singleton class. Use WorkDispatcher.getInstance()" + ); + } + } + + static getInstance(buildRef) { + if (!buildRef) { + throw new Error(`A reference to the calling instance must be provided`); + } + if (!WorkDispatcher.#instance) { + WorkDispatcher.#ensureSingleton = true; + WorkDispatcher.#instance = new WorkDispatcher(); + WorkDispatcher.#ensureSingleton = false; + } + + WorkDispatcher.#instance.#registerActiveBuild(buildRef); + + return WorkDispatcher.#instance; + } + + getProcessor(modulePath) { + return { + execute: async ({resources, options, reader}) => { + const buildUpArgs = {modulePath, args: {options: await serializeData(options)}}; + let toTransfer; + let threadMessageHandler; + let fsInterfaceMainPort; + + if (reader) { + const {port1, port2} = new MessageChannel(); + fsInterfaceMainPort = port1; + buildUpArgs.args.fsInterfaceComPort = port2; + toTransfer = {transfer: [port2]}; + + threadMessageHandler = new FsMainThreadInterface(reader); + threadMessageHandler.startCommunication(fsInterfaceMainPort); + } + + if (resources) { + buildUpArgs.args.resources = await serializeResources(resources); + } + + const result = await this.#getPool().exec("execProcessor", [buildUpArgs], toTransfer); + + if (reader) { + threadMessageHandler.endCommunication(fsInterfaceMainPort); + } + + return deserializeData(result); + } + }; + } + + async cleanup(buildRef, force) { + const attemptPoolTermination = async () => { + if (this.#activeBuilds.size && !force) { + this.#log.verbose( + `Pool termination canceled. There are still ${this.#activeBuilds.size} active builds` + ); + return; + } + + this.#log.verbose(`Attempting to terminate the workerpool...`); + + if (!this.#pool) { + this.#log.verbose( + "Pool termination requested, but a pool has not been initialized or has already been terminated." + ); + return; + } + + // There are many stats that could be used, but these ones seem the most + // convenient. When all the (available) workers are idle, then it's safe to terminate. + // There are many stats that could be used, but these ones seem the most + // convenient. When all the (available) workers are idle, then it's safe to terminate. + let {idleWorkers, totalWorkers} = this.#pool.stats(); + while (idleWorkers !== totalWorkers && !force) { + await setTimeoutPromise(100); // Wait a bit workers to finish and try again + ({idleWorkers, totalWorkers} = this.#pool.stats()); + } + + return await this.terminateTasks(force); + }; + + if (!buildRef) { + throw new Error(`A reference to the calling instance must be provided`); + } + if (!this.#activeBuilds.has(buildRef)) { + throw new Error(`The provided build reference is unknown`); + } + this.#activeBuilds.delete(buildRef); + + return await attemptPoolTermination(); + } + + async terminateTasks(force) { + if (!this.#pool) { + this.#log.verbose( + "Pool termination requested, but a pool has not been initialized or has already been terminated"); + return; + } + + this.#activeBuilds = []; + const pool = this.#pool; + this.#pool = null; + return await pool.terminate(force); + } + + #registerActiveBuild(instanceRef) { + if (this.#activeBuilds.has(instanceRef)) { + throw new Error(`Build already registered in Work Dispatcher. This should never happen`); + } + this.#activeBuilds.add(instanceRef); + } +} diff --git a/lib/build/helpers/threadRunner.js b/lib/build/helpers/threadRunner.js new file mode 100644 index 000000000..cc5cf05d9 --- /dev/null +++ b/lib/build/helpers/threadRunner.js @@ -0,0 +1,40 @@ +import workerpool from "workerpool"; +import {FsWorkerThreadInterface, deserializeResources, serializeData, deserializeData} from "./threadUtils.js"; +import {getLogger} from "@ui5/logger"; +import {createResource} from "@ui5/fs/resourceFactory"; + +export default async function execProcessor({modulePath, args}) { + const {default: moduleToExecute} = await import(modulePath); + if (!moduleToExecute) { + throw new Error(`No default export for module ${modulePath}`); + } + const methodCall = moduleToExecute; + const {options, resources, fsInterfaceComPort} = args; + + const buildUpArgs = { + options: await deserializeData(options), + resourceFactory: {createResource}, + log: getLogger(`builder:processor:${modulePath}`) + }; + + if (resources) { + buildUpArgs.resources = await deserializeResources(resources); + } + if (fsInterfaceComPort) { + buildUpArgs.fs = new FsWorkerThreadInterface(fsInterfaceComPort); + } + + const result = await methodCall(buildUpArgs); + + return serializeData(result); +} + +// Test execution via ava is never done on the main thread +/* istanbul ignore else */ +if (!workerpool.isMainThread) { + // Script got loaded through workerpool + // => Create a worker and register public functions + workerpool.worker({ + execProcessor, + }); +} diff --git a/lib/build/helpers/threadUtils.js b/lib/build/helpers/threadUtils.js new file mode 100644 index 000000000..3ce2e1906 --- /dev/null +++ b/lib/build/helpers/threadUtils.js @@ -0,0 +1,298 @@ +import {Buffer} from "node:buffer"; +import Resource from "@ui5/fs/Resource"; +import {createResource} from "@ui5/fs/resourceFactory"; +import fsInterface from "@ui5/fs/fsInterface"; + +/** + * Casts @ui5/fs/Resource-s into an Uint8Array transferable object + * + * @param {@ui5/fs/Resource[]} resourceCollection + * @returns {Promise} + */ +export async function serializeResources(resourceCollection) { + return Promise.all( + resourceCollection.map(async (res) => ({ + buffer: await res.getBuffer(), + path: res.getPath() + })) + ); +} + +/** + * Casts Uint8Array into @ui5/fs/Resource-s transferable object + * + * @param {Promise} resources + * @returns {@ui5/fs/Resource[]} + */ +export function deserializeResources(resources) { + return resources.map((res) => { + // res.buffer is an Uint8Array object and needs to be cast + // to a Buffer in order to be read correctly. + return createResource({path: res.path, buffer: Buffer.from(res.buffer)}); + }); +} + +function isPojo(obj) { + const proto = Object.prototype; + const gpo = Object.getPrototypeOf; + + if (obj === null || typeof obj !== "object") { + return false; + } + return gpo(obj) === proto; +} + +function isFsResourceLikeTransfer(input) { + return isPojo(input) && + input["buffer"] && (Buffer.isBuffer(input.buffer) || ArrayBuffer.isView(input.buffer)) && + input["path"] && typeof input["path"] === "string"; +} + +export async function serializeData(input) { + if (Array.isArray(input) || isPojo(input)) { + for (const prop in input) { + if (Object.hasOwn(input, prop)) { + input[prop] = await serializeData(input[prop]); + } + } + } else if (input instanceof Resource) { + return (await serializeResources([input]))[0]; + } + + return input; +} + +export async function deserializeData(input) { + // Resource like transferrable object that could be converted to a @ui5/fs/Resource + if (isFsResourceLikeTransfer(input)) { + return (await deserializeResources([input]))[0]; + } else if (Array.isArray(input) || isPojo(input)) { + for (const prop in input) { + if (Object.hasOwn(input, prop)) { + input[prop] = await deserializeData(input[prop]); + } + } + } + + return input; +} + +class AbstractMain { + #comPorts = new Set(); + #reader = null; + #fs = null; + #cache = Object.create(null); + + /** + * Constructor + * + * @param {object} reader + */ + constructor(reader) { + if (!reader) { + throw new Error("reader is mandatory argument"); + } + + this.#reader = reader; + this.#fs = fsInterface(reader); + } + + getFs() { + return this.#fs; + } + + getReader() { + return this.#reader; + } + + /** + * Adds MessagePort and starts listening for requests on it. + * + * @param {MessagePort} comPort port1 from a {code}MessageChannel{/code} + */ + startCommunication(comPort) { + if (!comPort) { + throw new Error("Communication channel is mandatory argument"); + } + + this.#comPorts.add(comPort); + comPort.on("message", (e) => this.#onMessage(e, comPort)); + comPort.on("close", () => comPort.close()); + } + + /** + * Ends MessagePort communication. + * + * @param {MessagePort} comPort port1 to remove from handling. + */ + endCommunication(comPort) { + comPort.close(); + this.#comPorts.delete(comPort); + } + + /** + * Destroys the FsMainThreadInterface + */ + cleanup() { + this.#comPorts.forEach((comPort) => comPort.close()); + this.#cache = null; + this.#reader = null; + } + + /** + * Handles messages from the MessagePort + * + * @param {object} e data to construct the request + * @param {string} e.action Action to perform. Corresponds to the names of + * the public methods of "@ui5/fs/fsInterface" + * @param {string} e.fsPath Path of the Resource + * @param {object} e.options Options for "readFile" action + * @param {MessagePort} comPort The communication channel + */ + async #onMessage(e, comPort) { + const {action, args, key: cacheKey} = e; + + if (!this._cache[cacheKey]) { + this._cache[cacheKey] = this.get(action, args); + } + + const fromCache = await this._cache[cacheKey]; + comPort.postMessage({action, key: cacheKey, ...fromCache}); + } + + get(method) { + throw new Error(`${method} method's handler has to be implemented`); + } +} + +class AbstractThread { + #comPort = null; + #callbacks = []; + #cache = Object.create(null); + + /** + * Constructor + * + * @param {MessagePort} comPort Communication port + */ + constructor(comPort) { + if (!comPort) { + throw new Error("Communication port is mandatory argument"); + } + + this.#comPort = comPort; + comPort.on("message", this.#onMessage.bind(this)); + comPort.on("close", this.#onClose.bind(this)); + } + + /** + * Handles messages from MessagePort + * + * @param {object} e + * @param {string} e.action Action to perform. Corresponds to the names of + * the public methods of "@ui5/fs/fsInterface" + * @param {string} e.fsPath Path of the Resource + * @param {*} e.result Response from the "action". + * @param {object} e.error Error from the "action". + */ + #onMessage(e) { + const cbObject = this.#callbacks.find((cb) => cb.key === e.key); + + if (cbObject) { + this.#cache[e.key] = { + error: e.error, + result: e.result, + }; + this.#callbacks.splice(this.#callbacks.indexOf(cbObject), 1); + cbObject.callback(e.error, e.result); + } else { + throw new Error( + "No callback found for this message! Possible hang for the thread!", + e + ); + } + } + + /** + * End communication + */ + #onClose() { + this.#comPort.close(); + this.#cache = null; + } + + /** + * Makes a request via the MessagePort + * + * @param {object} parameters + * @param {string} parameters.action Action to perform. Corresponds to the names of + * the public methods. + * @param {string} parameters.key + * @param {object} parameters.args + * @param {Function} callback Callback to call when the "action" is executed and ready. + */ + _doRequest({action, key, args}, callback) { + // fsPath, options + if (this.#cache[key]) { + const {result, error} = this.#cache[key]; + callback(error, result); + } else { + this.#callbacks.push({key, callback}); + this.#comPort.postMessage({action, key, args}); + } + } +} + +/** + * "@ui5/fs/fsInterface" like class that uses internally + * "@ui5/fs/fsInterface", implements its methods, and + * sends the results to a MessagePort. + * + * Used in the main thread in a combination with FsWorkerThreadInterface. + */ +export class FsMainThreadInterface extends AbstractMain { + constructor(fsInterfaceComPort) { + super(fsInterfaceComPort); + } + + #parseResults(method, result) { + // Stats object cannot be sent over postMessage. + // Cast it to simple object that is alike the stats. + if (method === "stat" && !!result) { + return JSON.parse(JSON.stringify(result)); + } else { + return result; + } + } + + get(method, args) { + const {fsPath, options} = args; + const composedArgs = [fsPath, options].filter(($) => $ !== undefined); + + return new Promise((resolve) => { + this.getFs()[method](...composedArgs, (error, result) => { + resolve({error, result: this.#parseResults(method, result)}); + }); + }); + } +} + +/** + * "@ui5/fs/fsInterface" like class that uses internally + * "@ui5/fs/fsInterface", implements its methods, and + * requests resources via MessagePort. + * + * Used in the worker thread in a combination with FsMainThreadInterface. + */ +export class FsWorkerThreadInterface extends AbstractThread { + readFile(fsPath, options, callback) { + const key = `${fsPath}-readFile`; + this._doRequest({action: "readFile", key, args: {fsPath, options}}, callback); + } + + stat(fsPath, callback) { + const key = `${fsPath}-stat`; + this._doRequest({action: "stat", key, args: {fsPath}}, callback); + } +} +