Skip to content

Commit

Permalink
refactor(dev): small refactors (#3003)
Browse files Browse the repository at this point in the history
  • Loading branch information
pi0 authored Jan 15, 2025
1 parent b38da2c commit 749f035
Showing 1 changed file with 106 additions and 97 deletions.
203 changes: 106 additions & 97 deletions src/core/dev-server/server.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import { accessSync, existsSync, promises as fsp } from "node:fs";
import { writeFile } from "node:fs/promises";
import type {
Nitro,
NitroBuildInfo,
NitroDevServer,
NitroWorker,
} from "nitropack/types";
import type { IncomingMessage, OutgoingMessage } from "node:http";
import type { Duplex } from "node:stream";
import type { TLSSocket } from "node:tls";
import { accessSync, existsSync } from "node:fs";
import { writeFile, rm } from "node:fs/promises";
import { resolve, join } from "pathe";
import { Worker } from "node:worker_threads";
import { type FSWatcher, watch } from "chokidar";
import {
Expand All @@ -15,15 +22,8 @@ import {
toNodeListener,
} from "h3";
import { type ProxyServerOptions, createProxyServer } from "httpxy";
import { type Listener, listen } from "listhen";
import { type Listener, listen as listhen } from "listhen";
import { version as nitroVersion } from "nitropack/meta";
import type {
Nitro,
NitroBuildInfo,
NitroDevServer,
NitroWorker,
} from "nitropack/types";
import { resolve, dirname } from "pathe";
import { debounce } from "perfect-debounce";
import { servePlaceholder } from "serve-placeholder";
import serveStatic from "serve-static";
Expand All @@ -34,78 +34,11 @@ import defaultErrorHandler, {
} from "../../runtime/internal/error/dev";
import { createVFSHandler } from "./vfs";

function initWorker(filename: string): Promise<NitroWorker> | undefined {
if (!existsSync(filename)) {
return;
}
return new Promise((resolve, reject) => {
const worker = new Worker(filename, {
env: {
...process.env,
NITRO_DEV_WORKER_DIR: dirname(filename),
},
});
worker.once("exit", (code) => {
reject(
new Error(
code ? "[worker] exited with code: " + code : "[worker] exited"
)
);
});
worker.once("error", async (error) => {
reject(error);
});
const addressListener = (event: any) => {
if (!event || !event?.address) {
return;
}
worker.off("message", addressListener);
resolve({
worker,
address: event.address,
} as NitroWorker);
};
worker.on("message", addressListener);
});
}

async function killWorker(worker: NitroWorker | undefined, nitro: Nitro) {
if (!worker) {
return;
}
if (worker.worker) {
worker.worker.postMessage({ event: "shutdown" });
const gracefulShutdownTimeout =
Number.parseInt(process.env.NITRO_SHUTDOWN_TIMEOUT || "", 10) || 3;
await new Promise<void>((resolve) => {
const timeout = setTimeout(() => {
nitro.logger.warn(
`[nitro] [dev] Force closing worker after ${gracefulShutdownTimeout} seconds...`
);
resolve();
}, gracefulShutdownTimeout * 1000);
worker.worker?.once("message", (message) => {
if (message.event === "exit") {
clearTimeout(timeout);
resolve();
}
});
});
worker.worker.removeAllListeners();
await worker.worker.terminate();
worker.worker = null;
}
if (worker.address.socketPath && existsSync(worker.address.socketPath)) {
await fsp.rm(worker.address.socketPath).catch(() => {});
}
}

export function createDevServer(nitro: Nitro): NitroDevServer {
// Worker
const workerEntry = resolve(
// Workerdir
const workerDir = resolve(
nitro.options.output.dir,
nitro.options.output.serverDir,
"index.mjs"
nitro.options.output.serverDir
);

// Error handler
Expand All @@ -115,17 +48,20 @@ export function createDevServer(nitro: Nitro): NitroDevServer {
let reloadPromise: Promise<void> | undefined;

let currentWorker: NitroWorker | undefined;
async function _reload() {

const reloadWorker = async () => {
// Kill old worker
const oldWorker = currentWorker;
currentWorker = undefined;
await killWorker(oldWorker, nitro);

// Create a new worker
currentWorker = await initWorker(workerEntry);
currentWorker = await initWorker(workerDir);
if (!currentWorker) {
return;
}
// Write nitro.json

// Write {buildDir}/nitro.json
const buildInfoPath = resolve(nitro.options.buildDir, "nitro.json");
const buildInfo: NitroBuildInfo = {
date: new Date().toJSON(),
Expand All @@ -140,9 +76,10 @@ export function createDevServer(nitro: Nitro): NitroDevServer {
},
};
await writeFile(buildInfoPath, JSON.stringify(buildInfo, null, 2));
}
const reload = debounce(() => {
reloadPromise = _reload()
};

const reloadWorkerDebounced = debounce(() => {
reloadPromise = reloadWorker()
.then(() => {
lastError = undefined;
})
Expand All @@ -156,7 +93,8 @@ export function createDevServer(nitro: Nitro): NitroDevServer {
});
return reloadPromise;
});
nitro.hooks.hook("dev:reload", reload);

nitro.hooks.hook("dev:reload", reloadWorkerDebounced);

// App
const app = createApp();
Expand All @@ -165,6 +103,7 @@ export function createDevServer(nitro: Nitro): NitroDevServer {
for (const handler of nitro.options.devHandlers) {
app.use(handler.route || "/", handler.handler);
}

// Debugging endpoint to view vfs
app.use("/_vfs", createVFSHandler(nitro));

Expand Down Expand Up @@ -198,10 +137,8 @@ export function createDevServer(nitro: Nitro): NitroDevServer {
// Main worker proxy
const proxy = createProxy();
proxy.proxy.on("proxyReq", (proxyReq, req) => {
// TODO: Use httpxy to set these headers
if (!proxyReq.hasHeader("x-forwarded-for")) {
const address = req.socket.remoteAddress;
// #2197
if (address) {
proxyReq.appendHeader("x-forwarded-for", address);
}
Expand Down Expand Up @@ -236,6 +173,7 @@ export function createDevServer(nitro: Nitro): NitroDevServer {
return address;
};

// Main handler
app.use(
eventHandler(async (event) => {
await reloadPromise;
Expand Down Expand Up @@ -265,7 +203,7 @@ export function createDevServer(nitro: Nitro): NitroDevServer {
})
);

// Upgrade handler
// WebSocket handler
const upgrade = (
req: IncomingMessage,
socket: OutgoingMessage<IncomingMessage> | Duplex,
Expand All @@ -285,8 +223,8 @@ export function createDevServer(nitro: Nitro): NitroDevServer {

// Listen
let listeners: Listener[] = [];
const _listen: NitroDevServer["listen"] = async (port, opts?) => {
const listener = await listen(toNodeListener(app), { port, ...opts });
const listen: NitroDevServer["listen"] = async (port, opts?) => {
const listener = await listhen(toNodeListener(app), { port, ...opts });
listener.server.on("upgrade", (req, sock, head) => {
upgrade(req, sock, head);
});
Expand All @@ -298,30 +236,101 @@ export function createDevServer(nitro: Nitro): NitroDevServer {
let watcher: FSWatcher | undefined;
if (nitro.options.devServer.watch.length > 0) {
watcher = watch(nitro.options.devServer.watch, nitro.options.watchOptions);
watcher.on("add", reload).on("change", reload);
watcher
.on("add", reloadWorkerDebounced)
.on("change", reloadWorkerDebounced);
}

// Close handler
async function close() {
const close = async () => {
if (watcher) {
await watcher.close();
}
await killWorker(currentWorker, nitro);
await Promise.all(listeners.map((l) => l.close()));
listeners = [];
}
};
nitro.hooks.hook("close", close);

return {
reload,
listen: _listen,
reload: reloadWorkerDebounced,
listen,
app,
close,
watcher,
upgrade,
};
}

function initWorker(workerDir: string): Promise<NitroWorker> | undefined {
const workerEntry = join(workerDir, "index.mjs");

if (!existsSync(workerEntry)) {
return;
}

return new Promise((resolve, reject) => {
const worker = new Worker(workerEntry, {
env: {
...process.env,
NITRO_DEV_WORKER_DIR: workerDir,
},
});
worker.once("exit", (code) => {
reject(
new Error(
code ? "[worker] exited with code: " + code : "[worker] exited"
)
);
});
worker.once("error", async (error) => {
reject(error);
});
const addressListener = (event: any) => {
if (!event || !event?.address) {
return;
}
worker.off("message", addressListener);
resolve({
worker,
address: event.address,
} as NitroWorker);
};
worker.on("message", addressListener);
});
}

async function killWorker(worker: NitroWorker | undefined, nitro: Nitro) {
if (!worker) {
return;
}
if (worker.worker) {
worker.worker.postMessage({ event: "shutdown" });
const gracefulShutdownTimeout =
Number.parseInt(process.env.NITRO_SHUTDOWN_TIMEOUT || "", 10) || 3;
await new Promise<void>((resolve) => {
const timeout = setTimeout(() => {
nitro.logger.warn(
`[nitro] [dev] Force closing worker after ${gracefulShutdownTimeout} seconds...`
);
resolve();
}, gracefulShutdownTimeout * 1000);
worker.worker?.once("message", (message) => {
if (message.event === "exit") {
clearTimeout(timeout);
resolve();
}
});
});
worker.worker.removeAllListeners();
await worker.worker.terminate();
worker.worker = null;
}
if (worker.address.socketPath) {
await rm(worker.address.socketPath).catch(() => {});
}
}

function createProxy(defaults: ProxyServerOptions = {}) {
const proxy = createProxyServer(defaults);
const handle = async (event: H3Event, opts: ProxyServerOptions = {}) => {
Expand Down

0 comments on commit 749f035

Please sign in to comment.