From 95ce856c5bfa818f9a607f8c00ef44322ec0b989 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 10 Mar 2025 15:27:35 +0200 Subject: [PATCH 1/4] Do not use tokio::select! for race2 tokio::select! requires the futures to be fused, see for example https://github.com/tokio-rs/tokio/discussions/3812 --- src/server.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/server.rs b/src/server.rs index 4cb5cf2..3df4db2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -464,10 +464,25 @@ impl Future for UnwrapToPending { } pub(crate) async fn race2, B: Future>(f1: A, f2: B) -> T { - tokio::select! { - x = f1 => x, - x = f2 => x, - } + // Pin the futures on the stack for polling + let mut f1 = std::pin::pin!(f1); + let mut f2 = std::pin::pin!(f2); + + // Create a future that resolves when either f1 or f2 completes + std::future::poll_fn(|cx| { + // Poll both futures + match f1.as_mut().poll(cx) { + Poll::Ready(result) => return Poll::Ready(result), + Poll::Pending => {} + } + match f2.as_mut().poll(cx) { + Poll::Ready(result) => return Poll::Ready(result), + Poll::Pending => {} + } + // If neither is ready, yield back to the executor + Poll::Pending + }) + .await } /// Run a server loop, invoking a handler callback for each request. From 01588c0d6b9e61d20df7337e04cae4d24b4660fc Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 10 Mar 2025 15:43:53 +0200 Subject: [PATCH 2/4] fuse the oneshot channel --- src/server.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/server.rs b/src/server.rs index 3df4db2..a0c1e9b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -329,7 +329,7 @@ where { pub(crate) fn new(recv: C::RecvStream) -> (Self, UnwrapToPending>) { let (error_send, error_recv) = oneshot::channel(); - let error_recv = UnwrapToPending(error_recv); + let error_recv = UnwrapToPending(futures_lite::future::fuse(error_recv)); (Self(recv, Some(error_send), PhantomData), error_recv) } } @@ -449,7 +449,7 @@ impl fmt::Display for RpcServerError { impl error::Error for RpcServerError {} /// Take an oneshot receiver and just return Pending the underlying future returns `Err(oneshot::Canceled)` -pub(crate) struct UnwrapToPending(oneshot::Receiver); +pub(crate) struct UnwrapToPending(futures_lite::future::Fuse>); impl Future for UnwrapToPending { type Output = T; From 2c0e6024c5ee8cc862ee9213d4aa4a3e3bc19931 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 11 Mar 2025 09:36:37 +0200 Subject: [PATCH 3/4] Revert "Do not use tokio::select! for race2" This reverts commit 95ce856c5bfa818f9a607f8c00ef44322ec0b989. --- src/server.rs | 23 ++++------------------- 1 file changed, 4 insertions(+), 19 deletions(-) diff --git a/src/server.rs b/src/server.rs index a0c1e9b..d1af957 100644 --- a/src/server.rs +++ b/src/server.rs @@ -464,25 +464,10 @@ impl Future for UnwrapToPending { } pub(crate) async fn race2, B: Future>(f1: A, f2: B) -> T { - // Pin the futures on the stack for polling - let mut f1 = std::pin::pin!(f1); - let mut f2 = std::pin::pin!(f2); - - // Create a future that resolves when either f1 or f2 completes - std::future::poll_fn(|cx| { - // Poll both futures - match f1.as_mut().poll(cx) { - Poll::Ready(result) => return Poll::Ready(result), - Poll::Pending => {} - } - match f2.as_mut().poll(cx) { - Poll::Ready(result) => return Poll::Ready(result), - Poll::Pending => {} - } - // If neither is ready, yield back to the executor - Poll::Pending - }) - .await + tokio::select! { + x = f1 => x, + x = f2 => x, + } } /// Run a server loop, invoking a handler callback for each request. From 17e56a5fa40843b48321c710f4c9fc81fbb3ea8e Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 11 Mar 2025 09:38:32 +0200 Subject: [PATCH 4/4] Add remark about is_terminated we don't want to update tokio just yet --- src/server.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server.rs b/src/server.rs index d1af957..387d41d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -455,6 +455,7 @@ impl Future for UnwrapToPending { type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + // todo: use is_terminated from tokio 1.44 here to avoid the fused wrapper match Pin::new(&mut self.0).poll(cx) { Poll::Ready(Ok(x)) => Poll::Ready(x), Poll::Ready(Err(_)) => Poll::Pending,