diff --git a/ext/http/00_serve.ts b/ext/http/00_serve.ts index cde00ba558..ab07fac807 100644 --- a/ext/http/00_serve.ts +++ b/ext/http/00_serve.ts @@ -40,6 +40,7 @@ const { PromisePrototypeCatch, SafeArrayIterator, SafePromisePrototypeFinally, + SafePromiseAll, PromisePrototypeThen, StringPrototypeIncludes, Symbol, @@ -787,43 +788,88 @@ function serve(arg1, arg2) { options = { __proto__: null }; } - const { 0: overrideKind, 1: overrideHost, 2: overridePort } = - op_http_serve_address_override(); - switch (overrideKind) { - case 1: { - // TCP - options = { - ...options, - hostname: overrideHost, - port: overridePort, - }; - delete options.path; - delete options.cid; - break; + const { + 0: overrideKind, + 1: overrideHost, + 2: overridePort, + 3: duplicateListener, + } = op_http_serve_address_override(); + if (overrideKind) { + let envOptions = duplicateListener ? { __proto__: null } : options; + + switch (overrideKind) { + case 1: { + // TCP + envOptions = { + ...envOptions, + hostname: overrideHost, + port: overridePort, + }; + delete envOptions.path; + delete envOptions.cid; + break; + } + case 2: { + // Unix + envOptions = { + ...envOptions, + path: overrideHost, + }; + delete envOptions.hostname; + delete envOptions.cid; + delete envOptions.port; + break; + } + case 3: { + // Vsock + envOptions = { + ...envOptions, + cid: Number(overrideHost), + port: overridePort, + }; + delete envOptions.hostname; + delete envOptions.path; + break; + } } - case 2: { - // Unix - options = { - ...options, - path: overrideHost, + + if (duplicateListener) { + envOptions.onListen = () => { + // override default console.log behavior }; - delete options.hostname; - delete options.port; - break; - } - case 3: { - // Vsock - options = { - ...options, - cid: Number(overrideHost), - port: overridePort, + const envListener = serveInner(envOptions, handler); + const userListener = serveInner(options, handler); + + return { + addr: userListener.addr, + finished: SafePromiseAll([envListener.finished, userListener.finished]), + shutdown() { + return SafePromiseAll([ + envListener.shutdown(), + userListener.shutdown(), + ]); + }, + ref() { + envListener.ref(); + userListener.ref(); + }, + unref() { + envListener.unref(); + userListener.unref(); + }, + [SymbolAsyncDispose]() { + return this.shutdown(); + }, }; - delete options.hostname; - delete options.path; - break; } + + options = envOptions; } + return serveInner(options, handler); +} + +function serveInner(options, handler) { const wantsHttps = hasTlsKeyPairOptions(options); const wantsUnix = ObjectHasOwn(options, "path"); const wantsVsock = ObjectHasOwn(options, "cid"); @@ -1071,41 +1117,52 @@ internals.serveHttpOnListener = serveHttpOnListener; internals.serveHttpOnConnection = serveHttpOnConnection; function registerDeclarativeServer(exports) { - if (ObjectHasOwn(exports, "fetch")) { - if (typeof exports.fetch !== "function") { - throw new TypeError( - "Invalid type for fetch: must be a function with a single or no parameter", - ); - } - return ({ servePort, serveHost, serveIsMain, serveWorkerCount }) => { - Deno.serve({ - port: servePort, - hostname: serveHost, - [kLoadBalanced]: (serveIsMain && serveWorkerCount > 1) || - serveWorkerCount !== null, - onListen: ({ port, hostname }) => { - if (serveIsMain) { - const nThreads = serveWorkerCount > 1 - ? ` with ${serveWorkerCount} threads` - : ""; - const host = formatHostName(hostname); + if (!ObjectHasOwn(exports, "fetch")) return; - import.meta.log( - "info", - `%cdeno serve%c: Listening on %chttp://${host}:${port}/%c${nThreads}`, - "color: green", - "color: inherit", - "color: yellow", - "color: inherit", - ); - } - }, - handler: (req, connInfo) => { - return exports.fetch(req, connInfo); - }, - }); - }; + if (typeof exports.fetch !== "function") { + throw new TypeError("Invalid type for fetch: must be a function"); } + + return ({ servePort, serveHost, serveIsMain, serveWorkerCount }) => { + Deno.serve({ + port: servePort, + hostname: serveHost, + [kLoadBalanced]: (serveIsMain && serveWorkerCount > 1) || + serveWorkerCount !== null, + onListen: ({ transport, port, hostname, path, cid }) => { + if (serveIsMain) { + const nThreads = serveWorkerCount > 1 + ? ` with ${serveWorkerCount} threads` + : ""; + + let target; + switch (transport) { + case "tcp": + target = `http://${formatHostName(hostname)}:${port}/`; + break; + case "unix": + target = path; + break; + case "vsock": + target = `vsock:${cid}:${port}`; + break; + } + + import.meta.log( + "info", + `%cdeno serve%c: Listening on %c${target}%c${nThreads}`, + "color: green", + "color: inherit", + "color: yellow", + "color: inherit", + ); + } + }, + handler: (req, connInfo) => { + return exports.fetch(req, connInfo); + }, + }); + }; } export { diff --git a/ext/http/lib.rs b/ext/http/lib.rs index f1f41c410c..df789a88a3 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -1693,14 +1693,18 @@ fn extract_network_stream( #[op2] #[serde] -pub fn op_http_serve_address_override() -> (u8, String, u32) { +pub fn op_http_serve_address_override() -> (u8, String, u32, bool) { match std::env::var("DENO_SERVE_ADDRESS") { Ok(val) => parse_serve_address(&val), - Err(_) => (0, String::new(), 0), + Err(_) => (0, String::new(), 0, false), } } -fn parse_serve_address(input: &str) -> (u8, String, u32) { +fn parse_serve_address(input: &str) -> (u8, String, u32, bool) { + let (input, duplicate) = match input.strip_prefix("duplicate,") { + Some(input) => (input, true), + None => (input, false), + }; match input.split_once(':') { Some(("tcp", addr)) => { // TCP address @@ -1710,11 +1714,11 @@ fn parse_serve_address(input: &str) -> (u8, String, u32) { SocketAddr::V4(v4) => v4.ip().to_string(), SocketAddr::V6(v6) => format!("[{}]", v6.ip()), }; - (1, hostname, addr.port() as u32) + (1, hostname, addr.port() as u32, duplicate) } Err(_) => { log::error!("DENO_SERVE_ADDRESS: invalid TCP address: {}", addr); - (0, String::new(), 0) + (0, String::new(), 0, false) } } } @@ -1722,9 +1726,9 @@ fn parse_serve_address(input: &str) -> (u8, String, u32) { // Unix socket path if addr.is_empty() { log::error!("DENO_SERVE_ADDRESS: empty unix socket path"); - return (0, String::new(), 0); + return (0, String::new(), 0, duplicate); } - (2, addr.to_string(), 0) + (2, addr.to_string(), 0, duplicate) } Some(("vsock", addr)) => { // Vsock address @@ -1737,7 +1741,7 @@ fn parse_serve_address(input: &str) -> (u8, String, u32) { Ok(cid) => cid.to_string(), Err(_) => { log::error!("DENO_SERVE_ADDRESS: invalid vsock CID: {}", cid); - return (0, String::new(), 0); + return (0, String::new(), 0, false); } } }; @@ -1745,17 +1749,17 @@ fn parse_serve_address(input: &str) -> (u8, String, u32) { Ok(port) => port, Err(_) => { log::error!("DENO_SERVE_ADDRESS: invalid vsock port: {}", port); - return (0, String::new(), 0); + return (0, String::new(), 0, false); } }; - (3, cid, port) + (3, cid, port, duplicate) } - None => (0, String::new(), 0), + None => (0, String::new(), 0, false), } } Some((_, _)) | None => { log::error!("DENO_SERVE_ADDRESS: invalid address format: {}", input); - (0, String::new(), 0) + (0, String::new(), 0, false) } } } @@ -1768,31 +1772,43 @@ mod tests { fn test_parse_serve_address() { assert_eq!( parse_serve_address("tcp:127.0.0.1:8080"), - (1, "127.0.0.1".to_string(), 8080) + (1, "127.0.0.1".to_string(), 8080, false) ); assert_eq!( parse_serve_address("tcp:[::1]:9000"), - (1, "[::1]".to_string(), 9000) + (1, "[::1]".to_string(), 9000, false) + ); + assert_eq!( + parse_serve_address("duplicate,tcp:[::1]:9000"), + (1, "[::1]".to_string(), 9000, true) ); assert_eq!( parse_serve_address("unix:/var/run/socket.sock"), - (2, "/var/run/socket.sock".to_string(), 0) + (2, "/var/run/socket.sock".to_string(), 0, false) + ); + assert_eq!( + parse_serve_address("duplicate,unix:/var/run/socket.sock"), + (2, "/var/run/socket.sock".to_string(), 0, true) ); assert_eq!( parse_serve_address("vsock:1234:5678"), - (3, "1234".to_string(), 5678) + (3, "1234".to_string(), 5678, false) ); assert_eq!( parse_serve_address("vsock:-1:5678"), - (3, "-1".to_string(), 5678) + (3, "-1".to_string(), 5678, false) + ); + assert_eq!( + parse_serve_address("duplicate,vsock:-1:5678"), + (3, "-1".to_string(), 5678, true) ); - assert_eq!(parse_serve_address("tcp:"), (0, String::new(), 0)); - assert_eq!(parse_serve_address("unix:"), (0, String::new(), 0)); - assert_eq!(parse_serve_address("vsock:"), (0, String::new(), 0)); - assert_eq!(parse_serve_address("foo:"), (0, String::new(), 0)); - assert_eq!(parse_serve_address("bar"), (0, String::new(), 0)); + assert_eq!(parse_serve_address("tcp:"), (0, String::new(), 0, false)); + assert_eq!(parse_serve_address("unix:"), (0, String::new(), 0, false)); + assert_eq!(parse_serve_address("vsock:"), (0, String::new(), 0, false)); + assert_eq!(parse_serve_address("foo:"), (0, String::new(), 0, false)); + assert_eq!(parse_serve_address("bar"), (0, String::new(), 0, false)); } } diff --git a/tests/integration/serve_tests.rs b/tests/integration/serve_tests.rs index 8d03289f5f..15bffe9d80 100644 --- a/tests/integration/serve_tests.rs +++ b/tests/integration/serve_tests.rs @@ -340,3 +340,62 @@ async fn deno_run_serve_with_unix_socket_from_env() { child.kill().unwrap(); child.wait().unwrap(); } + +#[tokio::test] +#[cfg(unix)] +async fn deno_run_serve_with_duplicate_env_addr() { + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + use tokio::net::UnixStream; + + let dir = tempfile::TempDir::new().unwrap(); + let sock = dir.path().join("listen.sock"); + let mut child = util::deno_cmd() + .current_dir(util::testdata_path()) + .arg("run") + .arg("--allow-net") + .arg(format!("--allow-read={}", sock.display())) + .arg(format!("--allow-write={}", sock.display())) + .arg("./serve/run_serve.ts") + .env( + "DENO_SERVE_ADDRESS", + format!("duplicate,unix:{}", sock.display()), + ) + .stderr_piped() + .spawn() + .unwrap(); + let stderr = BufReader::new(child.stderr.as_mut().unwrap()); + let msg = stderr.lines().next().unwrap().unwrap(); + + let port_regex = Regex::new(r"https?:[^:]+:(\d+)").unwrap(); + let port = port_regex.captures(&msg).unwrap().get(1).unwrap().as_str(); + + { + let client = reqwest::Client::builder().build().unwrap(); + + let res = client + .get(format!("http://127.0.0.1:{port}")) + .send() + .await + .unwrap(); + assert_eq!(200, res.status()); + + let body = res.text().await.unwrap(); + assert_eq!(body, "Deno.serve() works!"); + } + + { + // reqwest does not support connecting to unix sockets yet, so here we send the http + // payload directly + let mut conn = UnixStream::connect(dir.path().join("listen.sock")) + .await + .unwrap(); + conn.write_all(b"GET / HTTP/1.0\r\n\r\n").await.unwrap(); + let mut response = String::new(); + conn.read_to_string(&mut response).await.unwrap(); + assert!(response.ends_with("\r\nDeno.serve() works!")); + } + + child.kill().unwrap(); + child.wait().unwrap(); +}