fix(ext/fetch): retry some http/2 errors (#27417)

This brings some of the HTTP/2 retry behavior from reqwest to
`ext/fetch`. It will retry very specific HTTP/2 errors once, if the body
is able to be used again.

Closes #27332
This commit is contained in:
Sean McArthur 2024-12-18 17:04:29 -05:00 committed by GitHub
parent ae74407412
commit b1c685f4b7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 209 additions and 45 deletions

View file

@ -10,6 +10,7 @@ use std::borrow::Cow;
use std::cell::RefCell;
use std::cmp::min;
use std::convert::From;
use std::future;
use std::path::Path;
use std::path::PathBuf;
use std::pin::Pin;
@ -66,6 +67,7 @@ use http::header::USER_AGENT;
use http::Extensions;
use http::Method;
use http::Uri;
use http_body_util::combinators::BoxBody;
use http_body_util::BodyExt;
use hyper::body::Frame;
use hyper_util::client::legacy::connect::HttpConnector;
@ -75,6 +77,7 @@ use hyper_util::rt::TokioExecutor;
use hyper_util::rt::TokioTimer;
use serde::Deserialize;
use serde::Serialize;
use tower::retry;
use tower::ServiceExt;
use tower_http::decompression::Decompression;
@ -476,9 +479,7 @@ where
// If a body is passed, we use it, and don't return a body for streaming.
con_len = Some(data.len() as u64);
http_body_util::Full::new(data.to_vec().into())
.map_err(|never| match never {})
.boxed()
ReqBody::full(data.to_vec().into())
}
(_, Some(resource)) => {
let resource = state
@ -491,7 +492,7 @@ where
}
_ => {}
}
ReqBody::new(ResourceToBodyAdapter::new(resource))
ReqBody::streaming(ResourceToBodyAdapter::new(resource))
}
(None, None) => unreachable!(),
}
@ -501,9 +502,7 @@ where
if matches!(method, Method::POST | Method::PUT) {
con_len = Some(0);
}
http_body_util::Empty::new()
.map_err(|never| match never {})
.boxed()
ReqBody::empty()
};
let mut request = http::Request::new(body);
@ -1066,7 +1065,8 @@ pub fn create_http_client(
}
let pooled_client = builder.build(connector);
let decompress = Decompression::new(pooled_client).gzip(true).br(true);
let retry_client = retry::Retry::new(FetchRetry, pooled_client);
let decompress = Decompression::new(retry_client).gzip(true).br(true);
Ok(Client {
inner: decompress,
@ -1083,7 +1083,12 @@ pub fn op_utf8_to_byte_string(#[string] input: String) -> ByteString {
#[derive(Clone, Debug)]
pub struct Client {
inner: Decompression<hyper_util::client::legacy::Client<Connector, ReqBody>>,
inner: Decompression<
retry::Retry<
FetchRetry,
hyper_util::client::legacy::Client<Connector, ReqBody>,
>,
>,
// Used to check whether to include a proxy-authorization header
proxies: Arc<proxy::Proxies>,
user_agent: HeaderValue,
@ -1174,10 +1179,70 @@ impl Client {
}
}
pub type ReqBody =
http_body_util::combinators::BoxBody<Bytes, deno_core::error::AnyError>;
pub type ResBody =
http_body_util::combinators::BoxBody<Bytes, deno_core::error::AnyError>;
// This is a custom enum to allow the retry policy to clone the variants that could be retried.
pub enum ReqBody {
Full(http_body_util::Full<Bytes>),
Empty(http_body_util::Empty<Bytes>),
Streaming(BoxBody<Bytes, deno_core::error::AnyError>),
}
pub type ResBody = BoxBody<Bytes, deno_core::error::AnyError>;
impl ReqBody {
pub fn full(bytes: Bytes) -> Self {
ReqBody::Full(http_body_util::Full::new(bytes))
}
pub fn empty() -> Self {
ReqBody::Empty(http_body_util::Empty::new())
}
pub fn streaming<B>(body: B) -> Self
where
B: hyper::body::Body<Data = Bytes, Error = deno_core::error::AnyError>
+ Send
+ Sync
+ 'static,
{
ReqBody::Streaming(BoxBody::new(body))
}
}
impl hyper::body::Body for ReqBody {
type Data = Bytes;
type Error = deno_core::error::AnyError;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match &mut *self {
ReqBody::Full(ref mut b) => {
Pin::new(b).poll_frame(cx).map_err(|never| match never {})
}
ReqBody::Empty(ref mut b) => {
Pin::new(b).poll_frame(cx).map_err(|never| match never {})
}
ReqBody::Streaming(ref mut b) => Pin::new(b).poll_frame(cx),
}
}
fn is_end_stream(&self) -> bool {
match self {
ReqBody::Full(ref b) => b.is_end_stream(),
ReqBody::Empty(ref b) => b.is_end_stream(),
ReqBody::Streaming(ref b) => b.is_end_stream(),
}
}
fn size_hint(&self) -> hyper::body::SizeHint {
match self {
ReqBody::Full(ref b) => b.size_hint(),
ReqBody::Empty(ref b) => b.size_hint(),
ReqBody::Streaming(ref b) => b.size_hint(),
}
}
}
/// Copied from https://github.com/seanmonstar/reqwest/blob/b9d62a0323d96f11672a61a17bf8849baec00275/src/async_impl/request.rs#L572
/// Check the request URL for a "username:password" type authority, and if
@ -1214,3 +1279,102 @@ pub fn extract_authority(url: &mut Url) -> Option<(String, Option<String>)> {
fn op_fetch_promise_is_settled(promise: v8::Local<v8::Promise>) -> bool {
promise.state() != v8::PromiseState::Pending
}
/// Deno.fetch's retry policy.
#[derive(Clone, Debug)]
struct FetchRetry;
/// Marker extension that a request has been retried once.
#[derive(Clone, Debug)]
struct Retried;
impl<ResBody, E>
retry::Policy<http::Request<ReqBody>, http::Response<ResBody>, E>
for FetchRetry
where
E: std::error::Error + 'static,
{
/// Don't delay retries.
type Future = future::Ready<()>;
fn retry(
&mut self,
req: &mut http::Request<ReqBody>,
result: &mut Result<http::Response<ResBody>, E>,
) -> Option<Self::Future> {
if req.extensions().get::<Retried>().is_some() {
// only retry once
return None;
}
match result {
Ok(..) => {
// never retry a Response
None
}
Err(err) => {
if is_error_retryable(&*err) {
req.extensions_mut().insert(Retried);
Some(future::ready(()))
} else {
None
}
}
}
}
fn clone_request(
&mut self,
req: &http::Request<ReqBody>,
) -> Option<http::Request<ReqBody>> {
let body = match req.body() {
ReqBody::Full(b) => ReqBody::Full(b.clone()),
ReqBody::Empty(b) => ReqBody::Empty(*b),
ReqBody::Streaming(..) => return None,
};
let mut clone = http::Request::new(body);
*clone.method_mut() = req.method().clone();
*clone.uri_mut() = req.uri().clone();
*clone.headers_mut() = req.headers().clone();
*clone.extensions_mut() = req.extensions().clone();
Some(clone)
}
}
fn is_error_retryable(err: &(dyn std::error::Error + 'static)) -> bool {
// Note: hyper doesn't promise it will always be this h2 version. Keep up to date.
if let Some(err) = find_source::<h2::Error>(err) {
// They sent us a graceful shutdown, try with a new connection!
if err.is_go_away()
&& err.is_remote()
&& err.reason() == Some(h2::Reason::NO_ERROR)
{
return true;
}
// REFUSED_STREAM was sent from the server, which is safe to retry.
// https://www.rfc-editor.org/rfc/rfc9113.html#section-8.7-3.2
if err.is_reset()
&& err.is_remote()
&& err.reason() == Some(h2::Reason::REFUSED_STREAM)
{
return true;
}
}
false
}
fn find_source<'a, E: std::error::Error + 'static>(
err: &'a (dyn std::error::Error + 'static),
) -> Option<&'a E> {
let mut err = Some(err);
while let Some(src) = err {
if let Some(found) = src.downcast_ref::<E>() {
return Some(found);
}
err = src.source();
}
None
}