tor_rtmock/
task.rs

1//! Executor for running tests with mocked environment
2//!
3//! See [`MockExecutor`]
4
5use std::any::Any;
6use std::cell::Cell;
7use std::collections::VecDeque;
8use std::fmt::{self, Debug, Display};
9use std::future::Future;
10use std::io::{self, Write as _};
11use std::iter;
12use std::panic::{catch_unwind, panic_any, AssertUnwindSafe};
13use std::pin::{pin, Pin};
14use std::sync::{Arc, Mutex, MutexGuard, Weak};
15use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
16
17use futures::pin_mut;
18use futures::task::{FutureObj, Spawn, SpawnError};
19use futures::FutureExt as _;
20
21use assert_matches::assert_matches;
22use educe::Educe;
23use itertools::Either::{self, *};
24use itertools::{chain, izip};
25use slotmap_careful::DenseSlotMap;
26use std::backtrace::Backtrace;
27use strum::EnumIter;
28
29// NB: when using traced_test, the trace! and error! output here is generally suppressed
30// in tests of other crates.  To see it, you can write something like this
31// (in the dev-dependencies of the crate whose tests you're running):
32//    tracing-test = { version = "0.2.4", features = ["no-env-filter"] }
33use tracing::{error, trace};
34
35use oneshot_fused_workaround::{self as oneshot, Canceled};
36use tor_error::error_report;
37use tor_rtcompat::{Blocking, ToplevelBlockOn};
38
39use Poll::*;
40use TaskState::*;
41
42/// Type-erased future, one for each of our (normal) tasks
43type TaskFuture = FutureObj<'static, ()>;
44
45/// Future for the argument to `block_on`, which is handled specially
46type MainFuture<'m> = Pin<&'m mut dyn Future<Output = ()>>;
47
48//---------- principal data structures ----------
49
50/// Executor for running tests with mocked environment
51///
52/// For test cases which don't actually wait for anything in the real world.
53///
54/// This is the executor.
55/// It implements [`Spawn`] and [`ToplevelBlockOn`]
56///
57/// It will usually be used as part of a `MockRuntime`.
58///
59/// To run futures, call [`ToplevelBlockOn::block_on`]
60///
61/// # Restricted environment
62///
63/// Tests run with this executor must not attempt to block
64/// on anything "outside":
65/// every future that anything awaits must (eventually) be woken directly
66/// *by some other task* in the same test case.
67///
68/// (By directly we mean that the [`Waker::wake`] call is made
69/// by that waking future, before that future itself awaits anything.)
70///
71/// # Panics
72///
73/// The executor will panic
74/// if the toplevel future (passed to `block_on`)
75/// doesn't complete (without externally blocking),
76/// but instead waits for something.
77///
78/// The executor will malfunction or panic if reentered.
79/// (Eg, if `block_on` is reentered.)
80#[derive(Clone, Default, Educe)]
81#[educe(Debug)]
82pub struct MockExecutor {
83    /// Mutable state
84    #[educe(Debug(ignore))]
85    shared: Arc<Shared>,
86}
87
88/// Shared state and ancillary information
89///
90/// This is always within an `Arc`.
91#[derive(Default)]
92struct Shared {
93    /// Shared state
94    data: Mutex<Data>,
95    /// Condition variable for thread scheduling
96    ///
97    /// Signaled when [`Data.thread_to_run`](struct.Data.html#structfield.thread_to_run)
98    /// is modified.
99    thread_condvar: std::sync::Condvar,
100}
101
102/// Task id, module to hide `Ti` alias
103mod task_id {
104    slotmap_careful::new_key_type! {
105        /// Task ID, usually called `TaskId`
106        ///
107        /// Short name in special `task_id` module so that [`Debug`] is nice
108        pub(super) struct Ti;
109    }
110}
111use task_id::Ti as TaskId;
112
113/// Executor's state
114///
115/// ### Task state machine
116///
117/// A task is created in `tasks`, `Awake`, so also in `awake`.
118///
119/// When we poll it, we take it out of `awake` and set it to `Asleep`,
120/// and then call `poll()`.
121/// Any time after that, it can be made `Awake` again (and put back onto `awake`)
122/// by the waker ([`ActualWaker`], wrapped in [`Waker`]).
123///
124/// The task's future is of course also present here in this data structure.
125/// However, during poll we must release the lock,
126/// so we cannot borrow the future from `Data`.
127/// Instead, we move it out.  So `Task.fut` is an `Option`.
128///
129/// ### "Main" task - the argument to `block_on`
130///
131/// The signature of `BlockOn::block_on` accepts a non-`'static` future
132/// (and a non-`Send`/`Sync` one).
133///
134/// So we cannot store that future in `Data` because `Data` is `'static`.
135/// Instead, this main task future is passed as an argument down the call stack.
136/// In the data structure we simply store a placeholder, `TaskFutureInfo::Main`.
137#[derive(Educe, derive_more::Debug)]
138#[educe(Default)]
139struct Data {
140    /// Tasks
141    ///
142    /// Includes tasks spawned with `spawn`,
143    /// and also the future passed to `block_on`.
144    #[debug("{:?}", DebugTasks(self, || tasks.keys()))]
145    tasks: DenseSlotMap<TaskId, Task>,
146
147    /// `awake` lists precisely: tasks that are `Awake`, plus maybe stale `TaskId`s
148    ///
149    /// Tasks are pushed onto the *back* when woken,
150    /// so back is the most recently woken.
151    #[debug("{:?}", DebugTasks(self, || awake.iter().cloned()))]
152    awake: VecDeque<TaskId>,
153
154    /// If a future from `progress_until_stalled` exists
155    progressing_until_stalled: Option<ProgressingUntilStalled>,
156
157    /// Scheduling policy
158    scheduling: SchedulingPolicy,
159
160    /// (Sub)thread we want to run now
161    ///
162    /// At any one time only one thread is meant to be running.
163    /// Other threads are blocked in condvar wait, waiting for this to change.
164    ///
165    /// **Modified only** within
166    /// [`thread_context_switch_send_instruction_to_run`](Shared::thread_context_switch_send_instruction_to_run),
167    /// which takes responsibility for preserving the following **invariants**:
168    ///
169    ///  1. no-one but the named thread is allowed to modify this field.
170    ///  2. after modifying this field, signal `thread_condvar`
171    #[educe(Default(expression = "ThreadDescriptor::Executor"))]
172    thread_to_run: ThreadDescriptor,
173}
174
175/// How we should schedule?
176#[derive(Debug, Clone, Default, EnumIter)]
177#[non_exhaustive]
178pub enum SchedulingPolicy {
179    /// Task *most* recently woken is run
180    ///
181    /// This is the default.
182    ///
183    /// It will expose starvation bugs if a task never sleeps.
184    /// (Which is a good thing in tests.)
185    #[default]
186    Stack,
187    /// Task *least* recently woken is run.
188    Queue,
189}
190
191/// Record of a single task
192///
193/// Tracks a spawned task, or the main task (the argument to `block_on`).
194///
195/// Stored in [`Data`]`.tasks`.
196struct Task {
197    /// For debugging output
198    desc: String,
199    /// Has this been woken via a waker?  (And is it in `Data.awake`?)
200    ///
201    /// **Set to `Awake` only by [`Task::set_awake`]**,
202    /// preserving the invariant that
203    /// every `Awake` task is in [`Data.awake`](struct.Data.html#structfield.awake).
204    state: TaskState,
205    /// The actual future (or a placeholder for it)
206    ///
207    /// May be `None` briefly in the executor main loop, because we've
208    /// temporarily moved it out so we can poll it,
209    /// or if this is a Subthread task which is currently running sync code
210    /// (in which case we're blocked in the executor waiting to be
211    /// woken up by [`thread_context_switch`](Shared::thread_context_switch).
212    ///
213    /// Note that the `None` can be observed outside the main loop, because
214    /// the main loop unlocks while it polls, so other (non-main-loop) code
215    /// might see it.
216    fut: Option<TaskFutureInfo>,
217}
218
219/// A future as stored in our record of a [`Task`]
220#[derive(Educe)]
221#[educe(Debug)]
222enum TaskFutureInfo {
223    /// The [`Future`].  All is normal.
224    Normal(#[educe(Debug(ignore))] TaskFuture),
225    /// The future isn't here because this task is the main future for `block_on`
226    Main,
227    /// This task is actually a [`Subthread`](MockExecutor::subthread_spawn)
228    ///
229    /// Instead of polling it, we'll switch to it with
230    /// [`thread_context_switch`](Shared::thread_context_switch).
231    Subthread,
232}
233
234/// State of a task - do we think it needs to be polled?
235///
236/// Stored in [`Task`]`.state`.
237#[derive(Debug)]
238enum TaskState {
239    /// Awake - needs to be polled
240    ///
241    /// Established by [`waker.wake()`](Waker::wake)
242    Awake,
243    /// Asleep - does *not* need to be polled
244    ///
245    /// Established each time just before we call the future's [`poll`](Future::poll)
246    Asleep(Vec<SleepLocation>),
247}
248
249/// Actual implementor of `Wake` for use in a `Waker`
250///
251/// Futures (eg, channels from [`futures`]) will use this to wake a task
252/// when it should be polled.
253///
254/// This type must not be `Cloned` with the `Data` lock held.
255/// Consequently, a `Waker` mustn't either.
256struct ActualWaker {
257    /// Executor state
258    ///
259    /// The Waker mustn't to hold a strong reference to the executor,
260    /// since typically a task holds a future that holds a Waker,
261    /// and the executor holds the task - so that would be a cycle.
262    data: Weak<Shared>,
263
264    /// Which task this is
265    id: TaskId,
266}
267
268/// State used for an in-progress call to
269/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`]
270///
271/// If present in [`Data`], an (async) call to `progress_until_stalled`
272/// is in progress.
273///
274/// The future from `progress_until_stalled`, [`ProgressUntilStalledFuture`]
275/// is a normal-ish future.
276/// It can be polled in the normal way.
277/// When it is polled, it looks here, in `finished`, to see if it's `Ready`.
278///
279/// The future is made ready, and woken (via `waker`),
280/// by bespoke code in the task executor loop.
281///
282/// When `ProgressUntilStalledFuture` (maybe completes and) is dropped,
283/// its `Drop` impl is used to remove this from `Data.progressing_until_stalled`.
284#[derive(Debug)]
285struct ProgressingUntilStalled {
286    /// Have we, in fact, stalled?
287    ///
288    /// Made `Ready` by special code in the executor loop
289    finished: Poll<()>,
290
291    /// Waker
292    ///
293    /// Signalled by special code in the executor loop
294    waker: Option<Waker>,
295}
296
297/// Future from
298/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`]
299///
300/// See [`ProgressingUntilStalled`] for an overview of this aspect of the contraption.
301///
302/// Existence of this struct implies `Data.progressing_until_stalled` is `Some`.
303/// There can only be one at a time.
304#[derive(Educe)]
305#[educe(Debug)]
306struct ProgressUntilStalledFuture {
307    /// Executor's state; this future's state is in `.progressing_until_stalled`
308    #[educe(Debug(ignore))]
309    shared: Arc<Shared>,
310}
311
312/// Identifies a thread we know about - the executor thread, or a Subthread
313///
314/// Not related to `std::thread::ThreadId`.
315///
316/// See [`spawn_subthread`](MockExecutor::subthread_spawn) for definition of a Subthread.
317///
318/// This being a thread-local and not scoped by which `MockExecutor` we're talking about
319/// means that we can't cope if there are multiple `MockExecutor`s involved in the same thread.
320/// That's OK (and documented).
321#[derive(Copy, Clone, Eq, PartialEq, derive_more::Debug)]
322enum ThreadDescriptor {
323    /// Foreign - neither the (running) executor, nor a Subthread
324    #[debug("FOREIGN")]
325    Foreign,
326    /// The executor.
327    #[debug("Exe")]
328    Executor,
329    /// This task, which is a Subthread.
330    #[debug("{_0:?}")]
331    Subthread(TaskId),
332}
333
334/// Marker indicating that this task is a Subthread, not an async task.
335///
336/// See [`spawn_subthread`](MockExecutor::subthread_spawn) for definition of a Subthread.
337#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
338struct IsSubthread;
339
340/// [`Shared::subthread_yield`] should set our task awake before switching to the executor
341#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
342struct SetAwake;
343
344thread_local! {
345    /// Identifies this thread.
346    pub static THREAD_DESCRIPTOR: Cell<ThreadDescriptor> = const {
347        Cell::new(ThreadDescriptor::Foreign)
348    };
349}
350
351//---------- creation ----------
352
353impl MockExecutor {
354    /// Make a `MockExecutor` with default parameters
355    pub fn new() -> Self {
356        Self::default()
357    }
358
359    /// Make a `MockExecutor` with a specific `SchedulingPolicy`
360    pub fn with_scheduling(scheduling: SchedulingPolicy) -> Self {
361        Data {
362            scheduling,
363            ..Default::default()
364        }
365        .into()
366    }
367}
368
369impl From<Data> for MockExecutor {
370    fn from(data: Data) -> MockExecutor {
371        let shared = Shared {
372            data: Mutex::new(data),
373            thread_condvar: std::sync::Condvar::new(),
374        };
375        MockExecutor {
376            shared: Arc::new(shared),
377        }
378    }
379}
380
381//---------- spawning ----------
382
383impl MockExecutor {
384    /// Spawn a task and return something to identify it
385    ///
386    /// `desc` should `Display` as some kind of short string (ideally without spaces)
387    /// and will be used in the `Debug` impl and trace log messages from `MockExecutor`.
388    ///
389    /// The returned value is an opaque task identifier which is very cheap to clone
390    /// and which can be used by the caller in debug logging,
391    /// if it's desired to correlate with the debug output from `MockExecutor`.
392    /// Most callers will want to ignore it.
393    ///
394    /// This method is infallible.  (The `MockExecutor` cannot be shut down.)
395    pub fn spawn_identified(
396        &self,
397        desc: impl Display,
398        fut: impl Future<Output = ()> + Send + 'static,
399    ) -> impl Debug + Clone + Send + 'static {
400        self.spawn_internal(desc.to_string(), FutureObj::from(Box::new(fut)))
401    }
402
403    /// Spawn a task and return its output for further usage
404    ///
405    /// `desc` should `Display` as some kind of short string (ideally without spaces)
406    /// and will be used in the `Debug` impl and trace log messages from `MockExecutor`.
407    pub fn spawn_join<T: Debug + Send + 'static>(
408        &self,
409        desc: impl Display,
410        fut: impl Future<Output = T> + Send + 'static,
411    ) -> impl Future<Output = T> {
412        let (tx, rx) = oneshot::channel();
413        self.spawn_identified(desc, async move {
414            let res = fut.await;
415            tx.send(res)
416                .expect("Failed to send future's output, did future panic?");
417        });
418        rx.map(|m| m.expect("Failed to receive future's output"))
419    }
420
421    /// Spawn a task and return its `TaskId`
422    ///
423    /// Convenience method for use by `spawn_identified` and `spawn_obj`.
424    /// The future passed to `block_on` is not handled here.
425    fn spawn_internal(&self, desc: String, fut: TaskFuture) -> TaskId {
426        let mut data = self.shared.lock();
427        data.insert_task(desc, TaskFutureInfo::Normal(fut))
428    }
429}
430
431impl Data {
432    /// Insert a task given its `TaskFutureInfo` and return its `TaskId`.
433    fn insert_task(&mut self, desc: String, fut: TaskFutureInfo) -> TaskId {
434        let state = Awake;
435        let id = self.tasks.insert(Task {
436            state,
437            desc,
438            fut: Some(fut),
439        });
440        self.awake.push_back(id);
441        trace!("MockExecutor spawned {:?}={:?}", id, self.tasks[id]);
442        id
443    }
444}
445
446impl Spawn for MockExecutor {
447    fn spawn_obj(&self, future: TaskFuture) -> Result<(), SpawnError> {
448        self.spawn_internal("spawn_obj".into(), future);
449        Ok(())
450    }
451}
452
453impl Blocking for MockExecutor {
454    type ThreadHandle<T: Send + 'static> = Pin<Box<dyn Future<Output = T>>>;
455
456    fn spawn_blocking<F, T>(&self, f: F) -> Self::ThreadHandle<T>
457    where
458        F: FnOnce() -> T + Send + 'static,
459        T: Send + 'static,
460    {
461        assert_matches!(
462            THREAD_DESCRIPTOR.get(),
463            ThreadDescriptor::Executor | ThreadDescriptor::Subthread(_),
464 "MockExecutor::spawn_blocking_io only allowed from future or subthread, being run by this executor"
465        );
466        Box::pin(
467            self.subthread_spawn("spawn_blocking", f)
468                .map(|x| x.expect("Error in spawn_blocking subthread.")),
469        )
470    }
471
472    fn reenter_block_on<F>(&self, future: F) -> F::Output
473    where
474        F: Future,
475        F::Output: Send + 'static,
476    {
477        self.subthread_block_on_future(future)
478    }
479}
480
481//---------- block_on ----------
482
483impl ToplevelBlockOn for MockExecutor {
484    fn block_on<F>(&self, input_fut: F) -> F::Output
485    where
486        F: Future,
487    {
488        let mut value: Option<F::Output> = None;
489
490        // Box this just so that we can conveniently control precisely when it's dropped.
491        // (We could do this with Option and Pin::set but that seems clumsier.)
492        let mut input_fut = Box::pin(input_fut);
493
494        let run_store_fut = {
495            let value = &mut value;
496            let input_fut = &mut input_fut;
497            async {
498                trace!("MockExecutor block_on future...");
499                let t = input_fut.await;
500                trace!("MockExecutor block_on future returned...");
501                *value = Some(t);
502                trace!("MockExecutor block_on future exiting.");
503            }
504        };
505
506        {
507            pin_mut!(run_store_fut);
508
509            let main_id = self
510                .shared
511                .lock()
512                .insert_task("main".into(), TaskFutureInfo::Main);
513            trace!("MockExecutor {main_id:?} is task for block_on");
514            self.execute_to_completion(run_store_fut);
515        }
516
517        #[allow(clippy::let_and_return)] // clarity
518        let value = value.take().unwrap_or_else(|| {
519            // eprintln can be captured by libtest, but the debug_dump goes to io::stderr.
520            // use the latter, so that the debug dump is prefixed by this message.
521            let _: io::Result<()> = writeln!(io::stderr(), "all futures blocked, crashing...");
522            // write to tracing too, so the tracing log is clear about when we crashed
523            error!("all futures blocked, crashing...");
524
525            // Sequencing here is subtle.
526            //
527            // We should do the dump before dropping the input future, because the input
528            // future is likely to own things that, when dropped, wake up other tasks,
529            // rendering the dump inaccurate.
530            //
531            // But also, dropping the input future may well drop a ProgressUntilStalledFuture
532            // which then reenters us.  More generally, we mustn't call user code
533            // with the lock held.
534            //
535            // And, we mustn't panic with the data lock held.
536            //
537            // If value was Some, then this closure is dropped without being called,
538            // which drops the future after it has yielded the value, which is correct.
539            {
540                let mut data = self.shared.lock();
541                data.debug_dump();
542            }
543            drop(input_fut);
544
545            panic!(
546                r"
547all futures blocked. waiting for the real world? or deadlocked (waiting for each other) ?
548"
549            );
550        });
551
552        value
553    }
554}
555
556//---------- execution - core implementation ----------
557
558impl MockExecutor {
559    /// Keep polling tasks until nothing more can be done
560    ///
561    /// Ie, stop when `awake` is empty and `progressing_until_stalled` is `None`.
562    fn execute_to_completion(&self, mut main_fut: MainFuture) {
563        trace!("MockExecutor execute_to_completion...");
564        loop {
565            self.execute_until_first_stall(main_fut.as_mut());
566
567            // Handle `progressing_until_stalled`
568            let pus_waker = {
569                let mut data = self.shared.lock();
570                let pus = &mut data.progressing_until_stalled;
571                trace!("MockExecutor execute_to_completion PUS={:?}", &pus);
572                let Some(pus) = pus else {
573                    // No progressing_until_stalled, we're actually done.
574                    break;
575                };
576                assert_eq!(
577                    pus.finished, Pending,
578                    "ProgressingUntilStalled finished twice?!"
579                );
580                pus.finished = Ready(());
581
582                // Release the lock temporarily so that ActualWaker::clone doesn't deadlock
583                let waker = pus
584                    .waker
585                    .take()
586                    .expect("ProgressUntilStalledFuture not ever polled!");
587                drop(data);
588                let waker_copy = waker.clone();
589                let mut data = self.shared.lock();
590
591                let pus = &mut data.progressing_until_stalled;
592                if let Some(double) = pus
593                    .as_mut()
594                    .expect("progressing_until_stalled updated under our feet!")
595                    .waker
596                    .replace(waker)
597                {
598                    panic!("double progressing_until_stalled.waker! {double:?}");
599                }
600
601                waker_copy
602            };
603            pus_waker.wake();
604        }
605        trace!("MockExecutor execute_to_completion done");
606    }
607
608    /// Keep polling tasks until `awake` is empty
609    ///
610    /// (Ignores `progressing_until_stalled` - so if one is active,
611    /// will return when all other tasks have blocked.)
612    ///
613    /// # Panics
614    ///
615    /// Might malfunction or panic if called reentrantly
616    fn execute_until_first_stall(&self, main_fut: MainFuture) {
617        trace!("MockExecutor execute_until_first_stall ...");
618
619        assert_eq!(
620            THREAD_DESCRIPTOR.get(),
621            ThreadDescriptor::Foreign,
622            "MockExecutor executor re-entered"
623        );
624        THREAD_DESCRIPTOR.set(ThreadDescriptor::Executor);
625
626        let r = catch_unwind(AssertUnwindSafe(|| self.executor_main_loop(main_fut)));
627
628        THREAD_DESCRIPTOR.set(ThreadDescriptor::Foreign);
629
630        match r {
631            Ok(()) => trace!("MockExecutor execute_until_first_stall done."),
632            Err(e) => {
633                trace!("MockExecutor executor, or async task, panicked!");
634                panic_any(e)
635            }
636        }
637    }
638
639    /// Keep polling tasks until `awake` is empty (inner, executor main loop)
640    ///
641    /// This is only called from [`MockExecutor::execute_until_first_stall`],
642    /// so it could also be called `execute_until_first_stall_inner`.
643    #[allow(clippy::cognitive_complexity)]
644    fn executor_main_loop(&self, mut main_fut: MainFuture) {
645        'outer: loop {
646            // Take a `Awake` task off `awake` and make it `Asleep`
647            let (id, mut fut) = 'inner: loop {
648                let mut data = self.shared.lock();
649                let Some(id) = data.schedule() else {
650                    break 'outer;
651                };
652                let Some(task) = data.tasks.get_mut(id) else {
653                    trace!("MockExecutor {id:?} vanished");
654                    continue;
655                };
656                task.state = Asleep(vec![]);
657                let fut = task.fut.take().expect("future missing from task!");
658                break 'inner (id, fut);
659            };
660
661            // Poll the selected task
662            trace!("MockExecutor {id:?} polling...");
663            let waker = ActualWaker::make_waker(&self.shared, id);
664            let mut cx = Context::from_waker(&waker);
665            let r: Either<Poll<()>, IsSubthread> = match &mut fut {
666                TaskFutureInfo::Normal(fut) => Left(fut.poll_unpin(&mut cx)),
667                TaskFutureInfo::Main => Left(main_fut.as_mut().poll(&mut cx)),
668                TaskFutureInfo::Subthread => Right(IsSubthread),
669            };
670
671            // Deal with the returned `Poll`
672            let _fut_drop_late;
673            {
674                let mut data = self.shared.lock();
675                let task = data
676                    .tasks
677                    .get_mut(id)
678                    .expect("task vanished while we were polling it");
679
680                match r {
681                    Left(Pending) => {
682                        trace!("MockExecutor {id:?} -> Pending");
683                        if task.fut.is_some() {
684                            panic!("task reinserted while we polled it?!");
685                        }
686                        // The task might have been woken *by its own poll method*.
687                        // That's why we set it to `Asleep` *earlier* rather than here.
688                        // All we need to do is put the future back.
689                        task.fut = Some(fut);
690                    }
691                    Left(Ready(())) => {
692                        trace!("MockExecutor {id:?} -> Ready");
693                        // Oh, it finished!
694                        // It might be in `awake`, but that's allowed to contain stale tasks,
695                        // so we *don't* need to scan that list and remove it.
696                        data.tasks.remove(id);
697                        // It is important that we don't drop `fut` until we have released
698                        // the data lock, since it is an external type and might try to reenter
699                        // us (eg by calling spawn).  If we do that here, we risk deadlock.
700                        // So, move `fut` to a variable with scope outside the block with `data`.
701                        _fut_drop_late = fut;
702                    }
703                    Right(IsSubthread) => {
704                        trace!("MockExecutor {id:?} -> Ready, waking Subthread");
705                        // Task is a subthread, which has called thread_context_switch
706                        // to switch to us.  We "poll" it by switching back.
707
708                        // Put back `TFI::Subthread`, which was moved out temporarily, above.
709                        task.fut = Some(fut);
710
711                        self.shared.thread_context_switch(
712                            data,
713                            ThreadDescriptor::Executor,
714                            ThreadDescriptor::Subthread(id),
715                        );
716
717                        // Now, if the Subthread still exists, that's because it's switched
718                        // back to us, and is waiting in subthread_block_on_future again.
719                        // Or it might have ended, in which case it's not in `tasks` any more.
720                        // In any case we can go back to scheduling futures.
721                    }
722                }
723            }
724        }
725    }
726}
727
728impl Data {
729    /// Return the next task to run
730    ///
731    /// The task is removed from `awake`, but **`state` is not set to `Asleep`**.
732    /// The caller must restore the invariant!
733    fn schedule(&mut self) -> Option<TaskId> {
734        use SchedulingPolicy as SP;
735        match self.scheduling {
736            SP::Stack => self.awake.pop_back(),
737            SP::Queue => self.awake.pop_front(),
738        }
739    }
740}
741
742impl ActualWaker {
743    /// Obtain a strong reference to the executor's data
744    fn upgrade_data(&self) -> Option<Arc<Shared>> {
745        self.data.upgrade()
746    }
747
748    /// Wake the task corresponding to this `ActualWaker`
749    ///
750    /// This is like `<Self as std::task::Wake>::wake()` but takes `&self`, not `Arc`
751    fn wake(&self) {
752        let Some(data) = self.upgrade_data() else {
753            // The executor is gone!  Don't try to wake.
754            return;
755        };
756        let mut data = data.lock();
757        let data = &mut *data;
758        trace!("MockExecutor {:?} wake", &self.id);
759        let Some(task) = data.tasks.get_mut(self.id) else {
760            return;
761        };
762        task.set_awake(self.id, &mut data.awake);
763    }
764
765    /// Create and return a `Waker` for task `id`
766    fn make_waker(shared: &Arc<Shared>, id: TaskId) -> Waker {
767        ActualWaker {
768            data: Arc::downgrade(shared),
769            id,
770        }
771        .new_waker()
772    }
773}
774
775//---------- "progress until stalled" functionality ----------
776
777impl MockExecutor {
778    /// Run tasks in the current executor until every other task is waiting
779    ///
780    /// # Panics
781    ///
782    /// Might malfunction or panic if more than one such call is running at once.
783    ///
784    /// (Ie, you must `.await` or drop the returned `Future`
785    /// before calling this method again.)
786    ///
787    /// Must be called and awaited within a future being run by `self`.
788    pub fn progress_until_stalled(&self) -> impl Future<Output = ()> {
789        let mut data = self.shared.lock();
790        assert!(
791            data.progressing_until_stalled.is_none(),
792            "progress_until_stalled called more than once"
793        );
794        trace!("MockExecutor progress_until_stalled...");
795        data.progressing_until_stalled = Some(ProgressingUntilStalled {
796            finished: Pending,
797            waker: None,
798        });
799        ProgressUntilStalledFuture {
800            shared: self.shared.clone(),
801        }
802    }
803}
804
805impl Future for ProgressUntilStalledFuture {
806    type Output = ();
807
808    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
809        let waker = cx.waker().clone();
810        let mut data = self.shared.lock();
811        let pus = data.progressing_until_stalled.as_mut();
812        trace!("MockExecutor progress_until_stalled polling... {:?}", &pus);
813        let pus = pus.expect("ProgressingUntilStalled missing");
814        pus.waker = Some(waker);
815        pus.finished
816    }
817}
818
819impl Drop for ProgressUntilStalledFuture {
820    fn drop(&mut self) {
821        self.shared.lock().progressing_until_stalled = None;
822    }
823}
824
825//---------- (sub)threads ----------
826
827impl MockExecutor {
828    /// Spawn a "Subthread", for processing in a sync context
829    ///
830    /// `call` will be run on a separate thread, called a "Subthread".
831    ///
832    /// But it will **not run simultaneously** with the executor,
833    /// nor with other Subthreads.
834    /// So Subthreads are somewhat like coroutines.
835    ///
836    /// `call` must be capable of making progress without waiting for any other Subthreads.
837    /// `call` may wait for async futures, using
838    /// [`subthread_block_on_future`](MockExecutor::subthread_block_on_future).
839    ///
840    /// Subthreads may be used for cpubound activity,
841    /// or synchronous IO (such as large volumes of disk activity),
842    /// provided that the synchronous code will reliably make progress,
843    /// without waiting (directly or indirectly) for any async task or Subthread -
844    /// except via `subthread_block_on_future`.
845    ///
846    /// # Subthreads vs raw `std::thread` threads
847    ///
848    /// Programs using `MockExecutor` may use `std::thread` threads directly.
849    /// However, this is not recommended.  There are severe limitations:
850    ///
851    ///  * Only a Subthread can re-enter the async context from sync code:
852    ///    this must be done with
853    ///    using [`subthread_block_on_future`](MockExecutor::subthread_block_on_future).
854    ///    (Re-entering the executor with
855    ///    [`block_on`](tor_rtcompat::ToplevelBlockOn::block_on)
856    ///    is not allowed.)
857    ///  * If async tasks want to suspend waiting for synchronous code,
858    ///    the synchronous code must run on a Subthread.
859    ///    This allows the `MockExecutor` to know when
860    ///    that synchronous code is still making progress.
861    ///    (This is needed for
862    ///    [`progress_until_stalled`](MockExecutor::progress_until_stalled)
863    ///    and the facilities which use it, such as
864    ///    [`MockRuntime::advance_until_stalled`](crate::MockRuntime::advance_until_stalled).)
865    ///  * Subthreads never run in parallel -
866    ///    they only run as scheduled deterministically by the `MockExecutor`.
867    ///    So using Subthreads eliminates a source of test nonndeterminism.
868    ///    (Execution order is still varied due to explicitly varying the scheduling policy.)
869    ///
870    /// # Panics, abuse, and malfunctions
871    ///
872    /// If `call` panics and unwinds, `spawn_subthread` yields `Err`.
873    /// The application code should to do something about it if this happens,
874    /// typically, logging errors, tearing things down, or failing a test case.
875    ///
876    /// If the executor doesn't run, the subthread will not run either, and will remain stuck.
877    /// (So, typically, if the thread supposed to run the executor panics,
878    /// for example because a future or the executor itself panics,
879    /// all the subthreads will become stuck - effectively, they'll be leaked.)
880    ///
881    /// `spawn_subthread` panics if OS thread spawning fails.
882    /// (Like `std::thread::spawn()` does.)
883    ///
884    /// `MockExecutor`s will malfunction or panic if
885    /// any executor invocation method (eg `block_on`) is called on a Subthread.
886    pub fn subthread_spawn<T: Send + 'static>(
887        &self,
888        desc: impl Display,
889        call: impl FnOnce() -> T + Send + 'static,
890    ) -> impl Future<Output = Result<T, Box<dyn Any + Send>>> + Unpin + Send + Sync + 'static {
891        let desc = desc.to_string();
892        let (output_tx, output_rx) = oneshot::channel();
893
894        // NB: we don't know which thread we're on!
895        // In principle we might be on another Subthread.
896        // So we can't context switch here.  That would be very confusing.
897        //
898        // Instead, we prepare the new Subthread as follows:
899        //   - There is a task in the executor
900        //   - The task is ready to be polled, whenever the executor decides to
901        //   - The thread starts running right away, but immediately waits until it is scheduled
902        // See `subthread_entrypoint`.
903
904        {
905            let mut data = self.shared.lock();
906            let id = data.insert_task(desc.clone(), TaskFutureInfo::Subthread);
907
908            let _: std::thread::JoinHandle<()> = std::thread::Builder::new()
909                .name(desc)
910                .spawn({
911                    let shared = self.shared.clone();
912                    move || shared.subthread_entrypoint(id, call, output_tx)
913                })
914                .expect("spawn failed");
915        }
916
917        output_rx.map(|r| {
918            r.unwrap_or_else(|_: Canceled| panic!("Subthread cancelled but should be impossible!"))
919        })
920    }
921
922    /// Call an async `Future` from a Subthread
923    ///
924    /// Blocks the Subthread, and arranges to run async tasks,
925    /// including `fut`, until `fut` completes.
926    ///
927    /// `fut` is polled on the executor thread, not on the Subthread.
928    /// (We may change that in the future, allowing passing a non-`Send` future.)
929    ///
930    /// # Panics, abuse, and malfunctions
931    ///
932    /// `subthread_block_on_future` will malfunction or panic
933    /// if called on a thread that isn't a Subthread from the same `MockExecutor`
934    /// (ie a thread made with [`spawn_subthread`](MockExecutor::subthread_spawn)).
935    ///
936    /// If `fut` itself panics, the executor will panic.
937    ///
938    /// If the executor isn't running, `subthread_block_on_future` will hang indefinitely.
939    /// See `spawn_subthread`.
940    #[allow(clippy::cognitive_complexity)] // Splitting this up would be worse
941    pub fn subthread_block_on_future<T: Send + 'static>(&self, fut: impl Future<Output = T>) -> T {
942        let id = match THREAD_DESCRIPTOR.get() {
943            ThreadDescriptor::Subthread(id) => id,
944            ThreadDescriptor::Executor => {
945                panic!("subthread_block_on_future called from MockExecutor thread (async task?)")
946            }
947            ThreadDescriptor::Foreign => panic!(
948    "subthread_block_on_future called on foreign thread (not spawned with spawn_subthread)"
949            ),
950        };
951        trace!("MockExecutor thread {id:?}, subthread_block_on_future...");
952        let mut fut = pin!(fut);
953
954        // We yield once before the first poll, and once after Ready, to shake up the
955        // execution order a bit, depending on the scheduling policy.
956        let yield_ = |set_awake| self.shared.subthread_yield(id, set_awake);
957        yield_(Some(SetAwake));
958
959        let ret = loop {
960            // Poll the provided future
961            trace!("MockExecutor thread {id:?}, s.t._block_on_future polling...");
962            let waker = ActualWaker::make_waker(&self.shared, id);
963            let mut cx = Context::from_waker(&waker);
964            let r: Poll<T> = fut.as_mut().poll(&mut cx);
965
966            if let Ready(r) = r {
967                trace!("MockExecutor thread {id:?}, s.t._block_on_future poll -> Ready");
968                break r;
969            }
970
971            // Pending.  Switch back to the exeuctor thread.
972            // When the future becomes ready, the Waker will be woken, waking the task,
973            // so that the executor will "poll" us again.
974            trace!("MockExecutor thread {id:?}, s.t._block_on_future poll -> Pending");
975
976            yield_(None);
977        };
978
979        yield_(Some(SetAwake));
980
981        trace!("MockExecutor thread {id:?}, subthread_block_on_future complete.");
982
983        ret
984    }
985}
986
987impl Shared {
988    /// Main entrypoint function for a Subthread
989    ///
990    /// Entered on a new `std::thread` thread created by
991    /// [`subthread_spawn`](MockExecutor::subthread_spawn).
992    ///
993    /// When `call` completes, sends its returned value `T` to `output_tx`.
994    fn subthread_entrypoint<T: Send + 'static>(
995        self: Arc<Self>,
996        id: TaskId,
997        call: impl FnOnce() -> T + Send + 'static,
998        output_tx: oneshot::Sender<Result<T, Box<dyn Any + Send>>>,
999    ) {
1000        THREAD_DESCRIPTOR.set(ThreadDescriptor::Subthread(id));
1001        trace!("MockExecutor thread {id:?}, entrypoint");
1002
1003        // We start out Awake, but we wait for the executor to tell us to run.
1004        // This will be done the first time the task is "polled".
1005        {
1006            let data = self.lock();
1007            self.thread_context_switch_waitfor_instruction_to_run(
1008                data,
1009                ThreadDescriptor::Subthread(id),
1010            );
1011        }
1012
1013        trace!("MockExecutor thread {id:?}, entering user code");
1014
1015        // Run the user's actual thread function.
1016        // This will typically reenter us via subthread_block_on_future.
1017        let ret = catch_unwind(AssertUnwindSafe(call));
1018
1019        trace!("MockExecutor thread {id:?}, completed user code");
1020
1021        // This makes the return value from subthread_spawn ready.
1022        // It will be polled by the executor in due course, presumably.
1023
1024        output_tx.send(ret).unwrap_or_else(
1025            #[allow(clippy::unnecessary_lazy_evaluations)]
1026            |_| {}, // receiver dropped, maybe executor dropped or something?
1027        );
1028
1029        {
1030            let mut data = self.lock();
1031
1032            // Never poll this task again (so never schedule this thread)
1033            let _: Task = data.tasks.remove(id).expect("Subthread task vanished!");
1034
1035            // Tell the executor it is scheduled now.
1036            // We carry on exiting, in parallel (holding the data lock).
1037            self.thread_context_switch_send_instruction_to_run(
1038                &mut data,
1039                ThreadDescriptor::Subthread(id),
1040                ThreadDescriptor::Executor,
1041            );
1042        }
1043    }
1044
1045    /// Yield back to the executor from a subthread
1046    ///
1047    /// Checks that things are in order
1048    /// (in particular, that this task is in the data structure as a subhtread)
1049    /// and switches to the executor thread.
1050    ///
1051    /// The caller must arrange that the task gets woken.
1052    ///
1053    /// With [`SetAwake`], sets our task awake, so that we'll be polled
1054    /// again as soon as we get to the top of the executor's queue.
1055    /// Otherwise, we'll be reentered after someone wakes a [`Waker`] for the task.
1056    fn subthread_yield(&self, us: TaskId, set_awake: Option<SetAwake>) {
1057        let mut data = self.lock();
1058        {
1059            let data = &mut *data;
1060            let task = data.tasks.get_mut(us).expect("Subthread task vanished!");
1061            match &task.fut {
1062                Some(TaskFutureInfo::Subthread) => {}
1063                other => panic!("subthread_block_on_future but TFI {other:?}"),
1064            };
1065            if let Some(SetAwake) = set_awake {
1066                task.set_awake(us, &mut data.awake);
1067            }
1068        }
1069        self.thread_context_switch(
1070            data,
1071            ThreadDescriptor::Subthread(us),
1072            ThreadDescriptor::Executor,
1073        );
1074    }
1075
1076    /// Switch from (sub)thread `us` to (sub)thread `them`
1077    ///
1078    /// Returns when someone calls `thread_context_switch(.., us)`.
1079    fn thread_context_switch(
1080        &self,
1081        mut data: MutexGuard<Data>,
1082        us: ThreadDescriptor,
1083        them: ThreadDescriptor,
1084    ) {
1085        trace!("MockExecutor thread {us:?}, switching to {them:?}");
1086        self.thread_context_switch_send_instruction_to_run(&mut data, us, them);
1087        self.thread_context_switch_waitfor_instruction_to_run(data, us);
1088    }
1089
1090    /// Instruct the (sub)thread `them` to run
1091    ///
1092    /// Update `thread_to_run`, which will wake up `them`'s
1093    /// call to `thread_context_switch_waitfor_instruction_to_run`.
1094    ///
1095    /// Must be called from (sub)thread `us`.
1096    /// Part of `thread_context_switch`, not normally called directly.
1097    fn thread_context_switch_send_instruction_to_run(
1098        &self,
1099        data: &mut MutexGuard<Data>,
1100        us: ThreadDescriptor,
1101        them: ThreadDescriptor,
1102    ) {
1103        assert_eq!(data.thread_to_run, us);
1104        data.thread_to_run = them;
1105        self.thread_condvar.notify_all();
1106    }
1107
1108    /// Await an instruction for this thread, `us`, to run
1109    ///
1110    /// Waits for `thread_to_run` to be `us`,
1111    /// waiting for `thread_condvar` as necessary.
1112    ///
1113    /// Part of `thread_context_switch`, not normally called directly.
1114    fn thread_context_switch_waitfor_instruction_to_run(
1115        &self,
1116        data: MutexGuard<Data>,
1117        us: ThreadDescriptor,
1118    ) {
1119        #[allow(let_underscore_lock)]
1120        let _: MutexGuard<_> = self
1121            .thread_condvar
1122            .wait_while(data, |data| {
1123                let live = data.thread_to_run;
1124                let resume = live == us;
1125                if resume {
1126                    trace!("MockExecutor thread {us:?}, resuming");
1127                } else {
1128                    trace!("MockExecutor thread {us:?}, waiting for {live:?}");
1129                }
1130                // We're in `.wait_while`, not `.wait_until`.  Confusing.
1131                !resume
1132            })
1133            .expect("data lock poisoned");
1134    }
1135}
1136
1137//---------- ancillary and convenience functions ----------
1138
1139/// Trait to let us assert at compile time that something is nicely `Sync` etc.
1140#[allow(dead_code)] // yes, we don't *use* anything from this trait
1141trait EnsureSyncSend: Sync + Send + 'static {}
1142impl EnsureSyncSend for ActualWaker {}
1143impl EnsureSyncSend for MockExecutor {}
1144
1145impl MockExecutor {
1146    /// Return the number of tasks running in this executor
1147    ///
1148    /// One possible use is for a test case to check that task(s)
1149    /// that ought to have exited, have indeed done so.
1150    ///
1151    /// In the usual case, the answer will be at least 1,
1152    /// because it counts the future passed to
1153    /// [`block_on`](MockExecutor::block_on)
1154    /// (perhaps via [`MockRuntime::test_with_various`](crate::MockRuntime::test_with_various)).
1155    pub fn n_tasks(&self) -> usize {
1156        self.shared.lock().tasks.len()
1157    }
1158}
1159
1160impl Shared {
1161    /// Lock and obtain the guard
1162    ///
1163    /// Convenience method which panics on poison
1164    fn lock(&self) -> MutexGuard<Data> {
1165        self.data.lock().expect("data lock poisoned")
1166    }
1167}
1168
1169impl Task {
1170    /// Set task `id` to `Awake` and arrange that it will be polled.
1171    fn set_awake(&mut self, id: TaskId, data_awake: &mut VecDeque<TaskId>) {
1172        match self.state {
1173            Awake => {}
1174            Asleep(_) => {
1175                self.state = Awake;
1176                data_awake.push_back(id);
1177            }
1178        }
1179    }
1180}
1181
1182//---------- ActualWaker as RawWaker ----------
1183
1184/// Using [`ActualWaker`] in a [`RawWaker`]
1185///
1186/// We need to make a
1187/// [`Waker`] (the safe, type-erased, waker, used by actual futures)
1188/// which contains an
1189/// [`ActualWaker`] (our actual waker implementation, also safe).
1190///
1191/// `std` offers `Waker::from<Arc<impl Wake>>`.
1192/// But we want a bespoke `Clone` implementation, so we don't want to use `Arc`.
1193///
1194/// So instead, we implement the `RawWaker` API in terms of `ActualWaker`.
1195/// We keep the `ActualWaker` in a `Box`, and actually `clone` it (and the `Box`).
1196///
1197/// SAFETY
1198///
1199///  * The data pointer is `Box::<ActualWaker>::into_raw()`
1200///  * We share these when we clone
1201///  * No-one is allowed `&mut ActualWaker` unless there are no other clones
1202///  * So we may make references `&ActualWaker`
1203impl ActualWaker {
1204    /// Wrap up an [`ActualWaker`] as a type-erased [`Waker`] for passing to futures etc.
1205    fn new_waker(self) -> Waker {
1206        unsafe { Waker::from_raw(self.raw_new()) }
1207    }
1208
1209    /// Helper: wrap up an [`ActualWaker`] as a [`RawWaker`].
1210    fn raw_new(self) -> RawWaker {
1211        let self_: Box<ActualWaker> = self.into();
1212        let self_: *mut ActualWaker = Box::into_raw(self_);
1213        let self_: *const () = self_ as _;
1214        RawWaker::new(self_, &RAW_WAKER_VTABLE)
1215    }
1216
1217    /// Implementation of [`RawWakerVTable`]'s `clone`
1218    unsafe fn raw_clone(self_: *const ()) -> RawWaker {
1219        let self_: *const ActualWaker = self_ as _;
1220        let self_: &ActualWaker = self_.as_ref().unwrap_unchecked();
1221        let copy: ActualWaker = self_.clone();
1222        copy.raw_new()
1223    }
1224
1225    /// Implementation of [`RawWakerVTable`]'s `wake`
1226    unsafe fn raw_wake(self_: *const ()) {
1227        Self::raw_wake_by_ref(self_);
1228        Self::raw_drop(self_);
1229    }
1230
1231    /// Implementation of [`RawWakerVTable`]'s `wake_ref_by`
1232    unsafe fn raw_wake_by_ref(self_: *const ()) {
1233        let self_: *const ActualWaker = self_ as _;
1234        let self_: &ActualWaker = self_.as_ref().unwrap_unchecked();
1235        self_.wake();
1236    }
1237
1238    /// Implementation of [`RawWakerVTable`]'s `drop`
1239    unsafe fn raw_drop(self_: *const ()) {
1240        let self_: *mut ActualWaker = self_ as _;
1241        let self_: Box<ActualWaker> = Box::from_raw(self_);
1242        drop(self_);
1243    }
1244}
1245
1246/// vtable for `Box<ActualWaker>` as `RawWaker`
1247//
1248// This ought to be in the impl block above, but
1249//   "associated `static` items are not allowed"
1250static RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
1251    ActualWaker::raw_clone,
1252    ActualWaker::raw_wake,
1253    ActualWaker::raw_wake_by_ref,
1254    ActualWaker::raw_drop,
1255);
1256
1257//---------- Sleep location tracking and dumping ----------
1258
1259/// We record "where a future went to sleep" as (just) a backtrace
1260///
1261/// This type alias allows us to mock `Backtrace` for miri.
1262/// (It also insulates from future choices about sleep location representation.0
1263#[cfg(not(miri))]
1264type SleepLocation = Backtrace;
1265
1266impl Data {
1267    /// Dump tasks and their sleep location backtraces
1268    fn dump_backtraces(&self, f: &mut fmt::Formatter) -> fmt::Result {
1269        for (id, task) in self.tasks.iter() {
1270            let prefix = |f: &mut fmt::Formatter| write!(f, "{id:?}={task:?}: ");
1271            match &task.state {
1272                Awake => {
1273                    prefix(f)?;
1274                    writeln!(f, "awake")?;
1275                }
1276                Asleep(locs) => {
1277                    let n = locs.len();
1278                    for (i, loc) in locs.iter().enumerate() {
1279                        prefix(f)?;
1280                        writeln!(f, "asleep, backtrace {i}/{n}:\n{loc}",)?;
1281                    }
1282                    if n == 0 {
1283                        prefix(f)?;
1284                        writeln!(f, "asleep, no backtraces, Waker never cloned, stuck!",)?;
1285                    }
1286                }
1287            }
1288        }
1289        writeln!(
1290            f,
1291            "\nNote: there might be spurious traces, see docs for MockExecutor::debug_dump\n"
1292        )?;
1293        Ok(())
1294    }
1295}
1296
1297/// Track sleep locations via `<Waker as Clone>`.
1298///
1299/// See [`MockExecutor::debug_dump`] for the explanation.
1300impl Clone for ActualWaker {
1301    fn clone(&self) -> Self {
1302        let id = self.id;
1303
1304        if let Some(data) = self.upgrade_data() {
1305            // If the executor is gone, there is nothing to adjust
1306            let mut data = data.lock();
1307            if let Some(task) = data.tasks.get_mut(self.id) {
1308                match &mut task.state {
1309                    Awake => trace!("MockExecutor cloned waker for awake task {id:?}"),
1310                    Asleep(locs) => locs.push(SleepLocation::force_capture()),
1311                }
1312            } else {
1313                trace!("MockExecutor cloned waker for dead task {id:?}");
1314            }
1315        }
1316
1317        ActualWaker {
1318            data: self.data.clone(),
1319            id,
1320        }
1321    }
1322}
1323
1324//---------- API for full debug dump ----------
1325
1326/// Debugging dump of a `MockExecutor`'s state
1327///
1328/// Returned by [`MockExecutor::as_debug_dump`]
1329//
1330// Existence implies backtraces have been resolved
1331//
1332// We use `Either` so that we can also use this internally when we have &mut Data.
1333pub struct DebugDump<'a>(Either<&'a Data, MutexGuard<'a, Data>>);
1334
1335impl MockExecutor {
1336    /// Dump the executor's state including backtraces of waiting tasks, to stderr
1337    ///
1338    /// This is considerably more extensive than simply
1339    /// `MockExecutor as Debug`.
1340    ///
1341    /// (This is a convenience method, which wraps
1342    /// [`MockExecutor::as_debug_dump()`].
1343    ///
1344    /// ### Backtrace salience (possible spurious traces)
1345    ///
1346    /// **Summary**
1347    ///
1348    /// The technique used to capture backtraces when futures sleep is not 100% exact.
1349    /// It will usually show all the actual sleeping sites,
1350    /// but it might also show other backtraces which were part of
1351    /// the implementation of some complex relevant future.
1352    ///
1353    /// **Details**
1354    ///
1355    /// When a future's implementation wants to sleep,
1356    /// it needs to record the [`Waker`] (from the [`Context`])
1357    /// so that the "other end" can call `.wake()` on it later,
1358    /// when the future should be woken.
1359    ///
1360    /// Since `Context.waker()` gives `&Waker`, borrowed from the `Context`,
1361    /// the future must clone the `Waker`,
1362    /// and it must do so in within the `poll()` call.
1363    ///
1364    /// A future which is waiting in a `select!` will typically
1365    /// show multiple traces, one for each branch.
1366    /// But,
1367    /// if a future sleeps on one thing, and then when polled again later,
1368    /// sleeps on something different, without waking up in between,
1369    /// both backtrace locations will be shown.
1370    /// And,
1371    /// a complicated future contraption *might* clone the `Waker` more times.
1372    /// So not every backtrace will necessarily be informative.
1373    ///
1374    /// ### Panics
1375    ///
1376    /// Panics on write errors.
1377    pub fn debug_dump(&self) {
1378        self.as_debug_dump().to_stderr();
1379    }
1380
1381    /// Dump the executor's state including backtraces of waiting tasks
1382    ///
1383    /// This is considerably more extensive than simply
1384    /// `MockExecutor as Debug`.
1385    ///
1386    /// Returns an object for formatting with [`Debug`].
1387    /// To simply print the dump to stderr (eg in a test),
1388    /// use [`.debug_dump()`](MockExecutor::debug_dump).
1389    ///
1390    /// **Backtrace salience (possible spurious traces)** -
1391    /// see [`.debug_dump()`](MockExecutor::debug_dump).
1392    pub fn as_debug_dump(&self) -> DebugDump {
1393        let data = self.shared.lock();
1394        DebugDump(Right(data))
1395    }
1396}
1397
1398impl Data {
1399    /// Convenience function: dump including backtraces, to stderr
1400    fn debug_dump(&mut self) {
1401        DebugDump(Left(self)).to_stderr();
1402    }
1403}
1404
1405impl DebugDump<'_> {
1406    /// Convenience function: dump tasks and backtraces to stderr
1407    #[allow(clippy::wrong_self_convention)] // "to_stderr" doesn't mean "convert to stderr"
1408    fn to_stderr(self) {
1409        write!(io::stderr().lock(), "{:?}", self)
1410            .unwrap_or_else(|e| error_report!(e, "failed to write debug dump to stderr"));
1411    }
1412}
1413
1414//---------- bespoke Debug impls ----------
1415
1416impl Debug for DebugDump<'_> {
1417    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1418        let self_: &Data = &self.0;
1419
1420        writeln!(f, "MockExecutor state:\n{self_:#?}")?;
1421        writeln!(f, "MockExecutor task dump:")?;
1422        self_.dump_backtraces(f)?;
1423
1424        Ok(())
1425    }
1426}
1427
1428// See `impl Debug for Data` for notes on the output
1429impl Debug for Task {
1430    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1431        let Task { desc, state, fut } = self;
1432        write!(f, "{:?}", desc)?;
1433        write!(f, "=")?;
1434        match fut {
1435            None => write!(f, "P")?,
1436            Some(TaskFutureInfo::Normal(_)) => write!(f, "f")?,
1437            Some(TaskFutureInfo::Main) => write!(f, "m")?,
1438            Some(TaskFutureInfo::Subthread) => write!(f, "T")?,
1439        }
1440        match state {
1441            Awake => write!(f, "W")?,
1442            Asleep(locs) => write!(f, "s{}", locs.len())?,
1443        };
1444        Ok(())
1445    }
1446}
1447
1448/// Helper: `Debug`s as a list of tasks, given the `Data` for lookups and a list of the ids
1449///
1450/// `Task`s in `Data` are printed as `Ti(ID)"SPEC"=FLAGS"`.
1451///
1452/// `FLAGS` are:
1453///
1454///  * `T`: this task is for a Subthread (from subthread_spawn).
1455///  * `P`: this task is being polled (its `TaskFutureInfo` is absent)
1456///  * `f`: this is a normal task with a future and its future is present in `Data`
1457///  * `m`: this is the main task from `block_on`
1458///
1459///  * `W`: the task is awake
1460///  * `s<n>`: the task is asleep, and `<n>` is the number of recorded sleeping locations
1461//
1462// We do it this way because the naive dump from derive is very expansive
1463// and makes it impossible to see the wood for the trees.
1464// This very compact representation it easier to find a task of interest in the output.
1465//
1466// This is implemented in `impl Debug for Task`.
1467//
1468//
1469// rustc doesn't think automatically-derived Debug impls count for whether a thing is used.
1470// This has caused quite some fallout.  https://github.com/rust-lang/rust/pull/85200
1471// I think derive_more emits #[automatically_derived], so that even though we use this
1472// in our Debug impl, that construction is unused.
1473#[allow(dead_code)]
1474struct DebugTasks<'d, F>(&'d Data, F);
1475
1476// See `impl Debug for Data` for notes on the output
1477impl<F, I> Debug for DebugTasks<'_, F>
1478where
1479    F: Fn() -> I,
1480    I: Iterator<Item = TaskId>,
1481{
1482    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1483        let DebugTasks(data, ids) = self;
1484        for (id, delim) in izip!(ids(), chain!(iter::once(""), iter::repeat(" ")),) {
1485            write!(f, "{delim}{id:?}")?;
1486            match data.tasks.get(id) {
1487                None => write!(f, "-")?,
1488                Some(task) => write!(f, "={task:?}")?,
1489            }
1490        }
1491        Ok(())
1492    }
1493}
1494
1495/// Mock `Backtrace` for miri
1496///
1497/// See also the not-miri `type SleepLocation`, alias above.
1498#[cfg(miri)]
1499mod miri_sleep_location {
1500    #[derive(Debug, derive_more::Display)]
1501    #[display("<SleepLocation>")]
1502    pub(super) struct SleepLocation {}
1503
1504    impl SleepLocation {
1505        pub(super) fn force_capture() -> Self {
1506            SleepLocation {}
1507        }
1508    }
1509}
1510#[cfg(miri)]
1511use miri_sleep_location::SleepLocation;
1512
1513#[cfg(test)]
1514mod test {
1515    // @@ begin test lint list maintained by maint/add_warning @@
1516    #![allow(clippy::bool_assert_comparison)]
1517    #![allow(clippy::clone_on_copy)]
1518    #![allow(clippy::dbg_macro)]
1519    #![allow(clippy::mixed_attributes_style)]
1520    #![allow(clippy::print_stderr)]
1521    #![allow(clippy::print_stdout)]
1522    #![allow(clippy::single_char_pattern)]
1523    #![allow(clippy::unwrap_used)]
1524    #![allow(clippy::unchecked_duration_subtraction)]
1525    #![allow(clippy::useless_vec)]
1526    #![allow(clippy::needless_pass_by_value)]
1527    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1528    use super::*;
1529    use futures::channel::mpsc;
1530    use futures::{SinkExt as _, StreamExt as _};
1531    use strum::IntoEnumIterator;
1532    use tracing::info;
1533
1534    #[cfg(not(miri))] // trace! asks for the time, which miri doesn't support
1535    use tracing_test::traced_test;
1536
1537    fn various_mock_executors() -> impl Iterator<Item = MockExecutor> {
1538        // This duplicates the part of the logic in MockRuntime::test_with_various which
1539        // relates to MockExecutor, because we don't have a MockRuntime::builder.
1540        // The only parameter to MockExecutor is its scheduling policy, so this seems fine.
1541        SchedulingPolicy::iter().map(|scheduling| {
1542            eprintln!("===== MockExecutor::with_scheduling({scheduling:?}) =====");
1543            MockExecutor::with_scheduling(scheduling)
1544        })
1545    }
1546
1547    #[cfg_attr(not(miri), traced_test)]
1548    #[test]
1549    fn simple() {
1550        let runtime = MockExecutor::default();
1551        let val = runtime.block_on(async { 42 });
1552        assert_eq!(val, 42);
1553    }
1554
1555    #[cfg_attr(not(miri), traced_test)]
1556    #[test]
1557    fn stall() {
1558        let runtime = MockExecutor::default();
1559
1560        runtime.block_on({
1561            let runtime = runtime.clone();
1562            async move {
1563                const N: usize = 3;
1564                let (mut txs, mut rxs): (Vec<_>, Vec<_>) =
1565                    (0..N).map(|_| mpsc::channel::<usize>(5)).unzip();
1566
1567                let mut rx_n = rxs.pop().unwrap();
1568
1569                for (i, mut rx) in rxs.into_iter().enumerate() {
1570                    runtime.spawn_identified(i, {
1571                        let mut txs = txs.clone();
1572                        async move {
1573                            loop {
1574                                eprintln!("task {i} rx...");
1575                                let v = rx.next().await.unwrap();
1576                                let nv = v + 1;
1577                                eprintln!("task {i} rx {v}, tx {nv}");
1578                                let v = nv;
1579                                txs[v].send(v).await.unwrap();
1580                            }
1581                        }
1582                    });
1583                }
1584
1585                dbg!();
1586                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1587
1588                dbg!();
1589                runtime.progress_until_stalled().await;
1590
1591                dbg!();
1592                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1593
1594                dbg!();
1595                txs[0].send(0).await.unwrap();
1596
1597                dbg!();
1598                runtime.progress_until_stalled().await;
1599
1600                dbg!();
1601                let r = rx_n.next().await;
1602                assert_eq!(r, Some(N - 1));
1603
1604                dbg!();
1605                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1606
1607                runtime.spawn_identified("tx", {
1608                    let txs = txs.clone();
1609                    async {
1610                        eprintln!("sending task...");
1611                        for (i, mut tx) in txs.into_iter().enumerate() {
1612                            eprintln!("sending 0 to {i}...");
1613                            tx.send(0).await.unwrap();
1614                        }
1615                        eprintln!("sending task done");
1616                    }
1617                });
1618
1619                runtime.debug_dump();
1620
1621                for i in 0..txs.len() {
1622                    eprintln!("main {i} wait stall...");
1623                    runtime.progress_until_stalled().await;
1624                    eprintln!("main {i} rx wait...");
1625                    let r = rx_n.next().await;
1626                    eprintln!("main {i} rx = {r:?}");
1627                    assert!(r == Some(0) || r == Some(N - 1));
1628                }
1629
1630                eprintln!("finishing...");
1631                runtime.progress_until_stalled().await;
1632                eprintln!("finished.");
1633            }
1634        });
1635    }
1636
1637    #[cfg_attr(not(miri), traced_test)]
1638    #[test]
1639    fn spawn_blocking() {
1640        let runtime = MockExecutor::default();
1641
1642        runtime.block_on({
1643            let runtime = runtime.clone();
1644            async move {
1645                let thr_1 = runtime.spawn_blocking(|| 42);
1646                let thr_2 = runtime.spawn_blocking(|| 99);
1647
1648                assert_eq!(thr_2.await, 99);
1649                assert_eq!(thr_1.await, 42);
1650            }
1651        });
1652    }
1653
1654    #[cfg_attr(not(miri), traced_test)]
1655    #[test]
1656    fn drop_reentrancy() {
1657        // Check that dropping a completed task future is done *outside* the data lock.
1658        // Involves a contrived future whose Drop impl reenters the executor.
1659        //
1660        // If `_fut_drop_late = fut` in execute_until_first_stall (the main loop)
1661        // is replaced with `drop(fut)` (dropping the future at the wrong moment),
1662        // we do indeed get deadlock, so this test case is working.
1663
1664        struct ReentersOnDrop {
1665            runtime: MockExecutor,
1666        }
1667        impl Future for ReentersOnDrop {
1668            type Output = ();
1669            fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<()> {
1670                Poll::Ready(())
1671            }
1672        }
1673        impl Drop for ReentersOnDrop {
1674            fn drop(&mut self) {
1675                self.runtime
1676                    .spawn_identified("dummy", futures::future::ready(()));
1677            }
1678        }
1679
1680        for runtime in various_mock_executors() {
1681            runtime.block_on(async {
1682                runtime.spawn_identified("trapper", {
1683                    let runtime = runtime.clone();
1684                    ReentersOnDrop { runtime }
1685                });
1686            });
1687        }
1688    }
1689
1690    #[cfg_attr(not(miri), traced_test)]
1691    #[test]
1692    fn subthread_oneshot() {
1693        for runtime in various_mock_executors() {
1694            runtime.block_on(async {
1695                let (tx, rx) = oneshot::channel();
1696                info!("spawning subthread");
1697                let thr = runtime.subthread_spawn("thr1", {
1698                    let runtime = runtime.clone();
1699                    move || {
1700                        info!("subthread_block_on_future...");
1701                        let i = runtime.subthread_block_on_future(rx).unwrap();
1702                        info!("subthread_block_on_future => {i}");
1703                        i + 1
1704                    }
1705                });
1706                info!("main task sending");
1707                tx.send(12).unwrap();
1708                info!("main task sent");
1709                let r = thr.await.unwrap();
1710                info!("main task thr => {r}");
1711                assert_eq!(r, 13);
1712            });
1713        }
1714    }
1715
1716    #[cfg_attr(not(miri), traced_test)]
1717    #[test]
1718    #[allow(clippy::cognitive_complexity)] // It's is not that complicated, really.
1719    fn subthread_pingpong() {
1720        for runtime in various_mock_executors() {
1721            runtime.block_on(async {
1722                let (mut i_tx, mut i_rx) = mpsc::channel(1);
1723                let (mut o_tx, mut o_rx) = mpsc::channel(1);
1724                info!("spawning subthread");
1725                let thr = runtime.subthread_spawn("thr", {
1726                    let runtime = runtime.clone();
1727                    move || {
1728                        while let Some(i) = {
1729                            info!("thread receiving ...");
1730                            runtime.subthread_block_on_future(i_rx.next())
1731                        } {
1732                            let o = i + 12;
1733                            info!("thread received {i}, sending {o}");
1734                            runtime.subthread_block_on_future(o_tx.send(o)).unwrap();
1735                            info!("thread sent {o}");
1736                        }
1737                        info!("thread exiting");
1738                        42
1739                    }
1740                });
1741                for i in 0..2 {
1742                    info!("main task sending {i}");
1743                    i_tx.send(i).await.unwrap();
1744                    info!("main task sent {i}");
1745                    let o = o_rx.next().await.unwrap();
1746                    info!("main task recv => {o}");
1747                    assert_eq!(o, i + 12);
1748                }
1749                info!("main task dropping sender");
1750                drop(i_tx);
1751                info!("main task awaiting thread");
1752                let r = thr.await.unwrap();
1753                info!("main task complete");
1754                assert_eq!(r, 42);
1755            });
1756        }
1757    }
1758}