refactor(ext/node): migrate back to using "Deno.serve" API for HTTP server (#18865)

This commit fixes "node:http" API to properly handle "upgrade"
requests and thus marking Vite work again.

This is done by migrating back to "Deno.serve()" and internal
"upgradeHttpRaw" APIs for "node:http" module polyfill.
This commit is contained in:
Bartek Iwańczuk 2023-04-27 12:45:13 +02:00 committed by GitHub
parent 90a5ef5e34
commit 1e331a4873
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1,6 +1,7 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import { TextEncoder } from "ext:deno_web/08_text_encoding.js"; import { TextEncoder } from "ext:deno_web/08_text_encoding.js";
import { type Deferred, deferred } from "ext:deno_node/_util/async.ts";
import { _normalizeArgs, ListenOptions, Socket } from "ext:deno_node/net.ts"; import { _normalizeArgs, ListenOptions, Socket } from "ext:deno_node/net.ts";
import { Buffer } from "ext:deno_node/buffer.ts"; import { Buffer } from "ext:deno_node/buffer.ts";
import { ERR_SERVER_NOT_RUNNING } from "ext:deno_node/internal/errors.ts"; import { ERR_SERVER_NOT_RUNNING } from "ext:deno_node/internal/errors.ts";
@ -16,9 +17,8 @@ import { Agent } from "ext:deno_node/_http_agent.mjs";
import { chunkExpression as RE_TE_CHUNKED } from "ext:deno_node/_http_common.ts"; import { chunkExpression as RE_TE_CHUNKED } from "ext:deno_node/_http_common.ts";
import { urlToHttpOptions } from "ext:deno_node/internal/url.ts"; import { urlToHttpOptions } from "ext:deno_node/internal/url.ts";
import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts"; import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts";
import { upgradeHttpRaw } from "ext:deno_http/00_serve.js";
import * as httpRuntime from "ext:runtime/40_http.js";
import { connResetException } from "ext:deno_node/internal/errors.ts"; import { connResetException } from "ext:deno_node/internal/errors.ts";
import { serve, upgradeHttpRaw } from "ext:deno_http/00_serve.js";
enum STATUS_CODES { enum STATUS_CODES {
/** RFC 7231, 6.2.1 */ /** RFC 7231, 6.2.1 */
@ -427,7 +427,7 @@ export class ServerResponse extends NodeWritable {
finished = false; finished = false;
headersSent = false; headersSent = false;
#firstChunk: Chunk | null = null; #firstChunk: Chunk | null = null;
#reqEvent?: Deno.RequestEvent; #resolve: (value: Response | PromiseLike<Response>) => void;
static #enqueue(controller: ReadableStreamDefaultController, chunk: Chunk) { static #enqueue(controller: ReadableStreamDefaultController, chunk: Chunk) {
if (typeof chunk === "string") { if (typeof chunk === "string") {
@ -443,7 +443,7 @@ export class ServerResponse extends NodeWritable {
return status === 101 || status === 204 || status === 205 || status === 304; return status === 101 || status === 204 || status === 205 || status === 304;
} }
constructor(reqEvent: undefined | Deno.RequestEvent) { constructor(resolve: (value: Response | PromiseLike<Response>) => void) {
let controller: ReadableByteStreamController; let controller: ReadableByteStreamController;
const readable = new ReadableStream({ const readable = new ReadableStream({
start(c) { start(c) {
@ -485,7 +485,7 @@ export class ServerResponse extends NodeWritable {
}, },
}); });
this.#readable = readable; this.#readable = readable;
this.#reqEvent = reqEvent; this.#resolve = resolve;
} }
setHeader(name: string, value: string) { setHeader(name: string, value: string) {
@ -536,16 +536,13 @@ export class ServerResponse extends NodeWritable {
if (ServerResponse.#bodyShouldBeNull(this.statusCode!)) { if (ServerResponse.#bodyShouldBeNull(this.statusCode!)) {
body = null; body = null;
} }
this.#reqEvent!.respondWith( this.#resolve(
new Response(body, { new Response(body, {
headers: this.#headers, headers: this.#headers,
status: this.statusCode, status: this.statusCode,
statusText: this.statusMessage, statusText: this.statusMessage,
}), }),
).catch(() => { );
// TODO(bartlomieju): this error should be handled somehow
// ignore this error
});
} }
// deno-lint-ignore no-explicit-any // deno-lint-ignore no-explicit-any
@ -577,7 +574,7 @@ export class IncomingMessageForServer extends NodeReadable {
// These properties are used by `npm:forwarded` for example. // These properties are used by `npm:forwarded` for example.
socket: { remoteAddress: string; remotePort: number }; socket: { remoteAddress: string; remotePort: number };
constructor(req: Request, conn: Deno.Conn) { constructor(req: Request, remoteAddr: { hostname: string; port: number }) {
// Check if no body (GET/HEAD/OPTIONS/...) // Check if no body (GET/HEAD/OPTIONS/...)
const reader = req.body?.getReader(); const reader = req.body?.getReader();
super({ super({
@ -605,8 +602,8 @@ export class IncomingMessageForServer extends NodeReadable {
this.url = req.url?.slice(req.url.indexOf("/", 8)); this.url = req.url?.slice(req.url.indexOf("/", 8));
this.method = req.method; this.method = req.method;
this.socket = { this.socket = {
remoteAddress: conn.remoteAddr.hostname, remoteAddress: remoteAddr.hostname,
remotePort: conn.remoteAddr.port, remotePort: remoteAddr.port,
}; };
this.#req = req; this.#req = req;
} }
@ -648,10 +645,17 @@ export function Server(handler?: ServerHandler): ServerImpl {
class ServerImpl extends EventEmitter { class ServerImpl extends EventEmitter {
#httpConnections: Set<Deno.HttpConn> = new Set(); #httpConnections: Set<Deno.HttpConn> = new Set();
#listener?: Deno.Listener; #listener?: Deno.Listener;
#addr: Deno.NetAddr;
#hasClosed = false;
#ac?: AbortController;
#servePromise: Deferred<void>;
listening = false; listening = false;
constructor(handler?: ServerHandler) { constructor(handler?: ServerHandler) {
super(); super();
this.#servePromise = deferred();
this.#servePromise.then(() => this.emit("close"));
if (handler !== undefined) { if (handler !== undefined) {
this.on("request", handler); this.on("request", handler);
} }
@ -676,70 +680,52 @@ class ServerImpl extends EventEmitter {
// TODO(bnoordhuis) Node prefers [::] when host is omitted, // TODO(bnoordhuis) Node prefers [::] when host is omitted,
// we on the other hand default to 0.0.0.0. // we on the other hand default to 0.0.0.0.
const hostname = options.host ?? "0.0.0.0";
this.#addr = {
hostname,
port,
} as Deno.NetAddr;
this.listening = true; this.listening = true;
const hostname = options.host ?? ""; nextTick(() => this.#serve());
this.#listener = Deno.listen({ port, hostname });
nextTick(() => this.#listenLoop());
return this; return this;
} }
async #listenLoop() { #serve() {
const go = async (tcpConn: Deno.Conn, httpConn: Deno.HttpConn) => { const ac = new AbortController();
try { const handler = (request: Request, info: Deno.ServeHandlerInfo) => {
for (;;) { const req = new IncomingMessageForServer(request, info.remoteAddr);
let reqEvent = null; if (req.upgrade && this.listenerCount("upgrade") > 0) {
try { const { conn, response } = upgradeHttpRaw(request);
// Note: httpConn.nextRequest() calls httpConn.close() on error. const socket = new Socket({
reqEvent = await httpConn.nextRequest(); handle: new TCP(constants.SERVER, conn),
} catch { });
// Connection closed. this.emit("upgrade", req, socket, Buffer.from([]));
// TODO(bnoordhuis) Emit "clientError" event on the http.Server return response;
// instance? Node emits it when request parsing fails and expects } else {
// the listener to send a raw 4xx HTTP response on the underlying return new Promise<Response>((resolve): void => {
// net.Socket but we don't have one to pass to the listener. const res = new ServerResponse(resolve);
} this.emit("request", req, res);
if (reqEvent === null) { });
break;
}
const req = new IncomingMessageForServer(reqEvent.request, tcpConn);
if (req.upgrade && this.listenerCount("upgrade") > 0) {
const conn = await upgradeHttpRaw(
reqEvent.request,
tcpConn,
) as Deno.Conn;
const socket = new Socket({
handle: new TCP(constants.SERVER, conn),
});
this.emit("upgrade", req, socket, Buffer.from([]));
return;
} else {
const res = new ServerResponse(reqEvent);
this.emit("request", req, res);
}
}
} finally {
this.#httpConnections.delete(httpConn);
} }
}; };
const listener = this.#listener; if (this.#hasClosed) {
return;
if (listener !== undefined) {
this.emit("listening");
for await (const conn of listener) {
let httpConn: Deno.HttpConn;
try {
httpConn = httpRuntime.serveHttp(conn);
} catch {
continue; /// Connection closed.
}
this.#httpConnections.add(httpConn);
go(conn, httpConn);
}
} }
this.#ac = ac;
serve(
{
handler: handler as Deno.ServeHandler,
...this.#addr,
signal: ac.signal,
// @ts-ignore Might be any without `--unstable` flag
onListen: ({ port }) => {
this.#addr!.port = port;
this.emit("listening");
},
},
).then(() => this.#servePromise!.resolve());
} }
setTimeout() { setTimeout() {
@ -750,6 +736,7 @@ class ServerImpl extends EventEmitter {
const listening = this.listening; const listening = this.listening;
this.listening = false; this.listening = false;
this.#hasClosed = true;
if (typeof cb === "function") { if (typeof cb === "function") {
if (listening) { if (listening) {
this.once("close", cb); this.once("close", cb);
@ -760,31 +747,20 @@ class ServerImpl extends EventEmitter {
} }
} }
nextTick(() => this.emit("close")); if (listening && this.#ac) {
this.#ac.abort();
if (listening) { this.#ac = undefined;
this.#listener!.close(); } else {
this.#listener = undefined; this.#servePromise!.resolve();
for (const httpConn of this.#httpConnections) {
try {
httpConn.close();
} catch {
// Already closed.
}
}
this.#httpConnections.clear();
} }
return this; return this;
} }
address() { address() {
const addr = this.#listener!.addr as Deno.NetAddr;
return { return {
port: addr.port, port: this.#addr.port,
address: addr.hostname, address: this.#addr.hostname,
}; };
} }
} }