deno/ext/http/request_body.rs
林炳权 68297b5f10
Some checks are pending
ci / pre-build (push) Waiting to run
ci / test debug linux-aarch64 (push) Blocked by required conditions
ci / test release linux-aarch64 (push) Blocked by required conditions
ci / test debug macos-aarch64 (push) Blocked by required conditions
ci / test release macos-aarch64 (push) Blocked by required conditions
ci / bench release linux-x86_64 (push) Blocked by required conditions
ci / lint debug linux-x86_64 (push) Blocked by required conditions
ci / lint debug macos-x86_64 (push) Blocked by required conditions
ci / lint debug windows-x86_64 (push) Blocked by required conditions
ci / test debug linux-x86_64 (push) Blocked by required conditions
ci / test release linux-x86_64 (push) Blocked by required conditions
ci / test debug macos-x86_64 (push) Blocked by required conditions
ci / test release macos-x86_64 (push) Blocked by required conditions
ci / test debug windows-x86_64 (push) Blocked by required conditions
ci / test release windows-x86_64 (push) Blocked by required conditions
ci / build libs (push) Blocked by required conditions
ci / publish canary (push) Blocked by required conditions
chore: Rust 1.89.0 (#30364)
Related PR: https://github.com/denoland/deno/pull/30354
2025-08-09 11:11:48 +00:00

96 lines
2.7 KiB
Rust

// Copyright 2018-2025 the Deno authors. MIT license.
use std::borrow::Cow;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Poll;
use std::task::ready;
use bytes::Bytes;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufView;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
use deno_core::futures::TryFutureExt;
use deno_core::futures::stream::Peekable;
use deno_error::JsErrorBox;
use hyper::body::Body;
use hyper::body::Incoming;
use hyper::body::SizeHint;
/// Converts a hyper incoming body stream into a stream of [`Bytes`] that we can use to read in V8.
struct ReadFuture(Incoming);
impl Stream for ReadFuture {
type Item = Result<Bytes, hyper::Error>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
// Loop until we receive a non-empty frame from Hyper
let this = self.get_mut();
loop {
let res = ready!(Pin::new(&mut this.0).poll_frame(cx));
break match res {
Some(Ok(frame)) => {
if let Ok(data) = frame.into_data() {
// Ensure that we never yield an empty frame
if !data.is_empty() {
break Poll::Ready(Some(Ok(data)));
}
}
// Loop again so we don't lose the waker
continue;
}
Some(Err(e)) => Poll::Ready(Some(Err(e))),
None => Poll::Ready(None),
};
}
}
}
pub struct HttpRequestBody(AsyncRefCell<Peekable<ReadFuture>>, SizeHint);
impl HttpRequestBody {
pub fn new(body: Incoming) -> Self {
let size_hint = body.size_hint();
Self(AsyncRefCell::new(ReadFuture(body).peekable()), size_hint)
}
async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, hyper::Error> {
let peekable = RcRef::map(self, |this| &this.0);
let mut peekable = peekable.borrow_mut().await;
match Pin::new(&mut *peekable).peek_mut().await {
None => Ok(BufView::empty()),
Some(Err(_)) => Err(peekable.next().await.unwrap().err().unwrap()),
Some(Ok(bytes)) => {
if bytes.len() <= limit {
// We can safely take the next item since we peeked it
return Ok(BufView::from(peekable.next().await.unwrap()?));
}
let ret = bytes.split_to(limit);
Ok(BufView::from(ret))
}
}
}
}
impl Resource for HttpRequestBody {
fn name(&self) -> Cow<'_, str> {
"requestBody".into()
}
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
Box::pin(
HttpRequestBody::read(self, limit)
.map_err(|e| JsErrorBox::new("Http", e.to_string())),
)
}
fn size_hint(&self) -> (u64, Option<u64>) {
(self.1.lower(), self.1.upper())
}
}