From b7a93c4211e507f3071ad5ecc3005ebf52cae8db Mon Sep 17 00:00:00 2001 From: hatoo Date: Mon, 27 Nov 2023 23:43:51 +0900 Subject: [PATCH] hyper-util --- Cargo.toml | 2 +- src/client.rs | 4 +- src/main.rs | 1 - src/tokiort.rs | 259 ------------------------------------------------- 4 files changed, 3 insertions(+), 263 deletions(-) delete mode 100644 src/tokiort.rs diff --git a/Cargo.toml b/Cargo.toml index 663ae08c..a070b167 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,6 +62,7 @@ regex-syntax = "0.7.4" url = "2.4.0" pin-project-lite = "0.2.13" http-body-util = "0.1.0" +hyper-util = "0.1.1" [target.'cfg(unix)'.dependencies] rlimit = "0.10.0" @@ -71,6 +72,5 @@ jemallocator = "0.5.0" assert_cmd = "2.0.2" axum = { version = "0.7", features = ["http2"] } bytes = "1.0" -hyper-util = "0.1.1" lazy_static = "1.4.0" regex = "1.9.6" diff --git a/src/client.rs b/src/client.rs index d6a2a0c1..3a62afd6 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,6 +4,7 @@ use hyper::{ body::{Body, Incoming}, http, }; +use hyper_util::rt::{TokioExecutor, TokioIo}; use rand::prelude::*; use std::{pin::Pin, sync::Arc}; use thiserror::Error; @@ -11,7 +12,6 @@ use tokio::net::TcpStream; use url::{ParseError, Url}; use crate::{ - tokiort::{TokioExecutor, TokioIo}, url_generator::{UrlGenerator, UrlGeneratorError}, ConnectToEntry, }; @@ -238,7 +238,7 @@ impl Stream { } } async fn handshake_http2(self) -> Result { - let builder = hyper::client::conn::http2::Builder::new(TokioExecutor); + let builder = hyper::client::conn::http2::Builder::new(TokioExecutor::new()); match self { Stream::Tcp(stream) => { diff --git a/src/main.rs b/src/main.rs index 15f55d1b..45440052 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,7 +19,6 @@ mod histogram; mod monitor; mod printer; mod timescale; -mod tokiort; mod url_generator; #[cfg(unix)] diff --git a/src/tokiort.rs b/src/tokiort.rs deleted file mode 100644 index 799795ea..00000000 --- a/src/tokiort.rs +++ /dev/null @@ -1,259 +0,0 @@ -// This entire codes are copied from https://github.com/hyperium/hyper/blob/f9f65b7aa67fa3ec0267fe015945973726285bc2/benches/support/tokiort.rs -/* -Copyright (c) 2014-2021 Sean McArthur - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. -*/ - -#![allow(dead_code)] -//! Various runtimes for hyper -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, - time::{Duration, Instant}, -}; - -use hyper::rt::{Sleep, Timer}; -use pin_project_lite::pin_project; - -#[derive(Clone)] -/// An Executor that uses the tokio runtime. -pub struct TokioExecutor; - -impl hyper::rt::Executor for TokioExecutor -where - F: std::future::Future + Send + 'static, - F::Output: Send + 'static, -{ - fn execute(&self, fut: F) { - tokio::task::spawn(fut); - } -} - -/// A Timer that uses the tokio runtime. - -#[derive(Clone, Debug)] -pub struct TokioTimer; - -impl Timer for TokioTimer { - fn sleep(&self, duration: Duration) -> Pin> { - Box::pin(TokioSleep { - inner: tokio::time::sleep(duration), - }) - } - - fn sleep_until(&self, deadline: Instant) -> Pin> { - Box::pin(TokioSleep { - inner: tokio::time::sleep_until(deadline.into()), - }) - } - - fn reset(&self, sleep: &mut Pin>, new_deadline: Instant) { - if let Some(sleep) = sleep.as_mut().downcast_mut_pin::() { - sleep.reset(new_deadline) - } - } -} - -struct TokioTimeout { - inner: Pin>>, -} - -impl Future for TokioTimeout -where - T: Future, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll { - self.inner.as_mut().poll(context) - } -} - -// Use TokioSleep to get tokio::time::Sleep to implement Unpin. -// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html -pin_project! { - pub(crate) struct TokioSleep { - #[pin] - pub(crate) inner: tokio::time::Sleep, - } -} - -impl Future for TokioSleep { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().inner.poll(cx) - } -} - -impl Sleep for TokioSleep {} - -impl TokioSleep { - pub fn reset(self: Pin<&mut Self>, deadline: Instant) { - self.project().inner.as_mut().reset(deadline.into()); - } -} - -pin_project! { - #[derive(Debug)] - pub struct TokioIo { - #[pin] - inner: T, - } -} - -impl TokioIo { - pub fn new(inner: T) -> Self { - Self { inner } - } - - pub fn inner(self) -> T { - self.inner - } -} - -impl hyper::rt::Read for TokioIo -where - T: tokio::io::AsyncRead, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - mut buf: hyper::rt::ReadBufCursor<'_>, - ) -> Poll> { - let n = unsafe { - let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); - match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { - Poll::Ready(Ok(())) => tbuf.filled().len(), - other => return other, - } - }; - - unsafe { - buf.advance(n); - } - Poll::Ready(Ok(())) - } -} - -impl hyper::rt::Write for TokioIo -where - T: tokio::io::AsyncWrite, -{ - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) - } - - fn is_write_vectored(&self) -> bool { - tokio::io::AsyncWrite::is_write_vectored(&self.inner) - } - - fn poll_write_vectored( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &[std::io::IoSlice<'_>], - ) -> Poll> { - tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) - } -} - -impl tokio::io::AsyncRead for TokioIo -where - T: hyper::rt::Read, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - tbuf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - //let init = tbuf.initialized().len(); - let filled = tbuf.filled().len(); - let sub_filled = unsafe { - let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()); - - match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) { - Poll::Ready(Ok(())) => buf.filled().len(), - other => return other, - } - }; - - let n_filled = filled + sub_filled; - // At least sub_filled bytes had to have been initialized. - let n_init = sub_filled; - unsafe { - tbuf.assume_init(n_init); - tbuf.set_filled(n_filled); - } - - Poll::Ready(Ok(())) - } -} - -impl tokio::io::AsyncWrite for TokioIo -where - T: hyper::rt::Write, -{ - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - hyper::rt::Write::poll_write(self.project().inner, cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - hyper::rt::Write::poll_flush(self.project().inner, cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - hyper::rt::Write::poll_shutdown(self.project().inner, cx) - } - - fn is_write_vectored(&self) -> bool { - hyper::rt::Write::is_write_vectored(&self.inner) - } - - fn poll_write_vectored( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &[std::io::IoSlice<'_>], - ) -> Poll> { - hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) - } -}