From fb35b2a3eb6474da379a3caf8a472bfe9a6639aa Mon Sep 17 00:00:00 2001 From: stackinspector Date: Tue, 3 Dec 2024 23:43:46 +0800 Subject: [PATCH 01/15] remove `futures-util` (w/o impl send) --- Cargo.toml | 13 +++++++++++-- src/compat.rs | 24 ++++++++++++------------ src/lib.rs | 23 ++++++++++++++++------- 3 files changed, 39 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c423da1..948df45 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,8 @@ include = ["examples/**/*", "src/**/*", "LICENSE", "README.md", "CHANGELOG.md"] rust-version = "1.63" [features] -default = ["handshake"] +default = ["handshake", "sink"] +sink = ["futures-util"] handshake = ["tungstenite/handshake"] async-std-runtime = ["async-std", "handshake"] tokio-runtime = ["tokio", "handshake"] @@ -37,10 +38,18 @@ features = ["async-std-runtime", "tokio-runtime", "gio-runtime", "async-tls", "a [dependencies] log = "0.4" -futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] } +futures-core = { version = "0.3", default-features = false } +# TODO use this or `futures_core::task::__internal::AtomicWaker`? +atomic-waker = { version = "1.1", default-features = false } futures-io = { version = "0.3", default-features = false, features = ["std"] } pin-project-lite = "0.2" +[dependencies.futures-util] +optional = true +version = "0.3" +default-features = false +features = ["sink"] + [dependencies.tungstenite] version = "0.24" default-features = false diff --git a/src/compat.rs b/src/compat.rs index 2fec932..2b194c2 100644 --- a/src/compat.rs +++ b/src/compat.rs @@ -2,10 +2,10 @@ use log::*; use std::io::{Read, Write}; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{Context, Poll, Wake, Waker}; use futures_io::{AsyncRead, AsyncWrite}; -use futures_util::task; +use atomic_waker::AtomicWaker; use std::sync::Arc; use tungstenite::Error as WsError; @@ -50,17 +50,17 @@ pub(crate) struct AllowStd { // // Don't ever use this from multiple tasks at the same time! pub(crate) trait SetWaker { - fn set_waker(&self, waker: &task::Waker); + fn set_waker(&self, waker: &Waker); } impl SetWaker for AllowStd { - fn set_waker(&self, waker: &task::Waker) { + fn set_waker(&self, waker: &Waker) { self.set_waker(ContextWaker::Read, waker); } } impl AllowStd { - pub(crate) fn new(inner: S, waker: &task::Waker) -> Self { + pub(crate) fn new(inner: S, waker: &Waker) -> Self { let res = Self { inner, write_waker_proxy: Default::default(), @@ -83,7 +83,7 @@ impl AllowStd { // // Write: this is only supposde to be called by write operations, i.e. the Sink impl on the // WebSocketStream. - pub(crate) fn set_waker(&self, kind: ContextWaker, waker: &task::Waker) { + pub(crate) fn set_waker(&self, kind: ContextWaker, waker: &Waker) { match kind { ContextWaker::Read => { self.write_waker_proxy.read_waker.register(waker); @@ -103,11 +103,11 @@ impl AllowStd { // reads and writes, and the same for writes. #[derive(Debug, Default)] struct WakerProxy { - read_waker: task::AtomicWaker, - write_waker: task::AtomicWaker, + read_waker: AtomicWaker, + write_waker: AtomicWaker, } -impl std::task::Wake for WakerProxy { +impl Wake for WakerProxy { fn wake(self: Arc) { self.wake_by_ref() } @@ -129,10 +129,10 @@ where #[cfg(feature = "verbose-logging")] trace!("{}:{} AllowStd.with_context", file!(), line!()); let waker = match kind { - ContextWaker::Read => task::Waker::from(self.read_waker_proxy.clone()), - ContextWaker::Write => task::Waker::from(self.write_waker_proxy.clone()), + ContextWaker::Read => Waker::from(self.read_waker_proxy.clone()), + ContextWaker::Write => Waker::from(self.write_waker_proxy.clone()), }; - let mut context = task::Context::from_waker(&waker); + let mut context = Context::from_waker(&waker); f(&mut context, Pin::new(&mut self.inner)) } diff --git a/src/lib.rs b/src/lib.rs index 7c80e7d..95ee046 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -62,13 +62,10 @@ use std::io::{Read, Write}; use compat::{cvt, AllowStd, ContextWaker}; use futures_io::{AsyncRead, AsyncWrite}; -use futures_util::{ - sink::{Sink, SinkExt}, - stream::{FusedStream, Stream}, -}; +use futures_core::stream::{FusedStream, Stream}; use log::*; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{ready, Context, Poll}; #[cfg(feature = "handshake")] use tungstenite::{ @@ -337,7 +334,7 @@ where return Poll::Ready(None); } - match futures_util::ready!(self.with_context(Some((ContextWaker::Read, cx)), |s| { + match ready!(self.with_context(Some((ContextWaker::Read, cx)), |s| { #[cfg(feature = "verbose-logging")] trace!( "{}:{} Stream.with_context poll_next -> read()", @@ -368,7 +365,19 @@ where } } -impl Sink for WebSocketStream +impl WebSocketStream { + /// Simple send method to replace `futures_sink::Sink` (till v0.3). + pub async fn send(&mut self, msg: Message) -> Result<(), WsError> + where + S: AsyncRead + AsyncWrite + Unpin, + { + let _ = msg; + todo!() + } +} + +#[cfg(feature = "sink")] +impl futures_util::Sink for WebSocketStream where T: AsyncRead + AsyncWrite + Unpin, { From dc29f8d9a9784ce869ee0bde626ba0d008b95162 Mon Sep 17 00:00:00 2001 From: Golden_Water Date: Wed, 4 Dec 2024 00:07:59 +0800 Subject: [PATCH 02/15] Add a simple send method to replace SinkExt::send --- src/lib.rs | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 82 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 7c80e7d..40b6fd2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,12 +58,16 @@ mod handshake; ))] pub mod stream; -use std::io::{Read, Write}; +use std::{ + future::Future, + io::{Read, Write}, + task::ready, +}; use compat::{cvt, AllowStd, ContextWaker}; use futures_io::{AsyncRead, AsyncWrite}; use futures_util::{ - sink::{Sink, SinkExt}, + sink::Sink, stream::{FusedStream, Stream}, }; use log::*; @@ -318,6 +322,14 @@ impl WebSocketStream { let msg = msg.map(|msg| msg.into_owned()); self.send(Message::Close(msg)).await } + + /// Simple send method to replace `futures_sink::Sink` (till v0.3). + pub async fn send(&mut self, msg: Message) -> Result<(), WsError> + where + S: AsyncRead + AsyncWrite + Unpin, + { + Send::new(self, msg).await + } } impl Stream for WebSocketStream @@ -446,6 +458,74 @@ where } } +struct Send<'a, S> { + ws: &'a mut WebSocketStream, + msg: Option, +} + +impl<'a, S> Send<'a, S> +where + S: AsyncRead + AsyncWrite + Unpin, +{ + fn new(ws: &'a mut WebSocketStream, msg: Message) -> Self { + Self { ws, msg: Some(msg) } + } +} + +impl Future for Send<'_, S> +where + S: AsyncRead + AsyncWrite + Unpin, +{ + type Output = Result<(), WsError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.msg.is_some() { + if !self.ws.ready { + // Currently blocked so try to flush the blockage away + let polled = self + .ws + .with_context(Some((ContextWaker::Write, cx)), |s| cvt(s.flush())) + .map(|r| { + self.ws.ready = true; + r + }); + ready!(polled)? + } + + let msg = self.msg.take().expect("unreachable"); + match self.ws.with_context(None, |s| s.write(msg)) { + Ok(_) => Ok(()), + Err(WsError::Io(err)) if err.kind() == std::io::ErrorKind::WouldBlock => { + // the message was accepted and queued so not an error + // + // set to false here for cancel safe of *this* Future + self.ws.ready = false; + Ok(()) + } + Err(e) => { + debug!("websocket start_send error: {}", e); + Err(e) + } + }?; + } + + let polled = self + .ws + .with_context(Some((ContextWaker::Write, cx)), |s| cvt(s.flush())) + .map(|r| { + self.ws.ready = true; + match r { + // WebSocket connection has just been closed. Flushing completed, not an error. + Err(WsError::ConnectionClosed) => Ok(()), + other => other, + } + }); + ready!(polled)?; + + Poll::Ready(Ok(())) + } +} + #[cfg(any( feature = "async-tls", feature = "async-std-runtime", From 814112da4ebff0f9227bf8ff0862308175245109 Mon Sep 17 00:00:00 2001 From: stackinspector Date: Wed, 4 Dec 2024 10:22:17 +0800 Subject: [PATCH 03/15] cleanup uses --- src/lib.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index bba0c8e..17e386e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,15 +61,14 @@ pub mod stream; use std::{ future::Future, io::{Read, Write}, - task::ready, + pin::Pin, + task::{ready, Context, Poll}, }; use compat::{cvt, AllowStd, ContextWaker}; -use futures_io::{AsyncRead, AsyncWrite}; use futures_core::stream::{FusedStream, Stream}; +use futures_io::{AsyncRead, AsyncWrite}; use log::*; -use std::pin::Pin; -use std::task::{ready, Context, Poll}; #[cfg(feature = "handshake")] use tungstenite::{ From 9bb9c2b199ca30a162406393dde038d1a1310a2f Mon Sep 17 00:00:00 2001 From: stackinspector Date: Wed, 4 Dec 2024 10:28:33 +0800 Subject: [PATCH 04/15] fix feature & misc --- src/lib.rs | 38 +++++++++++++++++--------------------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 17e386e..8147abc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,7 +59,6 @@ mod handshake; pub mod stream; use std::{ - future::Future, io::{Read, Write}, pin::Pin, task::{ready, Context, Poll}, @@ -68,6 +67,8 @@ use std::{ use compat::{cvt, AllowStd, ContextWaker}; use futures_core::stream::{FusedStream, Stream}; use futures_io::{AsyncRead, AsyncWrite}; +#[cfg(feature = "sink")] +use futures_util::SinkExt; use log::*; #[cfg(feature = "handshake")] @@ -318,14 +319,6 @@ impl WebSocketStream { let msg = msg.map(|msg| msg.into_owned()); self.send(Message::Close(msg)).await } - - /// Simple send method to replace `futures_sink::Sink` (till v0.3). - pub async fn send(&mut self, msg: Message) -> Result<(), WsError> - where - S: AsyncRead + AsyncWrite + Unpin, - { - Send::new(self, msg).await - } } impl Stream for WebSocketStream @@ -376,17 +369,6 @@ where } } -impl WebSocketStream { - /// Simple send method to replace `futures_sink::Sink` (till v0.3). - pub async fn send(&mut self, msg: Message) -> Result<(), WsError> - where - S: AsyncRead + AsyncWrite + Unpin, - { - let _ = msg; - todo!() - } -} - #[cfg(feature = "sink")] impl futures_util::Sink for WebSocketStream where @@ -466,11 +448,24 @@ where } } +#[cfg(not(feature = "sink"))] +impl WebSocketStream { + /// Simple send method to replace `futures_sink::Sink` (till v0.3). + pub async fn send(&mut self, msg: Message) -> Result<(), WsError> + where + S: AsyncRead + AsyncWrite + Unpin, + { + Send::new(self, msg).await + } +} + +#[cfg(not(feature = "sink"))] struct Send<'a, S> { ws: &'a mut WebSocketStream, msg: Option, } +#[cfg(not(feature = "sink"))] impl<'a, S> Send<'a, S> where S: AsyncRead + AsyncWrite + Unpin, @@ -480,7 +475,8 @@ where } } -impl Future for Send<'_, S> +#[cfg(not(feature = "sink"))] +impl std::future::Future for Send<'_, S> where S: AsyncRead + AsyncWrite + Unpin, { From 18a274bb5a83e69c62e2d44db3756d74cb78ddc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=BF=9B=E6=A0=88=E6=A3=80=E7=A5=A8?= Date: Thu, 5 Dec 2024 02:02:13 +0800 Subject: [PATCH 05/15] adjust comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Sebastian Dröge --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 8147abc..50bc4fe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -502,7 +502,7 @@ where Err(WsError::Io(err)) if err.kind() == std::io::ErrorKind::WouldBlock => { // the message was accepted and queued so not an error // - // set to false here for cancel safe of *this* Future + // set to false here for cancellation safety of *this* Future self.ws.ready = false; Ok(()) } From 9ca7965bf9256e8ce1f70912fed4d4f13a984420 Mon Sep 17 00:00:00 2001 From: stackinspector Date: Thu, 5 Dec 2024 02:07:40 +0800 Subject: [PATCH 06/15] update msrv --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 404f277..f5b93a0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -97,7 +97,7 @@ jobs: strategy: matrix: rust: - - 1.63.0 + - 1.64.0 steps: - name: Checkout sources From 6fe059f91b2615cc3ceb86cc151106945bb9839f Mon Sep 17 00:00:00 2001 From: stackinspector Date: Thu, 5 Dec 2024 02:10:22 +0800 Subject: [PATCH 07/15] remove resolved todo --- Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 948df45..9cf22c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,6 @@ features = ["async-std-runtime", "tokio-runtime", "gio-runtime", "async-tls", "a [dependencies] log = "0.4" futures-core = { version = "0.3", default-features = false } -# TODO use this or `futures_core::task::__internal::AtomicWaker`? atomic-waker = { version = "1.1", default-features = false } futures-io = { version = "0.3", default-features = false, features = ["std"] } pin-project-lite = "0.2" From f010d7ac1e4f2cb2dafa0c2a91b64006cdbe9764 Mon Sep 17 00:00:00 2001 From: stackinspector Date: Thu, 5 Dec 2024 02:12:49 +0800 Subject: [PATCH 08/15] let fn send always there (?) --- src/lib.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 50bc4fe..fe192e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -67,8 +67,6 @@ use std::{ use compat::{cvt, AllowStd, ContextWaker}; use futures_core::stream::{FusedStream, Stream}; use futures_io::{AsyncRead, AsyncWrite}; -#[cfg(feature = "sink")] -use futures_util::SinkExt; use log::*; #[cfg(feature = "handshake")] @@ -448,7 +446,6 @@ where } } -#[cfg(not(feature = "sink"))] impl WebSocketStream { /// Simple send method to replace `futures_sink::Sink` (till v0.3). pub async fn send(&mut self, msg: Message) -> Result<(), WsError> @@ -459,13 +456,11 @@ impl WebSocketStream { } } -#[cfg(not(feature = "sink"))] struct Send<'a, S> { ws: &'a mut WebSocketStream, msg: Option, } -#[cfg(not(feature = "sink"))] impl<'a, S> Send<'a, S> where S: AsyncRead + AsyncWrite + Unpin, @@ -475,7 +470,6 @@ where } } -#[cfg(not(feature = "sink"))] impl std::future::Future for Send<'_, S> where S: AsyncRead + AsyncWrite + Unpin, From b0e700a41800c82c14f64bcfb1e91877d70eea82 Mon Sep 17 00:00:00 2001 From: stackinspector Date: Thu, 5 Dec 2024 02:33:27 +0800 Subject: [PATCH 09/15] update msrv (real) --- Cargo.lock.msrv | 2 ++ Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.lock.msrv b/Cargo.lock.msrv index 63187f6..d987867 100644 --- a/Cargo.lock.msrv +++ b/Cargo.lock.msrv @@ -247,9 +247,11 @@ dependencies = [ "async-native-tls", "async-std", "async-tls", + "atomic-waker", "env_logger", "futures", "futures-channel", + "futures-core", "futures-io", "futures-util", "gio", diff --git a/Cargo.toml b/Cargo.toml index 9cf22c7..a455490 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ version = "0.28.0" edition = "2018" readme = "README.md" include = ["examples/**/*", "src/**/*", "LICENSE", "README.md", "CHANGELOG.md"] -rust-version = "1.63" +rust-version = "1.64" [features] default = ["handshake", "sink"] From fb9dcc249b171fb7f80fd025132210ecad2eac0c Mon Sep 17 00:00:00 2001 From: stackinspector Date: Thu, 5 Dec 2024 02:43:26 +0800 Subject: [PATCH 10/15] fix fmt --- src/compat.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/compat.rs b/src/compat.rs index 2b194c2..7d09e80 100644 --- a/src/compat.rs +++ b/src/compat.rs @@ -4,8 +4,8 @@ use std::io::{Read, Write}; use std::pin::Pin; use std::task::{Context, Poll, Wake, Waker}; -use futures_io::{AsyncRead, AsyncWrite}; use atomic_waker::AtomicWaker; +use futures_io::{AsyncRead, AsyncWrite}; use std::sync::Arc; use tungstenite::Error as WsError; From a62670610d888baf7dbb0810ffafc30b92d17ff4 Mon Sep 17 00:00:00 2001 From: stackinspector Date: Thu, 5 Dec 2024 17:08:55 +0800 Subject: [PATCH 11/15] fix unused --- examples/server-headers.rs | 2 +- src/compat.rs | 2 ++ src/lib.rs | 2 ++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/examples/server-headers.rs b/examples/server-headers.rs index 5dd69a1..71740a4 100644 --- a/examples/server-headers.rs +++ b/examples/server-headers.rs @@ -24,7 +24,7 @@ use async_tungstenite::{ use url::Url; #[macro_use] extern crate log; -use futures_util::{SinkExt, StreamExt}; +use futures_util::StreamExt; #[async_std::main] async fn main() { diff --git a/src/compat.rs b/src/compat.rs index 7d09e80..c8c7f68 100644 --- a/src/compat.rs +++ b/src/compat.rs @@ -49,10 +49,12 @@ pub(crate) struct AllowStd { // read waker slot for this, but any would do. // // Don't ever use this from multiple tasks at the same time! +#[cfg(all(feature = "sink", feature = "handshake"))] pub(crate) trait SetWaker { fn set_waker(&self, waker: &Waker); } +#[cfg(all(feature = "sink", feature = "handshake"))] impl SetWaker for AllowStd { fn set_waker(&self, waker: &Waker) { self.set_waker(ContextWaker::Read, waker); diff --git a/src/lib.rs b/src/lib.rs index fe192e5..38286fa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -226,6 +226,7 @@ where #[derive(Debug)] pub struct WebSocketStream { inner: WebSocket>, + #[cfg(feature = "sink")] closing: bool, ended: bool, /// Tungstenite is probably ready to receive more data. @@ -268,6 +269,7 @@ impl WebSocketStream { pub(crate) fn new(ws: WebSocket>) -> Self { Self { inner: ws, + #[cfg(feature = "sink")] closing: false, ended: false, ready: true, From 8b5762ba7df171bc2f41d4364cd160f140812356 Mon Sep 17 00:00:00 2001 From: stackinspector Date: Thu, 5 Dec 2024 17:14:28 +0800 Subject: [PATCH 12/15] rename feature --- Cargo.toml | 4 ++-- src/compat.rs | 4 ++-- src/lib.rs | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a455490..6b957b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,8 +15,8 @@ include = ["examples/**/*", "src/**/*", "LICENSE", "README.md", "CHANGELOG.md"] rust-version = "1.64" [features] -default = ["handshake", "sink"] -sink = ["futures-util"] +default = ["handshake", "futures-03-sink"] +futures-03-sink = ["futures-util"] handshake = ["tungstenite/handshake"] async-std-runtime = ["async-std", "handshake"] tokio-runtime = ["tokio", "handshake"] diff --git a/src/compat.rs b/src/compat.rs index c8c7f68..583ad63 100644 --- a/src/compat.rs +++ b/src/compat.rs @@ -49,12 +49,12 @@ pub(crate) struct AllowStd { // read waker slot for this, but any would do. // // Don't ever use this from multiple tasks at the same time! -#[cfg(all(feature = "sink", feature = "handshake"))] +#[cfg(all(feature = "futures-03-sink", feature = "handshake"))] pub(crate) trait SetWaker { fn set_waker(&self, waker: &Waker); } -#[cfg(all(feature = "sink", feature = "handshake"))] +#[cfg(all(feature = "futures-03-sink", feature = "handshake"))] impl SetWaker for AllowStd { fn set_waker(&self, waker: &Waker) { self.set_waker(ContextWaker::Read, waker); diff --git a/src/lib.rs b/src/lib.rs index 38286fa..7114934 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -226,7 +226,7 @@ where #[derive(Debug)] pub struct WebSocketStream { inner: WebSocket>, - #[cfg(feature = "sink")] + #[cfg(feature = "futures-03-sink")] closing: bool, ended: bool, /// Tungstenite is probably ready to receive more data. @@ -269,7 +269,7 @@ impl WebSocketStream { pub(crate) fn new(ws: WebSocket>) -> Self { Self { inner: ws, - #[cfg(feature = "sink")] + #[cfg(feature = "futures-03-sink")] closing: false, ended: false, ready: true, @@ -369,7 +369,7 @@ where } } -#[cfg(feature = "sink")] +#[cfg(feature = "futures-03-sink")] impl futures_util::Sink for WebSocketStream where T: AsyncRead + AsyncWrite + Unpin, From 320298619a02c9c21d0a52078f66f9922c97a51e Mon Sep 17 00:00:00 2001 From: stackinspector Date: Thu, 5 Dec 2024 17:18:15 +0800 Subject: [PATCH 13/15] fix unused --- src/compat.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/compat.rs b/src/compat.rs index 583ad63..b9c5a1a 100644 --- a/src/compat.rs +++ b/src/compat.rs @@ -49,12 +49,12 @@ pub(crate) struct AllowStd { // read waker slot for this, but any would do. // // Don't ever use this from multiple tasks at the same time! -#[cfg(all(feature = "futures-03-sink", feature = "handshake"))] +#[cfg(any(feature = "futures-03-sink", feature = "handshake"))] pub(crate) trait SetWaker { fn set_waker(&self, waker: &Waker); } -#[cfg(all(feature = "futures-03-sink", feature = "handshake"))] +#[cfg(any(feature = "futures-03-sink", feature = "handshake"))] impl SetWaker for AllowStd { fn set_waker(&self, waker: &Waker) { self.set_waker(ContextWaker::Read, waker); From abe97c0730d4f45f44c32b43050d5403acc0bbc2 Mon Sep 17 00:00:00 2001 From: stackinspector Date: Thu, 5 Dec 2024 17:25:51 +0800 Subject: [PATCH 14/15] fix unused --- src/compat.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/compat.rs b/src/compat.rs index b9c5a1a..c2fe19a 100644 --- a/src/compat.rs +++ b/src/compat.rs @@ -49,12 +49,12 @@ pub(crate) struct AllowStd { // read waker slot for this, but any would do. // // Don't ever use this from multiple tasks at the same time! -#[cfg(any(feature = "futures-03-sink", feature = "handshake"))] +#[cfg(feature = "handshake")] pub(crate) trait SetWaker { fn set_waker(&self, waker: &Waker); } -#[cfg(any(feature = "futures-03-sink", feature = "handshake"))] +#[cfg(feature = "handshake")] impl SetWaker for AllowStd { fn set_waker(&self, waker: &Waker) { self.set_waker(ContextWaker::Read, waker); From 2f7c1d61f273515cab7570bad56019391f4ea60e Mon Sep 17 00:00:00 2001 From: Golden_Water Date: Sat, 7 Dec 2024 19:09:30 +0800 Subject: [PATCH 15/15] Use SinkExt for autobahn-server --- examples/autobahn-client.rs | 1 + examples/autobahn-server.rs | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/examples/autobahn-client.rs b/examples/autobahn-client.rs index 3c49b06..389caa7 100644 --- a/examples/autobahn-client.rs +++ b/examples/autobahn-client.rs @@ -32,6 +32,7 @@ async fn run_test(case: u32) -> Result<()> { while let Some(msg) = ws_stream.next().await { let msg = msg?; if msg.is_text() || msg.is_binary() { + // for Sink of futures 0.3, see autobahn-server example ws_stream.send(msg).await?; } } diff --git a/examples/autobahn-server.rs b/examples/autobahn-server.rs index 3f570e8..4f9c965 100644 --- a/examples/autobahn-server.rs +++ b/examples/autobahn-server.rs @@ -23,7 +23,9 @@ async fn handle_connection(peer: SocketAddr, stream: TcpStream) -> Result<()> { while let Some(msg) = ws_stream.next().await { let msg = msg?; if msg.is_text() || msg.is_binary() { - ws_stream.send(msg).await?; + // here we explicitly using futures 0.3's Sink implementation for send message + // for WebSocketStream::send, see autobahn-client example + futures::SinkExt::send(&mut ws_stream, msg).await?; } }