1
1
//! A single-producer, single-consumer (oneshot) channel.
2
+ //!
3
+ //! This is an experimental module, so the API will likely change.
2
4
3
5
use crate::sync::mpmc;
4
6
use crate::sync::mpsc::{RecvError, SendError};
5
7
use crate::time::{Duration, Instant};
6
8
use crate::{error, fmt};
7
9
8
10
/// Creates a new oneshot channel, returning the sender/receiver halves.
11
+ ///
12
+ /// # Examples
13
+ ///
14
+ /// ```
15
+ /// # #![feature(oneshot_channel)]
16
+ /// # use std::sync::oneshot;
17
+ /// # use std::thread;
18
+ /// #
19
+ /// let (sender, receiver) = oneshot::channel();
20
+ ///
21
+ /// // Spawn off an expensive computation.
22
+ /// thread::spawn(move || {
23
+ /// # fn expensive_computation() -> i32 { 42 }
24
+ /// sender.send(expensive_computation()).unwrap();
25
+ /// // `sender` is consumed by `send`, so we cannot use it anymore.
26
+ /// });
27
+ ///
28
+ /// # fn do_other_work() -> i32 { 42 }
29
+ /// do_other_work();
30
+ ///
31
+ /// // Let's see what that answer was...
32
+ /// println!("{:?}", receiver.recv().unwrap());
33
+ /// // `receiver` is consumed by `recv`, so we cannot use it anymore.
34
+ /// ```
9
35
#[must_use]
10
36
#[unstable(feature = "oneshot_channel", issue = "143674")]
11
37
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
12
38
// Using a `sync_channel` with capacity 1 means that the internal implementation will use the
13
- // `Array`-flavored channel implementtion .
14
- let (tx, rx ) = mpmc::sync_channel(1);
15
- (Sender { inner: tx }, Receiver { inner: rx })
39
+ // `Array`-flavored channel implementation .
40
+ let (sender, receiver ) = mpmc::sync_channel(1);
41
+ (Sender { inner: sender }, Receiver { inner: receiver })
16
42
}
17
43
18
44
////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -23,17 +49,33 @@ pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
23
49
///
24
50
/// # Examples
25
51
///
26
- /// (more examples to come)
52
+ /// ```
53
+ /// # #![feature(oneshot_channel)]
54
+ /// # use std::sync::oneshot;
55
+ /// # use std::thread;
56
+ /// #
57
+ /// let (sender, receiver) = oneshot::channel();
58
+ ///
59
+ /// thread::spawn(move || {
60
+ /// sender.send("Hello from thread!").unwrap();
61
+ /// });
62
+ ///
63
+ /// assert_eq!(receiver.recv().unwrap(), "Hello from thread!");
64
+ /// ```
65
+ ///
66
+ /// `Sender` cannot be sent between threads if it is sending non-`Send` types.
27
67
///
28
68
/// ```compile_fail
29
69
/// # #![feature(oneshot_channel)]
30
70
/// # use std::sync::oneshot;
71
+ /// # use std::thread;
72
+ /// # use std::ptr;
31
73
/// #
32
74
/// let (sender, receiver) = oneshot::channel();
33
75
///
34
76
/// struct NotSend(*mut ());
35
- /// std:: thread::spawn(move || {
36
- /// sender.send(NotSend(std:: ptr::null_mut()));
77
+ /// thread::spawn(move || {
78
+ /// sender.send(NotSend(ptr::null_mut()));
37
79
/// });
38
80
///
39
81
/// let reply = receiver.try_recv().unwrap();
@@ -57,6 +99,24 @@ impl<T> Sender<T> {
57
99
/// [`Receiver<T>`] has been dropped.
58
100
///
59
101
/// This method is non-blocking (wait-free).
102
+ ///
103
+ /// # Examples
104
+ ///
105
+ /// ```
106
+ /// # #![feature(oneshot_channel)]
107
+ /// # use std::sync::oneshot;
108
+ /// # use std::thread;
109
+ /// #
110
+ /// let (tx, rx) = oneshot::channel();
111
+ ///
112
+ /// thread::spawn(move || {
113
+ /// // Perform some computation.
114
+ /// let result = 2 + 2;
115
+ /// tx.send(result).unwrap();
116
+ /// });
117
+ ///
118
+ /// assert_eq!(rx.recv().unwrap(), 4);
119
+ /// ```
60
120
#[unstable(feature = "oneshot_channel", issue = "143674")]
61
121
pub fn send(self, t: T) -> Result<(), SendError<T>> {
62
122
self.inner.send(t)
@@ -78,18 +138,37 @@ impl<T> fmt::Debug for Sender<T> {
78
138
///
79
139
/// # Examples
80
140
///
81
- /// (more examples to come)
141
+ /// ```
142
+ /// # #![feature(oneshot_channel)]
143
+ /// # use std::sync::oneshot;
144
+ /// # use std::thread;
145
+ /// # use std::time::Duration;
146
+ /// #
147
+ /// let (sender, receiver) = oneshot::channel();
148
+ ///
149
+ /// thread::spawn(move || {
150
+ /// thread::sleep(Duration::from_millis(100));
151
+ /// sender.send("Hello after delay!").unwrap();
152
+ /// });
153
+ ///
154
+ /// println!("Waiting for message...");
155
+ /// println!("{}", receiver.recv().unwrap());
156
+ /// ```
157
+ ///
158
+ /// `Receiver` cannot be sent between threads if it is receiving non-`Send` types.
82
159
///
83
160
/// ```compile_fail
84
161
/// # #![feature(oneshot_channel)]
85
162
/// # use std::sync::oneshot;
163
+ /// # use std::thread;
164
+ /// # use std::ptr;
86
165
/// #
87
166
/// let (sender, receiver) = oneshot::channel();
88
167
///
89
168
/// struct NotSend(*mut ());
90
- /// sender.send(NotSend(std:: ptr::null_mut()));
169
+ /// sender.send(NotSend(ptr::null_mut()));
91
170
///
92
- /// std:: thread::spawn(move || {
171
+ /// thread::spawn(move || {
93
172
/// let reply = receiver.try_recv().unwrap();
94
173
/// });
95
174
/// ```
@@ -111,6 +190,25 @@ impl<T> Receiver<T> {
111
190
/// Receives the value from the sending end, blocking the calling thread until it gets it.
112
191
///
113
192
/// Can only fail if the corresponding [`Sender<T>`] has been dropped.
193
+ ///
194
+ /// # Examples
195
+ ///
196
+ /// ```
197
+ /// # #![feature(oneshot_channel)]
198
+ /// # use std::sync::oneshot;
199
+ /// # use std::thread;
200
+ /// # use std::time::Duration;
201
+ /// #
202
+ /// let (tx, rx) = oneshot::channel();
203
+ ///
204
+ /// thread::spawn(move || {
205
+ /// thread::sleep(Duration::from_millis(500));
206
+ /// tx.send("Done!").unwrap();
207
+ /// });
208
+ ///
209
+ /// // This will block until the message arrives.
210
+ /// println!("{}", rx.recv().unwrap());
211
+ /// ```
114
212
#[unstable(feature = "oneshot_channel", issue = "143674")]
115
213
pub fn recv(self) -> Result<T, RecvError> {
116
214
self.inner.recv()
@@ -119,6 +217,39 @@ impl<T> Receiver<T> {
119
217
// Fallible methods.
120
218
121
219
/// Attempts to return a pending value on this receiver without blocking.
220
+ ///
221
+ /// # Examples
222
+ ///
223
+ /// ```
224
+ /// # #![feature(oneshot_channel)]
225
+ /// # use std::sync::oneshot;
226
+ /// # use std::thread;
227
+ /// # use std::time::Duration;
228
+ /// #
229
+ /// let (sender, mut receiver) = oneshot::channel();
230
+ ///
231
+ /// thread::spawn(move || {
232
+ /// thread::sleep(Duration::from_millis(100));
233
+ /// sender.send(42).unwrap();
234
+ /// });
235
+ ///
236
+ /// // Keep trying until we get the message, doing other work in the process.
237
+ /// loop {
238
+ /// match receiver.try_recv() {
239
+ /// Ok(value) => {
240
+ /// assert_eq!(value, 42);
241
+ /// break;
242
+ /// }
243
+ /// Err(oneshot::TryRecvError::Empty(rx)) => {
244
+ /// // Retake ownership of the receiver.
245
+ /// receiver = rx;
246
+ /// # fn do_other_work() { thread::sleep(Duration::from_millis(25)); }
247
+ /// do_other_work();
248
+ /// }
249
+ /// Err(oneshot::TryRecvError::Disconnected) => panic!("Sender disconnected"),
250
+ /// }
251
+ /// }
252
+ /// ```
122
253
#[unstable(feature = "oneshot_channel", issue = "143674")]
123
254
pub fn try_recv(self) -> Result<T, TryRecvError<T>> {
124
255
self.inner.try_recv().map_err(|err| match err {
@@ -129,6 +260,29 @@ impl<T> Receiver<T> {
129
260
130
261
/// Attempts to wait for a value on this receiver, returning an error if the corresponding
131
262
/// [`Sender`] half of this channel has been dropped, or if it waits more than `timeout`.
263
+ ///
264
+ /// # Examples
265
+ ///
266
+ /// ```
267
+ /// # #![feature(oneshot_channel)]
268
+ /// # use std::sync::oneshot;
269
+ /// # use std::thread;
270
+ /// # use std::time::Duration;
271
+ /// #
272
+ /// let (sender, receiver) = oneshot::channel();
273
+ ///
274
+ /// thread::spawn(move || {
275
+ /// thread::sleep(Duration::from_millis(500));
276
+ /// sender.send("Success!").unwrap();
277
+ /// });
278
+ ///
279
+ /// // Wait up to 1 second for the message
280
+ /// match receiver.recv_timeout(Duration::from_secs(1)) {
281
+ /// Ok(msg) => println!("Received: {}", msg),
282
+ /// Err(oneshot::RecvTimeoutError::Timeout(_)) => println!("Timed out!"),
283
+ /// Err(oneshot::RecvTimeoutError::Disconnected) => println!("Sender dropped!"),
284
+ /// }
285
+ /// ```
132
286
#[unstable(feature = "oneshot_channel", issue = "143674")]
133
287
pub fn recv_timeout(self, timeout: Duration) -> Result<T, RecvTimeoutError<T>> {
134
288
self.inner.recv_timeout(timeout).map_err(|err| match err {
@@ -139,6 +293,29 @@ impl<T> Receiver<T> {
139
293
140
294
/// Attempts to wait for a value on this receiver, returning an error if the corresponding
141
295
/// [`Sender`] half of this channel has been dropped, or if `deadline` is reached.
296
+ ///
297
+ /// # Examples
298
+ ///
299
+ /// ```
300
+ /// # #![feature(oneshot_channel)]
301
+ /// # use std::sync::oneshot;
302
+ /// # use std::thread;
303
+ /// # use std::time::{Duration, Instant};
304
+ /// #
305
+ /// let (sender, receiver) = oneshot::channel();
306
+ ///
307
+ /// thread::spawn(move || {
308
+ /// thread::sleep(Duration::from_millis(100));
309
+ /// sender.send("Just in time!").unwrap();
310
+ /// });
311
+ ///
312
+ /// let deadline = Instant::now() + Duration::from_millis(500);
313
+ /// match receiver.recv_deadline(deadline) {
314
+ /// Ok(msg) => println!("Received: {}", msg),
315
+ /// Err(oneshot::RecvTimeoutError::Timeout(_)) => println!("Missed deadline!"),
316
+ /// Err(oneshot::RecvTimeoutError::Disconnected) => println!("Sender dropped!"),
317
+ /// }
318
+ /// ```
142
319
#[unstable(feature = "oneshot_channel", issue = "143674")]
143
320
pub fn recv_deadline(self, deadline: Instant) -> Result<T, RecvTimeoutError<T>> {
144
321
self.inner.recv_deadline(deadline).map_err(|err| match err {
@@ -160,6 +337,10 @@ impl<T> fmt::Debug for Receiver<T> {
160
337
////////////////////////////////////////////////////////////////////////////////////////////////////
161
338
162
339
/// An error returned from the [`try_recv`](Receiver::try_recv) method.
340
+ ///
341
+ /// See the documentation for [`try_recv`] for more information on how to use this error.
342
+ ///
343
+ /// [`try_recv`]: Receiver::try_recv
163
344
#[unstable(feature = "oneshot_channel", issue = "143674")]
164
345
pub enum TryRecvError<T> {
165
346
/// The [`Sender`] has not sent a message yet, but it might in the future (as it has not yet
@@ -173,6 +354,36 @@ pub enum TryRecvError<T> {
173
354
174
355
/// An error returned from the [`recv_timeout`](Receiver::recv_timeout) or
175
356
/// [`recv_deadline`](Receiver::recv_deadline) methods.
357
+ ///
358
+ /// # Examples
359
+ ///
360
+ /// Usage of this error is similar to [`TryRecvError`].
361
+ ///
362
+ /// ```
363
+ /// # #![feature(oneshot_channel)]
364
+ /// # use std::sync::oneshot::{self, RecvTimeoutError};
365
+ /// # use std::thread;
366
+ /// # use std::time::Duration;
367
+ /// #
368
+ /// let (sender, receiver) = oneshot::channel();
369
+ ///
370
+ /// thread::spawn(move || {
371
+ /// // Simulate a long computation that takes longer than our timeout.
372
+ /// thread::sleep(Duration::from_millis(500));
373
+ /// sender.send("Too late!".to_string()).unwrap();
374
+ /// });
375
+ ///
376
+ /// // Try to receive the message with a short timeout.
377
+ /// match receiver.recv_timeout(Duration::from_millis(100)) {
378
+ /// Ok(msg) => println!("Received: {}", msg),
379
+ /// Err(RecvTimeoutError::Timeout(rx)) => {
380
+ /// println!("Timed out waiting for message!");
381
+ /// // You can reuse the receiver if needed.
382
+ /// drop(rx);
383
+ /// }
384
+ /// Err(RecvTimeoutError::Disconnected) => println!("Sender dropped!"),
385
+ /// }
386
+ /// ```
176
387
#[unstable(feature = "oneshot_channel", issue = "143674")]
177
388
pub enum RecvTimeoutError<T> {
178
389
/// The [`Sender`] has not sent a message yet, but it might in the future (as it has not yet
0 commit comments