Skip to content

Commit 484cb52

Browse files
KR-bluejayADD-SP
authored andcommitted
sync: return TryRecvError::Disconnected from Receiver::try_recv after Receiver::close (#7686)
(cherry picked from commit d060401)
1 parent de6ef21 commit 484cb52

File tree

4 files changed

+20
-3
lines changed

4 files changed

+20
-3
lines changed

tokio/src/sync/mpsc/chan.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,10 @@ impl<T, S: Semaphore> Rx<T, S> {
439439
return Ok(value);
440440
}
441441
TryPopResult::Closed => return Err(TryRecvError::Disconnected),
442+
// If close() was called, an empty queue should report Disconnected.
443+
TryPopResult::Empty if rx_fields.rx_closed => {
444+
return Err(TryRecvError::Disconnected)
445+
}
442446
TryPopResult::Empty => return Err(TryRecvError::Empty),
443447
TryPopResult::Busy => {} // fall through
444448
}

tokio/src/sync/mpsc/list.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,14 @@ pub(crate) enum TryPopResult<T> {
3535
/// Successfully popped a value.
3636
Ok(T),
3737
/// The channel is empty.
38+
///
39+
/// Note that `list.rs` only tracks the close state set by senders. If the
40+
/// channel is closed by `Rx::close()`, then `TryPopResult::Empty` is still
41+
/// returned, and the close state needs to be handled by `chan.rs`.
3842
Empty,
3943
/// The channel is empty and closed.
44+
///
45+
/// Returned when the send half is closed (all senders dropped).
4046
Closed,
4147
/// The channel is not empty, but the first value is being written.
4248
Busy,

tokio/tests/sync_broadcast.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@ use wasm_bindgen_test::wasm_bindgen_test as test;
77

88
use tokio::sync::broadcast;
99
use tokio_test::task;
10-
use tokio_test::{
11-
assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
12-
};
10+
use tokio_test::{assert_err, assert_ok, assert_pending, assert_ready_err, assert_ready_ok};
1311

1412
use std::sync::Arc;
1513

tokio/tests/sync_mpsc.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -966,6 +966,15 @@ fn try_recv_unbounded() {
966966
}
967967
}
968968

969+
#[test]
970+
fn try_recv_after_receiver_close() {
971+
let (_tx, mut rx) = mpsc::channel::<()>(5);
972+
973+
assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
974+
rx.close();
975+
assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
976+
}
977+
969978
#[test]
970979
fn try_recv_close_while_empty_bounded() {
971980
let (tx, mut rx) = mpsc::channel::<()>(5);

0 commit comments

Comments
 (0)