feat(ext/websocket): websockets over http2 (#21040)

Implements `WebSocket` over http/2. This requires a conformant http/2
server supporting the extended connect protocol.

Passes approximately 100 new WPT tests (mostly `?wpt_flags=h2` versions
of existing websockets APIs).

This is implemented as a fallback when http/1.1 fails, so a server that
supports both h1 and h2 WebSockets will still end up on the http/1.1
upgrade path.

The patch also cleas up the websockets handshake to split it up into
http, https+http1 and https+http2, making it a little less intertwined.

This uncovered a likely bug in the WPT test server:
https://github.com/web-platform-tests/wpt/issues/42896
This commit is contained in:
Matt Mastracci 2023-11-01 15:11:01 -06:00 committed by GitHub
parent 587f2e0800
commit 42c426e769
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 632 additions and 237 deletions

View file

@ -1,16 +1,19 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use crate::stream::WebSocketStream;
use bytes::Bytes;
use deno_core::anyhow::bail;
use deno_core::error::invalid_hostname;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::TryFutureExt;
use deno_core::op2;
use deno_core::unsync::spawn;
use deno_core::url;
use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::RcRef;
@ -19,13 +22,16 @@ use deno_core::ResourceId;
use deno_core::ToJsBuffer;
use deno_net::raw::NetworkStream;
use deno_tls::create_client_config;
use deno_tls::rustls::ClientConfig;
use deno_tls::RootCertStoreProvider;
use deno_tls::SocketUse;
use http::header::CONNECTION;
use http::header::UPGRADE;
use http::HeaderName;
use http::HeaderValue;
use http::Method;
use http::Request;
use http::StatusCode;
use http::Uri;
use hyper::Body;
use once_cell::sync::Lazy;
@ -146,31 +152,231 @@ pub struct CreateResponse {
extensions: String,
}
async fn handshake<S: AsyncRead + AsyncWrite + Send + Unpin + 'static>(
cancel_resource: Option<Rc<CancelHandle>>,
async fn handshake_websocket(
state: &Rc<RefCell<OpState>>,
uri: &Uri,
protocols: &str,
headers: Option<Vec<(ByteString, ByteString)>>,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
let mut request = Request::builder().method(Method::GET).uri(
uri
.path_and_query()
.ok_or(type_error("Missing path in url".to_string()))?
.as_str(),
);
let authority = uri.authority().unwrap().as_str();
let host = authority
.find('@')
.map(|idx| authority.split_at(idx + 1).1)
.unwrap_or_else(|| authority);
request = request
.header("Host", host)
.header(UPGRADE, "websocket")
.header(CONNECTION, "Upgrade")
.header(
"Sec-WebSocket-Key",
fastwebsockets::handshake::generate_key(),
);
let user_agent = state.borrow().borrow::<WsUserAgent>().0.clone();
request =
populate_common_request_headers(request, &user_agent, protocols, &headers)?;
let request = request.body(Body::empty())?;
let domain = &uri.host().unwrap().to_string();
let port = &uri.port_u16().unwrap_or(match uri.scheme_str() {
Some("wss") => 443,
Some("ws") => 80,
_ => unreachable!(),
});
let addr = format!("{domain}:{port}");
let res = match uri.scheme_str() {
Some("ws") => handshake_http1_ws(request, &addr).await?,
Some("wss") => {
match handshake_http1_wss(state, request, domain, &addr).await {
Ok(res) => res,
Err(_) => {
handshake_http2_wss(
state,
uri,
authority,
&user_agent,
protocols,
domain,
&headers,
&addr,
)
.await?
}
}
}
_ => unreachable!(),
};
Ok(res)
}
async fn handshake_http1_ws(
request: Request<Body>,
addr: &String,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
let tcp_socket = TcpStream::connect(addr).await?;
handshake_connection(request, tcp_socket).await
}
async fn handshake_http1_wss(
state: &Rc<RefCell<OpState>>,
request: Request<Body>,
domain: &str,
addr: &str,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
let tcp_socket = TcpStream::connect(addr).await?;
let tls_config = create_ws_client_config(state, SocketUse::Http1Only)?;
let dnsname =
ServerName::try_from(domain).map_err(|_| invalid_hostname(domain))?;
let mut tls_connector = TlsStream::new_client_side(
tcp_socket,
tls_config.into(),
dnsname,
NonZeroUsize::new(65536),
);
// If we can bail on an http/1.1 ALPN mismatch here, we can avoid doing extra work
tls_connector.handshake().await?;
handshake_connection(request, tls_connector).await
}
#[allow(clippy::too_many_arguments)]
async fn handshake_http2_wss(
state: &Rc<RefCell<OpState>>,
uri: &Uri,
authority: &str,
user_agent: &str,
protocols: &str,
domain: &str,
headers: &Option<Vec<(ByteString, ByteString)>>,
addr: &str,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
let tcp_socket = TcpStream::connect(addr).await?;
let tls_config = create_ws_client_config(state, SocketUse::Http2Only)?;
let dnsname =
ServerName::try_from(domain).map_err(|_| invalid_hostname(domain))?;
// We need to better expose the underlying errors here
let mut tls_connector =
TlsStream::new_client_side(tcp_socket, tls_config.into(), dnsname, None);
let handshake = tls_connector.handshake().await?;
if handshake.alpn.is_none() {
bail!("Didn't receive h2 alpn, aborting connection");
}
let h2 = h2::client::Builder::new();
let (mut send, conn) = h2.handshake::<_, Bytes>(tls_connector).await?;
spawn(conn);
let mut request = Request::builder();
request = request.method(Method::CONNECT);
let uri = Uri::builder()
.authority(authority)
.path_and_query(uri.path_and_query().unwrap().as_str())
.scheme("https")
.build()?;
request = request.uri(uri);
request =
populate_common_request_headers(request, user_agent, protocols, headers)?;
request = request.extension(h2::ext::Protocol::from("websocket"));
let (resp, send) = send.send_request(request.body(())?, false)?;
let resp = resp.await?;
if resp.status() != StatusCode::OK {
bail!("Invalid status code: {}", resp.status());
}
let (http::response::Parts { headers, .. }, recv) = resp.into_parts();
let mut stream = WebSocket::after_handshake(
WebSocketStream::new(stream::WsStreamKind::H2(send, recv), None),
Role::Client,
);
// We currently don't support vectored writes in the H2 streams
stream.set_writev(false);
// TODO(mmastrac): we should be able to use a zero masking key over HTTPS
// stream.set_auto_apply_mask(false);
Ok((stream, headers))
}
async fn handshake_connection<
S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
>(
request: Request<Body>,
socket: S,
) -> Result<(WebSocket<WebSocketStream>, http::Response<Body>), AnyError> {
let client =
fastwebsockets::handshake::client(&LocalExecutor, request, socket);
let (upgraded, response) = if let Some(cancel_resource) = cancel_resource {
client.or_cancel(cancel_resource).await?
} else {
client.await
}
.map_err(|err| {
DomExceptionNetworkError::new(&format!(
"failed to connect to WebSocket: {err}"
))
})?;
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
let (upgraded, response) =
fastwebsockets::handshake::client(&LocalExecutor, request, socket).await?;
let upgraded = upgraded.into_inner();
let stream =
WebSocketStream::new(stream::WsStreamKind::Upgraded(upgraded), None);
let stream = WebSocket::after_handshake(stream, Role::Client);
Ok((stream, response))
Ok((stream, response.into_parts().0.headers))
}
pub fn create_ws_client_config(
state: &Rc<RefCell<OpState>>,
socket_use: SocketUse,
) -> Result<ClientConfig, AnyError> {
let unsafely_ignore_certificate_errors: Option<Vec<String>> = state
.borrow()
.try_borrow::<UnsafelyIgnoreCertificateErrors>()
.and_then(|it| it.0.clone());
let root_cert_store = state
.borrow()
.borrow::<WsRootStoreProvider>()
.get_or_try_init()?;
create_client_config(
root_cert_store,
vec![],
unsafely_ignore_certificate_errors,
None,
socket_use,
)
}
/// Headers common to both http/1.1 and h2 requests.
fn populate_common_request_headers(
mut request: http::request::Builder,
user_agent: &str,
protocols: &str,
headers: &Option<Vec<(ByteString, ByteString)>>,
) -> Result<http::request::Builder, AnyError> {
request = request
.header("User-Agent", user_agent)
.header("Sec-WebSocket-Version", "13");
if !protocols.is_empty() {
request = request.header("Sec-WebSocket-Protocol", protocols);
}
if let Some(headers) = headers {
for (key, value) in headers {
let name = HeaderName::from_bytes(key)
.map_err(|err| type_error(err.to_string()))?;
let v = HeaderValue::from_bytes(value)
.map_err(|err| type_error(err.to_string()))?;
let is_disallowed_header = matches!(
name,
http::header::HOST
| http::header::SEC_WEBSOCKET_ACCEPT
| http::header::SEC_WEBSOCKET_EXTENSIONS
| http::header::SEC_WEBSOCKET_KEY
| http::header::SEC_WEBSOCKET_PROTOCOL
| http::header::SEC_WEBSOCKET_VERSION
| http::header::UPGRADE
| http::header::CONNECTION
);
if !is_disallowed_header {
request = request.header(name, v);
}
}
}
Ok(request)
}
#[op2(async)]
@ -205,99 +411,18 @@ where
None
};
let unsafely_ignore_certificate_errors = state
.borrow()
.try_borrow::<UnsafelyIgnoreCertificateErrors>()
.and_then(|it| it.0.clone());
let root_cert_store = state
.borrow()
.borrow::<WsRootStoreProvider>()
.get_or_try_init()?;
let user_agent = state.borrow().borrow::<WsUserAgent>().0.clone();
let uri: Uri = url.parse()?;
let mut request = Request::builder().method(Method::GET).uri(
uri
.path_and_query()
.ok_or(type_error("Missing path in url".to_string()))?
.as_str(),
);
let authority = uri.authority().unwrap().as_str();
let host = authority
.find('@')
.map(|idx| authority.split_at(idx + 1).1)
.unwrap_or_else(|| authority);
request = request
.header("User-Agent", user_agent)
.header("Host", host)
.header(UPGRADE, "websocket")
.header(CONNECTION, "Upgrade")
.header(
"Sec-WebSocket-Key",
fastwebsockets::handshake::generate_key(),
)
.header("Sec-WebSocket-Version", "13");
if !protocols.is_empty() {
request = request.header("Sec-WebSocket-Protocol", protocols);
}
if let Some(headers) = headers {
for (key, value) in headers {
let name = HeaderName::from_bytes(&key)
.map_err(|err| type_error(err.to_string()))?;
let v = HeaderValue::from_bytes(&value)
.map_err(|err| type_error(err.to_string()))?;
let is_disallowed_header = matches!(
name,
http::header::HOST
| http::header::SEC_WEBSOCKET_ACCEPT
| http::header::SEC_WEBSOCKET_EXTENSIONS
| http::header::SEC_WEBSOCKET_KEY
| http::header::SEC_WEBSOCKET_PROTOCOL
| http::header::SEC_WEBSOCKET_VERSION
| http::header::UPGRADE
| http::header::CONNECTION
);
if !is_disallowed_header {
request = request.header(name, v);
}
}
}
let request = request.body(Body::empty())?;
let domain = &uri.host().unwrap().to_string();
let port = &uri.port_u16().unwrap_or(match uri.scheme_str() {
Some("wss") => 443,
Some("ws") => 80,
_ => unreachable!(),
});
let addr = format!("{domain}:{port}");
let tcp_socket = TcpStream::connect(addr).await?;
let (stream, response) = match uri.scheme_str() {
Some("ws") => handshake(cancel_resource, request, tcp_socket).await?,
Some("wss") => {
let tls_config = create_client_config(
root_cert_store,
vec![],
unsafely_ignore_certificate_errors,
None,
)?;
let dnsname = ServerName::try_from(domain.as_str())
.map_err(|_| invalid_hostname(domain))?;
let mut tls_connector = TlsStream::new_client_side(
tcp_socket,
tls_config.into(),
dnsname,
NonZeroUsize::new(65536),
);
let _hs = tls_connector.handshake().await?;
handshake(cancel_resource, request, tls_connector).await?
}
_ => unreachable!(),
};
let handshake = handshake_websocket(&state, &uri, &protocols, headers)
.map_err(|err| {
AnyError::from(DomExceptionNetworkError::new(&format!(
"failed to connect to WebSocket: {err}"
)))
});
let (stream, response) = match cancel_resource {
Some(rc) => handshake.try_or_cancel(rc).await,
None => handshake.await,
}?;
if let Some(cancel_rid) = cancel_handle {
if let Ok(res) = state.borrow_mut().resource_table.take_any(cancel_rid) {
@ -308,12 +433,11 @@ where
let mut state = state.borrow_mut();
let rid = state.resource_table.add(ServerWebSocket::new(stream));
let protocol = match response.headers().get("Sec-WebSocket-Protocol") {
let protocol = match response.get("Sec-WebSocket-Protocol") {
Some(header) => header.to_str().unwrap(),
None => "",
};
let extensions = response
.headers()
.get_all("Sec-WebSocket-Extensions")
.iter()
.map(|header| header.to_str().unwrap())