feat: DENO_SERVE_ADDRESS duplicate option (#29109)

if `duplicate` is passed in DENO_SERVE_ADDRESS, use its value to create
a second listener instead of overriding the options the user passed.
This commit is contained in:
snek 2025-04-30 16:04:19 +02:00 committed by GitHub
parent 8773b5f5b0
commit 78758f257b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 218 additions and 86 deletions

View file

@ -40,6 +40,7 @@ const {
PromisePrototypeCatch, PromisePrototypeCatch,
SafeArrayIterator, SafeArrayIterator,
SafePromisePrototypeFinally, SafePromisePrototypeFinally,
SafePromiseAll,
PromisePrototypeThen, PromisePrototypeThen,
StringPrototypeIncludes, StringPrototypeIncludes,
Symbol, Symbol,
@ -787,43 +788,88 @@ function serve(arg1, arg2) {
options = { __proto__: null }; options = { __proto__: null };
} }
const { 0: overrideKind, 1: overrideHost, 2: overridePort } = const {
op_http_serve_address_override(); 0: overrideKind,
switch (overrideKind) { 1: overrideHost,
case 1: { 2: overridePort,
// TCP 3: duplicateListener,
options = { } = op_http_serve_address_override();
...options, if (overrideKind) {
hostname: overrideHost, let envOptions = duplicateListener ? { __proto__: null } : options;
port: overridePort,
}; switch (overrideKind) {
delete options.path; case 1: {
delete options.cid; // TCP
break; envOptions = {
...envOptions,
hostname: overrideHost,
port: overridePort,
};
delete envOptions.path;
delete envOptions.cid;
break;
}
case 2: {
// Unix
envOptions = {
...envOptions,
path: overrideHost,
};
delete envOptions.hostname;
delete envOptions.cid;
delete envOptions.port;
break;
}
case 3: {
// Vsock
envOptions = {
...envOptions,
cid: Number(overrideHost),
port: overridePort,
};
delete envOptions.hostname;
delete envOptions.path;
break;
}
} }
case 2: {
// Unix if (duplicateListener) {
options = { envOptions.onListen = () => {
...options, // override default console.log behavior
path: overrideHost,
}; };
delete options.hostname; const envListener = serveInner(envOptions, handler);
delete options.port; const userListener = serveInner(options, handler);
break;
} return {
case 3: { addr: userListener.addr,
// Vsock finished: SafePromiseAll([envListener.finished, userListener.finished]),
options = { shutdown() {
...options, return SafePromiseAll([
cid: Number(overrideHost), envListener.shutdown(),
port: overridePort, userListener.shutdown(),
]);
},
ref() {
envListener.ref();
userListener.ref();
},
unref() {
envListener.unref();
userListener.unref();
},
[SymbolAsyncDispose]() {
return this.shutdown();
},
}; };
delete options.hostname;
delete options.path;
break;
} }
options = envOptions;
} }
return serveInner(options, handler);
}
function serveInner(options, handler) {
const wantsHttps = hasTlsKeyPairOptions(options); const wantsHttps = hasTlsKeyPairOptions(options);
const wantsUnix = ObjectHasOwn(options, "path"); const wantsUnix = ObjectHasOwn(options, "path");
const wantsVsock = ObjectHasOwn(options, "cid"); const wantsVsock = ObjectHasOwn(options, "cid");
@ -1071,41 +1117,52 @@ internals.serveHttpOnListener = serveHttpOnListener;
internals.serveHttpOnConnection = serveHttpOnConnection; internals.serveHttpOnConnection = serveHttpOnConnection;
function registerDeclarativeServer(exports) { function registerDeclarativeServer(exports) {
if (ObjectHasOwn(exports, "fetch")) { if (!ObjectHasOwn(exports, "fetch")) return;
if (typeof exports.fetch !== "function") {
throw new TypeError(
"Invalid type for fetch: must be a function with a single or no parameter",
);
}
return ({ servePort, serveHost, serveIsMain, serveWorkerCount }) => {
Deno.serve({
port: servePort,
hostname: serveHost,
[kLoadBalanced]: (serveIsMain && serveWorkerCount > 1) ||
serveWorkerCount !== null,
onListen: ({ port, hostname }) => {
if (serveIsMain) {
const nThreads = serveWorkerCount > 1
? ` with ${serveWorkerCount} threads`
: "";
const host = formatHostName(hostname);
import.meta.log( if (typeof exports.fetch !== "function") {
"info", throw new TypeError("Invalid type for fetch: must be a function");
`%cdeno serve%c: Listening on %chttp://${host}:${port}/%c${nThreads}`,
"color: green",
"color: inherit",
"color: yellow",
"color: inherit",
);
}
},
handler: (req, connInfo) => {
return exports.fetch(req, connInfo);
},
});
};
} }
return ({ servePort, serveHost, serveIsMain, serveWorkerCount }) => {
Deno.serve({
port: servePort,
hostname: serveHost,
[kLoadBalanced]: (serveIsMain && serveWorkerCount > 1) ||
serveWorkerCount !== null,
onListen: ({ transport, port, hostname, path, cid }) => {
if (serveIsMain) {
const nThreads = serveWorkerCount > 1
? ` with ${serveWorkerCount} threads`
: "";
let target;
switch (transport) {
case "tcp":
target = `http://${formatHostName(hostname)}:${port}/`;
break;
case "unix":
target = path;
break;
case "vsock":
target = `vsock:${cid}:${port}`;
break;
}
import.meta.log(
"info",
`%cdeno serve%c: Listening on %c${target}%c${nThreads}`,
"color: green",
"color: inherit",
"color: yellow",
"color: inherit",
);
}
},
handler: (req, connInfo) => {
return exports.fetch(req, connInfo);
},
});
};
} }
export { export {

View file

@ -1693,14 +1693,18 @@ fn extract_network_stream<U: CanDowncastUpgrade>(
#[op2] #[op2]
#[serde] #[serde]
pub fn op_http_serve_address_override() -> (u8, String, u32) { pub fn op_http_serve_address_override() -> (u8, String, u32, bool) {
match std::env::var("DENO_SERVE_ADDRESS") { match std::env::var("DENO_SERVE_ADDRESS") {
Ok(val) => parse_serve_address(&val), Ok(val) => parse_serve_address(&val),
Err(_) => (0, String::new(), 0), Err(_) => (0, String::new(), 0, false),
} }
} }
fn parse_serve_address(input: &str) -> (u8, String, u32) { fn parse_serve_address(input: &str) -> (u8, String, u32, bool) {
let (input, duplicate) = match input.strip_prefix("duplicate,") {
Some(input) => (input, true),
None => (input, false),
};
match input.split_once(':') { match input.split_once(':') {
Some(("tcp", addr)) => { Some(("tcp", addr)) => {
// TCP address // TCP address
@ -1710,11 +1714,11 @@ fn parse_serve_address(input: &str) -> (u8, String, u32) {
SocketAddr::V4(v4) => v4.ip().to_string(), SocketAddr::V4(v4) => v4.ip().to_string(),
SocketAddr::V6(v6) => format!("[{}]", v6.ip()), SocketAddr::V6(v6) => format!("[{}]", v6.ip()),
}; };
(1, hostname, addr.port() as u32) (1, hostname, addr.port() as u32, duplicate)
} }
Err(_) => { Err(_) => {
log::error!("DENO_SERVE_ADDRESS: invalid TCP address: {}", addr); log::error!("DENO_SERVE_ADDRESS: invalid TCP address: {}", addr);
(0, String::new(), 0) (0, String::new(), 0, false)
} }
} }
} }
@ -1722,9 +1726,9 @@ fn parse_serve_address(input: &str) -> (u8, String, u32) {
// Unix socket path // Unix socket path
if addr.is_empty() { if addr.is_empty() {
log::error!("DENO_SERVE_ADDRESS: empty unix socket path"); log::error!("DENO_SERVE_ADDRESS: empty unix socket path");
return (0, String::new(), 0); return (0, String::new(), 0, duplicate);
} }
(2, addr.to_string(), 0) (2, addr.to_string(), 0, duplicate)
} }
Some(("vsock", addr)) => { Some(("vsock", addr)) => {
// Vsock address // Vsock address
@ -1737,7 +1741,7 @@ fn parse_serve_address(input: &str) -> (u8, String, u32) {
Ok(cid) => cid.to_string(), Ok(cid) => cid.to_string(),
Err(_) => { Err(_) => {
log::error!("DENO_SERVE_ADDRESS: invalid vsock CID: {}", cid); log::error!("DENO_SERVE_ADDRESS: invalid vsock CID: {}", cid);
return (0, String::new(), 0); return (0, String::new(), 0, false);
} }
} }
}; };
@ -1745,17 +1749,17 @@ fn parse_serve_address(input: &str) -> (u8, String, u32) {
Ok(port) => port, Ok(port) => port,
Err(_) => { Err(_) => {
log::error!("DENO_SERVE_ADDRESS: invalid vsock port: {}", port); log::error!("DENO_SERVE_ADDRESS: invalid vsock port: {}", port);
return (0, String::new(), 0); return (0, String::new(), 0, false);
} }
}; };
(3, cid, port) (3, cid, port, duplicate)
} }
None => (0, String::new(), 0), None => (0, String::new(), 0, false),
} }
} }
Some((_, _)) | None => { Some((_, _)) | None => {
log::error!("DENO_SERVE_ADDRESS: invalid address format: {}", input); log::error!("DENO_SERVE_ADDRESS: invalid address format: {}", input);
(0, String::new(), 0) (0, String::new(), 0, false)
} }
} }
} }
@ -1768,31 +1772,43 @@ mod tests {
fn test_parse_serve_address() { fn test_parse_serve_address() {
assert_eq!( assert_eq!(
parse_serve_address("tcp:127.0.0.1:8080"), parse_serve_address("tcp:127.0.0.1:8080"),
(1, "127.0.0.1".to_string(), 8080) (1, "127.0.0.1".to_string(), 8080, false)
); );
assert_eq!( assert_eq!(
parse_serve_address("tcp:[::1]:9000"), parse_serve_address("tcp:[::1]:9000"),
(1, "[::1]".to_string(), 9000) (1, "[::1]".to_string(), 9000, false)
);
assert_eq!(
parse_serve_address("duplicate,tcp:[::1]:9000"),
(1, "[::1]".to_string(), 9000, true)
); );
assert_eq!( assert_eq!(
parse_serve_address("unix:/var/run/socket.sock"), parse_serve_address("unix:/var/run/socket.sock"),
(2, "/var/run/socket.sock".to_string(), 0) (2, "/var/run/socket.sock".to_string(), 0, false)
);
assert_eq!(
parse_serve_address("duplicate,unix:/var/run/socket.sock"),
(2, "/var/run/socket.sock".to_string(), 0, true)
); );
assert_eq!( assert_eq!(
parse_serve_address("vsock:1234:5678"), parse_serve_address("vsock:1234:5678"),
(3, "1234".to_string(), 5678) (3, "1234".to_string(), 5678, false)
); );
assert_eq!( assert_eq!(
parse_serve_address("vsock:-1:5678"), parse_serve_address("vsock:-1:5678"),
(3, "-1".to_string(), 5678) (3, "-1".to_string(), 5678, false)
);
assert_eq!(
parse_serve_address("duplicate,vsock:-1:5678"),
(3, "-1".to_string(), 5678, true)
); );
assert_eq!(parse_serve_address("tcp:"), (0, String::new(), 0)); assert_eq!(parse_serve_address("tcp:"), (0, String::new(), 0, false));
assert_eq!(parse_serve_address("unix:"), (0, String::new(), 0)); assert_eq!(parse_serve_address("unix:"), (0, String::new(), 0, false));
assert_eq!(parse_serve_address("vsock:"), (0, String::new(), 0)); assert_eq!(parse_serve_address("vsock:"), (0, String::new(), 0, false));
assert_eq!(parse_serve_address("foo:"), (0, String::new(), 0)); assert_eq!(parse_serve_address("foo:"), (0, String::new(), 0, false));
assert_eq!(parse_serve_address("bar"), (0, String::new(), 0)); assert_eq!(parse_serve_address("bar"), (0, String::new(), 0, false));
} }
} }

View file

@ -340,3 +340,62 @@ async fn deno_run_serve_with_unix_socket_from_env() {
child.kill().unwrap(); child.kill().unwrap();
child.wait().unwrap(); child.wait().unwrap();
} }
#[tokio::test]
#[cfg(unix)]
async fn deno_run_serve_with_duplicate_env_addr() {
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
let dir = tempfile::TempDir::new().unwrap();
let sock = dir.path().join("listen.sock");
let mut child = util::deno_cmd()
.current_dir(util::testdata_path())
.arg("run")
.arg("--allow-net")
.arg(format!("--allow-read={}", sock.display()))
.arg(format!("--allow-write={}", sock.display()))
.arg("./serve/run_serve.ts")
.env(
"DENO_SERVE_ADDRESS",
format!("duplicate,unix:{}", sock.display()),
)
.stderr_piped()
.spawn()
.unwrap();
let stderr = BufReader::new(child.stderr.as_mut().unwrap());
let msg = stderr.lines().next().unwrap().unwrap();
let port_regex = Regex::new(r"https?:[^:]+:(\d+)").unwrap();
let port = port_regex.captures(&msg).unwrap().get(1).unwrap().as_str();
{
let client = reqwest::Client::builder().build().unwrap();
let res = client
.get(format!("http://127.0.0.1:{port}"))
.send()
.await
.unwrap();
assert_eq!(200, res.status());
let body = res.text().await.unwrap();
assert_eq!(body, "Deno.serve() works!");
}
{
// reqwest does not support connecting to unix sockets yet, so here we send the http
// payload directly
let mut conn = UnixStream::connect(dir.path().join("listen.sock"))
.await
.unwrap();
conn.write_all(b"GET / HTTP/1.0\r\n\r\n").await.unwrap();
let mut response = String::new();
conn.read_to_string(&mut response).await.unwrap();
assert!(response.ends_with("\r\nDeno.serve() works!"));
}
child.kill().unwrap();
child.wait().unwrap();
}