Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions futures-async-stream-macro/src/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ impl Visitor {
}
Stream | TryStream => {
quote! {
match unsafe { ::futures_async_stream::stream::poll_next_with_context(
match unsafe { ::futures_async_stream::stream::Stream::poll_next(
::futures_async_stream::reexport::pin::Pin::as_mut(&mut __pinned),
__task_context,
::futures_async_stream::future::get_context(__task_context),
) } {
::futures_async_stream::reexport::task::Poll::Ready(
::futures_async_stream::reexport::option::Option::Some(e),
Expand Down Expand Up @@ -191,19 +191,31 @@ impl Visitor {
return;
}

// Desugar `<base>.await` into:
//
// {
// let mut __pinned = <base>;
// loop {
// if let Poll::Ready(result) = unsafe { Future::poll(
// Pin::new_unchecked(&mut __pinned),
// get_context(__task_context),
// ) } {
// break result;
// }
// __task_context = yield Poll::Pending;
// }
// }
if let Expr::Await(ExprAwait { base, await_token, .. }) = expr {
*expr = syn::parse2(quote_spanned! { await_token.span() => {
let mut __pinned = #base;
loop {
if let ::futures_async_stream::reexport::task::Poll::Ready(x) =
unsafe { ::futures_async_stream::future::poll_with_context(
::futures_async_stream::reexport::pin::Pin::new_unchecked(&mut __pinned),
__task_context,
)}
{
break x;
if let ::futures_async_stream::reexport::task::Poll::Ready(result) =
unsafe { ::futures_async_stream::future::Future::poll(
::futures_async_stream::reexport::pin::Pin::new_unchecked(&mut __pinned),
::futures_async_stream::future::get_context(__task_context),
) } {
break result;
}

__task_context = yield ::futures_async_stream::reexport::task::Poll::Pending;
}
}})
Expand Down
26 changes: 8 additions & 18 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,16 @@ pub use futures_async_stream_macro::async_try_stream_block;
#[doc(hidden)]
pub mod future {
use core::{
future::Future,
ops::{Generator, GeneratorState},
pin::Pin,
ptr::NonNull,
task::{Context, Poll},
};
use pin_project::pin_project;

#[doc(hidden)]
pub use core::future::Future;

/// This type is needed because:
///
/// a) Generators cannot implement `for<'a, 'b> Generator<&'a mut Context<'b>>`, so we need to pass
Expand Down Expand Up @@ -293,11 +295,8 @@ pub mod future {
}

#[doc(hidden)]
pub unsafe fn poll_with_context<F>(f: Pin<&mut F>, mut cx: ResumeTy) -> Poll<F::Output>
where
F: Future,
{
F::poll(f, cx.0.as_mut())
pub unsafe fn get_context<'a, 'b>(cx: ResumeTy) -> &'a mut Context<'b> {
&mut *cx.0.as_ptr().cast()
}
}

Expand All @@ -312,11 +311,13 @@ pub mod stream {
ptr::NonNull,
task::{Context, Poll},
};
use futures_core::stream::Stream;
use pin_project::pin_project;

use super::future::ResumeTy;

#[doc(hidden)]
pub use futures_core::stream::Stream;

/// Wrap a generator in a stream.
///
/// This function returns a `GenStream` underneath, but hides it in `impl Trait` to give
Expand Down Expand Up @@ -352,17 +353,6 @@ pub mod stream {
}
}

#[doc(hidden)]
pub unsafe fn poll_next_with_context<S>(
s: Pin<&mut S>,
mut cx: ResumeTy,
) -> Poll<Option<S::Item>>
where
S: Stream,
{
S::poll_next(s, cx.0.as_mut())
}

// This is equivalent to the `futures::stream::StreamExt::next` method.
// But we want to make this crate dependency as small as possible, so we define our `next` function.
#[doc(hidden)]
Expand Down