upgrade: tokio 0.1 in test_util crate (#8885)

This commit upgrades "tokio" and crates from tokio
ecosystem in "test_util" crate.
This commit is contained in:
Yosi Pramajaya 2021-01-10 19:20:47 +07:00 committed by GitHub
parent 4361895476
commit 9e9e104664
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 285 additions and 79 deletions

View file

@ -5,28 +5,28 @@
#[macro_use]
extern crate lazy_static;
use core::mem::replace;
use futures::FutureExt;
use futures::Stream;
use futures::StreamExt;
use futures::TryStreamExt;
use hyper::header::HeaderValue;
use hyper::server::Server;
use hyper::service::make_service_fn;
use hyper::service::service_fn;
use hyper::Body;
use hyper::Request;
use hyper::Response;
use hyper::Server;
use hyper::StatusCode;
use os_pipe::pipe;
#[cfg(unix)]
pub use pty;
use regex::Regex;
use std::collections::HashMap;
use std::convert::Infallible;
use std::env;
use std::io;
use std::io::Read;
use std::io::Write;
use std::mem::replace;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::pin::Pin;
@ -146,11 +146,9 @@ async fn hyper_hello(port: u16) {
println!("hyper hello");
let addr = SocketAddr::from(([127, 0, 0, 1], port));
let hello_svc = make_service_fn(|_| async move {
Ok::<_, hyper::error::Error>(service_fn(
move |_: Request<Body>| async move {
Ok::<_, hyper::error::Error>(Response::new(Body::from("Hello World!")))
},
))
Ok::<_, Infallible>(service_fn(move |_: Request<Body>| async move {
Ok::<_, Infallible>(Response::new(Body::from("Hello World!")))
}))
});
let server = Server::bind(&addr).serve(hello_svc);
@ -203,7 +201,7 @@ async fn another_redirect(req: Request<Body>) -> hyper::Result<Response<Body>> {
}
async fn run_ws_server(addr: &SocketAddr) {
let mut listener = TcpListener::bind(addr).await.unwrap();
let listener = TcpListener::bind(addr).await.unwrap();
while let Ok((stream, _addr)) = listener.accept().await {
tokio::spawn(async move {
let ws_stream_fut = accept_async(stream);
@ -278,7 +276,7 @@ async fn run_wss_server(addr: &SocketAddr) {
let tls_config = get_tls_config(cert_file, key_file).await.unwrap();
let tls_acceptor = TlsAcceptor::from(tls_config);
let mut listener = TcpListener::bind(addr).await.unwrap();
let listener = TcpListener::bind(addr).await.unwrap();
while let Ok((stream, _addr)) = listener.accept().await {
let acceptor = tls_acceptor.clone();
@ -620,7 +618,7 @@ unsafe impl std::marker::Send for HyperAcceptor<'_> {}
async fn wrap_redirect_server() {
let redirect_svc =
make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(redirect)) });
make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(redirect)) });
let redirect_addr = SocketAddr::from(([127, 0, 0, 1], REDIRECT_PORT));
let redirect_server = Server::bind(&redirect_addr).serve(redirect_svc);
if let Err(e) = redirect_server.await {
@ -630,7 +628,7 @@ async fn wrap_redirect_server() {
async fn wrap_double_redirect_server() {
let double_redirects_svc = make_service_fn(|_| async {
Ok::<_, hyper::Error>(service_fn(double_redirects))
Ok::<_, Infallible>(service_fn(double_redirects))
});
let double_redirects_addr =
SocketAddr::from(([127, 0, 0, 1], DOUBLE_REDIRECTS_PORT));
@ -643,7 +641,7 @@ async fn wrap_double_redirect_server() {
async fn wrap_inf_redirect_server() {
let inf_redirects_svc = make_service_fn(|_| async {
Ok::<_, hyper::Error>(service_fn(inf_redirects))
Ok::<_, Infallible>(service_fn(inf_redirects))
});
let inf_redirects_addr =
SocketAddr::from(([127, 0, 0, 1], INF_REDIRECTS_PORT));
@ -656,7 +654,7 @@ async fn wrap_inf_redirect_server() {
async fn wrap_another_redirect_server() {
let another_redirect_svc = make_service_fn(|_| async {
Ok::<_, hyper::Error>(service_fn(another_redirect))
Ok::<_, Infallible>(service_fn(another_redirect))
});
let another_redirect_addr =
SocketAddr::from(([127, 0, 0, 1], ANOTHER_REDIRECT_PORT));
@ -669,7 +667,7 @@ async fn wrap_another_redirect_server() {
async fn wrap_abs_redirect_server() {
let abs_redirect_svc = make_service_fn(|_| async {
Ok::<_, hyper::Error>(service_fn(absolute_redirect))
Ok::<_, Infallible>(service_fn(absolute_redirect))
});
let abs_redirect_addr =
SocketAddr::from(([127, 0, 0, 1], REDIRECT_ABSOLUTE_PORT));
@ -681,9 +679,8 @@ async fn wrap_abs_redirect_server() {
}
async fn wrap_main_server() {
let main_server_svc = make_service_fn(|_| async {
Ok::<_, hyper::Error>(service_fn(main_server))
});
let main_server_svc =
make_service_fn(|_| async { Ok::<_, Infallible>(service_fn(main_server)) });
let main_server_addr = SocketAddr::from(([127, 0, 0, 1], PORT));
let main_server = Server::bind(&main_server_addr).serve(main_server_svc);
if let Err(e) = main_server.await {
@ -698,29 +695,23 @@ async fn wrap_main_https_server() {
let tls_config = get_tls_config(cert_file, key_file)
.await
.expect("Cannot get TLS config");
let mut tcp = TcpListener::bind(&main_server_https_addr)
.await
.expect("Cannot bind TCP");
loop {
let tcp = TcpListener::bind(&main_server_https_addr)
.await
.expect("Cannot bind TCP");
let tls_acceptor = TlsAcceptor::from(tls_config.clone());
// Prepare a long-running future stream to accept and serve cients.
let incoming_tls_stream = tcp
.incoming()
.map_err(|e| {
eprintln!("Error Incoming: {:?}", e);
io::Error::new(io::ErrorKind::Other, e)
})
.and_then(move |s| {
use futures::TryFutureExt;
tls_acceptor.accept(s).map_err(|e| {
eprintln!("TLS Error {:?}", e);
e
})
})
.boxed();
let incoming_tls_stream = async_stream::stream! {
loop {
let (socket, _) = tcp.accept().await?;
let stream = tls_acceptor.accept(socket);
yield stream.await;
}
}
.boxed();
let main_server_https_svc = make_service_fn(|_| async {
Ok::<_, hyper::Error>(service_fn(main_server))
Ok::<_, Infallible>(service_fn(main_server))
});
let main_server_https = Server::builder(HyperAcceptor {
acceptor: incoming_tls_stream,
@ -737,7 +728,7 @@ async fn wrap_main_https_server() {
// Use the single-threaded scheduler. The hyper server is used as a point of
// comparison for the (single-threaded!) benchmarks in cli/bench. We're not
// comparing apples to apples if we use the default multi-threaded scheduler.
#[tokio::main(basic_scheduler)]
#[tokio::main(flavor = "current_thread")]
pub async fn run_all_servers() {
if let Some(port) = env::args().nth(1) {
return hyper_hello(port.parse::<u16>().unwrap()).await;