Skip to content

Commit 7f58a9e

Browse files
bors[bot]taiki-e
andauthored
Merge #31
31: Fix build failure on latest nightly r=taiki-e a=taiki-e This fixes breakage due to rust-lang/rust#66398. Fixes #30 Co-authored-by: Taiki Endo <[email protected]>
2 parents b82fcc7 + c746957 commit 7f58a9e

File tree

10 files changed

+308
-190
lines changed

10 files changed

+308
-190
lines changed

.github/workflows/ci.yml

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@ jobs:
2323
# This is the minimum supported Rust version of this crate.
2424
# When updating this, the reminder to update the minimum supported
2525
# Rust version in README.md.
26-
#
27-
# Tests are not run as tests may require newer versions of Rust.
2826
- nightly-2019-08-21
27+
- nightly-2019-12-09
2928
- nightly
3029
steps:
3130
- uses: actions/checkout@master
@@ -37,14 +36,14 @@ jobs:
3736
if: matrix.rust == 'nightly'
3837
run: |
3938
cargo install cargo-hack
40-
- name: cargo check
41-
if: matrix.rust == 'nightly-2019-08-21'
42-
run: |
43-
cargo check --all
4439
- name: cargo test
45-
if: matrix.rust != 'nightly-2019-08-21'
40+
if: matrix.rust != 'nightly'
4641
run: |
4742
cargo test --all
43+
- name: cargo test
44+
if: matrix.rust == 'nightly'
45+
run: |
46+
cargo test --all --all-features -- -Zunstable-options --include-ignored
4847
# Refs: https://github.com/rust-lang/cargo/issues/5657
4948
- name: cargo check -Zminimal-versions
5049
if: matrix.rust == 'nightly'

compiletest.sh

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#!/bin/bash
2+
3+
# A script to run compile tests with the same condition of the checks done by CI.
4+
#
5+
# Usage
6+
#
7+
# ```sh
8+
# . ./compiletest.sh
9+
# ```
10+
11+
TRYBUILD=overwrite cargo +nightly test -p futures-async-stream --all-features --test compiletest -- --ignored
12+
# cargo +nightly test -p futures-async-stream --all-features --test compiletest -- --ignored

futures-async-stream-macro/src/visitor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ impl Visitor {
176176
let mut __pinned = #base;
177177
loop {
178178
if let ::futures_async_stream::reexport::task::Poll::Ready(x) =
179-
::futures_async_stream::stream::poll_with_tls_context(unsafe {
179+
::futures_async_stream::future::poll_with_tls_context(unsafe {
180180
::futures_async_stream::reexport::pin::Pin::new_unchecked(&mut __pinned)
181181
})
182182
{

src/lib.rs

Lines changed: 279 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,11 +209,10 @@
209209
no_crate_inject,
210210
attr(deny(warnings, rust_2018_idioms, single_use_lifetimes), allow(dead_code))
211211
))]
212-
#![warn(unsafe_code)]
213212
#![warn(missing_docs)]
214213
#![warn(rust_2018_idioms, single_use_lifetimes, unreachable_pub)]
215214
#![warn(clippy::all)]
216-
#![feature(gen_future, generator_trait)]
215+
#![feature(generator_trait)]
217216

218217
#[doc(inline)]
219218
pub use futures_async_stream_macro::for_await;
@@ -230,13 +229,289 @@ pub use futures_async_stream_macro::async_try_stream;
230229
#[doc(inline)]
231230
pub use futures_async_stream_macro::async_try_stream_block;
232231

232+
use core::{cell::Cell, ptr::NonNull, task::Context};
233+
234+
thread_local! {
235+
static TLS_CX: Cell<Option<NonNull<Context<'static>>>> = Cell::new(None);
236+
}
237+
238+
struct SetOnDrop(Option<NonNull<Context<'static>>>);
239+
240+
impl Drop for SetOnDrop {
241+
fn drop(&mut self) {
242+
TLS_CX.with(|tls_cx| {
243+
tls_cx.set(self.0.take());
244+
});
245+
}
246+
}
247+
248+
// Safety: the returned guard must drop before `cx` is dropped and before
249+
// any previous guard is dropped.
250+
unsafe fn set_task_context(cx: &mut Context<'_>) -> SetOnDrop {
251+
// transmute the context's lifetime to 'static so we can store it.
252+
let cx = core::mem::transmute::<&mut Context<'_>, &mut Context<'static>>(cx);
253+
let old_cx = TLS_CX.with(|tls_cx| tls_cx.replace(Some(NonNull::from(cx))));
254+
SetOnDrop(old_cx)
255+
}
256+
233257
// Not public API.
234258
#[doc(hidden)]
235-
pub mod stream;
259+
pub mod future {
260+
use core::{
261+
future::Future,
262+
ops::{Generator, GeneratorState},
263+
pin::Pin,
264+
task::{Context, Poll},
265+
};
266+
use pin_project::pin_project;
267+
268+
use super::{set_task_context, SetOnDrop, TLS_CX};
269+
270+
// =================================================================================================
271+
// GenFuture
272+
273+
/// Wrap a generator in a future.
274+
///
275+
/// This function returns a `GenFuture` underneath, but hides it in `impl Trait` to give
276+
/// better error messages (`impl Future` rather than `GenFuture<[closure.....]>`).
277+
#[doc(hidden)]
278+
pub fn from_generator<T: Generator<Yield = ()>>(x: T) -> impl Future<Output = T::Return> {
279+
GenFuture(x)
280+
}
281+
282+
/// A wrapper around generators used to implement `Future` for `async`/`await` code.
283+
#[pin_project]
284+
struct GenFuture<T>(#[pin] T);
285+
286+
#[doc(hidden)]
287+
impl<T> Future for GenFuture<T>
288+
where
289+
T: Generator<Yield = ()>,
290+
{
291+
type Output = T::Return;
292+
293+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
294+
let this = self.project();
295+
let _guard = unsafe { set_task_context(cx) };
296+
match this.0.resume() {
297+
GeneratorState::Yielded(()) => Poll::Pending,
298+
GeneratorState::Complete(x) => Poll::Ready(x),
299+
}
300+
}
301+
}
302+
303+
// =================================================================================================
304+
// Poll
305+
306+
/// Polls a future in the current thread-local task waker.
307+
#[doc(hidden)]
308+
pub fn poll_with_tls_context<F>(f: Pin<&mut F>) -> Poll<F::Output>
309+
where
310+
F: Future,
311+
{
312+
let cx_ptr = TLS_CX.with(|tls_cx| {
313+
// Clear the entry so that nested `get_task_waker` calls
314+
// will fail or set their own value.
315+
tls_cx.replace(None)
316+
});
317+
let _reset = SetOnDrop(cx_ptr);
318+
319+
let mut cx_ptr = cx_ptr.expect("TLS Context not set.");
320+
321+
// Safety: we've ensured exclusive access to the context by
322+
// removing the pointer from TLS, only to be replaced once
323+
// we're done with it.
324+
//
325+
// The pointer that was inserted came from an `&mut Context<'_>`,
326+
// so it is safe to treat as mutable.
327+
unsafe { F::poll(f, cx_ptr.as_mut()) }
328+
}
329+
}
330+
331+
// Not public API.
332+
#[doc(hidden)]
333+
pub mod stream {
334+
use core::{
335+
future::Future,
336+
marker::PhantomData,
337+
ops::{Generator, GeneratorState},
338+
pin::Pin,
339+
task::{Context, Poll},
340+
};
341+
use futures_core::stream::Stream;
342+
use pin_project::pin_project;
343+
344+
use super::{set_task_context, SetOnDrop, TLS_CX};
345+
346+
// =================================================================================================
347+
// GenStream
348+
349+
/// Wrap a generator in a stream.
350+
///
351+
/// This function returns a `GenStream` underneath, but hides it in `impl Trait` to give
352+
/// better error messages (`impl Stream` rather than `GenStream<[closure.....]>`).
353+
#[doc(hidden)]
354+
pub fn from_generator<G, T>(gen: G) -> impl Stream<Item = T>
355+
where
356+
G: Generator<Yield = Poll<T>, Return = ()>,
357+
{
358+
GenStream { gen, _phantom: PhantomData }
359+
}
360+
361+
/// A wrapper around generators used to implement `Stream` for `async`/`await` code.
362+
#[pin_project]
363+
struct GenStream<G, T> {
364+
#[pin]
365+
gen: G,
366+
_phantom: PhantomData<T>,
367+
}
368+
369+
impl<G, T> Stream for GenStream<G, T>
370+
where
371+
G: Generator<Yield = Poll<T>, Return = ()>,
372+
{
373+
type Item = T;
374+
375+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
376+
let this = self.project();
377+
let _guard = unsafe { set_task_context(cx) };
378+
match this.gen.resume() {
379+
GeneratorState::Yielded(x) => x.map(Some),
380+
GeneratorState::Complete(()) => Poll::Ready(None),
381+
}
382+
}
383+
}
384+
385+
// =================================================================================================
386+
// Poll
387+
388+
/// Polls a future in the current thread-local task waker.
389+
#[doc(hidden)]
390+
pub fn poll_next_with_tls_context<S>(s: Pin<&mut S>) -> Poll<Option<S::Item>>
391+
where
392+
S: Stream,
393+
{
394+
let cx_ptr = TLS_CX.with(|tls_cx| {
395+
// Clear the entry so that nested `get_task_waker` calls
396+
// will fail or set their own value.
397+
tls_cx.replace(None)
398+
});
399+
let _reset = SetOnDrop(cx_ptr);
400+
401+
let mut cx_ptr = cx_ptr.expect("TLS Context not set.");
402+
403+
// Safety: we've ensured exclusive access to the context by
404+
// removing the pointer from TLS, only to be replaced once
405+
// we're done with it.
406+
//
407+
// The pointer that was inserted came from an `&mut Context<'_>`,
408+
// so it is safe to treat as mutable.
409+
unsafe { S::poll_next(s, cx_ptr.as_mut()) }
410+
}
411+
412+
// =================================================================================================
413+
// Next
414+
415+
// This is equivalent to the `futures::stream::StreamExt::next` method.
416+
// But we want to make this crate dependency as small as possible, so we define our `next` function.
417+
#[doc(hidden)]
418+
pub fn next<S>(stream: &mut S) -> impl Future<Output = Option<S::Item>> + '_
419+
where
420+
S: Stream + Unpin,
421+
{
422+
Next { stream }
423+
}
424+
425+
struct Next<'a, S> {
426+
stream: &'a mut S,
427+
}
428+
429+
impl<S> Future for Next<'_, S>
430+
where
431+
S: Stream + Unpin,
432+
{
433+
type Output = Option<S::Item>;
434+
435+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
436+
Pin::new(&mut self.stream).poll_next(cx)
437+
}
438+
}
439+
}
236440

237441
// Not public API.
238442
#[doc(hidden)]
239-
pub mod try_stream;
443+
pub mod try_stream {
444+
use core::{
445+
marker::PhantomData,
446+
ops::{Generator, GeneratorState},
447+
pin::Pin,
448+
task::{Context, Poll},
449+
};
450+
use futures_core::stream::{FusedStream, Stream};
451+
use pin_project::pin_project;
452+
453+
use super::set_task_context;
454+
455+
// =================================================================================================
456+
// GenStream
457+
458+
/// Wrap a generator in a stream.
459+
///
460+
/// This function returns a `GenStream` underneath, but hides it in `impl Trait` to give
461+
/// better error messages (`impl Stream` rather than `GenStream<[closure.....]>`).
462+
#[doc(hidden)]
463+
pub fn from_generator<G, T, E>(
464+
gen: G,
465+
) -> impl Stream<Item = Result<T, E>> + FusedStream<Item = Result<T, E>>
466+
where
467+
G: Generator<Yield = Poll<T>, Return = Result<(), E>>,
468+
{
469+
GenTryStream { gen, done: false, _phantom: PhantomData }
470+
}
471+
472+
/// A wrapper around generators used to implement `Stream` for `async`/`await` code.
473+
#[pin_project]
474+
struct GenTryStream<G, T, E> {
475+
#[pin]
476+
gen: G,
477+
done: bool,
478+
_phantom: PhantomData<(T, E)>,
479+
}
480+
481+
impl<G, T, E> Stream for GenTryStream<G, T, E>
482+
where
483+
G: Generator<Yield = Poll<T>, Return = Result<(), E>>,
484+
{
485+
type Item = Result<T, E>;
486+
487+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
488+
if self.done {
489+
return Poll::Ready(None);
490+
}
491+
492+
let this = self.project();
493+
let _guard = unsafe { set_task_context(cx) };
494+
let res = match this.gen.resume() {
495+
GeneratorState::Yielded(x) => x.map(|x| Some(Ok(x))),
496+
GeneratorState::Complete(Err(e)) => Poll::Ready(Some(Err(e))),
497+
GeneratorState::Complete(Ok(())) => Poll::Ready(None),
498+
};
499+
if let Poll::Ready(Some(Err(_))) | Poll::Ready(None) = &res {
500+
*this.done = true;
501+
}
502+
res
503+
}
504+
}
505+
506+
impl<G, T, E> FusedStream for GenTryStream<G, T, E>
507+
where
508+
G: Generator<Yield = Poll<T>, Return = Result<(), E>>,
509+
{
510+
fn is_terminated(&self) -> bool {
511+
self.done
512+
}
513+
}
514+
}
240515

241516
// Not public API.
242517
#[doc(hidden)]

0 commit comments

Comments
 (0)