tor_rtmock/
task.rs

1//! Executor for running tests with mocked environment
2//!
3//! See [`MockExecutor`]
4
5use std::any::Any;
6use std::cell::Cell;
7use std::collections::VecDeque;
8use std::fmt::{self, Debug, Display};
9use std::future::Future;
10use std::io::{self, Write as _};
11use std::iter;
12use std::mem;
13use std::panic::{catch_unwind, panic_any, AssertUnwindSafe};
14use std::pin::{pin, Pin};
15use std::sync::{Arc, Mutex, MutexGuard, Weak};
16use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
17
18use futures::future::Map;
19use futures::pin_mut;
20use futures::task::{FutureObj, Spawn, SpawnError};
21use futures::FutureExt as _;
22
23use assert_matches::assert_matches;
24use educe::Educe;
25use itertools::Either::{self, *};
26use itertools::{chain, izip};
27use slotmap_careful::DenseSlotMap;
28use std::backtrace::Backtrace;
29use strum::EnumIter;
30
31// NB: when using traced_test, the trace! and error! output here is generally suppressed
32// in tests of other crates.  To see it, you can write something like this
33// (in the dev-dependencies of the crate whose tests you're running):
34//    tracing-test = { version = "0.2.4", features = ["no-env-filter"] }
35use tracing::{error, trace};
36
37use oneshot_fused_workaround::{self as oneshot, Canceled, Receiver};
38use tor_error::error_report;
39use tor_rtcompat::{Blocking, ToplevelBlockOn};
40
41use Poll::*;
42use TaskState::*;
43
44/// Type-erased future, one for each of our (normal) tasks
45type TaskFuture = FutureObj<'static, ()>;
46
47/// Future for the argument to `block_on`, which is handled specially
48type MainFuture<'m> = Pin<&'m mut dyn Future<Output = ()>>;
49
50//---------- principal data structures ----------
51
52/// Executor for running tests with mocked environment
53///
54/// For test cases which don't actually wait for anything in the real world.
55///
56/// This is the executor.
57/// It implements [`Spawn`] and [`ToplevelBlockOn`]
58///
59/// It will usually be used as part of a `MockRuntime`.
60///
61/// To run futures, call [`ToplevelBlockOn::block_on`]
62///
63/// # Restricted environment
64///
65/// Tests run with this executor must not attempt to block
66/// on anything "outside":
67/// every future that anything awaits must (eventually) be woken directly
68/// *by some other task* in the same test case.
69///
70/// (By directly we mean that the [`Waker::wake`] call is made
71/// by that waking future, before that future itself awaits anything.)
72///
73/// # Panics
74///
75/// The executor will panic
76/// if the toplevel future (passed to `block_on`)
77/// doesn't complete (without externally blocking),
78/// but instead waits for something.
79///
80/// The executor will malfunction or panic if reentered.
81/// (Eg, if `block_on` is reentered.)
82#[derive(Clone, Default, Educe)]
83#[educe(Debug)]
84pub struct MockExecutor {
85    /// Mutable state
86    #[educe(Debug(ignore))]
87    shared: Arc<Shared>,
88}
89
90/// Shared state and ancillary information
91///
92/// This is always within an `Arc`.
93#[derive(Default)]
94struct Shared {
95    /// Shared state
96    data: Mutex<Data>,
97    /// Condition variable for thread scheduling
98    ///
99    /// Signaled when [`Data.thread_to_run`](struct.Data.html#structfield.thread_to_run)
100    /// is modified.
101    thread_condvar: std::sync::Condvar,
102}
103
104/// Task id, module to hide `Ti` alias
105mod task_id {
106    slotmap_careful::new_key_type! {
107        /// Task ID, usually called `TaskId`
108        ///
109        /// Short name in special `task_id` module so that [`Debug`] is nice
110        pub(super) struct Ti;
111    }
112}
113use task_id::Ti as TaskId;
114
115/// Executor's state
116///
117/// ### Task state machine
118///
119/// A task is created in `tasks`, `Awake`, so also in `awake`.
120///
121/// When we poll it, we take it out of `awake` and set it to `Asleep`,
122/// and then call `poll()`.
123/// Any time after that, it can be made `Awake` again (and put back onto `awake`)
124/// by the waker ([`ActualWaker`], wrapped in [`Waker`]).
125///
126/// The task's future is of course also present here in this data structure.
127/// However, during poll we must release the lock,
128/// so we cannot borrow the future from `Data`.
129/// Instead, we move it out.  So `Task.fut` is an `Option`.
130///
131/// ### "Main" task - the argument to `block_on`
132///
133/// The signature of `BlockOn::block_on` accepts a non-`'static` future
134/// (and a non-`Send`/`Sync` one).
135///
136/// So we cannot store that future in `Data` because `Data` is `'static`.
137/// Instead, this main task future is passed as an argument down the call stack.
138/// In the data structure we simply store a placeholder, `TaskFutureInfo::Main`.
139#[derive(Educe, derive_more::Debug)]
140#[educe(Default)]
141struct Data {
142    /// Tasks
143    ///
144    /// Includes tasks spawned with `spawn`,
145    /// and also the future passed to `block_on`.
146    #[debug("{:?}", DebugTasks(self, || tasks.keys()))]
147    tasks: DenseSlotMap<TaskId, Task>,
148
149    /// `awake` lists precisely: tasks that are `Awake`, plus maybe stale `TaskId`s
150    ///
151    /// Tasks are pushed onto the *back* when woken,
152    /// so back is the most recently woken.
153    #[debug("{:?}", DebugTasks(self, || awake.iter().cloned()))]
154    awake: VecDeque<TaskId>,
155
156    /// If a future from `progress_until_stalled` exists
157    progressing_until_stalled: Option<ProgressingUntilStalled>,
158
159    /// Scheduling policy
160    scheduling: SchedulingPolicy,
161
162    /// (Sub)thread we want to run now
163    ///
164    /// At any one time only one thread is meant to be running.
165    /// Other threads are blocked in condvar wait, waiting for this to change.
166    ///
167    /// **Modified only** within
168    /// [`thread_context_switch_send_instruction_to_run`](Shared::thread_context_switch_send_instruction_to_run),
169    /// which takes responsibility for preserving the following **invariants**:
170    ///
171    ///  1. no-one but the named thread is allowed to modify this field.
172    ///  2. after modifying this field, signal `thread_condvar`
173    #[educe(Default(expression = "ThreadDescriptor::Executor"))]
174    thread_to_run: ThreadDescriptor,
175}
176
177/// How we should schedule?
178#[derive(Debug, Clone, Default, EnumIter)]
179#[non_exhaustive]
180pub enum SchedulingPolicy {
181    /// Task *most* recently woken is run
182    ///
183    /// This is the default.
184    ///
185    /// It will expose starvation bugs if a task never sleeps.
186    /// (Which is a good thing in tests.)
187    #[default]
188    Stack,
189    /// Task *least* recently woken is run.
190    Queue,
191}
192
193/// Record of a single task
194///
195/// Tracks a spawned task, or the main task (the argument to `block_on`).
196///
197/// Stored in [`Data`]`.tasks`.
198struct Task {
199    /// For debugging output
200    desc: String,
201    /// Has this been woken via a waker?  (And is it in `Data.awake`?)
202    ///
203    /// **Set to `Awake` only by [`Task::set_awake`]**,
204    /// preserving the invariant that
205    /// every `Awake` task is in [`Data.awake`](struct.Data.html#structfield.awake).
206    state: TaskState,
207    /// The actual future (or a placeholder for it)
208    ///
209    /// May be `None` briefly in the executor main loop, because we've
210    /// temporarily moved it out so we can poll it,
211    /// or if this is a Subthread task which is currently running sync code
212    /// (in which case we're blocked in the executor waiting to be
213    /// woken up by [`thread_context_switch`](Shared::thread_context_switch).
214    ///
215    /// Note that the `None` can be observed outside the main loop, because
216    /// the main loop unlocks while it polls, so other (non-main-loop) code
217    /// might see it.
218    fut: Option<TaskFutureInfo>,
219}
220
221/// A future as stored in our record of a [`Task`]
222#[derive(Educe)]
223#[educe(Debug)]
224enum TaskFutureInfo {
225    /// The [`Future`].  All is normal.
226    Normal(#[educe(Debug(ignore))] TaskFuture),
227    /// The future isn't here because this task is the main future for `block_on`
228    Main,
229    /// This task is actually a [`Subthread`](MockExecutor::subthread_spawn)
230    ///
231    /// Instead of polling it, we'll switch to it with
232    /// [`thread_context_switch`](Shared::thread_context_switch).
233    Subthread,
234}
235
236/// State of a task - do we think it needs to be polled?
237///
238/// Stored in [`Task`]`.state`.
239#[derive(Debug)]
240enum TaskState {
241    /// Awake - needs to be polled
242    ///
243    /// Established by [`waker.wake()`](Waker::wake)
244    Awake,
245    /// Asleep - does *not* need to be polled
246    ///
247    /// Established each time just before we call the future's [`poll`](Future::poll)
248    Asleep(Vec<SleepLocation>),
249}
250
251/// Actual implementor of `Wake` for use in a `Waker`
252///
253/// Futures (eg, channels from [`futures`]) will use this to wake a task
254/// when it should be polled.
255///
256/// This type must not be `Cloned` with the `Data` lock held.
257/// Consequently, a `Waker` mustn't either.
258struct ActualWaker {
259    /// Executor state
260    ///
261    /// The Waker mustn't to hold a strong reference to the executor,
262    /// since typically a task holds a future that holds a Waker,
263    /// and the executor holds the task - so that would be a cycle.
264    data: Weak<Shared>,
265
266    /// Which task this is
267    id: TaskId,
268}
269
270/// State used for an in-progress call to
271/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`]
272///
273/// If present in [`Data`], an (async) call to `progress_until_stalled`
274/// is in progress.
275///
276/// The future from `progress_until_stalled`, [`ProgressUntilStalledFuture`]
277/// is a normal-ish future.
278/// It can be polled in the normal way.
279/// When it is polled, it looks here, in `finished`, to see if it's `Ready`.
280///
281/// The future is made ready, and woken (via `waker`),
282/// by bespoke code in the task executor loop.
283///
284/// When `ProgressUntilStalledFuture` (maybe completes and) is dropped,
285/// its `Drop` impl is used to remove this from `Data.progressing_until_stalled`.
286#[derive(Debug)]
287struct ProgressingUntilStalled {
288    /// Have we, in fact, stalled?
289    ///
290    /// Made `Ready` by special code in the executor loop
291    finished: Poll<()>,
292
293    /// Waker
294    ///
295    /// Signalled by special code in the executor loop
296    waker: Option<Waker>,
297}
298
299/// Future from
300/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`]
301///
302/// See [`ProgressingUntilStalled`] for an overview of this aspect of the contraption.
303///
304/// Existence of this struct implies `Data.progressing_until_stalled` is `Some`.
305/// There can only be one at a time.
306#[derive(Educe)]
307#[educe(Debug)]
308struct ProgressUntilStalledFuture {
309    /// Executor's state; this future's state is in `.progressing_until_stalled`
310    #[educe(Debug(ignore))]
311    shared: Arc<Shared>,
312}
313
314/// Identifies a thread we know about - the executor thread, or a Subthread
315///
316/// Not related to `std::thread::ThreadId`.
317///
318/// See [`spawn_subthread`](MockExecutor::subthread_spawn) for definition of a Subthread.
319///
320/// This being a thread-local and not scoped by which `MockExecutor` we're talking about
321/// means that we can't cope if there are multiple `MockExecutor`s involved in the same thread.
322/// That's OK (and documented).
323#[derive(Copy, Clone, Eq, PartialEq, derive_more::Debug)]
324enum ThreadDescriptor {
325    /// Foreign - neither the (running) executor, nor a Subthread
326    #[debug("FOREIGN")]
327    Foreign,
328    /// The executor.
329    #[debug("Exe")]
330    Executor,
331    /// This task, which is a Subthread.
332    #[debug("{_0:?}")]
333    Subthread(TaskId),
334}
335
336/// Marker indicating that this task is a Subthread, not an async task.
337///
338/// See [`spawn_subthread`](MockExecutor::subthread_spawn) for definition of a Subthread.
339#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
340struct IsSubthread;
341
342/// [`Shared::subthread_yield`] should set our task awake before switching to the executor
343#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
344struct SetAwake;
345
346thread_local! {
347    /// Identifies this thread.
348    pub static THREAD_DESCRIPTOR: Cell<ThreadDescriptor> = const {
349        Cell::new(ThreadDescriptor::Foreign)
350    };
351}
352
353//---------- creation ----------
354
355impl MockExecutor {
356    /// Make a `MockExecutor` with default parameters
357    pub fn new() -> Self {
358        Self::default()
359    }
360
361    /// Make a `MockExecutor` with a specific `SchedulingPolicy`
362    pub fn with_scheduling(scheduling: SchedulingPolicy) -> Self {
363        Data {
364            scheduling,
365            ..Default::default()
366        }
367        .into()
368    }
369}
370
371impl From<Data> for MockExecutor {
372    fn from(data: Data) -> MockExecutor {
373        let shared = Shared {
374            data: Mutex::new(data),
375            thread_condvar: std::sync::Condvar::new(),
376        };
377        MockExecutor {
378            shared: Arc::new(shared),
379        }
380    }
381}
382
383//---------- spawning ----------
384
385impl MockExecutor {
386    /// Spawn a task and return something to identify it
387    ///
388    /// `desc` should `Display` as some kind of short string (ideally without spaces)
389    /// and will be used in the `Debug` impl and trace log messages from `MockExecutor`.
390    ///
391    /// The returned value is an opaque task identifier which is very cheap to clone
392    /// and which can be used by the caller in debug logging,
393    /// if it's desired to correlate with the debug output from `MockExecutor`.
394    /// Most callers will want to ignore it.
395    ///
396    /// This method is infallible.  (The `MockExecutor` cannot be shut down.)
397    pub fn spawn_identified(
398        &self,
399        desc: impl Display,
400        fut: impl Future<Output = ()> + Send + 'static,
401    ) -> impl Debug + Clone + Send + 'static {
402        self.spawn_internal(desc.to_string(), FutureObj::from(Box::new(fut)))
403    }
404
405    /// Spawn a task and return its output for further usage
406    ///
407    /// `desc` should `Display` as some kind of short string (ideally without spaces)
408    /// and will be used in the `Debug` impl and trace log messages from `MockExecutor`.
409    pub fn spawn_join<T: Debug + Send + 'static>(
410        &self,
411        desc: impl Display,
412        fut: impl Future<Output = T> + Send + 'static,
413    ) -> impl Future<Output = T> {
414        let (tx, rx) = oneshot::channel();
415        self.spawn_identified(desc, async move {
416            let res = fut.await;
417            tx.send(res)
418                .expect("Failed to send future's output, did future panic?");
419        });
420        rx.map(|m| m.expect("Failed to receive future's output"))
421    }
422
423    /// Spawn a task and return its `TaskId`
424    ///
425    /// Convenience method for use by `spawn_identified` and `spawn_obj`.
426    /// The future passed to `block_on` is not handled here.
427    fn spawn_internal(&self, desc: String, fut: TaskFuture) -> TaskId {
428        let mut data = self.shared.lock();
429        data.insert_task(desc, TaskFutureInfo::Normal(fut))
430    }
431}
432
433impl Data {
434    /// Insert a task given its `TaskFutureInfo` and return its `TaskId`.
435    fn insert_task(&mut self, desc: String, fut: TaskFutureInfo) -> TaskId {
436        let state = Awake;
437        let id = self.tasks.insert(Task {
438            state,
439            desc,
440            fut: Some(fut),
441        });
442        self.awake.push_back(id);
443        trace!("MockExecutor spawned {:?}={:?}", id, self.tasks[id]);
444        id
445    }
446}
447
448impl Spawn for MockExecutor {
449    fn spawn_obj(&self, future: TaskFuture) -> Result<(), SpawnError> {
450        self.spawn_internal("spawn_obj".into(), future);
451        Ok(())
452    }
453}
454
455impl MockExecutor {
456    /// Implementation of `spawn_blocking` and `blocking_io`
457    fn spawn_thread_inner<F, T>(&self, f: F) -> <Self as Blocking>::ThreadHandle<T>
458    where
459        F: FnOnce() -> T + Send + 'static,
460        T: Send + 'static,
461    {
462        // For the mock executor, everything runs on the same thread.
463        // If we need something more complex in the future, we can change this.
464        let (tx, rx) = oneshot::channel();
465        self.spawn_identified("Blocking".to_string(), async move {
466            match tx.send(f()) {
467                Ok(()) => (),
468                Err(_) => panic!("Failed to send future's output, did future panic?"),
469            }
470        });
471        rx.map(Box::new(|m| m.expect("Failed to receive future's output")))
472    }
473}
474
475impl Blocking for MockExecutor {
476    type ThreadHandle<T: Send + 'static> =
477        Map<Receiver<T>, Box<dyn FnOnce(Result<T, Canceled>) -> T>>;
478
479    fn spawn_blocking<F, T>(&self, f: F) -> Self::ThreadHandle<T>
480    where
481        F: FnOnce() -> T + Send + 'static,
482        T: Send + 'static,
483    {
484        assert_matches!(
485            THREAD_DESCRIPTOR.get(),
486            ThreadDescriptor::Executor | ThreadDescriptor::Subthread(_),
487 "MockExecutor::spawn_blocking_io only allowed from future or subthread, being run by this executor"
488        );
489        self.spawn_thread_inner(f)
490    }
491
492    fn reenter_block_on<F>(&self, future: F) -> F::Output
493    where
494        F: Future,
495        F::Output: Send + 'static,
496    {
497        self.subthread_block_on_future(future)
498    }
499
500    fn blocking_io<F, T>(&self, f: F) -> impl Future<Output = T>
501    where
502        F: FnOnce() -> T + Send + 'static,
503        T: Send + 'static,
504    {
505        assert_eq!(
506            THREAD_DESCRIPTOR.get(),
507            ThreadDescriptor::Executor,
508            "MockExecutor::blocking_io only allowed from future being polled by this executor"
509        );
510        self.spawn_thread_inner(f)
511    }
512}
513
514//---------- block_on ----------
515
516impl ToplevelBlockOn for MockExecutor {
517    fn block_on<F>(&self, input_fut: F) -> F::Output
518    where
519        F: Future,
520    {
521        let mut value: Option<F::Output> = None;
522
523        // Box this just so that we can conveniently control precisely when it's dropped.
524        // (We could do this with Option and Pin::set but that seems clumsier.)
525        let mut input_fut = Box::pin(input_fut);
526
527        let run_store_fut = {
528            let value = &mut value;
529            let input_fut = &mut input_fut;
530            async {
531                trace!("MockExecutor block_on future...");
532                let t = input_fut.await;
533                trace!("MockExecutor block_on future returned...");
534                *value = Some(t);
535                trace!("MockExecutor block_on future exiting.");
536            }
537        };
538
539        {
540            pin_mut!(run_store_fut);
541
542            let main_id = self
543                .shared
544                .lock()
545                .insert_task("main".into(), TaskFutureInfo::Main);
546            trace!("MockExecutor {main_id:?} is task for block_on");
547            self.execute_to_completion(run_store_fut);
548        }
549
550        #[allow(clippy::let_and_return)] // clarity
551        let value = value.take().unwrap_or_else(|| {
552            // eprintln can be captured by libtest, but the debug_dump goes to io::stderr.
553            // use the latter, so that the debug dump is prefixed by this message.
554            let _: io::Result<()> = writeln!(io::stderr(), "all futures blocked, crashing...");
555            // write to tracing too, so the tracing log is clear about when we crashed
556            error!("all futures blocked, crashing...");
557
558            // Sequencing here is subtle.
559            //
560            // We should do the dump before dropping the input future, because the input
561            // future is likely to own things that, when dropped, wake up other tasks,
562            // rendering the dump inaccurate.
563            //
564            // But also, dropping the input future may well drop a ProgressUntilStalledFuture
565            // which then reenters us.  More generally, we mustn't call user code
566            // with the lock held.
567            //
568            // And, we mustn't panic with the data lock held.
569            //
570            // If value was Some, then this closure is dropped without being called,
571            // which drops the future after it has yielded the value, which is correct.
572            {
573                let mut data = self.shared.lock();
574                data.debug_dump();
575            }
576            drop(input_fut);
577
578            panic!(
579                r"
580all futures blocked. waiting for the real world? or deadlocked (waiting for each other) ?
581"
582            );
583        });
584
585        value
586    }
587}
588
589//---------- execution - core implementation ----------
590
591impl MockExecutor {
592    /// Keep polling tasks until nothing more can be done
593    ///
594    /// Ie, stop when `awake` is empty and `progressing_until_stalled` is `None`.
595    fn execute_to_completion(&self, mut main_fut: MainFuture) {
596        trace!("MockExecutor execute_to_completion...");
597        loop {
598            self.execute_until_first_stall(main_fut.as_mut());
599
600            // Handle `progressing_until_stalled`
601            let pus_waker = {
602                let mut data = self.shared.lock();
603                let pus = &mut data.progressing_until_stalled;
604                trace!("MockExecutor execute_to_completion PUS={:?}", &pus);
605                let Some(pus) = pus else {
606                    // No progressing_until_stalled, we're actually done.
607                    break;
608                };
609                assert_eq!(
610                    pus.finished, Pending,
611                    "ProgressingUntilStalled finished twice?!"
612                );
613                pus.finished = Ready(());
614
615                // Release the lock temporarily so that ActualWaker::clone doesn't deadlock
616                let waker = pus
617                    .waker
618                    .take()
619                    .expect("ProgressUntilStalledFuture not ever polled!");
620                drop(data);
621                let waker_copy = waker.clone();
622                let mut data = self.shared.lock();
623
624                let pus = &mut data.progressing_until_stalled;
625                if let Some(double) = mem::replace(
626                    &mut pus
627                        .as_mut()
628                        .expect("progressing_until_stalled updated under our feet!")
629                        .waker,
630                    Some(waker),
631                ) {
632                    panic!("double progressing_until_stalled.waker! {double:?}");
633                }
634
635                waker_copy
636            };
637            pus_waker.wake();
638        }
639        trace!("MockExecutor execute_to_completion done");
640    }
641
642    /// Keep polling tasks until `awake` is empty
643    ///
644    /// (Ignores `progressing_until_stalled` - so if one is active,
645    /// will return when all other tasks have blocked.)
646    ///
647    /// # Panics
648    ///
649    /// Might malfunction or panic if called reentrantly
650    fn execute_until_first_stall(&self, main_fut: MainFuture) {
651        trace!("MockExecutor execute_until_first_stall ...");
652
653        assert_eq!(
654            THREAD_DESCRIPTOR.get(),
655            ThreadDescriptor::Foreign,
656            "MockExecutor executor re-entered"
657        );
658        THREAD_DESCRIPTOR.set(ThreadDescriptor::Executor);
659
660        let r = catch_unwind(AssertUnwindSafe(|| self.executor_main_loop(main_fut)));
661
662        THREAD_DESCRIPTOR.set(ThreadDescriptor::Foreign);
663
664        match r {
665            Ok(()) => trace!("MockExecutor execute_until_first_stall done."),
666            Err(e) => {
667                trace!("MockExecutor executor, or async task, panicked!");
668                panic_any(e)
669            }
670        }
671    }
672
673    /// Keep polling tasks until `awake` is empty (inner, executor main loop)
674    ///
675    /// This is only called from [`MockExecutor::execute_until_first_stall`],
676    /// so it could also be called `execute_until_first_stall_inner`.
677    #[allow(clippy::cognitive_complexity)]
678    fn executor_main_loop(&self, mut main_fut: MainFuture) {
679        'outer: loop {
680            // Take a `Awake` task off `awake` and make it `Asleep`
681            let (id, mut fut) = 'inner: loop {
682                let mut data = self.shared.lock();
683                let Some(id) = data.schedule() else {
684                    break 'outer;
685                };
686                let Some(task) = data.tasks.get_mut(id) else {
687                    trace!("MockExecutor {id:?} vanished");
688                    continue;
689                };
690                task.state = Asleep(vec![]);
691                let fut = task.fut.take().expect("future missing from task!");
692                break 'inner (id, fut);
693            };
694
695            // Poll the selected task
696            trace!("MockExecutor {id:?} polling...");
697            let waker = ActualWaker::make_waker(&self.shared, id);
698            let mut cx = Context::from_waker(&waker);
699            let r: Either<Poll<()>, IsSubthread> = match &mut fut {
700                TaskFutureInfo::Normal(fut) => Left(fut.poll_unpin(&mut cx)),
701                TaskFutureInfo::Main => Left(main_fut.as_mut().poll(&mut cx)),
702                TaskFutureInfo::Subthread => Right(IsSubthread),
703            };
704
705            // Deal with the returned `Poll`
706            let _fut_drop_late;
707            {
708                let mut data = self.shared.lock();
709                let task = data
710                    .tasks
711                    .get_mut(id)
712                    .expect("task vanished while we were polling it");
713
714                match r {
715                    Left(Pending) => {
716                        trace!("MockExecutor {id:?} -> Pending");
717                        if task.fut.is_some() {
718                            panic!("task reinserted while we polled it?!");
719                        }
720                        // The task might have been woken *by its own poll method*.
721                        // That's why we set it to `Asleep` *earlier* rather than here.
722                        // All we need to do is put the future back.
723                        task.fut = Some(fut);
724                    }
725                    Left(Ready(())) => {
726                        trace!("MockExecutor {id:?} -> Ready");
727                        // Oh, it finished!
728                        // It might be in `awake`, but that's allowed to contain stale tasks,
729                        // so we *don't* need to scan that list and remove it.
730                        data.tasks.remove(id);
731                        // It is important that we don't drop `fut` until we have released
732                        // the data lock, since it is an external type and might try to reenter
733                        // us (eg by calling spawn).  If we do that here, we risk deadlock.
734                        // So, move `fut` to a variable with scope outside the block with `data`.
735                        _fut_drop_late = fut;
736                    }
737                    Right(IsSubthread) => {
738                        trace!("MockExecutor {id:?} -> Ready, waking Subthread");
739                        // Task is a subthread, which has called thread_context_switch
740                        // to switch to us.  We "poll" it by switching back.
741
742                        // Put back `TFI::Subthread`, which was moved out temporarily, above.
743                        task.fut = Some(fut);
744
745                        self.shared.thread_context_switch(
746                            data,
747                            ThreadDescriptor::Executor,
748                            ThreadDescriptor::Subthread(id),
749                        );
750
751                        // Now, if the Subthread still exists, that's because it's switched
752                        // back to us, and is waiting in subthread_block_on_future again.
753                        // Or it might have ended, in which case it's not in `tasks` any more.
754                        // In any case we can go back to scheduling futures.
755                    }
756                }
757            }
758        }
759    }
760}
761
762impl Data {
763    /// Return the next task to run
764    ///
765    /// The task is removed from `awake`, but **`state` is not set to `Asleep`**.
766    /// The caller must restore the invariant!
767    fn schedule(&mut self) -> Option<TaskId> {
768        use SchedulingPolicy as SP;
769        match self.scheduling {
770            SP::Stack => self.awake.pop_back(),
771            SP::Queue => self.awake.pop_front(),
772        }
773    }
774}
775
776impl ActualWaker {
777    /// Obtain a strong reference to the executor's data
778    fn upgrade_data(&self) -> Option<Arc<Shared>> {
779        self.data.upgrade()
780    }
781
782    /// Wake the task corresponding to this `ActualWaker`
783    ///
784    /// This is like `<Self as std::task::Wake>::wake()` but takes `&self`, not `Arc`
785    fn wake(&self) {
786        let Some(data) = self.upgrade_data() else {
787            // The executor is gone!  Don't try to wake.
788            return;
789        };
790        let mut data = data.lock();
791        let data = &mut *data;
792        trace!("MockExecutor {:?} wake", &self.id);
793        let Some(task) = data.tasks.get_mut(self.id) else {
794            return;
795        };
796        task.set_awake(self.id, &mut data.awake);
797    }
798
799    /// Create and return a `Waker` for task `id`
800    fn make_waker(shared: &Arc<Shared>, id: TaskId) -> Waker {
801        ActualWaker {
802            data: Arc::downgrade(shared),
803            id,
804        }
805        .new_waker()
806    }
807}
808
809//---------- "progress until stalled" functionality ----------
810
811impl MockExecutor {
812    /// Run tasks in the current executor until every other task is waiting
813    ///
814    /// # Panics
815    ///
816    /// Might malfunction or panic if more than one such call is running at once.
817    ///
818    /// (Ie, you must `.await` or drop the returned `Future`
819    /// before calling this method again.)
820    ///
821    /// Must be called and awaited within a future being run by `self`.
822    pub fn progress_until_stalled(&self) -> impl Future<Output = ()> {
823        let mut data = self.shared.lock();
824        assert!(
825            data.progressing_until_stalled.is_none(),
826            "progress_until_stalled called more than once"
827        );
828        trace!("MockExecutor progress_until_stalled...");
829        data.progressing_until_stalled = Some(ProgressingUntilStalled {
830            finished: Pending,
831            waker: None,
832        });
833        ProgressUntilStalledFuture {
834            shared: self.shared.clone(),
835        }
836    }
837}
838
839impl Future for ProgressUntilStalledFuture {
840    type Output = ();
841
842    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
843        let waker = cx.waker().clone();
844        let mut data = self.shared.lock();
845        let pus = data.progressing_until_stalled.as_mut();
846        trace!("MockExecutor progress_until_stalled polling... {:?}", &pus);
847        let pus = pus.expect("ProgressingUntilStalled missing");
848        pus.waker = Some(waker);
849        pus.finished
850    }
851}
852
853impl Drop for ProgressUntilStalledFuture {
854    fn drop(&mut self) {
855        self.shared.lock().progressing_until_stalled = None;
856    }
857}
858
859//---------- (sub)threads ----------
860
861impl MockExecutor {
862    /// Spawn a "Subthread", for processing in a sync context
863    ///
864    /// `call` will be run on a separate thread, called a "Subthread".
865    ///
866    /// But it will **not run simultaneously** with the executor,
867    /// nor with other Subthreads.
868    /// So Subthreads are somewhat like coroutines.
869    ///
870    /// `call` must be capable of making progress without waiting for any other Subthreads.
871    /// `call` may wait for async futures, using
872    /// [`subthread_block_on_future`](MockExecutor::subthread_block_on_future).
873    ///
874    /// Subthreads may be used for cpubound activity,
875    /// or synchronous IO (such as large volumes of disk activity),
876    /// provided that the synchronous code will reliably make progress,
877    /// without waiting (directly or indirectly) for any async task or Subthread -
878    /// except via `subthread_block_on_future`.
879    ///
880    /// # Subthreads vs raw `std::thread` threads
881    ///
882    /// Programs using `MockExecutor` may use `std::thread` threads directly.
883    /// However, this is not recommended.  There are severe limitations:
884    ///
885    ///  * Only a Subthread can re-enter the async context from sync code:
886    ///    this must be done with
887    ///    using [`subthread_block_on_future`](MockExecutor::subthread_block_on_future).
888    ///    (Re-entering the executor with
889    ///    [`block_on`](tor_rtcompat::ToplevelBlockOn::block_on)
890    ///    is not allowed.)
891    ///  * If async tasks want to suspend waiting for synchronous code,
892    ///    the synchronous code must run on a Subthread.
893    ///    This allows the `MockExecutor` to know when
894    ///    that synchronous code is still making progress.
895    ///    (This is needed for
896    ///    [`progress_until_stalled`](MockExecutor::progress_until_stalled)
897    ///    and the facilities which use it, such as
898    ///    [`MockRuntime::advance_until_stalled`](crate::MockRuntime::advance_until_stalled).)
899    ///  * Subthreads never run in parallel -
900    ///    they only run as scheduled deterministically by the `MockExecutor`.
901    ///    So using Subthreads eliminates a source of test nonndeterminism.
902    ///    (Execution order is still varied due to explicitly varying the scheduling policy.)
903    ///
904    /// # Panics, abuse, and malfunctions
905    ///
906    /// If `call` panics and unwinds, `spawn_subthread` yields `Err`.
907    /// The application code should to do something about it if this happens,
908    /// typically, logging errors, tearing things down, or failing a test case.
909    ///
910    /// If the executor doesn't run, the subthread will not run either, and will remain stuck.
911    /// (So, typically, if the thread supposed to run the executor panics,
912    /// for example because a future or the executor itself panics,
913    /// all the subthreads will become stuck - effectively, they'll be leaked.)
914    ///
915    /// `spawn_subthread` panics if OS thread spawning fails.
916    /// (Like `std::thread::spawn()` does.)
917    ///
918    /// `MockExecutor`s will malfunction or panic if
919    /// any executor invocation method (eg `block_on`) is called on a Subthread.
920    pub fn subthread_spawn<T: Send + 'static>(
921        &self,
922        desc: impl Display,
923        call: impl FnOnce() -> T + Send + 'static,
924    ) -> impl Future<Output = Result<T, Box<dyn Any + Send>>> + Unpin + Send + Sync + 'static {
925        let desc = desc.to_string();
926        let (output_tx, output_rx) = oneshot::channel();
927
928        // NB: we don't know which thread we're on!
929        // In principle we might be on another Subthread.
930        // So we can't context switch here.  That would be very confusing.
931        //
932        // Instead, we prepare the new Subthread as follows:
933        //   - There is a task in the executor
934        //   - The task is ready to be polled, whenever the executor decides to
935        //   - The thread starts running right away, but immediately waits until it is scheduled
936        // See `subthread_entrypoint`.
937
938        {
939            let mut data = self.shared.lock();
940            let id = data.insert_task(desc.clone(), TaskFutureInfo::Subthread);
941
942            let _: std::thread::JoinHandle<()> = std::thread::Builder::new()
943                .name(desc)
944                .spawn({
945                    let shared = self.shared.clone();
946                    move || shared.subthread_entrypoint(id, call, output_tx)
947                })
948                .expect("spawn failed");
949        }
950
951        output_rx.map(|r| {
952            r.unwrap_or_else(|_: Canceled| panic!("Subthread cancelled but should be impossible!"))
953        })
954    }
955
956    /// Call an async `Future` from a Subthread
957    ///
958    /// Blocks the Subthread, and arranges to run async tasks,
959    /// including `fut`, until `fut` completes.
960    ///
961    /// `fut` is polled on the executor thread, not on the Subthread.
962    /// (We may change that in the future, allowing passing a non-`Send` future.)
963    ///
964    /// # Panics, abuse, and malfunctions
965    ///
966    /// `subthread_block_on_future` will malfunction or panic
967    /// if called on a thread that isn't a Subthread from the same `MockExecutor`
968    /// (ie a thread made with [`spawn_subthread`](MockExecutor::subthread_spawn)).
969    ///
970    /// If `fut` itself panics, the executor will panic.
971    ///
972    /// If the executor isn't running, `subthread_block_on_future` will hang indefinitely.
973    /// See `spawn_subthread`.
974    #[allow(clippy::cognitive_complexity)] // Splitting this up would be worse
975    pub fn subthread_block_on_future<T: Send + 'static>(&self, fut: impl Future<Output = T>) -> T {
976        let id = match THREAD_DESCRIPTOR.get() {
977            ThreadDescriptor::Subthread(id) => id,
978            ThreadDescriptor::Executor => {
979                panic!("subthread_block_on_future called from MockExecutor thread (async task?)")
980            }
981            ThreadDescriptor::Foreign => panic!(
982    "subthread_block_on_future called on foreign thread (not spawned with spawn_subthread)"
983            ),
984        };
985        trace!("MockExecutor thread {id:?}, subthread_block_on_future...");
986        let mut fut = pin!(fut);
987
988        // We yield once before the first poll, and once after Ready, to shake up the
989        // execution order a bit, depending on the scheduling policy.
990        let yield_ = |set_awake| self.shared.subthread_yield(id, set_awake);
991        yield_(Some(SetAwake));
992
993        let ret = loop {
994            // Poll the provided future
995            trace!("MockExecutor thread {id:?}, s.t._block_on_future polling...");
996            let waker = ActualWaker::make_waker(&self.shared, id);
997            let mut cx = Context::from_waker(&waker);
998            let r: Poll<T> = fut.as_mut().poll(&mut cx);
999
1000            if let Ready(r) = r {
1001                trace!("MockExecutor thread {id:?}, s.t._block_on_future poll -> Ready");
1002                break r;
1003            }
1004
1005            // Pending.  Switch back to the exeuctor thread.
1006            // When the future becomes ready, the Waker will be woken, waking the task,
1007            // so that the executor will "poll" us again.
1008            trace!("MockExecutor thread {id:?}, s.t._block_on_future poll -> Pending");
1009
1010            yield_(None);
1011        };
1012
1013        yield_(Some(SetAwake));
1014
1015        trace!("MockExecutor thread {id:?}, subthread_block_on_future complete.");
1016
1017        ret
1018    }
1019}
1020
1021impl Shared {
1022    /// Main entrypoint function for a Subthread
1023    ///
1024    /// Entered on a new `std::thread` thread created by
1025    /// [`subthread_spawn`](MockExecutor::subthread_spawn).
1026    ///
1027    /// When `call` completes, sends its returned value `T` to `output_tx`.
1028    fn subthread_entrypoint<T: Send + 'static>(
1029        self: Arc<Self>,
1030        id: TaskId,
1031        call: impl FnOnce() -> T + Send + 'static,
1032        output_tx: oneshot::Sender<Result<T, Box<dyn Any + Send>>>,
1033    ) {
1034        THREAD_DESCRIPTOR.set(ThreadDescriptor::Subthread(id));
1035        trace!("MockExecutor thread {id:?}, entrypoint");
1036
1037        // We start out Awake, but we wait for the executor to tell us to run.
1038        // This will be done the first time the task is "polled".
1039        {
1040            let data = self.lock();
1041            self.thread_context_switch_waitfor_instruction_to_run(
1042                data,
1043                ThreadDescriptor::Subthread(id),
1044            );
1045        }
1046
1047        trace!("MockExecutor thread {id:?}, entering user code");
1048
1049        // Run the user's actual thread function.
1050        // This will typically reenter us via subthread_block_on_future.
1051        let ret = catch_unwind(AssertUnwindSafe(call));
1052
1053        trace!("MockExecutor thread {id:?}, completed user code");
1054
1055        // This makes the return value from subthread_spawn ready.
1056        // It will be polled by the executor in due course, presumably.
1057
1058        output_tx.send(ret).unwrap_or_else(
1059            #[allow(clippy::unnecessary_lazy_evaluations)]
1060            |_| {}, // receiver dropped, maybe executor dropped or something?
1061        );
1062
1063        {
1064            let mut data = self.lock();
1065
1066            // Never poll this task again (so never schedule this thread)
1067            let _: Task = data.tasks.remove(id).expect("Subthread task vanished!");
1068
1069            // Tell the executor it is scheduled now.
1070            // We carry on exiting, in parallel (holding the data lock).
1071            self.thread_context_switch_send_instruction_to_run(
1072                &mut data,
1073                ThreadDescriptor::Subthread(id),
1074                ThreadDescriptor::Executor,
1075            );
1076        }
1077    }
1078
1079    /// Yield back to the executor from a subthread
1080    ///
1081    /// Checks that things are in order
1082    /// (in particular, that this task is in the data structure as a subhtread)
1083    /// and switches to the executor thread.
1084    ///
1085    /// The caller must arrange that the task gets woken.
1086    ///
1087    /// With [`SetAwake`], sets our task awake, so that we'll be polled
1088    /// again as soon as we get to the top of the executor's queue.
1089    /// Otherwise, we'll be reentered after someone wakes a [`Waker`] for the task.
1090    fn subthread_yield(&self, us: TaskId, set_awake: Option<SetAwake>) {
1091        let mut data = self.lock();
1092        {
1093            let data = &mut *data;
1094            let task = data.tasks.get_mut(us).expect("Subthread task vanished!");
1095            match &task.fut {
1096                Some(TaskFutureInfo::Subthread) => {}
1097                other => panic!("subthread_block_on_future but TFI {other:?}"),
1098            };
1099            if let Some(SetAwake) = set_awake {
1100                task.set_awake(us, &mut data.awake);
1101            }
1102        }
1103        self.thread_context_switch(
1104            data,
1105            ThreadDescriptor::Subthread(us),
1106            ThreadDescriptor::Executor,
1107        );
1108    }
1109
1110    /// Switch from (sub)thread `us` to (sub)thread `them`
1111    ///
1112    /// Returns when someone calls `thread_context_switch(.., us)`.
1113    fn thread_context_switch(
1114        &self,
1115        mut data: MutexGuard<Data>,
1116        us: ThreadDescriptor,
1117        them: ThreadDescriptor,
1118    ) {
1119        trace!("MockExecutor thread {us:?}, switching to {them:?}");
1120        self.thread_context_switch_send_instruction_to_run(&mut data, us, them);
1121        self.thread_context_switch_waitfor_instruction_to_run(data, us);
1122    }
1123
1124    /// Instruct the (sub)thread `them` to run
1125    ///
1126    /// Update `thread_to_run`, which will wake up `them`'s
1127    /// call to `thread_context_switch_waitfor_instruction_to_run`.
1128    ///
1129    /// Must be called from (sub)thread `us`.
1130    /// Part of `thread_context_switch`, not normally called directly.
1131    fn thread_context_switch_send_instruction_to_run(
1132        &self,
1133        data: &mut MutexGuard<Data>,
1134        us: ThreadDescriptor,
1135        them: ThreadDescriptor,
1136    ) {
1137        assert_eq!(data.thread_to_run, us);
1138        data.thread_to_run = them;
1139        self.thread_condvar.notify_all();
1140    }
1141
1142    /// Await an instruction for this thread, `us`, to run
1143    ///
1144    /// Waits for `thread_to_run` to be `us`,
1145    /// waiting for `thread_condvar` as necessary.
1146    ///
1147    /// Part of `thread_context_switch`, not normally called directly.
1148    fn thread_context_switch_waitfor_instruction_to_run(
1149        &self,
1150        data: MutexGuard<Data>,
1151        us: ThreadDescriptor,
1152    ) {
1153        #[allow(let_underscore_lock)]
1154        let _: MutexGuard<_> = self
1155            .thread_condvar
1156            .wait_while(data, |data| {
1157                let live = data.thread_to_run;
1158                let resume = live == us;
1159                if resume {
1160                    trace!("MockExecutor thread {us:?}, resuming");
1161                } else {
1162                    trace!("MockExecutor thread {us:?}, waiting for {live:?}");
1163                }
1164                // We're in `.wait_while`, not `.wait_until`.  Confusing.
1165                !resume
1166            })
1167            .expect("data lock poisoned");
1168    }
1169}
1170
1171//---------- ancillary and convenience functions ----------
1172
1173/// Trait to let us assert at compile time that something is nicely `Sync` etc.
1174#[allow(dead_code)] // yes, we don't *use* anything from this trait
1175trait EnsureSyncSend: Sync + Send + 'static {}
1176impl EnsureSyncSend for ActualWaker {}
1177impl EnsureSyncSend for MockExecutor {}
1178
1179impl MockExecutor {
1180    /// Return the number of tasks running in this executor
1181    ///
1182    /// One possible use is for a test case to check that task(s)
1183    /// that ought to have exited, have indeed done so.
1184    ///
1185    /// In the usual case, the answer will be at least 1,
1186    /// because it counts the future passed to
1187    /// [`block_on`](MockExecutor::block_on)
1188    /// (perhaps via [`MockRuntime::test_with_various`](crate::MockRuntime::test_with_various)).
1189    pub fn n_tasks(&self) -> usize {
1190        self.shared.lock().tasks.len()
1191    }
1192}
1193
1194impl Shared {
1195    /// Lock and obtain the guard
1196    ///
1197    /// Convenience method which panics on poison
1198    fn lock(&self) -> MutexGuard<Data> {
1199        self.data.lock().expect("data lock poisoned")
1200    }
1201}
1202
1203impl Task {
1204    /// Set task `id` to `Awake` and arrange that it will be polled.
1205    fn set_awake(&mut self, id: TaskId, data_awake: &mut VecDeque<TaskId>) {
1206        match self.state {
1207            Awake => {}
1208            Asleep(_) => {
1209                self.state = Awake;
1210                data_awake.push_back(id);
1211            }
1212        }
1213    }
1214}
1215
1216//---------- ActualWaker as RawWaker ----------
1217
1218/// Using [`ActualWaker`] in a [`RawWaker`]
1219///
1220/// We need to make a
1221/// [`Waker`] (the safe, type-erased, waker, used by actual futures)
1222/// which contains an
1223/// [`ActualWaker`] (our actual waker implementation, also safe).
1224///
1225/// `std` offers `Waker::from<Arc<impl Wake>>`.
1226/// But we want a bespoke `Clone` implementation, so we don't want to use `Arc`.
1227///
1228/// So instead, we implement the `RawWaker` API in terms of `ActualWaker`.
1229/// We keep the `ActualWaker` in a `Box`, and actually `clone` it (and the `Box`).
1230///
1231/// SAFETY
1232///
1233///  * The data pointer is `Box::<ActualWaker>::into_raw()`
1234///  * We share these when we clone
1235///  * No-one is allowed `&mut ActualWaker` unless there are no other clones
1236///  * So we may make references `&ActualWaker`
1237impl ActualWaker {
1238    /// Wrap up an [`ActualWaker`] as a type-erased [`Waker`] for passing to futures etc.
1239    fn new_waker(self) -> Waker {
1240        unsafe { Waker::from_raw(self.raw_new()) }
1241    }
1242
1243    /// Helper: wrap up an [`ActualWaker`] as a [`RawWaker`].
1244    fn raw_new(self) -> RawWaker {
1245        let self_: Box<ActualWaker> = self.into();
1246        let self_: *mut ActualWaker = Box::into_raw(self_);
1247        let self_: *const () = self_ as _;
1248        RawWaker::new(self_, &RAW_WAKER_VTABLE)
1249    }
1250
1251    /// Implementation of [`RawWakerVTable`]'s `clone`
1252    unsafe fn raw_clone(self_: *const ()) -> RawWaker {
1253        let self_: *const ActualWaker = self_ as _;
1254        let self_: &ActualWaker = self_.as_ref().unwrap_unchecked();
1255        let copy: ActualWaker = self_.clone();
1256        copy.raw_new()
1257    }
1258
1259    /// Implementation of [`RawWakerVTable`]'s `wake`
1260    unsafe fn raw_wake(self_: *const ()) {
1261        Self::raw_wake_by_ref(self_);
1262        Self::raw_drop(self_);
1263    }
1264
1265    /// Implementation of [`RawWakerVTable`]'s `wake_ref_by`
1266    unsafe fn raw_wake_by_ref(self_: *const ()) {
1267        let self_: *const ActualWaker = self_ as _;
1268        let self_: &ActualWaker = self_.as_ref().unwrap_unchecked();
1269        self_.wake();
1270    }
1271
1272    /// Implementation of [`RawWakerVTable`]'s `drop`
1273    unsafe fn raw_drop(self_: *const ()) {
1274        let self_: *mut ActualWaker = self_ as _;
1275        let self_: Box<ActualWaker> = Box::from_raw(self_);
1276        drop(self_);
1277    }
1278}
1279
1280/// vtable for `Box<ActualWaker>` as `RawWaker`
1281//
1282// This ought to be in the impl block above, but
1283//   "associated `static` items are not allowed"
1284static RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
1285    ActualWaker::raw_clone,
1286    ActualWaker::raw_wake,
1287    ActualWaker::raw_wake_by_ref,
1288    ActualWaker::raw_drop,
1289);
1290
1291//---------- Sleep location tracking and dumping ----------
1292
1293/// We record "where a future went to sleep" as (just) a backtrace
1294///
1295/// This type alias allows us to mock `Backtrace` for miri.
1296/// (It also insulates from future choices about sleep location representation.0
1297#[cfg(not(miri))]
1298type SleepLocation = Backtrace;
1299
1300impl Data {
1301    /// Dump tasks and their sleep location backtraces
1302    fn dump_backtraces(&self, f: &mut fmt::Formatter) -> fmt::Result {
1303        for (id, task) in self.tasks.iter() {
1304            let prefix = |f: &mut fmt::Formatter| write!(f, "{id:?}={task:?}: ");
1305            match &task.state {
1306                Awake => {
1307                    prefix(f)?;
1308                    writeln!(f, "awake")?;
1309                }
1310                Asleep(locs) => {
1311                    let n = locs.len();
1312                    for (i, loc) in locs.iter().enumerate() {
1313                        prefix(f)?;
1314                        writeln!(f, "asleep, backtrace {i}/{n}:\n{loc}",)?;
1315                    }
1316                    if n == 0 {
1317                        prefix(f)?;
1318                        writeln!(f, "asleep, no backtraces, Waker never cloned, stuck!",)?;
1319                    }
1320                }
1321            }
1322        }
1323        writeln!(
1324            f,
1325            "\nNote: there might be spurious traces, see docs for MockExecutor::debug_dump\n"
1326        )?;
1327        Ok(())
1328    }
1329}
1330
1331/// Track sleep locations via `<Waker as Clone>`.
1332///
1333/// See [`MockExecutor::debug_dump`] for the explanation.
1334impl Clone for ActualWaker {
1335    fn clone(&self) -> Self {
1336        let id = self.id;
1337
1338        if let Some(data) = self.upgrade_data() {
1339            // If the executor is gone, there is nothing to adjust
1340            let mut data = data.lock();
1341            if let Some(task) = data.tasks.get_mut(self.id) {
1342                match &mut task.state {
1343                    Awake => trace!("MockExecutor cloned waker for awake task {id:?}"),
1344                    Asleep(locs) => locs.push(SleepLocation::force_capture()),
1345                }
1346            } else {
1347                trace!("MockExecutor cloned waker for dead task {id:?}");
1348            }
1349        }
1350
1351        ActualWaker {
1352            data: self.data.clone(),
1353            id,
1354        }
1355    }
1356}
1357
1358//---------- API for full debug dump ----------
1359
1360/// Debugging dump of a `MockExecutor`'s state
1361///
1362/// Returned by [`MockExecutor::as_debug_dump`]
1363//
1364// Existence implies backtraces have been resolved
1365//
1366// We use `Either` so that we can also use this internally when we have &mut Data.
1367pub struct DebugDump<'a>(Either<&'a Data, MutexGuard<'a, Data>>);
1368
1369impl MockExecutor {
1370    /// Dump the executor's state including backtraces of waiting tasks, to stderr
1371    ///
1372    /// This is considerably more extensive than simply
1373    /// `MockExecutor as Debug`.
1374    ///
1375    /// (This is a convenience method, which wraps
1376    /// [`MockExecutor::as_debug_dump()`].
1377    ///
1378    /// ### Backtrace salience (possible spurious traces)
1379    ///
1380    /// **Summary**
1381    ///
1382    /// The technique used to capture backtraces when futures sleep is not 100% exact.
1383    /// It will usually show all the actual sleeping sites,
1384    /// but it might also show other backtraces which were part of
1385    /// the implementation of some complex relevant future.
1386    ///
1387    /// **Details**
1388    ///
1389    /// When a future's implementation wants to sleep,
1390    /// it needs to record the [`Waker`] (from the [`Context`])
1391    /// so that the "other end" can call `.wake()` on it later,
1392    /// when the future should be woken.
1393    ///
1394    /// Since `Context.waker()` gives `&Waker`, borrowed from the `Context`,
1395    /// the future must clone the `Waker`,
1396    /// and it must do so in within the `poll()` call.
1397    ///
1398    /// A future which is waiting in a `select!` will typically
1399    /// show multiple traces, one for each branch.
1400    /// But,
1401    /// if a future sleeps on one thing, and then when polled again later,
1402    /// sleeps on something different, without waking up in between,
1403    /// both backtrace locations will be shown.
1404    /// And,
1405    /// a complicated future contraption *might* clone the `Waker` more times.
1406    /// So not every backtrace will necessarily be informative.
1407    ///
1408    /// ### Panics
1409    ///
1410    /// Panics on write errors.
1411    pub fn debug_dump(&self) {
1412        self.as_debug_dump().to_stderr();
1413    }
1414
1415    /// Dump the executor's state including backtraces of waiting tasks
1416    ///
1417    /// This is considerably more extensive than simply
1418    /// `MockExecutor as Debug`.
1419    ///
1420    /// Returns an object for formatting with [`Debug`].
1421    /// To simply print the dump to stderr (eg in a test),
1422    /// use [`.debug_dump()`](MockExecutor::debug_dump).
1423    ///
1424    /// **Backtrace salience (possible spurious traces)** -
1425    /// see [`.debug_dump()`](MockExecutor::debug_dump).
1426    pub fn as_debug_dump(&self) -> DebugDump {
1427        let data = self.shared.lock();
1428        DebugDump(Right(data))
1429    }
1430}
1431
1432impl Data {
1433    /// Convenience function: dump including backtraces, to stderr
1434    fn debug_dump(&mut self) {
1435        DebugDump(Left(self)).to_stderr();
1436    }
1437}
1438
1439impl DebugDump<'_> {
1440    /// Convenience function: dump tasks and backtraces to stderr
1441    #[allow(clippy::wrong_self_convention)] // "to_stderr" doesn't mean "convert to stderr"
1442    fn to_stderr(self) {
1443        write!(io::stderr().lock(), "{:?}", self)
1444            .unwrap_or_else(|e| error_report!(e, "failed to write debug dump to stderr"));
1445    }
1446}
1447
1448//---------- bespoke Debug impls ----------
1449
1450impl Debug for DebugDump<'_> {
1451    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1452        let self_: &Data = &self.0;
1453
1454        writeln!(f, "MockExecutor state:\n{self_:#?}")?;
1455        writeln!(f, "MockExecutor task dump:")?;
1456        self_.dump_backtraces(f)?;
1457
1458        Ok(())
1459    }
1460}
1461
1462// See `impl Debug for Data` for notes on the output
1463impl Debug for Task {
1464    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1465        let Task { desc, state, fut } = self;
1466        write!(f, "{:?}", desc)?;
1467        write!(f, "=")?;
1468        match fut {
1469            None => write!(f, "P")?,
1470            Some(TaskFutureInfo::Normal(_)) => write!(f, "f")?,
1471            Some(TaskFutureInfo::Main) => write!(f, "m")?,
1472            Some(TaskFutureInfo::Subthread) => write!(f, "T")?,
1473        }
1474        match state {
1475            Awake => write!(f, "W")?,
1476            Asleep(locs) => write!(f, "s{}", locs.len())?,
1477        };
1478        Ok(())
1479    }
1480}
1481
1482/// Helper: `Debug`s as a list of tasks, given the `Data` for lookups and a list of the ids
1483///
1484/// `Task`s in `Data` are printed as `Ti(ID)"SPEC"=FLAGS"`.
1485///
1486/// `FLAGS` are:
1487///
1488///  * `T`: this task is for a Subthread (from subthread_spawn).
1489///  * `P`: this task is being polled (its `TaskFutureInfo` is absent)
1490///  * `f`: this is a normal task with a future and its future is present in `Data`
1491///  * `m`: this is the main task from `block_on`
1492///
1493///  * `W`: the task is awake
1494///  * `s<n>`: the task is asleep, and `<n>` is the number of recorded sleeping locations
1495//
1496// We do it this way because the naive dump from derive is very expansive
1497// and makes it impossible to see the wood for the trees.
1498// This very compact representation it easier to find a task of interest in the output.
1499//
1500// This is implemented in `impl Debug for Task`.
1501//
1502//
1503// rustc doesn't think automatically-derived Debug impls count for whether a thing is used.
1504// This has caused quite some fallout.  https://github.com/rust-lang/rust/pull/85200
1505// I think derive_more emits #[automatically_derived], so that even though we use this
1506// in our Debug impl, that construction is unused.
1507#[allow(dead_code)]
1508struct DebugTasks<'d, F>(&'d Data, F);
1509
1510// See `impl Debug for Data` for notes on the output
1511impl<F, I> Debug for DebugTasks<'_, F>
1512where
1513    F: Fn() -> I,
1514    I: Iterator<Item = TaskId>,
1515{
1516    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1517        let DebugTasks(data, ids) = self;
1518        for (id, delim) in izip!(ids(), chain!(iter::once(""), iter::repeat(" ")),) {
1519            write!(f, "{delim}{id:?}")?;
1520            match data.tasks.get(id) {
1521                None => write!(f, "-")?,
1522                Some(task) => write!(f, "={task:?}")?,
1523            }
1524        }
1525        Ok(())
1526    }
1527}
1528
1529/// Mock `Backtrace` for miri
1530///
1531/// See also the not-miri `type SleepLocation`, alias above.
1532#[cfg(miri)]
1533mod miri_sleep_location {
1534    #[derive(Debug, derive_more::Display)]
1535    #[display("<SleepLocation>")]
1536    pub(super) struct SleepLocation {}
1537
1538    impl SleepLocation {
1539        pub(super) fn force_capture() -> Self {
1540            SleepLocation {}
1541        }
1542    }
1543}
1544#[cfg(miri)]
1545use miri_sleep_location::SleepLocation;
1546
1547#[cfg(test)]
1548mod test {
1549    // @@ begin test lint list maintained by maint/add_warning @@
1550    #![allow(clippy::bool_assert_comparison)]
1551    #![allow(clippy::clone_on_copy)]
1552    #![allow(clippy::dbg_macro)]
1553    #![allow(clippy::mixed_attributes_style)]
1554    #![allow(clippy::print_stderr)]
1555    #![allow(clippy::print_stdout)]
1556    #![allow(clippy::single_char_pattern)]
1557    #![allow(clippy::unwrap_used)]
1558    #![allow(clippy::unchecked_duration_subtraction)]
1559    #![allow(clippy::useless_vec)]
1560    #![allow(clippy::needless_pass_by_value)]
1561    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1562    use super::*;
1563    use futures::channel::mpsc;
1564    use futures::{SinkExt as _, StreamExt as _};
1565    use strum::IntoEnumIterator;
1566    use tracing::info;
1567
1568    #[cfg(not(miri))] // trace! asks for the time, which miri doesn't support
1569    use tracing_test::traced_test;
1570
1571    fn various_mock_executors() -> impl Iterator<Item = MockExecutor> {
1572        // This duplicates the part of the logic in MockRuntime::test_with_various which
1573        // relates to MockExecutor, because we don't have a MockRuntime::builder.
1574        // The only parameter to MockExecutor is its scheduling policy, so this seems fine.
1575        SchedulingPolicy::iter().map(|scheduling| {
1576            eprintln!("===== MockExecutor::with_scheduling({scheduling:?}) =====");
1577            MockExecutor::with_scheduling(scheduling)
1578        })
1579    }
1580
1581    #[cfg_attr(not(miri), traced_test)]
1582    #[test]
1583    fn simple() {
1584        let runtime = MockExecutor::default();
1585        let val = runtime.block_on(async { 42 });
1586        assert_eq!(val, 42);
1587    }
1588
1589    #[cfg_attr(not(miri), traced_test)]
1590    #[test]
1591    fn stall() {
1592        let runtime = MockExecutor::default();
1593
1594        runtime.block_on({
1595            let runtime = runtime.clone();
1596            async move {
1597                const N: usize = 3;
1598                let (mut txs, mut rxs): (Vec<_>, Vec<_>) =
1599                    (0..N).map(|_| mpsc::channel::<usize>(5)).unzip();
1600
1601                let mut rx_n = rxs.pop().unwrap();
1602
1603                for (i, mut rx) in rxs.into_iter().enumerate() {
1604                    runtime.spawn_identified(i, {
1605                        let mut txs = txs.clone();
1606                        async move {
1607                            loop {
1608                                eprintln!("task {i} rx...");
1609                                let v = rx.next().await.unwrap();
1610                                let nv = v + 1;
1611                                eprintln!("task {i} rx {v}, tx {nv}");
1612                                let v = nv;
1613                                txs[v].send(v).await.unwrap();
1614                            }
1615                        }
1616                    });
1617                }
1618
1619                dbg!();
1620                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1621
1622                dbg!();
1623                runtime.progress_until_stalled().await;
1624
1625                dbg!();
1626                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1627
1628                dbg!();
1629                txs[0].send(0).await.unwrap();
1630
1631                dbg!();
1632                runtime.progress_until_stalled().await;
1633
1634                dbg!();
1635                let r = rx_n.next().await;
1636                assert_eq!(r, Some(N - 1));
1637
1638                dbg!();
1639                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1640
1641                runtime.spawn_identified("tx", {
1642                    let txs = txs.clone();
1643                    async {
1644                        eprintln!("sending task...");
1645                        for (i, mut tx) in txs.into_iter().enumerate() {
1646                            eprintln!("sending 0 to {i}...");
1647                            tx.send(0).await.unwrap();
1648                        }
1649                        eprintln!("sending task done");
1650                    }
1651                });
1652
1653                runtime.debug_dump();
1654
1655                for i in 0..txs.len() {
1656                    eprintln!("main {i} wait stall...");
1657                    runtime.progress_until_stalled().await;
1658                    eprintln!("main {i} rx wait...");
1659                    let r = rx_n.next().await;
1660                    eprintln!("main {i} rx = {r:?}");
1661                    assert!(r == Some(0) || r == Some(N - 1));
1662                }
1663
1664                eprintln!("finishing...");
1665                runtime.progress_until_stalled().await;
1666                eprintln!("finished.");
1667            }
1668        });
1669    }
1670
1671    #[cfg_attr(not(miri), traced_test)]
1672    #[test]
1673    fn spawn_blocking() {
1674        let runtime = MockExecutor::default();
1675
1676        runtime.block_on({
1677            let runtime = runtime.clone();
1678            async move {
1679                let thr_1 = runtime.spawn_blocking(|| 42);
1680                let thr_2 = runtime.spawn_blocking(|| 99);
1681
1682                assert_eq!(thr_2.await, 99);
1683                assert_eq!(thr_1.await, 42);
1684            }
1685        });
1686    }
1687
1688    #[cfg_attr(not(miri), traced_test)]
1689    #[test]
1690    fn drop_reentrancy() {
1691        // Check that dropping a completed task future is done *outside* the data lock.
1692        // Involves a contrived future whose Drop impl reenters the executor.
1693        //
1694        // If `_fut_drop_late = fut` in execute_until_first_stall (the main loop)
1695        // is replaced with `drop(fut)` (dropping the future at the wrong moment),
1696        // we do indeed get deadlock, so this test case is working.
1697
1698        struct ReentersOnDrop {
1699            runtime: MockExecutor,
1700        }
1701        impl Future for ReentersOnDrop {
1702            type Output = ();
1703            fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<()> {
1704                Poll::Ready(())
1705            }
1706        }
1707        impl Drop for ReentersOnDrop {
1708            fn drop(&mut self) {
1709                self.runtime
1710                    .spawn_identified("dummy", futures::future::ready(()));
1711            }
1712        }
1713
1714        for runtime in various_mock_executors() {
1715            runtime.block_on(async {
1716                runtime.spawn_identified("trapper", {
1717                    let runtime = runtime.clone();
1718                    ReentersOnDrop { runtime }
1719                });
1720            });
1721        }
1722    }
1723
1724    #[cfg_attr(not(miri), traced_test)]
1725    #[test]
1726    fn subthread_oneshot() {
1727        for runtime in various_mock_executors() {
1728            runtime.block_on(async {
1729                let (tx, rx) = oneshot::channel();
1730                info!("spawning subthread");
1731                let thr = runtime.subthread_spawn("thr1", {
1732                    let runtime = runtime.clone();
1733                    move || {
1734                        info!("subthread_block_on_future...");
1735                        let i = runtime.subthread_block_on_future(rx).unwrap();
1736                        info!("subthread_block_on_future => {i}");
1737                        i + 1
1738                    }
1739                });
1740                info!("main task sending");
1741                tx.send(12).unwrap();
1742                info!("main task sent");
1743                let r = thr.await.unwrap();
1744                info!("main task thr => {r}");
1745                assert_eq!(r, 13);
1746            });
1747        }
1748    }
1749
1750    #[cfg_attr(not(miri), traced_test)]
1751    #[test]
1752    #[allow(clippy::cognitive_complexity)] // It's is not that complicated, really.
1753    fn subthread_pingpong() {
1754        for runtime in various_mock_executors() {
1755            runtime.block_on(async {
1756                let (mut i_tx, mut i_rx) = mpsc::channel(1);
1757                let (mut o_tx, mut o_rx) = mpsc::channel(1);
1758                info!("spawning subthread");
1759                let thr = runtime.subthread_spawn("thr", {
1760                    let runtime = runtime.clone();
1761                    move || {
1762                        while let Some(i) = {
1763                            info!("thread receiving ...");
1764                            runtime.subthread_block_on_future(i_rx.next())
1765                        } {
1766                            let o = i + 12;
1767                            info!("thread received {i}, sending {o}");
1768                            runtime.subthread_block_on_future(o_tx.send(o)).unwrap();
1769                            info!("thread sent {o}");
1770                        }
1771                        info!("thread exiting");
1772                        42
1773                    }
1774                });
1775                for i in 0..2 {
1776                    info!("main task sending {i}");
1777                    i_tx.send(i).await.unwrap();
1778                    info!("main task sent {i}");
1779                    let o = o_rx.next().await.unwrap();
1780                    info!("main task recv => {o}");
1781                    assert_eq!(o, i + 12);
1782                }
1783                info!("main task dropping sender");
1784                drop(i_tx);
1785                info!("main task awaiting thread");
1786                let r = thr.await.unwrap();
1787                info!("main task complete");
1788                assert_eq!(r, 42);
1789            });
1790        }
1791    }
1792}