1
//! A oneshot broadcast channel.
2
//!
3
//! The motivation for this channel type was to allow multiple
4
//! receivers to either wait for something to finish,
5
//! or to have an inexpensive method of checking if it has finished.
6
//!
7
//! See [`channel()`].
8

            
9
// NOTE: If we decide to make this public in the future (for example through `tor-async-utils`),
10
// we should enable the doc tests.
11

            
12
use std::future::{Future, IntoFuture};
13
use std::ops::Drop;
14
use std::pin::Pin;
15
use std::sync::{Arc, Mutex, OnceLock, Weak};
16
use std::task::{ready, Context, Poll, Waker};
17

            
18
use slotmap_careful::DenseSlotMap;
19

            
20
slotmap_careful::new_key_type! { struct WakerKey; }
21

            
22
/// A [oneshot broadcast][crate::util::oneshot_broadcast] sender.
23
#[derive(Debug)]
24
pub(crate) struct Sender<T> {
25
    /// State shared with all [`Receiver`]s.
26
    shared: Weak<Shared<T>>,
27
}
28

            
29
/// A [oneshot broadcast][crate::util::oneshot_broadcast] receiver.
30
///
31
/// The `Receiver` offers two methods for receiving the message:
32
///
33
/// 1. [`Receiver::into_future`]
34
///     ```rust,ignore
35
///     let (tx, rx) = channel();
36
///     tx.send(0);
37
///     let message: u32 = rx.await.unwrap();
38
///     ```
39
///
40
/// 2. [`Receiver::borrowed`]
41
///     ```rust,ignore
42
///     let (tx, rx) = channel();
43
///     tx.send(0);
44
///     let message: &u32 = rx.borrowed().await.unwrap();
45
///     ```
46
#[derive(Clone, Debug)]
47
pub(crate) struct Receiver<T> {
48
    /// State shared with the sender and all other receivers.
49
    shared: Arc<Shared<T>>,
50
}
51

            
52
/// State shared between the sender and receivers.
53
/// Correctness:
54
///
55
/// Sending a message:
56
///  - set the message OnceLock (A)
57
///  - acquire the wakers Mutex
58
///  - take all wakers (B)
59
///  - release the wakers Mutex (C)
60
///  - wake all wakers
61
///
62
/// Polling:
63
///  - if message was set, return it (fast path)
64
///  - acquire the wakers Mutex (D)
65
///  - if message was set, return it (E)
66
///  - add waker (F)
67
///  - release the wakers Mutex
68
///
69
/// When the wakers Mutex is released at (C), a release-store operation is performed by the Mutex,
70
/// which means that the message set at (A) will be seen by all future acquire-load operations by
71
/// that same Mutex. More specifically, after (C) has occurred and when the same mutex is acquired at
72
/// (D), the message set at (A) is guaranteed to be visible at (E). This means that after the wakers
73
/// are taken at (B), no future wakers will be added at (F) and no waker will be "lost".
74
#[derive(Debug)]
75
struct Shared<T> {
76
    /// The message sent from the [`Sender`] to the [`Receiver`]s.
77
    msg: OnceLock<Result<T, SenderDropped>>,
78
    /// The wakers waiting for a value to be sent.
79
    /// Will be set to `Err` after the wakers have been woken.
80
    // the `Result` isn't technically needed here,
81
    // but we use it to help detect bugs;
82
    // see `WakersAlreadyWoken` for details
83
    wakers: Mutex<Result<DenseSlotMap<WakerKey, Waker>, WakersAlreadyWoken>>,
84
}
85

            
86
/// The future from [`Receiver::borrowed`].
87
///
88
/// Will be ready, yielding `&'a T`,
89
/// when the sender sends a message or is dropped.
90
#[derive(Debug)]
91
pub(crate) struct BorrowedReceiverFuture<'a, T> {
92
    /// State shared with the sender and all other receivers.
93
    shared: &'a Shared<T>,
94
    /// The key for any waker that we've added to [`Shared::wakers`].
95
    waker_key: Option<WakerKey>,
96
}
97

            
98
/// The future from [`Receiver::into_future`].
99
///
100
/// Will be ready, yielding a clone of `T`,
101
/// when the sender sends a message or is dropped.
102
// Both `ReceiverFuture` and `BorrowedReceiverFuture` have similar fields
103
// but there's no nice way to deduplicated them.
104
// It would have been nice if we could store a `BorrowedReceiverFuture`
105
// holding a reference to our `Arc<Shared>`,
106
// but that would be a self-referential struct,
107
// so we need to duplicate the fields here instead.
108
#[derive(Debug)]
109
pub(crate) struct ReceiverFuture<T> {
110
    /// State shared with the sender and all other receivers.
111
    shared: Arc<Shared<T>>,
112
    /// The key for any waker that we've added to [`Shared::wakers`].
113
    waker_key: Option<WakerKey>,
114
}
115

            
116
/// The wakers have already been woken.
117
///
118
/// This is used to help detect if we're trying to access the wakers after they've already been
119
/// woken, which likely indicates a bug. For example, it is a bug if a receiver attempts to add a
120
/// waker after the sender has already sent its message and woken the wakers, since the new waker
121
/// would never be woken.
122
#[derive(Copy, Clone, Debug)]
123
struct WakersAlreadyWoken;
124

            
125
/// The message has already been set, and we can't set it again.
126
#[derive(Copy, Clone, Debug, thiserror::Error)]
127
#[error("the message was already set")]
128
struct MessageAlreadySet;
129

            
130
/// The sender was dropped, so the channel is closed.
131
#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
132
#[error("the sender was dropped")]
133
pub(crate) struct SenderDropped;
134

            
135
/// Create a new oneshot broadcast channel.
136
///
137
/// ```rust,ignore
138
/// let (tx, rx) = channel();
139
/// let rx_clone = rx.clone();
140
/// tx.send(0_u8);
141
/// assert_eq!(rx.await, Ok(0));
142
/// assert_eq!(rx_clone.await, Ok(0));
143
/// ```
144
553
pub(crate) fn channel<T>() -> (Sender<T>, Receiver<T>) {
145
553
    let shared = Arc::new(Shared {
146
553
        msg: OnceLock::new(),
147
553
        wakers: Mutex::new(Ok(DenseSlotMap::with_key())),
148
553
    });
149
553

            
150
553
    let sender = Sender {
151
553
        shared: Arc::downgrade(&shared),
152
553
    };
153
553

            
154
553
    let receiver = Receiver { shared };
155
553

            
156
553
    (sender, receiver)
157
553
}
158

            
159
impl<T> Sender<T> {
160
    /// Send the message to the [`Receiver`]s.
161
    ///
162
    /// The message may be lost if all receivers have been dropped.
163
    #[cfg_attr(not(test), allow(dead_code))]
164
230
    pub(crate) fn send(self, msg: T) {
165
230
        // set the message and inform the wakers
166
230
        Self::send_and_wake(&self.shared, Ok(msg))
167
230
            // this 'send()` method takes an owned self,
168
230
            // and we don't send a message outside of here and the drop handler,
169
230
            // so this shouldn't be possible
170
230
            .expect("could not set the message");
171
230
    }
172

            
173
    /// Send the message, and wake and clear all wakers.
174
    ///
175
    /// If all receivers have been dropped, then always returns `Ok`.
176
    ///
177
    /// If the message was unable to be set, returns `Err(MessageAlreadySet)`.
178
828
    fn send_and_wake(
179
828
        shared: &Weak<Shared<T>>,
180
828
        msg: Result<T, SenderDropped>,
181
828
    ) -> Result<(), MessageAlreadySet> {
182
        // Even if the `Weak` upgrade is successful,
183
        // it's possible that the last receiver
184
        // will be dropped during this `send_and_wake` method,
185
        // in which case we will be holding the last `Arc`.
186
828
        let Some(shared) = shared.upgrade() else {
187
            // all receivers have dropped; nothing to do
188
304
            return Ok(());
189
        };
190

            
191
        // set the message
192
524
        shared.msg.set(msg).or(Err(MessageAlreadySet))?;
193

            
194
394
        let mut wakers = {
195
394
            let mut wakers = shared.wakers.lock().expect("poisoned");
196
394
            // Take the wakers and drop the mutex guard, releasing the lock.
197
394
            //
198
394
            // We could just drain the wakers map in-place here, but instead we replace the map with
199
394
            // an explicit `WakersAlreadyWoken` state to help catch bugs if something tries adding a
200
394
            // new waker later after we've already woken the wakers.
201
394
            //
202
394
            // The above `msg.set()` will only ever succeed once,
203
394
            // which means that we should only end up here once.
204
394
            std::mem::replace(&mut *wakers, Err(WakersAlreadyWoken))
205
394
                .expect("wakers were taken more than once")
206
        };
207

            
208
        // Once we drop the mutex guard, which does a release-store on its own atomic, any other
209
        // code which later acquires the wakers mutex is guaranteed to see the msg as "set".
210
        // See comments on `Shared`.
211

            
212
        // Wake while not holding the lock.
213
        // Since the lock is used in `ReceiverFuture::poll` and `ReceiverFuture::drop` and
214
        // should not block for long periods of time,
215
        // we'd prefer not to run third-party waker code here while holding the mutex,
216
        // even if `wake` should typically be fast.
217
432
        for (_key, waker) in wakers.drain() {
218
244
            waker.wake();
219
244
        }
220

            
221
394
        Ok(())
222
828
    }
223

            
224
    /// Returns `true` if all [`Receiver`]s (and all futures created from the receivers) have been
225
    /// dropped.
226
    ///
227
    /// This can be useful to skip doing extra work to generate the message if the message will be
228
    /// discarded anyways.
229
    // This is for external use.
230
    // It is not always valid to call this internally.
231
    // For example when we've done a `Weak::upgrade` internally, like in `send_and_wake`,
232
    // this won't return the correct value.
233
    #[cfg_attr(not(test), allow(dead_code))]
234
10
    pub(crate) fn is_cancelled(&self) -> bool {
235
10
        self.shared.strong_count() == 0
236
10
    }
237
}
238

            
239
impl<T> Drop for Sender<T> {
240
553
    fn drop(&mut self) {
241
553
        // set an error message to indicate that the sender was dropped and inform the wakers;
242
553
        // it's fine if setting the message fails since it might have been set previously during a
243
553
        // `send()`
244
553
        let _ = Self::send_and_wake(&self.shared, Err(SenderDropped));
245
553
    }
246
}
247

            
248
impl<T> Receiver<T> {
249
    /// Receive a borrowed message from the [`Sender`].
250
    ///
251
    /// This may be more efficient than [`Receiver::into_future`]
252
    /// and doesn't require `T: Clone`.
253
    ///
254
    /// This is cancellation-safe.
255
    #[cfg_attr(not(test), allow(dead_code))]
256
472
    pub(crate) fn borrowed(&self) -> BorrowedReceiverFuture<'_, T> {
257
472
        BorrowedReceiverFuture {
258
472
            shared: &self.shared,
259
472
            waker_key: None,
260
472
        }
261
472
    }
262

            
263
    /// The receiver is ready.
264
    ///
265
    /// If `true`, the [`Sender`] has either sent its message or been dropped.
266
3031
    pub(crate) fn is_ready(&self) -> bool {
267
3031
        self.shared.msg.get().is_some()
268
3031
    }
269
}
270

            
271
impl<T: Clone> IntoFuture for Receiver<T> {
272
    type Output = Result<T, SenderDropped>;
273
    type IntoFuture = ReceiverFuture<T>;
274

            
275
    /// This future is cancellation-safe.
276
52
    fn into_future(self) -> Self::IntoFuture {
277
52
        ReceiverFuture {
278
52
            shared: self.shared,
279
52
            waker_key: None,
280
52
        }
281
52
    }
282
}
283

            
284
impl<'a, T> Future for BorrowedReceiverFuture<'a, T> {
285
    type Output = Result<&'a T, SenderDropped>;
286

            
287
708
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
288
708
        let self_ = self.get_mut();
289
708
        receiver_fut_poll(self_.shared, &mut self_.waker_key, cx.waker())
290
708
    }
291
}
292

            
293
impl<T> Drop for BorrowedReceiverFuture<'_, T> {
294
472
    fn drop(&mut self) {
295
472
        receiver_fut_drop(self.shared, &mut self.waker_key);
296
472
    }
297
}
298

            
299
impl<T: Clone> Future for ReceiverFuture<T> {
300
    type Output = Result<T, SenderDropped>;
301

            
302
48
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
303
48
        let self_ = self.get_mut();
304
48
        let poll = receiver_fut_poll(&self_.shared, &mut self_.waker_key, cx.waker());
305
48
        Poll::Ready(ready!(poll)).map_ok(Clone::clone)
306
48
    }
307
}
308

            
309
impl<T> Drop for ReceiverFuture<T> {
310
52
    fn drop(&mut self) {
311
52
        receiver_fut_drop(&self.shared, &mut self.waker_key);
312
52
    }
313
}
314

            
315
/// The shared poll implementation for receiver futures.
316
756
fn receiver_fut_poll<'a, T>(
317
756
    shared: &'a Shared<T>,
318
756
    waker_key: &mut Option<WakerKey>,
319
756
    new_waker: &Waker,
320
756
) -> Poll<Result<&'a T, SenderDropped>> {
321
    // if the message was already set, return it
322
756
    if let Some(msg) = shared.msg.get() {
323
508
        return Poll::Ready(msg.as_ref().or(Err(SenderDropped)));
324
248
    }
325
248

            
326
248
    let mut wakers = shared.wakers.lock().expect("poisoned");
327

            
328
    // check again now that we've acquired the mutex
329
248
    if let Some(msg) = shared.msg.get() {
330
        return Poll::Ready(msg.as_ref().or(Err(SenderDropped)));
331
248
    }
332
248

            
333
248
    // we have acquired the wakers mutex and checked that the message wasn't set,
334
248
    // so we know that wakers have not yet been woken
335
248
    // and it's okay to add our waker to the wakers map
336
248
    let wakers = wakers.as_mut().expect("wakers were already woken");
337
248

            
338
248
    match waker_key {
339
        // we have added a waker previously
340
        Some(waker_key) => {
341
            // replace the old entry
342
            let waker = wakers
343
                .get_mut(*waker_key)
344
                // the waker is only removed from the map by our drop handler,
345
                // so the waker should never be missing
346
                .expect("waker key is missing from map");
347
            waker.clone_from(new_waker);
348
        }
349
        // we have never added a waker
350
248
        None => {
351
248
            // add a new entry
352
248
            let new_key = wakers.insert(new_waker.clone());
353
248
            *waker_key = Some(new_key);
354
248
        }
355
    }
356

            
357
248
    Poll::Pending
358
756
}
359

            
360
/// The shared drop implementation for receiver futures.
361
524
fn receiver_fut_drop<T>(shared: &Shared<T>, waker_key: &mut Option<WakerKey>) {
362
524
    if let Some(waker_key) = waker_key.take() {
363
248
        let mut wakers = shared.wakers.lock().expect("poisoned");
364
248
        if let Ok(wakers) = wakers.as_mut() {
365
4
            let waker = wakers.remove(waker_key);
366
4
            // this is the only place that removes the waker from the map,
367
4
            // so the waker should never be missing
368
4
            debug_assert!(waker.is_some(), "the waker key was not found");
369
244
        }
370
276
    }
371
524
}
372

            
373
#[cfg(test)]
374
mod test {
375
    #![allow(clippy::unwrap_used)]
376

            
377
    use super::*;
378

            
379
    use futures::future::FutureExt;
380
    use futures::task::SpawnExt;
381

            
382
    impl<T> Shared<T> {
383
        /// Count the number of wakers.
384
        fn count_wakers(&self) -> usize {
385
            self.wakers
386
                .lock()
387
                .expect("poisoned")
388
                .as_ref()
389
                .map(|x| x.len())
390
                .unwrap_or(0)
391
        }
392
    }
393

            
394
    #[test]
395
    fn standard_usage() {
396
        tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
397
            let (tx, rx) = channel();
398
            tx.send(0_u8);
399
            assert_eq!(rx.borrowed().await, Ok(&0));
400

            
401
            let (tx, rx) = channel();
402
            tx.send(0_u8);
403
            assert_eq!(rx.await, Ok(0));
404
        });
405
    }
406

            
407
    #[test]
408
    fn immediate_drop() {
409
        let _ = channel::<()>();
410

            
411
        let (tx, rx) = channel::<()>();
412
        drop(tx);
413
        drop(rx);
414

            
415
        let (tx, rx) = channel::<()>();
416
        drop(rx);
417
        drop(tx);
418
    }
419

            
420
    #[test]
421
    fn drop_sender() {
422
        tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
423
            let (tx, rx_1) = channel::<u8>();
424

            
425
            let rx_2 = rx_1.clone();
426
            drop(tx);
427
            let rx_3 = rx_1.clone();
428
            assert_eq!(rx_1.borrowed().await, Err(SenderDropped));
429
            assert_eq!(rx_2.borrowed().await, Err(SenderDropped));
430
            assert_eq!(rx_3.borrowed().await, Err(SenderDropped));
431
        });
432
    }
433

            
434
    #[test]
435
    fn clone_before_send() {
436
        tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
437
            let (tx, rx_1) = channel();
438

            
439
            let rx_2 = rx_1.clone();
440
            tx.send(0_u8);
441
            assert_eq!(rx_1.borrowed().await, Ok(&0));
442
            assert_eq!(rx_2.borrowed().await, Ok(&0));
443
        });
444
    }
445

            
446
    #[test]
447
    fn clone_after_send() {
448
        tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
449
            let (tx, rx_1) = channel();
450

            
451
            tx.send(0_u8);
452
            let rx_2 = rx_1.clone();
453
            assert_eq!(rx_1.borrowed().await, Ok(&0));
454
            assert_eq!(rx_2.borrowed().await, Ok(&0));
455
        });
456
    }
457

            
458
    #[test]
459
    fn clone_after_borrowed() {
460
        tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
461
            let (tx, rx_1) = channel();
462

            
463
            tx.send(0_u8);
464
            assert_eq!(rx_1.borrowed().await, Ok(&0));
465
            let rx_2 = rx_1.clone();
466
            assert_eq!(rx_2.borrowed().await, Ok(&0));
467
        });
468
    }
469

            
470
    #[test]
471
    fn drop_one_receiver() {
472
        tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
473
            let (tx, rx_1) = channel();
474

            
475
            let rx_2 = rx_1.clone();
476
            drop(rx_1);
477
            tx.send(0_u8);
478
            assert_eq!(rx_2.borrowed().await, Ok(&0));
479
        });
480
    }
481

            
482
    #[test]
483
    fn drop_all_receivers() {
484
        let (tx, rx_1) = channel();
485

            
486
        let rx_2 = rx_1.clone();
487
        drop(rx_1);
488
        drop(rx_2);
489
        tx.send(0_u8);
490
    }
491

            
492
    #[test]
493
    fn drop_fut() {
494
        let (_tx, rx) = channel::<u8>();
495
        let fut = rx.borrowed();
496
        assert_eq!(rx.shared.count_wakers(), 0);
497
        drop(fut);
498
        assert_eq!(rx.shared.count_wakers(), 0);
499

            
500
        // drop after sending
501
        let (tx, rx) = channel();
502
        tx.send(0_u8);
503
        let fut = rx.borrowed();
504
        assert_eq!(rx.shared.count_wakers(), 0);
505
        drop(fut);
506
        assert_eq!(rx.shared.count_wakers(), 0);
507

            
508
        // drop after polling once
509
        let (_tx, rx) = channel::<u8>();
510
        let mut fut = Box::pin(rx.borrowed());
511
        assert_eq!(rx.shared.count_wakers(), 0);
512
        assert_eq!(fut.as_mut().now_or_never(), None);
513
        assert_eq!(rx.shared.count_wakers(), 1);
514
        drop(fut);
515
        assert_eq!(rx.shared.count_wakers(), 0);
516

            
517
        // drop after polling once and send
518
        let (tx, rx) = channel();
519
        let mut fut = Box::pin(rx.borrowed());
520
        assert_eq!(rx.shared.count_wakers(), 0);
521
        assert_eq!(fut.as_mut().now_or_never(), None);
522
        assert_eq!(rx.shared.count_wakers(), 1);
523
        tx.send(0_u8);
524
        assert_eq!(rx.shared.count_wakers(), 0);
525
        drop(fut);
526
    }
527

            
528
    #[test]
529
    fn drop_owned_fut() {
530
        let (_tx, rx) = channel::<u8>();
531
        let fut = rx.clone().into_future();
532
        assert_eq!(rx.shared.count_wakers(), 0);
533
        drop(fut);
534
        assert_eq!(rx.shared.count_wakers(), 0);
535

            
536
        // drop after sending
537
        let (tx, rx) = channel();
538
        tx.send(0_u8);
539
        let fut = rx.clone().into_future();
540
        assert_eq!(rx.shared.count_wakers(), 0);
541
        drop(fut);
542
        assert_eq!(rx.shared.count_wakers(), 0);
543

            
544
        // drop after polling once
545
        let (_tx, rx) = channel::<u8>();
546
        let mut fut = Box::pin(rx.clone().into_future());
547
        assert_eq!(rx.shared.count_wakers(), 0);
548
        assert_eq!(fut.as_mut().now_or_never(), None);
549
        assert_eq!(rx.shared.count_wakers(), 1);
550
        drop(fut);
551
        assert_eq!(rx.shared.count_wakers(), 0);
552

            
553
        // drop after polling once and send
554
        let (tx, rx) = channel();
555
        let mut fut = Box::pin(rx.clone().into_future());
556
        assert_eq!(rx.shared.count_wakers(), 0);
557
        assert_eq!(fut.as_mut().now_or_never(), None);
558
        assert_eq!(rx.shared.count_wakers(), 1);
559
        tx.send(0_u8);
560
        assert_eq!(rx.shared.count_wakers(), 0);
561
        drop(fut);
562
    }
563

            
564
    #[test]
565
    fn is_ready_after_send() {
566
        let (tx, rx_1) = channel();
567
        assert!(!rx_1.is_ready());
568
        let rx_2 = rx_1.clone();
569
        assert!(!rx_2.is_ready());
570

            
571
        tx.send(0_u8);
572

            
573
        assert!(rx_1.is_ready());
574
        assert!(rx_2.is_ready());
575

            
576
        let rx_3 = rx_1.clone();
577
        assert!(rx_3.is_ready());
578
    }
579

            
580
    #[test]
581
    fn is_ready_after_drop() {
582
        let (tx, rx_1) = channel::<u8>();
583
        assert!(!rx_1.is_ready());
584
        let rx_2 = rx_1.clone();
585
        assert!(!rx_2.is_ready());
586

            
587
        drop(tx);
588

            
589
        assert!(rx_1.is_ready());
590
        assert!(rx_2.is_ready());
591

            
592
        let rx_3 = rx_1.clone();
593
        assert!(rx_3.is_ready());
594
    }
595

            
596
    #[test]
597
    fn is_cancelled() {
598
        let (tx, rx) = channel::<u8>();
599
        assert!(!tx.is_cancelled());
600
        drop(rx);
601
        assert!(tx.is_cancelled());
602

            
603
        let (tx, rx_1) = channel::<u8>();
604
        assert!(!tx.is_cancelled());
605
        let rx_2 = rx_1.clone();
606
        drop(rx_1);
607
        assert!(!tx.is_cancelled());
608
        drop(rx_2);
609
        assert!(tx.is_cancelled());
610
    }
611

            
612
    #[test]
613
    fn recv_in_task() {
614
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
615
            let (tx, rx) = channel();
616

            
617
            let join = rt
618
                .spawn_with_handle(async move {
619
                    assert_eq!(rx.borrowed().await, Ok(&0));
620
                    assert_eq!(rx.await, Ok(0));
621
                })
622
                .unwrap();
623

            
624
            tx.send(0_u8);
625

            
626
            join.await;
627
        });
628
    }
629

            
630
    #[test]
631
    fn recv_multiple_in_task() {
632
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
633
            let (tx, rx) = channel();
634
            let rx_1 = rx.clone();
635
            let rx_2 = rx.clone();
636

            
637
            let join_1 = rt
638
                .spawn_with_handle(async move {
639
                    assert_eq!(rx_1.borrowed().await, Ok(&0));
640
                })
641
                .unwrap();
642
            let join_2 = rt
643
                .spawn_with_handle(async move {
644
                    assert_eq!(rx_2.await, Ok(0));
645
                })
646
                .unwrap();
647

            
648
            tx.send(0_u8);
649

            
650
            join_1.await;
651
            join_2.await;
652
            assert_eq!(rx.borrowed().await, Ok(&0));
653
        });
654
    }
655

            
656
    #[test]
657
    fn recv_multiple_times() {
658
        tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
659
            let (tx, rx) = channel();
660

            
661
            tx.send(0_u8);
662
            assert_eq!(rx.borrowed().await, Ok(&0));
663
            assert_eq!(rx.borrowed().await, Ok(&0));
664
            assert_eq!(rx.clone().await, Ok(0));
665
            assert_eq!(rx.await, Ok(0));
666
        });
667
    }
668

            
669
    #[test]
670
    fn stress() {
671
        // In general we don't have control over the runtime and where/when tasks are scheduled,
672
        // so we try as best as possible to send the message while simultaneously creating new
673
        // receivers and waiting on them.
674
        // It's possible this might be entirely ineffective since we don't enforce any specific
675
        // scheduler behaviour here,
676
        // but in the worst case it's still a test with multiple receivers on different tasks,
677
        // so is useful to have.
678
        //
679
        // The `test_with_various` helper uses `MockExecutor` with two different deterministic
680
        // scheduling policies.
681
        // At least at the time of writing,
682
        // when this test uses `MockExecutor` with its "queue" scheduling policy
683
        // the "send" occurs after 20 of the tasks have begun waiting.
684
        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
685
            let (tx, rx) = channel();
686

            
687
            rt.spawn(async move {
688
                // this tries to delay the send a little bit
689
                // to give time for some of the receiver tasks to start
690
                for _ in 0..20 {
691
                    tor_rtcompat::task::yield_now().await;
692
                }
693
                tx.send(0_u8);
694
            })
695
            .unwrap();
696

            
697
            let mut joins = vec![];
698
            for _ in 0..100 {
699
                let rx_clone = rx.clone();
700
                let join = rt
701
                    .spawn_with_handle(async move { rx_clone.borrowed().await.cloned() })
702
                    .unwrap();
703
                joins.push(join);
704
                // allows the send task to make progress if single-threaded
705
                tor_rtcompat::task::yield_now().await;
706
            }
707

            
708
            for join in joins {
709
                assert!(matches!(join.await, Ok(0)));
710
            }
711
        });
712
    }
713
}