diff --git a/Cargo.lock b/Cargo.lock index a6febf9af8..ccef9a39de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4755,6 +4755,7 @@ dependencies = [ "cap-rand", "cap-std", "cap-time-ext", + "env_logger 0.11.5", "fs-set-times", "futures", "io-extras", @@ -4795,6 +4796,7 @@ dependencies = [ "async-trait", "base64", "bytes", + "env_logger 0.11.5", "flate2", "futures", "http", @@ -5217,7 +5219,7 @@ dependencies = [ [[package]] name = "wit-bindgen" version = "0.42.1" -source = "git+https://github.com/bytecodealliance/wit-bindgen#13b0ab0338268e218134c52511ac69b113895849" +source = "git+https://github.com/dicej/wit-bindgen?branch=no-reads-writes-after-dropped#21a41990e1edfa11e313f254d57d4db9340e25a7" dependencies = [ "wit-bindgen-rt 0.42.1", "wit-bindgen-rust-macro", @@ -5226,7 +5228,7 @@ dependencies = [ [[package]] name = "wit-bindgen-core" version = "0.42.1" -source = "git+https://github.com/bytecodealliance/wit-bindgen#13b0ab0338268e218134c52511ac69b113895849" +source = "git+https://github.com/dicej/wit-bindgen?branch=no-reads-writes-after-dropped#21a41990e1edfa11e313f254d57d4db9340e25a7" dependencies = [ "anyhow", "heck 0.5.0", @@ -5254,7 +5256,7 @@ dependencies = [ [[package]] name = "wit-bindgen-rt" version = "0.42.1" -source = "git+https://github.com/bytecodealliance/wit-bindgen#13b0ab0338268e218134c52511ac69b113895849" +source = "git+https://github.com/dicej/wit-bindgen?branch=no-reads-writes-after-dropped#21a41990e1edfa11e313f254d57d4db9340e25a7" dependencies = [ "bitflags 2.6.0", "futures", @@ -5264,7 +5266,7 @@ dependencies = [ [[package]] name = "wit-bindgen-rust" version = "0.42.1" -source = "git+https://github.com/bytecodealliance/wit-bindgen#13b0ab0338268e218134c52511ac69b113895849" +source = "git+https://github.com/dicej/wit-bindgen?branch=no-reads-writes-after-dropped#21a41990e1edfa11e313f254d57d4db9340e25a7" dependencies = [ "anyhow", "heck 0.5.0", @@ -5279,7 +5281,7 @@ dependencies = [ [[package]] name = "wit-bindgen-rust-macro" version = "0.42.1" -source = "git+https://github.com/bytecodealliance/wit-bindgen#13b0ab0338268e218134c52511ac69b113895849" +source = "git+https://github.com/dicej/wit-bindgen?branch=no-reads-writes-after-dropped#21a41990e1edfa11e313f254d57d4db9340e25a7" dependencies = [ "anyhow", "prettyplease", diff --git a/Cargo.toml b/Cargo.toml index c419cd6a68..30ce6b67c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -631,9 +631,12 @@ lto = true # wasm-wave = { git = "https://github.com/bytecodealliance/wasm-tools" } # wasm-compose = { git = "https://github.com/bytecodealliance/wasm-tools" } # wasm-metadata = { git = "https://github.com/bytecodealliance/wasm-tools" } -wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen" } -wit-bindgen-rt = { git = "https://github.com/bytecodealliance/wit-bindgen" } -wit-bindgen-rust-macro = { git = "https://github.com/bytecodealliance/wit-bindgen" } +# wit-bindgen = { git = "https://github.com/bytecodealliance/wit-bindgen" } +# wit-bindgen-rt = { git = "https://github.com/bytecodealliance/wit-bindgen" } +# wit-bindgen-rust-macro = { git = "https://github.com/bytecodealliance/wit-bindgen" } +wit-bindgen = { git = "https://github.com/dicej/wit-bindgen", branch = "no-reads-writes-after-dropped" } +wit-bindgen-rt = { git = "https://github.com/dicej/wit-bindgen", branch = "no-reads-writes-after-dropped" } +wit-bindgen-rust-macro = { git = "https://github.com/dicej/wit-bindgen", branch = "no-reads-writes-after-dropped" } # wasmparser = { path = '../wasm-tools/crates/wasmparser' } # wat = { path = '../wasm-tools/crates/wat' } diff --git a/crates/test-programs/src/bin/p3_http_middleware.rs b/crates/test-programs/src/bin/p3_http_middleware.rs index e71af92242..ca066aa519 100644 --- a/crates/test-programs/src/bin/p3_http_middleware.rs +++ b/crates/test-programs/src/bin/p3_http_middleware.rs @@ -66,6 +66,7 @@ impl Handler for Component { let remaining = pipe_tx.write_all(mem::take(decoder.get_mut())).await; assert!(remaining.is_empty()); *decoder.get_mut() = remaining; + chunk.clear(); (status, chunk) = body.read(chunk).await; } diff --git a/crates/wasi-http/Cargo.toml b/crates/wasi-http/Cargo.toml index a8c4d28b5e..eeedda8f1a 100644 --- a/crates/wasi-http/Cargo.toml +++ b/crates/wasi-http/Cargo.toml @@ -34,6 +34,7 @@ rustls = { workspace = true } webpki-roots = { workspace = true } [dev-dependencies] +env_logger = { workspace = true } test-programs-artifacts = { workspace = true } test-log = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/wasi-http/src/p3/response.rs b/crates/wasi-http/src/p3/response.rs index 5f93280a96..b6e98747d3 100644 --- a/crates/wasi-http/src/p3/response.rs +++ b/crates/wasi-http/src/p3/response.rs @@ -214,14 +214,14 @@ impl Response { let fut = async move { loop { let (tail, mut rx_buffer) = contents.await; - if let Some(tail) = tail { - let buffer = rx_buffer.split(); - if !buffer.is_empty() { - if let Err(..) = contents_tx.send(buffer.freeze()).await { - break; - } - rx_buffer.reserve(DEFAULT_BUFFER_CAPACITY); + let buffer = rx_buffer.split(); + if !buffer.is_empty() { + if let Err(..) = contents_tx.send(buffer.freeze()).await { + break; } + rx_buffer.reserve(DEFAULT_BUFFER_CAPACITY); + } + if let Some(tail) = tail { contents = tail.read(rx_buffer).boxed(); } else { debug_assert!(rx_buffer.is_empty()); diff --git a/crates/wasi-http/tests/all/p3/outgoing.rs b/crates/wasi-http/tests/all/p3/outgoing.rs index dffa2071f1..5f9d5b22b8 100644 --- a/crates/wasi-http/tests/all/p3/outgoing.rs +++ b/crates/wasi-http/tests/all/p3/outgoing.rs @@ -9,6 +9,8 @@ foreach_p3_http!(assert_test_exists); use super::proxy::{p3_http_echo, p3_http_middleware, p3_http_middleware_with_chain}; async fn run(path: &str, server: &Server) -> anyhow::Result<()> { + _ = env_logger::try_init(); + let engine = test_programs_artifacts::engine(|config| { config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); config.async_support(true); diff --git a/crates/wasi-http/tests/all/p3/proxy.rs b/crates/wasi-http/tests/all/p3/proxy.rs index 3c8601ab46..6af11a5dec 100644 --- a/crates/wasi-http/tests/all/p3/proxy.rs +++ b/crates/wasi-http/tests/all/p3/proxy.rs @@ -106,6 +106,8 @@ pub async fn p3_http_middleware_with_chain() -> Result<()> { } async fn test_http_echo(component: &str, use_compression: bool) -> Result<()> { + _ = env_logger::try_init(); + let body = b"And the mome raths outgrabe"; // Prepare the raw body, optionally compressed if that's what we're diff --git a/crates/wasi/Cargo.toml b/crates/wasi/Cargo.toml index 1e669c1e92..ba06f3db09 100644 --- a/crates/wasi/Cargo.toml +++ b/crates/wasi/Cargo.toml @@ -38,6 +38,7 @@ futures = { workspace = true } url = { workspace = true } [dev-dependencies] +env_logger = { workspace = true } tokio = { workspace = true, features = ["time", "sync", "io-std", "io-util", "rt", "rt-multi-thread", "net", "macros", "fs"] } test-log = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/wasi/tests/all/p3/mod.rs b/crates/wasi/tests/all/p3/mod.rs index a6f8fdf64f..d2a14f9008 100644 --- a/crates/wasi/tests/all/p3/mod.rs +++ b/crates/wasi/tests/all/p3/mod.rs @@ -104,6 +104,8 @@ impl WasiSocketsView for Ctx { } async fn run(path: &str) -> anyhow::Result<()> { + _ = env_logger::try_init(); + let path = Path::new(path); let engine = test_programs_artifacts::engine(|config| { config.async_support(true); diff --git a/crates/wasmtime/src/runtime/component/concurrent.rs b/crates/wasmtime/src/runtime/component/concurrent.rs index ababc50716..e63510552e 100644 --- a/crates/wasmtime/src/runtime/component/concurrent.rs +++ b/crates/wasmtime/src/runtime/component/concurrent.rs @@ -4115,18 +4115,18 @@ impl Waitable { assert_eq!(rep, self.rep()); assert_eq!(*state, StreamFutureState::Busy); *state = match event { - Event::FutureRead { .. } => StreamFutureState::Read, - Event::FutureWrite { .. } => StreamFutureState::Write, + Event::FutureRead { .. } => StreamFutureState::Read { done: false }, + Event::FutureWrite { .. } => StreamFutureState::Write { done: false }, _ => unreachable!(), }; } Event::StreamRead { pending: Some((ty, handle)), - .. + code, } | Event::StreamWrite { pending: Some((ty, handle)), - .. + code, } => { let runtime_instance = instance.component().types()[ty].instance; let (rep, WaitableState::Stream(actual_ty, state)) = instance.waitable_tables() @@ -4139,9 +4139,10 @@ impl Waitable { assert_eq!(*actual_ty, ty); assert_eq!(rep, self.rep()); assert_eq!(*state, StreamFutureState::Busy); + let done = matches!(code, ReturnCode::Dropped(_)); *state = match event { - Event::StreamRead { .. } => StreamFutureState::Read, - Event::StreamWrite { .. } => StreamFutureState::Write, + Event::StreamRead { .. } => StreamFutureState::Read { done }, + Event::StreamWrite { .. } => StreamFutureState::Write { done }, _ => unreachable!(), }; } diff --git a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs index d3083049fd..8b52290981 100644 --- a/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs +++ b/crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs @@ -25,6 +25,7 @@ use { }, std::{ boxed::Box, + fmt, future::Future, iter, marker::PhantomData, @@ -338,10 +339,20 @@ fn accept_writer, U>( /// given component instance. #[derive(Debug, Eq, PartialEq)] pub(super) enum StreamFutureState { - /// Only the write end is owned by this component instance. - Write, - /// Only the read end is owned by this component instance. - Read, + /// The write end of the stream or future. + Write { + /// Whether the component instance has been notified that the stream or + /// future is "done" (i.e. the other end has dropped, or, in the case of + /// a future, a value has been transmitted). + done: bool, + }, + /// The read end of the stream or future. + Read { + /// Whether the component instance has been notified that the stream or + /// future is "done" (i.e. the other end has dropped, or, in the case of + /// a future, a value has been transmitted). + done: bool, + }, /// A read or write is in progress. Busy, } @@ -638,13 +649,22 @@ impl HostFuture { get_mut_by_index_from(state_table, TableIndex::Future(src), index)?; match state { - StreamFutureState::Read => { + StreamFutureState::Read { .. } => { state_table.remove_by_index(index)?; } - StreamFutureState::Write => bail!("cannot transfer write end of future"), + StreamFutureState::Write { .. } => bail!("cannot transfer write end of future"), StreamFutureState::Busy => bail!("cannot transfer busy future"), } + let state = cx + .instance_mut() + .get(TableId::::new(rep))? + .state; + + if cx.instance_mut().get(state)?.done { + bail!("cannot lift future after previous read succeeded"); + } + Ok(Self::new(rep, cx.instance_handle())) } _ => func::bad_type_info(), @@ -668,7 +688,10 @@ pub(crate) fn lower_future_to_index( cx.instance_mut() .state_table(TableIndex::Future(dst)) - .insert(rep, WaitableState::Future(dst, StreamFutureState::Read)) + .insert( + rep, + WaitableState::Future(dst, StreamFutureState::Read { done: false }), + ) } _ => func::bad_type_info(), } @@ -1004,10 +1027,13 @@ impl HostStream { get_mut_by_index_from(state_table, TableIndex::Stream(src), index)?; match state { - StreamFutureState::Read => { + StreamFutureState::Read { done: true } => bail!( + "cannot lift stream after being notified that the writable end dropped" + ), + StreamFutureState::Read { done: false } => { state_table.remove_by_index(index)?; } - StreamFutureState::Write => bail!("cannot transfer write end of stream"), + StreamFutureState::Write { .. } => bail!("cannot transfer write end of stream"), StreamFutureState::Busy => bail!("cannot transfer busy stream"), } @@ -1034,7 +1060,10 @@ pub(crate) fn lower_stream_to_index( cx.instance_mut() .state_table(TableIndex::Stream(dst)) - .insert(rep, WaitableState::Stream(dst, StreamFutureState::Read)) + .insert( + rep, + WaitableState::Stream(dst, StreamFutureState::Read { done: false }), + ) } _ => func::bad_type_info(), } @@ -1331,8 +1360,8 @@ struct TransmitState { writer_watcher: Option>, /// Like `writer_watcher`, but for the reverse direction. reader_watcher: Option>, - /// Whether the write end may be dropped or not. - may_drop_writer: bool, + /// Whether futher values may be transmitted via this stream or future. + done: bool, } impl Default for TransmitState { @@ -1344,7 +1373,7 @@ impl Default for TransmitState { write: WriteState::Open, reader_watcher: None, writer_watcher: None, - may_drop_writer: true, + done: false, } } } @@ -1379,6 +1408,17 @@ enum WriteState { Dropped, } +impl fmt::Debug for WriteState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::Open => f.debug_tuple("Open").finish(), + Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(), + Self::HostReady { .. } => f.debug_tuple("HostReady").finish(), + Self::Dropped => f.debug_tuple("Dropped").finish(), + } + } +} + /// Represents the state of the read end of a stream or future. enum ReadState { /// The read end is open, but no read is pending. @@ -1400,6 +1440,17 @@ enum ReadState { Dropped, } +impl fmt::Debug for ReadState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::Open => f.debug_tuple("Open").finish(), + Self::GuestReady { .. } => f.debug_tuple("GuestReady").finish(), + Self::HostReady { .. } => f.debug_tuple("HostReady").finish(), + Self::Dropped => f.debug_tuple("Dropped").finish(), + } + } +} + /// Parameter type to pass to a `ReadState::HostReady` closure. /// /// See also `accept_writer`. @@ -1462,7 +1513,7 @@ impl Instance { default: fn() -> T, mut store: S, ) -> Result<(FutureWriter, FutureReader)> { - let (write, read) = store.as_context_mut()[self.id()].new_transmit(TransmitKind::Future)?; + let (write, read) = store.as_context_mut()[self.id()].new_transmit()?; Ok(( FutureWriter::new( @@ -1498,7 +1549,7 @@ impl Instance { self, mut store: S, ) -> Result<(StreamWriter, StreamReader)> { - let (write, read) = store.as_context_mut()[self.id()].new_transmit(TransmitKind::Stream)?; + let (write, read) = store.as_context_mut()[self.id()].new_transmit()?; Ok(( StreamWriter::new( @@ -1694,7 +1745,7 @@ impl Instance { let transmit = store[self.id()] .get_mut(transmit_id) .with_context(|| format!("retrieving state for transmit [{transmit_rep}]"))?; - transmit.may_drop_writer = true; + log::trace!("host_write state {transmit_id:?}; {:?}", transmit.read); let new_state = if let ReadState::Dropped = &transmit.read { ReadState::Dropped @@ -1728,6 +1779,10 @@ impl Instance { handle, .. } => { + if let TransmitKind::Future = kind { + transmit.done = true; + } + let read_handle = transmit.read_handle; let code = accept_reader::(store.as_context_mut(), buffer, tx, kind)( store.0.traitobj_mut(), @@ -1807,6 +1862,7 @@ impl Instance { let transmit = store[self.id()] .get_mut(transmit_id) .with_context(|| rep.to_string())?; + log::trace!("host_read state {transmit_id:?}; {:?}", transmit.write); let new_state = if let WriteState::Dropped = &transmit.write { WriteState::Dropped @@ -1833,6 +1889,10 @@ impl Instance { post_write, .. } => { + if let TableIndex::Future(_) = ty { + transmit.done = true; + } + let write_handle = transmit.write_handle; let types = store[self.id()].component().types().clone(); let lift = @@ -1917,6 +1977,11 @@ impl Instance { let transmit = instance .get_mut(transmit_id) .with_context(|| format!("error closing reader {transmit_rep}"))?; + log::trace!( + "host_drop_reader state {transmit_id:?}; read state {:?} write state {:?}", + transmit.read, + transmit.write + ); transmit.read = ReadState::Dropped; transmit.reader_watcher = None; @@ -2167,19 +2232,30 @@ impl Instance { }; let instance = &mut store[self.id()]; let (rep, state) = instance.get_mut_by_index(ty, handle)?; - let StreamFutureState::Write = *state else { + let StreamFutureState::Write { done } = *state else { bail!( - "invalid handle {handle}; expected {:?}; got {:?}", - StreamFutureState::Write, + "invalid handle {handle}; expected `Write`; got {:?}", *state ); }; + + if done { + bail!("cannot write to stream after being notified that the readable end dropped"); + } + *state = StreamFutureState::Busy; let transmit_handle = TableId::::new(rep); let transmit_id = instance.get(transmit_handle)?.state; - log::trace!("guest_write {transmit_handle:?} (handle {handle}; state {transmit_id:?})",); let transmit = instance.get_mut(transmit_id)?; - transmit.may_drop_writer = true; + log::trace!( + "guest_write {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}", + transmit.read + ); + + if transmit.done { + bail!("cannot write to future after previous write succeeded or readable end dropped"); + } + let new_state = if let ReadState::Dropped = &transmit.read { ReadState::Dropped } else { @@ -2212,6 +2288,10 @@ impl Instance { } => { assert_eq!(flat_abi, read_flat_abi); + if let TableIndex::Future(_) = ty { + transmit.done = true; + } + // Note that zero-length reads and writes are handling specially // by the spec to allow each end to signal readiness to the // other. Quoting the spec: @@ -2309,6 +2389,10 @@ impl Instance { } ReadState::HostReady { accept } => { + if let TableIndex::Future(_) = ty { + transmit.done = true; + } + let types = instance.component().types().clone(); let lift = &mut LiftContext::new(store.0.store_opaque_mut(), &options, &types, self); @@ -2325,11 +2409,22 @@ impl Instance { ReturnCode::Blocked } - ReadState::Dropped => ReturnCode::Dropped(0), + ReadState::Dropped => { + if let TableIndex::Future(_) = ty { + transmit.done = true; + } + + ReturnCode::Dropped(0) + } }; if result != ReturnCode::Blocked { - *store[self.id()].get_mut_by_index(ty, handle)?.1 = StreamFutureState::Write; + *store[self.id()].get_mut_by_index(ty, handle)?.1 = StreamFutureState::Write { + done: matches!( + (result, ty), + (ReturnCode::Dropped(_), TableIndex::Stream(_)) + ), + }; } Ok(result) @@ -2371,18 +2466,27 @@ impl Instance { }; let instance = &mut store[self.id()]; let (rep, state) = instance.get_mut_by_index(ty, handle)?; - let StreamFutureState::Read = *state else { - bail!( - "invalid handle {handle}; expected {:?}; got {:?}", - StreamFutureState::Read, - *state - ); + let StreamFutureState::Read { done } = *state else { + bail!("invalid handle {handle}; expected `Read`; got {:?}", *state); }; + + if done { + bail!("cannot read from stream after being notified that the writable end dropped"); + } + *state = StreamFutureState::Busy; let transmit_handle = TableId::::new(rep); let transmit_id = instance.get(transmit_handle)?.state; - log::trace!("guest_read {transmit_handle:?} (handle {handle}; state {transmit_id:?})",); let transmit = instance.get_mut(transmit_id)?; + log::trace!( + "guest_read {transmit_handle:?} (handle {handle}; state {transmit_id:?}); {:?}", + transmit.write + ); + + if transmit.done { + bail!("cannot read from future after previous read succeeded"); + } + let new_state = if let WriteState::Dropped = &transmit.write { WriteState::Dropped } else { @@ -2415,6 +2519,10 @@ impl Instance { } => { assert_eq!(flat_abi, write_flat_abi); + if let TableIndex::Future(_) = ty { + transmit.done = true; + } + let write_handle_rep = transmit.write_handle.rep(); // See the comment in `guest_write` for the @@ -2505,6 +2613,10 @@ impl Instance { } WriteState::HostReady { accept, post_write } => { + if let TableIndex::Future(_) = ty { + transmit.done = true; + } + let code = accept( store.0.traitobj_mut(), self, @@ -2532,7 +2644,12 @@ impl Instance { }; if result != ReturnCode::Blocked { - *store[self.id()].get_mut_by_index(ty, handle)?.1 = StreamFutureState::Read; + *store[self.id()].get_mut_by_index(ty, handle)?.1 = StreamFutureState::Read { + done: matches!( + (result, ty), + (ReturnCode::Dropped(_), TableIndex::Stream(_)) + ), + }; } Ok(result) @@ -2555,8 +2672,8 @@ impl Instance { } }; match state { - StreamFutureState::Read => {} - StreamFutureState::Write => { + StreamFutureState::Read { .. } => {} + StreamFutureState::Write { .. } => { bail!("passed write end to `{{stream|future}}.drop-readable`") } StreamFutureState::Busy => bail!("cannot drop busy stream or future"), @@ -2834,10 +2951,7 @@ impl ComponentInstance { /// Allocate a new future or stream, including the `TransmitState` and the /// `TransmitHandle`s corresponding to the read and write ends. - fn new_transmit( - &mut self, - kind: TransmitKind, - ) -> Result<(TableId, TableId)> { + fn new_transmit(&mut self) -> Result<(TableId, TableId)> { let state_id = self.push(TransmitState::default())?; let write = self.push(TransmitHandle::new(state_id))?; @@ -2847,10 +2961,6 @@ impl ComponentInstance { state.write_handle = write; state.read_handle = read; - if let TransmitKind::Future = kind { - state.may_drop_writer = false; - } - log::trace!("new transmit: state {state_id:?}; write {write:?}; read {read:?}",); Ok((write, read)) @@ -2883,16 +2993,15 @@ impl ComponentInstance { /// write ends to the (sub-)component instance to which the specified /// `TableIndex` belongs. fn guest_new(&mut self, ty: TableIndex) -> Result { - let (write, read) = self.new_transmit(match ty { - TableIndex::Future(_) => TransmitKind::Future, - TableIndex::Stream(_) => TransmitKind::Stream, - })?; - let read = self - .state_table(ty) - .insert(read.rep(), waitable_state(ty, StreamFutureState::Read))?; - let write = self - .state_table(ty) - .insert(write.rep(), waitable_state(ty, StreamFutureState::Write))?; + let (write, read) = self.new_transmit()?; + let read = self.state_table(ty).insert( + read.rep(), + waitable_state(ty, StreamFutureState::Read { done: false }), + )?; + let write = self.state_table(ty).insert( + write.rep(), + waitable_state(ty, StreamFutureState::Write { done: false }), + )?; Ok(ResourcePair { write, read }) } @@ -2901,10 +3010,14 @@ impl ComponentInstance { /// # Arguments /// /// * `rep` - The `TransmitState` rep for the stream or future. - /// * `kind` - Whether `rep` is for a stream or a future. - fn host_cancel_write(&mut self, rep: u32, kind: TransmitKind) -> Result { + fn host_cancel_write(&mut self, rep: u32) -> Result { let transmit_id = TableId::::new(rep); let transmit = self.get_mut(transmit_id)?; + log::trace!( + "host_cancel_write state {transmit_id:?}; write state {:?} read state {:?}", + transmit.read, + transmit.write + ); let code = if let Some(event) = Waitable::Transmit(transmit.write_handle).take_event(self)? @@ -2935,10 +3048,6 @@ impl ComponentInstance { log::trace!("cancelled write {transmit_id:?}"); - if let (TransmitKind::Future, ReturnCode::Cancelled(0)) = (kind, code) { - transmit.may_drop_writer = false; - } - Ok(code) } @@ -2950,6 +3059,11 @@ impl ComponentInstance { fn host_cancel_read(&mut self, rep: u32) -> Result { let transmit_id = TableId::::new(rep); let transmit = self.get_mut(transmit_id)?; + log::trace!( + "host_cancel_read state {transmit_id:?}; read state {:?} write state {:?}", + transmit.read, + transmit.write + ); let code = if let Some(event) = Waitable::Transmit(transmit.read_handle).take_event(self)? { let (Event::FutureRead { code, .. } | Event::StreamRead { code, .. }) = event else { @@ -2991,10 +3105,11 @@ impl ComponentInstance { let transmit = self .get_mut(transmit_id) .with_context(|| format!("error closing writer {transmit_rep}"))?; - - if !transmit.may_drop_writer { - bail!("cannot drop future write end without first writing a value") - } + log::trace!( + "host_drop_writer state {transmit_id:?}; write state {:?} read state {:?}", + transmit.read, + transmit.write + ); transmit.writer_watcher = None; @@ -3007,6 +3122,13 @@ impl ComponentInstance { *post_write = PostWrite::Drop; } v @ WriteState::Open => { + if let (TransmitKind::Future, false) = ( + kind, + transmit.done || matches!(transmit.read, ReadState::Dropped), + ) { + bail!("cannot drop future write end without first writing a value") + } + *v = WriteState::Dropped; } WriteState::Dropped => unreachable!("write state is already dropped"), @@ -3087,27 +3209,26 @@ impl ComponentInstance { writer: u32, _async_: bool, ) -> Result { - let (rep, state) = self.state_table(ty).get_mut_by_index(writer)?; - let (state, kind) = match state { - WaitableState::Stream(_, state) => (state, TransmitKind::Stream), - WaitableState::Future(_, state) => (state, TransmitKind::Future), - _ => bail!("invalid stream or future handle"), + let (rep, WaitableState::Stream(_, state) | WaitableState::Future(_, state)) = + self.state_table(ty).get_mut_by_index(writer)? + else { + bail!("invalid stream or future handle"); }; let id = TableId::::new(rep); log::trace!("guest cancel write {id:?} (handle {writer})"); match state { - StreamFutureState::Write => { + StreamFutureState::Write { .. } => { bail!("stream or future write cancelled when no write is pending") } - StreamFutureState::Read => { + StreamFutureState::Read { .. } => { bail!("passed read end to `{{stream|future}}.cancel-write`") } StreamFutureState::Busy => { - *state = StreamFutureState::Write; + *state = StreamFutureState::Write { done: false }; } } let rep = self.get(id)?.state.rep(); - self.host_cancel_write(rep, kind) + self.host_cancel_write(rep) } /// Cancel a pending read for the specified stream or future from the guest. @@ -3125,14 +3246,14 @@ impl ComponentInstance { let id = TableId::::new(rep); log::trace!("guest cancel read {id:?} (handle {reader})"); match state { - StreamFutureState::Read => { + StreamFutureState::Read { .. } => { bail!("stream or future read cancelled when no read is pending") } - StreamFutureState::Write => { + StreamFutureState::Write { .. } => { bail!("passed write end to `{{stream|future}}.cancel-read`") } StreamFutureState::Busy => { - *state = StreamFutureState::Read; + *state = StreamFutureState::Read { done: false }; } } let rep = self.get(id)?.state.rep(); @@ -3153,17 +3274,16 @@ impl ComponentInstance { } }; match state { - StreamFutureState::Write => {} - StreamFutureState::Read => { + StreamFutureState::Write { .. } => {} + StreamFutureState::Read { .. } => { bail!("passed read end to `{{stream|future}}.drop-writable`") } StreamFutureState::Busy => bail!("cannot drop busy stream or future"), } - let transmit_rep = self - .get(TableId::::new(transmit_rep))? - .state - .rep(); + let id = TableId::::new(transmit_rep); + let transmit_rep = self.get(id)?.state.rep(); + log::trace!("guest_drop_writable: drop writer {id:?}"); self.host_drop_writer(transmit_rep, kind) } @@ -3241,13 +3361,21 @@ impl ComponentInstance { let (_, src_state) = match_state(src_state)?; match src_state { - StreamFutureState::Read => { + StreamFutureState::Read { done: true } => { + bail!("cannot lift stream after being notified that the writable end dropped") + } + StreamFutureState::Read { done: false } => { src_table.remove_by_index(src_idx)?; let dst_table = &mut self.waitable_tables()[dst_instance]; - dst_table.insert(rep, make_state(dst, StreamFutureState::Read)) + dst_table.insert( + rep, + make_state(dst, StreamFutureState::Read { done: false }), + ) + } + StreamFutureState::Write { .. } => { + bail!("cannot transfer write end of stream or future") } - StreamFutureState::Write => bail!("cannot transfer write end of stream or future"), StreamFutureState::Busy => bail!("cannot transfer busy stream or future"), } } diff --git a/crates/wasmtime/src/runtime/func.rs b/crates/wasmtime/src/runtime/func.rs index aff565c30e..3c4b07a5dd 100644 --- a/crates/wasmtime/src/runtime/func.rs +++ b/crates/wasmtime/src/runtime/func.rs @@ -1706,7 +1706,7 @@ impl EntryStoreContext { /// This function restores the values stored in this struct. We invoke this /// function through this type's `Drop` implementation. This ensures that we /// even restore the values if we unwind the stack (e.g., because we are - /// panicing out of a Wasm execution). + /// panicking out of a Wasm execution). #[inline] fn exit_wasm(&mut self) { unsafe { diff --git a/tests/misc_testsuite/component-model-async/trap-if-done.wast b/tests/misc_testsuite/component-model-async/trap-if-done.wast new file mode 100644 index 0000000000..b8c832b052 --- /dev/null +++ b/tests/misc_testsuite/component-model-async/trap-if-done.wast @@ -0,0 +1,473 @@ +;;! component_model_async = true + +;; This test has two components $C and $D, where $D imports and calls $C. +;; $C contains utility functions used by $D to create futures/streams, +;; write to them and close them. $D uses these utility functions to test for +;; all the cases where, once a future/stream is "done", further uses of the +;; future/stream trap. +;; +;; $D exports a list of functions, one for each case of trapping. Since traps +;; take out their containing instance, a fresh instance of $Tester is created +;; for each call to a $D export. +;; +;; When testing traps involving the readable end, the exports of $D take a +;; "bool" parameter that toggles whether the trap is triggered by +;; {stream,future}.{read,write} or by lifting, and the top-level commands +;; pass 'false' and 'true'. +;; +;; (Copied from +;; https://github.com/WebAssembly/component-model/blob/fix-future/test/async/trap-if-done.wast) +(component definition $Tester + (component $C + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $CM + (import "" "mem" (memory 1)) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32))) + (import "" "future.new" (func $future.new (result i64))) + (import "" "future.write" (func $future.write (param i32 i32) (result i32))) + (import "" "future.drop-writable" (func $future.drop-writable (param i32))) + (import "" "stream.new" (func $stream.new (result i64))) + (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) + (import "" "stream.drop-writable" (func $stream.drop-writable (param i32))) + + (global $writable-end (mut i32) (i32.const 0)) + (global $ws (mut i32) (i32.const 0)) + + (func $start (global.set $ws (call $waitable-set.new))) + (start $start) + + (func $start-future (export "start-future") (result i32) + ;; create a new future, return the readable end to the caller + (local $ret64 i64) + (local.set $ret64 (call $future.new)) + (global.set $writable-end (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32)))) + (call $waitable.join (global.get $writable-end) (global.get $ws) ) + (i32.wrap_i64 (local.get $ret64)) + ) + (func $future-write (export "future-write") (result i32) + ;; the caller will assert what they expect the return value to be + (i32.store (i32.const 16) (i32.const 42)) + (call $future.write (global.get $writable-end) (i32.const 16)) + ) + (func $acknowledge-future-write (export "acknowledge-future-write") + ;; confirm we got a FUTURE_WRITE $writable-end COMPLETED event + (local $ret i32) + (local.set $ret (call $waitable-set.wait (global.get $ws) (i32.const 0))) + (if (i32.ne (i32.const 5 (; FUTURE_WRITE ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (global.get $writable-end) (i32.load (i32.const 0))) + (then unreachable)) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (i32.load (i32.const 4))) + (then unreachable)) + ) + (func $future-drop-writable (export "future-drop-writable") + ;; maybe boom + (call $future.drop-writable (global.get $writable-end)) + ) + + (func $start-stream (export "start-stream") (result i32) + ;; create a new stream, return the readable end to the caller + (local $ret64 i64) + (local.set $ret64 (call $stream.new)) + (global.set $writable-end (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32)))) + (call $waitable.join (global.get $writable-end) (global.get $ws) ) + (i32.wrap_i64 (local.get $ret64)) + ) + (func $stream-write (export "stream-write") (result i32) + ;; the caller will assert what they expect the return value to be + (i32.store (i32.const 16) (i32.const 42)) + (call $stream.write (global.get $writable-end) (i32.const 16) (i32.const 1)) + ) + (func $acknowledge-stream-write (export "acknowledge-stream-write") + ;; confirm we got a STREAM_WRITE $writable-end COMPLETED event + (local $ret i32) + (local.set $ret (call $waitable-set.wait (global.get $ws) (i32.const 0))) + (if (i32.ne (i32.const 3 (; STREAM_WRITE ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (global.get $writable-end) (i32.load (i32.const 0))) + (then unreachable)) + (if (i32.ne (i32.const 0x11 (; DROPPED=1 | (1<<4) ;)) (i32.load (i32.const 4))) + (then unreachable)) + ) + (func $stream-drop-writable (export "stream-drop-writable") + ;; maybe boom + (call $stream.drop-writable (global.get $writable-end)) + ) + ) + (type $FT (future u8)) + (type $ST (stream u8)) + (canon waitable.join (core func $waitable.join)) + (canon waitable-set.new (core func $waitable-set.new)) + (canon waitable-set.wait (memory $memory "mem") (core func $waitable-set.wait)) + (canon future.new $FT (core func $future.new)) + (canon future.write $FT async (memory $memory "mem") (core func $future.write)) + (canon future.drop-writable $FT (core func $future.drop-writable)) + (canon stream.new $ST (core func $stream.new)) + (canon stream.write $ST async (memory $memory "mem") (core func $stream.write)) + (canon stream.drop-writable $ST (core func $stream.drop-writable)) + (core instance $cm (instantiate $CM (with "" (instance + (export "mem" (memory $memory "mem")) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + (export "waitable-set.wait" (func $waitable-set.wait)) + (export "future.new" (func $future.new)) + (export "future.write" (func $future.write)) + (export "future.drop-writable" (func $future.drop-writable)) + (export "stream.new" (func $stream.new)) + (export "stream.write" (func $stream.write)) + (export "stream.drop-writable" (func $stream.drop-writable)) + )))) + (func (export "start-future") (result (future u8)) (canon lift (core func $cm "start-future"))) + (func (export "future-write") (result u32) (canon lift (core func $cm "future-write"))) + (func (export "acknowledge-future-write") (canon lift (core func $cm "acknowledge-future-write"))) + (func (export "future-drop-writable") (canon lift (core func $cm "future-drop-writable"))) + (func (export "start-stream") (result (stream u8)) (canon lift (core func $cm "start-stream"))) + (func (export "stream-write") (result u32) (canon lift (core func $cm "stream-write"))) + (func (export "acknowledge-stream-write") (canon lift (core func $cm "acknowledge-stream-write"))) + (func (export "stream-drop-writable") (canon lift (core func $cm "stream-drop-writable"))) + ) + (component $D + (import "c" (instance $c + (export "start-future" (func (result (future u8)))) + (export "future-write" (func (result u32))) + (export "acknowledge-future-write" (func)) + (export "future-drop-writable" (func)) + (export "start-stream" (func (result (stream u8)))) + (export "stream-write" (func (result u32))) + (export "acknowledge-stream-write" (func)) + (export "stream-drop-writable" (func)) + )) + + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $Core + (import "" "mem" (memory 1)) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32))) + (import "" "future.read" (func $future.read (param i32 i32) (result i32))) + (import "" "future.drop-readable" (func $future.drop-readable (param i32))) + (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32))) + (import "" "stream.drop-readable" (func $stream.drop-readable (param i32))) + (import "" "start-future" (func $start-future (result i32))) + (import "" "future-write" (func $future-write (result i32))) + (import "" "acknowledge-future-write" (func $acknowledge-future-write)) + (import "" "future-drop-writable" (func $future-drop-writable)) + (import "" "start-stream" (func $start-stream (result i32))) + (import "" "stream-write" (func $stream-write (result i32))) + (import "" "acknowledge-stream-write" (func $acknowledge-stream-write)) + (import "" "stream-drop-writable" (func $stream-drop-writable)) + + (func $trap-after-future-eager-write (export "trap-after-future-eager-write") + (local $ret i32) + (local $fr i32) + (local.set $fr (call $start-future)) + + ;; start a read on our end so the next write will succeed + (local.set $ret (call $future.read (local.get $fr) (i32.const 16))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; calling future.write in $C should succeed eagerly + (local.set $ret (call $future-write)) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load8_u (i32.const 16))) + (then unreachable)) + + ;; calling future.write in $C now should trap + (drop (call $future-write)) + ) + (func $trap-after-future-async-write (export "trap-after-future-async-write") + (local $ret i32) + (local $fr i32) + (local.set $fr (call $start-future)) + + ;; calling future.write in $C should block + (local.set $ret (call $future-write)) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; our future.read should then succeed eagerly + (local.set $ret (call $future.read (local.get $fr) (i32.const 16))) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load8_u (i32.const 16))) + (then unreachable)) + + ;; let $C see the write completed so the future is 'done' + (call $acknowledge-future-write) + + ;; trying to call future.write again in $C should trap + (drop (call $future-write)) + ) + (func $trap-after-future-reader-dropped (export "trap-after-future-reader-dropped") + (local $ret i32) + (local $fr i32) + (local.set $fr (call $start-future)) + + ;; drop our readable end before writer can write + (call $future.drop-readable (local.get $fr)) + + ;; let $C try to future.write and find out we DROPPED + (local.set $ret (call $future-write)) + (if (i32.ne (i32.const 1 (; DROPPED ;)) (local.get $ret)) + (then unreachable)) + + ;; trying to call future.write again in $C should trap + (drop (call $future-write)) + ) + (func $trap-after-future-eager-read (export "trap-after-future-eager-read") (param $bool i32) (result i32) + (local $ret i32) + (local $fr i32) + (local.set $fr (call $start-future)) + + ;; calling future.write in $C should block + (local.set $ret (call $future-write)) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; our future.read should then succeed eagerly + (local.set $ret (call $future.read (local.get $fr) (i32.const 16))) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load8_u (i32.const 16))) + (then unreachable)) + + (if (i32.eqz (local.get $bool)) (then + ;; calling future.read again should then trap + (drop (call $future.read (local.get $fr) (i32.const 16))) + ) (else + ;; lifting the future by returning it should also trap + (return (local.get $fr)) + )) + unreachable + ) + (func $trap-after-future-async-read (export "trap-after-future-async-read") (param $bool i32) (result i32) + (local $ret i32) (local $ws i32) + (local $fr i32) + (local.set $fr (call $start-future)) + + ;; read first, so it blocks + (local.set $ret (call $future.read (local.get $fr) (i32.const 16))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; calling future.write in $C should then succeed eagerly + (local.set $ret (call $future-write)) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load8_u (i32.const 16))) + (then unreachable)) + + ;; wait to see that our blocked future.read COMPLETED, producing '42' + (local.set $ws (call $waitable-set.new)) + (call $waitable.join (local.get $fr) (local.get $ws)) + (local.set $ret (call $waitable-set.wait (local.get $ws) (i32.const 0))) + (if (i32.ne (i32.const 4 (; FUTURE_READ ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (local.get $fr) (i32.load (i32.const 0))) + (then unreachable)) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (i32.load (i32.const 4))) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load (i32.const 16))) + (then unreachable)) + + (if (i32.eqz (local.get $bool)) (then + ;; calling future.read again should then trap + (drop (call $future.read (local.get $fr) (i32.const 16))) + ) (else + ;; lifting the future by returning it should also trap + (return (local.get $fr)) + )) + unreachable + ) + (func $trap-after-stream-reader-eager-dropped (export "trap-after-stream-reader-eager-dropped") + (local $ret i32) + (local $sr i32) + (local.set $sr (call $start-stream)) + + ;; drop our readable end before writer can write + (call $stream.drop-readable (local.get $sr)) + + ;; let $C try to stream.write and find out we DROPPED + (local.set $ret (call $stream-write)) + (if (i32.ne (i32.const 1 (; DROPPED ;)) (local.get $ret)) + (then unreachable)) + + ;; trying to call stream.write again in $C should trap + (drop (call $stream-write)) + ) + (func $trap-after-stream-reader-async-dropped (export "trap-after-stream-reader-async-dropped") + (local $ret i32) + (local $sr i32) + (local.set $sr (call $start-stream)) + + ;; calling stream.write in $C should block + (local.set $ret (call $stream-write)) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; our stream.read should then succeed eagerly + (local.set $ret (call $stream.read (local.get $sr) (i32.const 16) (i32.const 100))) + (if (i32.ne (i32.const 0x10 (; COMPLETED=0 | (1<<4) ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load8_u (i32.const 16))) + (then unreachable)) + + ;; then drop our readable end + (call $stream.drop-readable (local.get $sr)) + + ;; let $C see that it's stream.write COMPLETED and wrote 1 elem + (call $acknowledge-stream-write) + + ;; now calling stream.write again in $C will trap + (drop (call $stream-write)) + ) + (func $trap-after-stream-writer-eager-dropped (export "trap-after-stream-writer-eager-dropped") (param $bool i32) (result i32) + (local $ret i32) + (local $sr i32) + (local.set $sr (call $start-stream)) + + ;; immediately drop the writable end + (call $stream-drop-writable) + + ;; calling stream.read will see that the writer dropped + (local.set $ret (call $stream.read (local.get $sr) (i32.const 16) (i32.const 100))) + (if (i32.ne (i32.const 0x01 (; DROPPED=1 | (0<<4) ;)) (local.get $ret)) + (then unreachable)) + + (if (i32.eqz (local.get $bool)) (then + ;; calling stream.read again should then trap + (drop (call $stream.read (local.get $sr) (i32.const 16) (i32.const 100))) + ) (else + ;; lifting the stream by returning it should also trap + (return (local.get $sr)) + )) + unreachable + ) + (func $trap-after-stream-writer-async-dropped (export "trap-after-stream-writer-async-dropped") (param $bool i32) (result i32) + (local $ret i32) (local $ws i32) + (local $sr i32) + (local.set $sr (call $start-stream)) + + ;; start a read on our end first which will block + (local.set $ret (call $stream.read (local.get $sr) (i32.const 16) (i32.const 100))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; drop the writable end before writing anything + (call $stream-drop-writable) + + ;; wait to see that our blocked stream.read was DROPPED + (local.set $ws (call $waitable-set.new)) + (call $waitable.join (local.get $sr) (local.get $ws)) + (local.set $ret (call $waitable-set.wait (local.get $ws) (i32.const 0))) + (if (i32.ne (i32.const 2 (; STREAM_READ ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (local.get $sr) (i32.load (i32.const 0))) + (then unreachable)) + (if (i32.ne (i32.const 0x01 (; DROPPED=1 | (0<<4) ;)) (i32.load (i32.const 4))) + (then unreachable)) + + (if (i32.eqz (local.get $bool)) (then + ;; calling stream.read again should then trap + (drop (call $stream.read (local.get $sr) (i32.const 16) (i32.const 100))) + ) (else + ;; lifting the stream by returning it should also trap + (return (local.get $sr)) + )) + unreachable + ) + ) + (type $FT (future u8)) + (type $ST (stream u8)) + (canon waitable.join (core func $waitable.join)) + (canon waitable-set.new (core func $waitable-set.new)) + (canon waitable-set.wait (memory $memory "mem") (core func $waitable-set.wait)) + (canon future.new $FT (core func $future.new)) + (canon future.read $FT async (memory $memory "mem") (core func $future.read)) + (canon future.drop-readable $FT (core func $future.drop-readable)) + (canon stream.new $ST (core func $stream.new)) + (canon stream.read $ST async (memory $memory "mem") (core func $stream.read)) + (canon stream.drop-readable $ST (core func $stream.drop-readable)) + (canon lower (func $c "start-future") (core func $start-future')) + (canon lower (func $c "future-write") (core func $future-write')) + (canon lower (func $c "acknowledge-future-write") (core func $acknowledge-future-write')) + (canon lower (func $c "future-drop-writable") (core func $future-drop-writable')) + (canon lower (func $c "start-stream") (core func $start-stream')) + (canon lower (func $c "stream-write") (core func $stream-write')) + (canon lower (func $c "acknowledge-stream-write") (core func $acknowledge-stream-write')) + (canon lower (func $c "stream-drop-writable") (core func $stream-drop-writable')) + (core instance $core (instantiate $Core (with "" (instance + (export "mem" (memory $memory "mem")) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + (export "waitable-set.wait" (func $waitable-set.wait)) + (export "future.new" (func $future.new)) + (export "future.read" (func $future.read)) + (export "future.drop-readable" (func $future.drop-readable)) + (export "stream.new" (func $stream.new)) + (export "stream.read" (func $stream.read)) + (export "stream.drop-readable" (func $stream.drop-readable)) + (export "start-future" (func $start-future')) + (export "future-write" (func $future-write')) + (export "acknowledge-future-write" (func $acknowledge-future-write')) + (export "future-drop-writable" (func $future-drop-writable')) + (export "start-stream" (func $start-stream')) + (export "stream-write" (func $stream-write')) + (export "acknowledge-stream-write" (func $acknowledge-stream-write')) + (export "stream-drop-writable" (func $stream-drop-writable')) + )))) + (func (export "trap-after-future-eager-write") (canon lift (core func $core "trap-after-future-eager-write"))) + (func (export "trap-after-future-async-write") (canon lift (core func $core "trap-after-future-async-write"))) + (func (export "trap-after-future-reader-dropped") (canon lift (core func $core "trap-after-future-reader-dropped"))) + (func (export "trap-after-future-eager-read") (param "bool" bool) (result $FT) (canon lift (core func $core "trap-after-future-eager-read"))) + (func (export "trap-after-future-async-read") (param "bool" bool) (result $FT) (canon lift (core func $core "trap-after-future-async-read"))) + (func (export "trap-after-stream-reader-eager-dropped") (canon lift (core func $core "trap-after-stream-reader-eager-dropped"))) + (func (export "trap-after-stream-reader-async-dropped") (canon lift (core func $core "trap-after-stream-reader-async-dropped"))) + (func (export "trap-after-stream-writer-eager-dropped") (param "bool" bool) (result $ST) (canon lift (core func $core "trap-after-stream-writer-eager-dropped"))) + (func (export "trap-after-stream-writer-async-dropped") (param "bool" bool) (result $ST) (canon lift (core func $core "trap-after-stream-writer-async-dropped"))) + ) + (instance $c (instantiate $C)) + (instance $d (instantiate $D (with "c" (instance $c)))) + (func (export "trap-after-future-eager-write") (alias export $d "trap-after-future-eager-write")) + (func (export "trap-after-future-async-write") (alias export $d "trap-after-future-async-write")) + (func (export "trap-after-future-reader-dropped") (alias export $d "trap-after-future-reader-dropped")) + (func (export "trap-after-future-eager-read") (alias export $d "trap-after-future-eager-read")) + (func (export "trap-after-future-async-read") (alias export $d "trap-after-future-async-read")) + (func (export "trap-after-stream-reader-eager-dropped") (alias export $d "trap-after-stream-reader-eager-dropped")) + (func (export "trap-after-stream-reader-async-dropped") (alias export $d "trap-after-stream-reader-async-dropped")) + (func (export "trap-after-stream-writer-eager-dropped") (alias export $d "trap-after-stream-writer-eager-dropped")) + (func (export "trap-after-stream-writer-async-dropped") (alias export $d "trap-after-stream-writer-async-dropped")) +) + +(component instance $i1 $Tester) +(assert_trap (invoke "trap-after-future-eager-write") "cannot write to future after previous write succeeded or readable end dropped") +(component instance $i2 $Tester) +(assert_trap (invoke "trap-after-future-async-write") "cannot write to future after previous write succeeded or readable end dropped") +(component instance $i3 $Tester) +(assert_trap (invoke "trap-after-future-reader-dropped") "cannot write to future after previous write succeeded or readable end dropped") +(component instance $i4.1 $Tester) +(assert_trap (invoke "trap-after-future-eager-read" (bool.const false)) "cannot read from future after previous read succeeded") +(component instance $i4.2 $Tester) +(assert_trap (invoke "trap-after-future-eager-read" (bool.const true)) "cannot lift future after previous read succeeded") +(component instance $i5.1 $Tester) +(assert_trap (invoke "trap-after-future-async-read" (bool.const false)) "cannot read from future after previous read succeeded") +(component instance $i5.2 $Tester) +(assert_trap (invoke "trap-after-future-async-read" (bool.const true)) "cannot lift future after previous read succeeded") +(component instance $i6 $Tester) +(assert_trap (invoke "trap-after-stream-reader-eager-dropped") "cannot write to stream after being notified that the readable end dropped") +(component instance $i7 $Tester) +(assert_trap (invoke "trap-after-stream-reader-async-dropped") "cannot write to stream after being notified that the readable end dropped") +(component instance $i8.1 $Tester) +(assert_trap (invoke "trap-after-stream-writer-eager-dropped" (bool.const false)) "cannot read from stream after being notified that the writable end dropped") +(component instance $i8.2 $Tester) +(assert_trap (invoke "trap-after-stream-writer-eager-dropped" (bool.const true)) "cannot lift stream after being notified that the writable end dropped") +(component instance $i9.1 $Tester) +(assert_trap (invoke "trap-after-stream-writer-async-dropped" (bool.const false)) "cannot read from stream after being notified that the writable end dropped") +(component instance $i9.2 $Tester) +(assert_trap (invoke "trap-after-stream-writer-async-dropped" (bool.const true)) "cannot lift stream after being notified that the writable end dropped")