tor_proto/util/
oneshot_broadcast.rs

1//! A oneshot broadcast channel.
2//!
3//! The motivation for this channel type was to allow multiple
4//! receivers to either wait for something to finish,
5//! or to have an inexpensive method of checking if it has finished.
6//!
7//! See [`channel()`].
8
9// NOTE: If we decide to make this public in the future (for example through `tor-async-utils`),
10// we should enable the doc tests.
11
12use std::future::{Future, IntoFuture};
13use std::ops::Drop;
14use std::pin::Pin;
15use std::sync::{Arc, Mutex, OnceLock, Weak};
16use std::task::{ready, Context, Poll, Waker};
17
18use slotmap_careful::DenseSlotMap;
19
20slotmap_careful::new_key_type! { struct WakerKey; }
21
22/// A [oneshot broadcast][crate::util::oneshot_broadcast] sender.
23#[derive(Debug)]
24pub(crate) struct Sender<T> {
25    /// State shared with all [`Receiver`]s.
26    shared: Weak<Shared<T>>,
27}
28
29/// A [oneshot broadcast][crate::util::oneshot_broadcast] receiver.
30///
31/// The `Receiver` offers two methods for receiving the message:
32///
33/// 1. [`Receiver::into_future`]
34///     ```rust,ignore
35///     let (tx, rx) = channel();
36///     tx.send(0);
37///     let message: u32 = rx.await.unwrap();
38///     ```
39///
40/// 2. [`Receiver::borrowed`]
41///     ```rust,ignore
42///     let (tx, rx) = channel();
43///     tx.send(0);
44///     let message: &u32 = rx.borrowed().await.unwrap();
45///     ```
46#[derive(Clone, Debug)]
47pub(crate) struct Receiver<T> {
48    /// State shared with the sender and all other receivers.
49    shared: Arc<Shared<T>>,
50}
51
52/// State shared between the sender and receivers.
53/// Correctness:
54///
55/// Sending a message:
56///  - set the message OnceLock (A)
57///  - acquire the wakers Mutex
58///  - take all wakers (B)
59///  - release the wakers Mutex (C)
60///  - wake all wakers
61///
62/// Polling:
63///  - if message was set, return it (fast path)
64///  - acquire the wakers Mutex (D)
65///  - if message was set, return it (E)
66///  - add waker (F)
67///  - release the wakers Mutex
68///
69/// When the wakers Mutex is released at (C), a release-store operation is performed by the Mutex,
70/// which means that the message set at (A) will be seen by all future acquire-load operations by
71/// that same Mutex. More specifically, after (C) has occurred and when the same mutex is acquired at
72/// (D), the message set at (A) is guaranteed to be visible at (E). This means that after the wakers
73/// are taken at (B), no future wakers will be added at (F) and no waker will be "lost".
74#[derive(Debug)]
75struct Shared<T> {
76    /// The message sent from the [`Sender`] to the [`Receiver`]s.
77    msg: OnceLock<Result<T, SenderDropped>>,
78    /// The wakers waiting for a value to be sent.
79    /// Will be set to `Err` after the wakers have been woken.
80    // the `Result` isn't technically needed here,
81    // but we use it to help detect bugs;
82    // see `WakersAlreadyWoken` for details
83    wakers: Mutex<Result<DenseSlotMap<WakerKey, Waker>, WakersAlreadyWoken>>,
84}
85
86/// The future from [`Receiver::borrowed`].
87///
88/// Will be ready, yielding `&'a T`,
89/// when the sender sends a message or is dropped.
90#[derive(Debug)]
91pub(crate) struct BorrowedReceiverFuture<'a, T> {
92    /// State shared with the sender and all other receivers.
93    shared: &'a Shared<T>,
94    /// The key for any waker that we've added to [`Shared::wakers`].
95    waker_key: Option<WakerKey>,
96}
97
98/// The future from [`Receiver::into_future`].
99///
100/// Will be ready, yielding a clone of `T`,
101/// when the sender sends a message or is dropped.
102// Both `ReceiverFuture` and `BorrowedReceiverFuture` have similar fields
103// but there's no nice way to deduplicated them.
104// It would have been nice if we could store a `BorrowedReceiverFuture`
105// holding a reference to our `Arc<Shared>`,
106// but that would be a self-referential struct,
107// so we need to duplicate the fields here instead.
108#[derive(Debug)]
109pub(crate) struct ReceiverFuture<T> {
110    /// State shared with the sender and all other receivers.
111    shared: Arc<Shared<T>>,
112    /// The key for any waker that we've added to [`Shared::wakers`].
113    waker_key: Option<WakerKey>,
114}
115
116/// The wakers have already been woken.
117///
118/// This is used to help detect if we're trying to access the wakers after they've already been
119/// woken, which likely indicates a bug. For example, it is a bug if a receiver attempts to add a
120/// waker after the sender has already sent its message and woken the wakers, since the new waker
121/// would never be woken.
122#[derive(Copy, Clone, Debug)]
123struct WakersAlreadyWoken;
124
125/// The message has already been set, and we can't set it again.
126#[derive(Copy, Clone, Debug, thiserror::Error)]
127#[error("the message was already set")]
128struct MessageAlreadySet;
129
130/// The sender was dropped, so the channel is closed.
131#[derive(Copy, Clone, Debug, PartialEq, Eq, thiserror::Error)]
132#[error("the sender was dropped")]
133pub(crate) struct SenderDropped;
134
135/// Create a new oneshot broadcast channel.
136///
137/// ```rust,ignore
138/// let (tx, rx) = channel();
139/// let rx_clone = rx.clone();
140/// tx.send(0_u8);
141/// assert_eq!(rx.await, Ok(0));
142/// assert_eq!(rx_clone.await, Ok(0));
143/// ```
144pub(crate) fn channel<T>() -> (Sender<T>, Receiver<T>) {
145    let shared = Arc::new(Shared {
146        msg: OnceLock::new(),
147        wakers: Mutex::new(Ok(DenseSlotMap::with_key())),
148    });
149
150    let sender = Sender {
151        shared: Arc::downgrade(&shared),
152    };
153
154    let receiver = Receiver { shared };
155
156    (sender, receiver)
157}
158
159impl<T> Sender<T> {
160    /// Send the message to the [`Receiver`]s.
161    ///
162    /// The message may be lost if all receivers have been dropped.
163    #[cfg_attr(not(test), allow(dead_code))]
164    pub(crate) fn send(self, msg: T) {
165        // set the message and inform the wakers
166        Self::send_and_wake(&self.shared, Ok(msg))
167            // this 'send()` method takes an owned self,
168            // and we don't send a message outside of here and the drop handler,
169            // so this shouldn't be possible
170            .expect("could not set the message");
171    }
172
173    /// Send the message, and wake and clear all wakers.
174    ///
175    /// If all receivers have been dropped, then always returns `Ok`.
176    ///
177    /// If the message was unable to be set, returns `Err(MessageAlreadySet)`.
178    fn send_and_wake(
179        shared: &Weak<Shared<T>>,
180        msg: Result<T, SenderDropped>,
181    ) -> Result<(), MessageAlreadySet> {
182        // Even if the `Weak` upgrade is successful,
183        // it's possible that the last receiver
184        // will be dropped during this `send_and_wake` method,
185        // in which case we will be holding the last `Arc`.
186        let Some(shared) = shared.upgrade() else {
187            // all receivers have dropped; nothing to do
188            return Ok(());
189        };
190
191        // set the message
192        shared.msg.set(msg).or(Err(MessageAlreadySet))?;
193
194        let mut wakers = {
195            let mut wakers = shared.wakers.lock().expect("poisoned");
196            // Take the wakers and drop the mutex guard, releasing the lock.
197            //
198            // We could just drain the wakers map in-place here, but instead we replace the map with
199            // an explicit `WakersAlreadyWoken` state to help catch bugs if something tries adding a
200            // new waker later after we've already woken the wakers.
201            //
202            // The above `msg.set()` will only ever succeed once,
203            // which means that we should only end up here once.
204            std::mem::replace(&mut *wakers, Err(WakersAlreadyWoken))
205                .expect("wakers were taken more than once")
206        };
207
208        // Once we drop the mutex guard, which does a release-store on its own atomic, any other
209        // code which later acquires the wakers mutex is guaranteed to see the msg as "set".
210        // See comments on `Shared`.
211
212        // Wake while not holding the lock.
213        // Since the lock is used in `ReceiverFuture::poll` and `ReceiverFuture::drop` and
214        // should not block for long periods of time,
215        // we'd prefer not to run third-party waker code here while holding the mutex,
216        // even if `wake` should typically be fast.
217        for (_key, waker) in wakers.drain() {
218            waker.wake();
219        }
220
221        Ok(())
222    }
223
224    /// Returns `true` if all [`Receiver`]s (and all futures created from the receivers) have been
225    /// dropped.
226    ///
227    /// This can be useful to skip doing extra work to generate the message if the message will be
228    /// discarded anyways.
229    // This is for external use.
230    // It is not always valid to call this internally.
231    // For example when we've done a `Weak::upgrade` internally, like in `send_and_wake`,
232    // this won't return the correct value.
233    #[cfg_attr(not(test), allow(dead_code))]
234    pub(crate) fn is_cancelled(&self) -> bool {
235        self.shared.strong_count() == 0
236    }
237}
238
239impl<T> Drop for Sender<T> {
240    fn drop(&mut self) {
241        // set an error message to indicate that the sender was dropped and inform the wakers;
242        // it's fine if setting the message fails since it might have been set previously during a
243        // `send()`
244        let _ = Self::send_and_wake(&self.shared, Err(SenderDropped));
245    }
246}
247
248impl<T> Receiver<T> {
249    /// Receive a borrowed message from the [`Sender`].
250    ///
251    /// This may be more efficient than [`Receiver::into_future`]
252    /// and doesn't require `T: Clone`.
253    ///
254    /// This is cancellation-safe.
255    #[cfg_attr(not(test), allow(dead_code))]
256    pub(crate) fn borrowed(&self) -> BorrowedReceiverFuture<'_, T> {
257        BorrowedReceiverFuture {
258            shared: &self.shared,
259            waker_key: None,
260        }
261    }
262
263    /// The receiver is ready.
264    ///
265    /// If `true`, the [`Sender`] has either sent its message or been dropped.
266    pub(crate) fn is_ready(&self) -> bool {
267        self.shared.msg.get().is_some()
268    }
269}
270
271impl<T: Clone> IntoFuture for Receiver<T> {
272    type Output = Result<T, SenderDropped>;
273    type IntoFuture = ReceiverFuture<T>;
274
275    /// This future is cancellation-safe.
276    fn into_future(self) -> Self::IntoFuture {
277        ReceiverFuture {
278            shared: self.shared,
279            waker_key: None,
280        }
281    }
282}
283
284impl<'a, T> Future for BorrowedReceiverFuture<'a, T> {
285    type Output = Result<&'a T, SenderDropped>;
286
287    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
288        let self_ = self.get_mut();
289        receiver_fut_poll(self_.shared, &mut self_.waker_key, cx.waker())
290    }
291}
292
293impl<T> Drop for BorrowedReceiverFuture<'_, T> {
294    fn drop(&mut self) {
295        receiver_fut_drop(self.shared, &mut self.waker_key);
296    }
297}
298
299impl<T: Clone> Future for ReceiverFuture<T> {
300    type Output = Result<T, SenderDropped>;
301
302    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
303        let self_ = self.get_mut();
304        let poll = receiver_fut_poll(&self_.shared, &mut self_.waker_key, cx.waker());
305        Poll::Ready(ready!(poll)).map_ok(Clone::clone)
306    }
307}
308
309impl<T> Drop for ReceiverFuture<T> {
310    fn drop(&mut self) {
311        receiver_fut_drop(&self.shared, &mut self.waker_key);
312    }
313}
314
315/// The shared poll implementation for receiver futures.
316fn receiver_fut_poll<'a, T>(
317    shared: &'a Shared<T>,
318    waker_key: &mut Option<WakerKey>,
319    new_waker: &Waker,
320) -> Poll<Result<&'a T, SenderDropped>> {
321    // if the message was already set, return it
322    if let Some(msg) = shared.msg.get() {
323        return Poll::Ready(msg.as_ref().or(Err(SenderDropped)));
324    }
325
326    let mut wakers = shared.wakers.lock().expect("poisoned");
327
328    // check again now that we've acquired the mutex
329    if let Some(msg) = shared.msg.get() {
330        return Poll::Ready(msg.as_ref().or(Err(SenderDropped)));
331    }
332
333    // we have acquired the wakers mutex and checked that the message wasn't set,
334    // so we know that wakers have not yet been woken
335    // and it's okay to add our waker to the wakers map
336    let wakers = wakers.as_mut().expect("wakers were already woken");
337
338    match waker_key {
339        // we have added a waker previously
340        Some(waker_key) => {
341            // replace the old entry
342            let waker = wakers
343                .get_mut(*waker_key)
344                // the waker is only removed from the map by our drop handler,
345                // so the waker should never be missing
346                .expect("waker key is missing from map");
347            waker.clone_from(new_waker);
348        }
349        // we have never added a waker
350        None => {
351            // add a new entry
352            let new_key = wakers.insert(new_waker.clone());
353            *waker_key = Some(new_key);
354        }
355    }
356
357    Poll::Pending
358}
359
360/// The shared drop implementation for receiver futures.
361fn receiver_fut_drop<T>(shared: &Shared<T>, waker_key: &mut Option<WakerKey>) {
362    if let Some(waker_key) = waker_key.take() {
363        let mut wakers = shared.wakers.lock().expect("poisoned");
364        if let Ok(wakers) = wakers.as_mut() {
365            let waker = wakers.remove(waker_key);
366            // this is the only place that removes the waker from the map,
367            // so the waker should never be missing
368            debug_assert!(waker.is_some(), "the waker key was not found");
369        }
370    }
371}
372
373#[cfg(test)]
374mod test {
375    #![allow(clippy::unwrap_used)]
376
377    use super::*;
378
379    use futures::future::FutureExt;
380    use futures::task::SpawnExt;
381
382    impl<T> Shared<T> {
383        /// Count the number of wakers.
384        fn count_wakers(&self) -> usize {
385            self.wakers
386                .lock()
387                .expect("poisoned")
388                .as_ref()
389                .map(|x| x.len())
390                .unwrap_or(0)
391        }
392    }
393
394    #[test]
395    fn standard_usage() {
396        tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
397            let (tx, rx) = channel();
398            tx.send(0_u8);
399            assert_eq!(rx.borrowed().await, Ok(&0));
400
401            let (tx, rx) = channel();
402            tx.send(0_u8);
403            assert_eq!(rx.await, Ok(0));
404        });
405    }
406
407    #[test]
408    fn immediate_drop() {
409        let _ = channel::<()>();
410
411        let (tx, rx) = channel::<()>();
412        drop(tx);
413        drop(rx);
414
415        let (tx, rx) = channel::<()>();
416        drop(rx);
417        drop(tx);
418    }
419
420    #[test]
421    fn drop_sender() {
422        tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
423            let (tx, rx_1) = channel::<u8>();
424
425            let rx_2 = rx_1.clone();
426            drop(tx);
427            let rx_3 = rx_1.clone();
428            assert_eq!(rx_1.borrowed().await, Err(SenderDropped));
429            assert_eq!(rx_2.borrowed().await, Err(SenderDropped));
430            assert_eq!(rx_3.borrowed().await, Err(SenderDropped));
431        });
432    }
433
434    #[test]
435    fn clone_before_send() {
436        tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
437            let (tx, rx_1) = channel();
438
439            let rx_2 = rx_1.clone();
440            tx.send(0_u8);
441            assert_eq!(rx_1.borrowed().await, Ok(&0));
442            assert_eq!(rx_2.borrowed().await, Ok(&0));
443        });
444    }
445
446    #[test]
447    fn clone_after_send() {
448        tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
449            let (tx, rx_1) = channel();
450
451            tx.send(0_u8);
452            let rx_2 = rx_1.clone();
453            assert_eq!(rx_1.borrowed().await, Ok(&0));
454            assert_eq!(rx_2.borrowed().await, Ok(&0));
455        });
456    }
457
458    #[test]
459    fn clone_after_borrowed() {
460        tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
461            let (tx, rx_1) = channel();
462
463            tx.send(0_u8);
464            assert_eq!(rx_1.borrowed().await, Ok(&0));
465            let rx_2 = rx_1.clone();
466            assert_eq!(rx_2.borrowed().await, Ok(&0));
467        });
468    }
469
470    #[test]
471    fn drop_one_receiver() {
472        tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
473            let (tx, rx_1) = channel();
474
475            let rx_2 = rx_1.clone();
476            drop(rx_1);
477            tx.send(0_u8);
478            assert_eq!(rx_2.borrowed().await, Ok(&0));
479        });
480    }
481
482    #[test]
483    fn drop_all_receivers() {
484        let (tx, rx_1) = channel();
485
486        let rx_2 = rx_1.clone();
487        drop(rx_1);
488        drop(rx_2);
489        tx.send(0_u8);
490    }
491
492    #[test]
493    fn drop_fut() {
494        let (_tx, rx) = channel::<u8>();
495        let fut = rx.borrowed();
496        assert_eq!(rx.shared.count_wakers(), 0);
497        drop(fut);
498        assert_eq!(rx.shared.count_wakers(), 0);
499
500        // drop after sending
501        let (tx, rx) = channel();
502        tx.send(0_u8);
503        let fut = rx.borrowed();
504        assert_eq!(rx.shared.count_wakers(), 0);
505        drop(fut);
506        assert_eq!(rx.shared.count_wakers(), 0);
507
508        // drop after polling once
509        let (_tx, rx) = channel::<u8>();
510        let mut fut = Box::pin(rx.borrowed());
511        assert_eq!(rx.shared.count_wakers(), 0);
512        assert_eq!(fut.as_mut().now_or_never(), None);
513        assert_eq!(rx.shared.count_wakers(), 1);
514        drop(fut);
515        assert_eq!(rx.shared.count_wakers(), 0);
516
517        // drop after polling once and send
518        let (tx, rx) = channel();
519        let mut fut = Box::pin(rx.borrowed());
520        assert_eq!(rx.shared.count_wakers(), 0);
521        assert_eq!(fut.as_mut().now_or_never(), None);
522        assert_eq!(rx.shared.count_wakers(), 1);
523        tx.send(0_u8);
524        assert_eq!(rx.shared.count_wakers(), 0);
525        drop(fut);
526    }
527
528    #[test]
529    fn drop_owned_fut() {
530        let (_tx, rx) = channel::<u8>();
531        let fut = rx.clone().into_future();
532        assert_eq!(rx.shared.count_wakers(), 0);
533        drop(fut);
534        assert_eq!(rx.shared.count_wakers(), 0);
535
536        // drop after sending
537        let (tx, rx) = channel();
538        tx.send(0_u8);
539        let fut = rx.clone().into_future();
540        assert_eq!(rx.shared.count_wakers(), 0);
541        drop(fut);
542        assert_eq!(rx.shared.count_wakers(), 0);
543
544        // drop after polling once
545        let (_tx, rx) = channel::<u8>();
546        let mut fut = Box::pin(rx.clone().into_future());
547        assert_eq!(rx.shared.count_wakers(), 0);
548        assert_eq!(fut.as_mut().now_or_never(), None);
549        assert_eq!(rx.shared.count_wakers(), 1);
550        drop(fut);
551        assert_eq!(rx.shared.count_wakers(), 0);
552
553        // drop after polling once and send
554        let (tx, rx) = channel();
555        let mut fut = Box::pin(rx.clone().into_future());
556        assert_eq!(rx.shared.count_wakers(), 0);
557        assert_eq!(fut.as_mut().now_or_never(), None);
558        assert_eq!(rx.shared.count_wakers(), 1);
559        tx.send(0_u8);
560        assert_eq!(rx.shared.count_wakers(), 0);
561        drop(fut);
562    }
563
564    #[test]
565    fn is_ready_after_send() {
566        let (tx, rx_1) = channel();
567        assert!(!rx_1.is_ready());
568        let rx_2 = rx_1.clone();
569        assert!(!rx_2.is_ready());
570
571        tx.send(0_u8);
572
573        assert!(rx_1.is_ready());
574        assert!(rx_2.is_ready());
575
576        let rx_3 = rx_1.clone();
577        assert!(rx_3.is_ready());
578    }
579
580    #[test]
581    fn is_ready_after_drop() {
582        let (tx, rx_1) = channel::<u8>();
583        assert!(!rx_1.is_ready());
584        let rx_2 = rx_1.clone();
585        assert!(!rx_2.is_ready());
586
587        drop(tx);
588
589        assert!(rx_1.is_ready());
590        assert!(rx_2.is_ready());
591
592        let rx_3 = rx_1.clone();
593        assert!(rx_3.is_ready());
594    }
595
596    #[test]
597    fn is_cancelled() {
598        let (tx, rx) = channel::<u8>();
599        assert!(!tx.is_cancelled());
600        drop(rx);
601        assert!(tx.is_cancelled());
602
603        let (tx, rx_1) = channel::<u8>();
604        assert!(!tx.is_cancelled());
605        let rx_2 = rx_1.clone();
606        drop(rx_1);
607        assert!(!tx.is_cancelled());
608        drop(rx_2);
609        assert!(tx.is_cancelled());
610    }
611
612    #[test]
613    fn recv_in_task() {
614        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
615            let (tx, rx) = channel();
616
617            let join = rt
618                .spawn_with_handle(async move {
619                    assert_eq!(rx.borrowed().await, Ok(&0));
620                    assert_eq!(rx.await, Ok(0));
621                })
622                .unwrap();
623
624            tx.send(0_u8);
625
626            join.await;
627        });
628    }
629
630    #[test]
631    fn recv_multiple_in_task() {
632        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
633            let (tx, rx) = channel();
634            let rx_1 = rx.clone();
635            let rx_2 = rx.clone();
636
637            let join_1 = rt
638                .spawn_with_handle(async move {
639                    assert_eq!(rx_1.borrowed().await, Ok(&0));
640                })
641                .unwrap();
642            let join_2 = rt
643                .spawn_with_handle(async move {
644                    assert_eq!(rx_2.await, Ok(0));
645                })
646                .unwrap();
647
648            tx.send(0_u8);
649
650            join_1.await;
651            join_2.await;
652            assert_eq!(rx.borrowed().await, Ok(&0));
653        });
654    }
655
656    #[test]
657    fn recv_multiple_times() {
658        tor_rtmock::MockRuntime::test_with_various(|_rt| async move {
659            let (tx, rx) = channel();
660
661            tx.send(0_u8);
662            assert_eq!(rx.borrowed().await, Ok(&0));
663            assert_eq!(rx.borrowed().await, Ok(&0));
664            assert_eq!(rx.clone().await, Ok(0));
665            assert_eq!(rx.await, Ok(0));
666        });
667    }
668
669    #[test]
670    fn stress() {
671        // In general we don't have control over the runtime and where/when tasks are scheduled,
672        // so we try as best as possible to send the message while simultaneously creating new
673        // receivers and waiting on them.
674        // It's possible this might be entirely ineffective since we don't enforce any specific
675        // scheduler behaviour here,
676        // but in the worst case it's still a test with multiple receivers on different tasks,
677        // so is useful to have.
678        //
679        // The `test_with_various` helper uses `MockExecutor` with two different deterministic
680        // scheduling policies.
681        // At least at the time of writing,
682        // when this test uses `MockExecutor` with its "queue" scheduling policy
683        // the "send" occurs after 20 of the tasks have begun waiting.
684        tor_rtmock::MockRuntime::test_with_various(|rt| async move {
685            let (tx, rx) = channel();
686
687            rt.spawn(async move {
688                // this tries to delay the send a little bit
689                // to give time for some of the receiver tasks to start
690                for _ in 0..20 {
691                    tor_rtcompat::task::yield_now().await;
692                }
693                tx.send(0_u8);
694            })
695            .unwrap();
696
697            let mut joins = vec![];
698            for _ in 0..100 {
699                let rx_clone = rx.clone();
700                let join = rt
701                    .spawn_with_handle(async move { rx_clone.borrowed().await.cloned() })
702                    .unwrap();
703                joins.push(join);
704                // allows the send task to make progress if single-threaded
705                tor_rtcompat::task::yield_now().await;
706            }
707
708            for join in joins {
709                assert!(matches!(join.await, Ok(0)));
710            }
711        });
712    }
713}