1
//! Executor for running tests with mocked environment
2
//!
3
//! See [`MockExecutor`]
4

            
5
use std::any::Any;
6
use std::cell::Cell;
7
use std::collections::VecDeque;
8
use std::fmt::{self, Debug, Display};
9
use std::future::Future;
10
use std::io::{self, Write as _};
11
use std::iter;
12
use std::mem;
13
use std::panic::{catch_unwind, panic_any, AssertUnwindSafe};
14
use std::pin::{pin, Pin};
15
use std::sync::{Arc, Mutex, MutexGuard, Weak};
16
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
17

            
18
use futures::future::Map;
19
use futures::pin_mut;
20
use futures::task::{FutureObj, Spawn, SpawnError};
21
use futures::FutureExt as _;
22

            
23
use assert_matches::assert_matches;
24
use educe::Educe;
25
use itertools::Either::{self, *};
26
use itertools::{chain, izip};
27
use slotmap_careful::DenseSlotMap;
28
use std::backtrace::Backtrace;
29
use strum::EnumIter;
30

            
31
// NB: when using traced_test, the trace! and error! output here is generally suppressed
32
// in tests of other crates.  To see it, you can write something like this
33
// (in the dev-dependencies of the crate whose tests you're running):
34
//    tracing-test = { version = "0.2.4", features = ["no-env-filter"] }
35
use tracing::{error, trace};
36

            
37
use oneshot_fused_workaround::{self as oneshot, Canceled, Receiver};
38
use tor_error::error_report;
39
use tor_rtcompat::{Blocking, ToplevelBlockOn};
40

            
41
use Poll::*;
42
use TaskState::*;
43

            
44
/// Type-erased future, one for each of our (normal) tasks
45
type TaskFuture = FutureObj<'static, ()>;
46

            
47
/// Future for the argument to `block_on`, which is handled specially
48
type MainFuture<'m> = Pin<&'m mut dyn Future<Output = ()>>;
49

            
50
//---------- principal data structures ----------
51

            
52
/// Executor for running tests with mocked environment
53
///
54
/// For test cases which don't actually wait for anything in the real world.
55
///
56
/// This is the executor.
57
/// It implements [`Spawn`] and [`ToplevelBlockOn`]
58
///
59
/// It will usually be used as part of a `MockRuntime`.
60
///
61
/// To run futures, call [`ToplevelBlockOn::block_on`]
62
///
63
/// # Restricted environment
64
///
65
/// Tests run with this executor must not attempt to block
66
/// on anything "outside":
67
/// every future that anything awaits must (eventually) be woken directly
68
/// *by some other task* in the same test case.
69
///
70
/// (By directly we mean that the [`Waker::wake`] call is made
71
/// by that waking future, before that future itself awaits anything.)
72
///
73
/// # Panics
74
///
75
/// The executor will panic
76
/// if the toplevel future (passed to `block_on`)
77
/// doesn't complete (without externally blocking),
78
/// but instead waits for something.
79
///
80
/// The executor will malfunction or panic if reentered.
81
/// (Eg, if `block_on` is reentered.)
82
#[derive(Clone, Default, Educe)]
83
#[educe(Debug)]
84
pub struct MockExecutor {
85
    /// Mutable state
86
    #[educe(Debug(ignore))]
87
    shared: Arc<Shared>,
88
}
89

            
90
/// Shared state and ancillary information
91
///
92
/// This is always within an `Arc`.
93
#[derive(Default)]
94
struct Shared {
95
    /// Shared state
96
    data: Mutex<Data>,
97
    /// Condition variable for thread scheduling
98
    ///
99
    /// Signaled when [`Data.thread_to_run`](struct.Data.html#structfield.thread_to_run)
100
    /// is modified.
101
    thread_condvar: std::sync::Condvar,
102
}
103

            
104
/// Task id, module to hide `Ti` alias
105
mod task_id {
106
    slotmap_careful::new_key_type! {
107
        /// Task ID, usually called `TaskId`
108
        ///
109
        /// Short name in special `task_id` module so that [`Debug`] is nice
110
        pub(super) struct Ti;
111
    }
112
}
113
use task_id::Ti as TaskId;
114

            
115
/// Executor's state
116
///
117
/// ### Task state machine
118
///
119
/// A task is created in `tasks`, `Awake`, so also in `awake`.
120
///
121
/// When we poll it, we take it out of `awake` and set it to `Asleep`,
122
/// and then call `poll()`.
123
/// Any time after that, it can be made `Awake` again (and put back onto `awake`)
124
/// by the waker ([`ActualWaker`], wrapped in [`Waker`]).
125
///
126
/// The task's future is of course also present here in this data structure.
127
/// However, during poll we must release the lock,
128
/// so we cannot borrow the future from `Data`.
129
/// Instead, we move it out.  So `Task.fut` is an `Option`.
130
///
131
/// ### "Main" task - the argument to `block_on`
132
///
133
/// The signature of `BlockOn::block_on` accepts a non-`'static` future
134
/// (and a non-`Send`/`Sync` one).
135
///
136
/// So we cannot store that future in `Data` because `Data` is `'static`.
137
/// Instead, this main task future is passed as an argument down the call stack.
138
/// In the data structure we simply store a placeholder, `TaskFutureInfo::Main`.
139
56656
#[derive(Educe, derive_more::Debug)]
140
#[educe(Default)]
141
struct Data {
142
    /// Tasks
143
    ///
144
    /// Includes tasks spawned with `spawn`,
145
    /// and also the future passed to `block_on`.
146
2
    #[debug("{:?}", DebugTasks(self, || tasks.keys()))]
147
    tasks: DenseSlotMap<TaskId, Task>,
148

            
149
    /// `awake` lists precisely: tasks that are `Awake`, plus maybe stale `TaskId`s
150
    ///
151
    /// Tasks are pushed onto the *back* when woken,
152
    /// so back is the most recently woken.
153
2
    #[debug("{:?}", DebugTasks(self, || awake.iter().cloned()))]
154
    awake: VecDeque<TaskId>,
155

            
156
    /// If a future from `progress_until_stalled` exists
157
    progressing_until_stalled: Option<ProgressingUntilStalled>,
158

            
159
    /// Scheduling policy
160
    scheduling: SchedulingPolicy,
161

            
162
    /// (Sub)thread we want to run now
163
    ///
164
    /// At any one time only one thread is meant to be running.
165
    /// Other threads are blocked in condvar wait, waiting for this to change.
166
    ///
167
    /// **Modified only** within
168
    /// [`thread_context_switch_send_instruction_to_run`](Shared::thread_context_switch_send_instruction_to_run),
169
    /// which takes responsibility for preserving the following **invariants**:
170
    ///
171
    ///  1. no-one but the named thread is allowed to modify this field.
172
    ///  2. after modifying this field, signal `thread_condvar`
173
    #[educe(Default(expression = "ThreadDescriptor::Executor"))]
174
    thread_to_run: ThreadDescriptor,
175
}
176

            
177
/// How we should schedule?
178
592
#[derive(Debug, Clone, Default, EnumIter)]
179
#[non_exhaustive]
180
pub enum SchedulingPolicy {
181
    /// Task *most* recently woken is run
182
    ///
183
    /// This is the default.
184
    ///
185
    /// It will expose starvation bugs if a task never sleeps.
186
    /// (Which is a good thing in tests.)
187
    #[default]
188
    Stack,
189
    /// Task *least* recently woken is run.
190
    Queue,
191
}
192

            
193
/// Record of a single task
194
///
195
/// Tracks a spawned task, or the main task (the argument to `block_on`).
196
///
197
/// Stored in [`Data`]`.tasks`.
198
struct Task {
199
    /// For debugging output
200
    desc: String,
201
    /// Has this been woken via a waker?  (And is it in `Data.awake`?)
202
    ///
203
    /// **Set to `Awake` only by [`Task::set_awake`]**,
204
    /// preserving the invariant that
205
    /// every `Awake` task is in [`Data.awake`](struct.Data.html#structfield.awake).
206
    state: TaskState,
207
    /// The actual future (or a placeholder for it)
208
    ///
209
    /// May be `None` briefly in the executor main loop, because we've
210
    /// temporarily moved it out so we can poll it,
211
    /// or if this is a Subthread task which is currently running sync code
212
    /// (in which case we're blocked in the executor waiting to be
213
    /// woken up by [`thread_context_switch`](Shared::thread_context_switch).
214
    ///
215
    /// Note that the `None` can be observed outside the main loop, because
216
    /// the main loop unlocks while it polls, so other (non-main-loop) code
217
    /// might see it.
218
    fut: Option<TaskFutureInfo>,
219
}
220

            
221
/// A future as stored in our record of a [`Task`]
222
#[derive(Educe)]
223
#[educe(Debug)]
224
enum TaskFutureInfo {
225
    /// The [`Future`].  All is normal.
226
    Normal(#[educe(Debug(ignore))] TaskFuture),
227
    /// The future isn't here because this task is the main future for `block_on`
228
    Main,
229
    /// This task is actually a [`Subthread`](MockExecutor::subthread_spawn)
230
    ///
231
    /// Instead of polling it, we'll switch to it with
232
    /// [`thread_context_switch`](Shared::thread_context_switch).
233
    Subthread,
234
}
235

            
236
/// State of a task - do we think it needs to be polled?
237
///
238
/// Stored in [`Task`]`.state`.
239
#[derive(Debug)]
240
enum TaskState {
241
    /// Awake - needs to be polled
242
    ///
243
    /// Established by [`waker.wake()`](Waker::wake)
244
    Awake,
245
    /// Asleep - does *not* need to be polled
246
    ///
247
    /// Established each time just before we call the future's [`poll`](Future::poll)
248
    Asleep(Vec<SleepLocation>),
249
}
250

            
251
/// Actual implementor of `Wake` for use in a `Waker`
252
///
253
/// Futures (eg, channels from [`futures`]) will use this to wake a task
254
/// when it should be polled.
255
///
256
/// This type must not be `Cloned` with the `Data` lock held.
257
/// Consequently, a `Waker` mustn't either.
258
struct ActualWaker {
259
    /// Executor state
260
    ///
261
    /// The Waker mustn't to hold a strong reference to the executor,
262
    /// since typically a task holds a future that holds a Waker,
263
    /// and the executor holds the task - so that would be a cycle.
264
    data: Weak<Shared>,
265

            
266
    /// Which task this is
267
    id: TaskId,
268
}
269

            
270
/// State used for an in-progress call to
271
/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`]
272
///
273
/// If present in [`Data`], an (async) call to `progress_until_stalled`
274
/// is in progress.
275
///
276
/// The future from `progress_until_stalled`, [`ProgressUntilStalledFuture`]
277
/// is a normal-ish future.
278
/// It can be polled in the normal way.
279
/// When it is polled, it looks here, in `finished`, to see if it's `Ready`.
280
///
281
/// The future is made ready, and woken (via `waker`),
282
/// by bespoke code in the task executor loop.
283
///
284
/// When `ProgressUntilStalledFuture` (maybe completes and) is dropped,
285
/// its `Drop` impl is used to remove this from `Data.progressing_until_stalled`.
286
#[derive(Debug)]
287
struct ProgressingUntilStalled {
288
    /// Have we, in fact, stalled?
289
    ///
290
    /// Made `Ready` by special code in the executor loop
291
    finished: Poll<()>,
292

            
293
    /// Waker
294
    ///
295
    /// Signalled by special code in the executor loop
296
    waker: Option<Waker>,
297
}
298

            
299
/// Future from
300
/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`]
301
///
302
/// See [`ProgressingUntilStalled`] for an overview of this aspect of the contraption.
303
///
304
/// Existence of this struct implies `Data.progressing_until_stalled` is `Some`.
305
/// There can only be one at a time.
306
#[derive(Educe)]
307
#[educe(Debug)]
308
struct ProgressUntilStalledFuture {
309
    /// Executor's state; this future's state is in `.progressing_until_stalled`
310
    #[educe(Debug(ignore))]
311
    shared: Arc<Shared>,
312
}
313

            
314
/// Identifies a thread we know about - the executor thread, or a Subthread
315
///
316
/// Not related to `std::thread::ThreadId`.
317
///
318
/// See [`spawn_subthread`](MockExecutor::subthread_spawn) for definition of a Subthread.
319
///
320
/// This being a thread-local and not scoped by which `MockExecutor` we're talking about
321
/// means that we can't cope if there are multiple `MockExecutor`s involved in the same thread.
322
/// That's OK (and documented).
323
#[derive(Copy, Clone, Eq, PartialEq, derive_more::Debug)]
324
enum ThreadDescriptor {
325
    /// Foreign - neither the (running) executor, nor a Subthread
326
    #[debug("FOREIGN")]
327
    Foreign,
328
    /// The executor.
329
    #[debug("Exe")]
330
    Executor,
331
    /// This task, which is a Subthread.
332
    #[debug("{_0:?}")]
333
    Subthread(TaskId),
334
}
335

            
336
/// Marker indicating that this task is a Subthread, not an async task.
337
///
338
/// See [`spawn_subthread`](MockExecutor::subthread_spawn) for definition of a Subthread.
339
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
340
struct IsSubthread;
341

            
342
/// [`Shared::subthread_yield`] should set our task awake before switching to the executor
343
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
344
struct SetAwake;
345

            
346
thread_local! {
347
    /// Identifies this thread.
348
    pub static THREAD_DESCRIPTOR: Cell<ThreadDescriptor> = const {
349
        Cell::new(ThreadDescriptor::Foreign)
350
    };
351
}
352

            
353
//---------- creation ----------
354

            
355
impl MockExecutor {
356
    /// Make a `MockExecutor` with default parameters
357
4
    pub fn new() -> Self {
358
4
        Self::default()
359
4
    }
360

            
361
    /// Make a `MockExecutor` with a specific `SchedulingPolicy`
362
7113
    pub fn with_scheduling(scheduling: SchedulingPolicy) -> Self {
363
7113
        Data {
364
7113
            scheduling,
365
7113
            ..Default::default()
366
7113
        }
367
7113
        .into()
368
7113
    }
369
}
370

            
371
impl From<Data> for MockExecutor {
372
7113
    fn from(data: Data) -> MockExecutor {
373
7113
        let shared = Shared {
374
7113
            data: Mutex::new(data),
375
7113
            thread_condvar: std::sync::Condvar::new(),
376
7113
        };
377
7113
        MockExecutor {
378
7113
            shared: Arc::new(shared),
379
7113
        }
380
7113
    }
381
}
382

            
383
//---------- spawning ----------
384

            
385
impl MockExecutor {
386
    /// Spawn a task and return something to identify it
387
    ///
388
    /// `desc` should `Display` as some kind of short string (ideally without spaces)
389
    /// and will be used in the `Debug` impl and trace log messages from `MockExecutor`.
390
    ///
391
    /// The returned value is an opaque task identifier which is very cheap to clone
392
    /// and which can be used by the caller in debug logging,
393
    /// if it's desired to correlate with the debug output from `MockExecutor`.
394
    /// Most callers will want to ignore it.
395
    ///
396
    /// This method is infallible.  (The `MockExecutor` cannot be shut down.)
397
186
    pub fn spawn_identified(
398
186
        &self,
399
186
        desc: impl Display,
400
186
        fut: impl Future<Output = ()> + Send + 'static,
401
186
    ) -> impl Debug + Clone + Send + 'static {
402
186
        self.spawn_internal(desc.to_string(), FutureObj::from(Box::new(fut)))
403
186
    }
404

            
405
    /// Spawn a task and return its output for further usage
406
    ///
407
    /// `desc` should `Display` as some kind of short string (ideally without spaces)
408
    /// and will be used in the `Debug` impl and trace log messages from `MockExecutor`.
409
16
    pub fn spawn_join<T: Debug + Send + 'static>(
410
16
        &self,
411
16
        desc: impl Display,
412
16
        fut: impl Future<Output = T> + Send + 'static,
413
16
    ) -> impl Future<Output = T> {
414
16
        let (tx, rx) = oneshot::channel();
415
16
        self.spawn_identified(desc, async move {
416
16
            let res = fut.await;
417
16
            tx.send(res)
418
16
                .expect("Failed to send future's output, did future panic?");
419
16
        });
420
16
        rx.map(|m| m.expect("Failed to receive future's output"))
421
16
    }
422

            
423
    /// Spawn a task and return its `TaskId`
424
    ///
425
    /// Convenience method for use by `spawn_identified` and `spawn_obj`.
426
    /// The future passed to `block_on` is not handled here.
427
67898
    fn spawn_internal(&self, desc: String, fut: TaskFuture) -> TaskId {
428
67898
        let mut data = self.shared.lock();
429
67898
        data.insert_task(desc, TaskFutureInfo::Normal(fut))
430
67898
    }
431
}
432

            
433
impl Data {
434
    /// Insert a task given its `TaskFutureInfo` and return its `TaskId`.
435
75590
    fn insert_task(&mut self, desc: String, fut: TaskFutureInfo) -> TaskId {
436
75590
        let state = Awake;
437
75590
        let id = self.tasks.insert(Task {
438
75590
            state,
439
75590
            desc,
440
75590
            fut: Some(fut),
441
75590
        });
442
75590
        self.awake.push_back(id);
443
75590
        trace!("MockExecutor spawned {:?}={:?}", id, self.tasks[id]);
444
75590
        id
445
75590
    }
446
}
447

            
448
impl Spawn for MockExecutor {
449
64821
    fn spawn_obj(&self, future: TaskFuture) -> Result<(), SpawnError> {
450
64821
        self.spawn_internal("spawn_obj".into(), future);
451
64821
        Ok(())
452
64821
    }
453
}
454

            
455
impl MockExecutor {
456
    /// Implementation of `spawn_blocking` and `blocking_io`
457
4
    fn spawn_thread_inner<F, T>(&self, f: F) -> <Self as Blocking>::ThreadHandle<T>
458
4
    where
459
4
        F: FnOnce() -> T + Send + 'static,
460
4
        T: Send + 'static,
461
4
    {
462
4
        // For the mock executor, everything runs on the same thread.
463
4
        // If we need something more complex in the future, we can change this.
464
4
        let (tx, rx) = oneshot::channel();
465
4
        self.spawn_identified("Blocking".to_string(), async move {
466
4
            match tx.send(f()) {
467
4
                Ok(()) => (),
468
                Err(_) => panic!("Failed to send future's output, did future panic?"),
469
            }
470
4
        });
471
4
        rx.map(Box::new(|m| m.expect("Failed to receive future's output")))
472
4
    }
473
}
474

            
475
impl Blocking for MockExecutor {
476
    type ThreadHandle<T: Send + 'static> =
477
        Map<Receiver<T>, Box<dyn FnOnce(Result<T, Canceled>) -> T>>;
478

            
479
4
    fn spawn_blocking<F, T>(&self, f: F) -> Self::ThreadHandle<T>
480
4
    where
481
4
        F: FnOnce() -> T + Send + 'static,
482
4
        T: Send + 'static,
483
4
    {
484
        assert_matches!(
485
4
            THREAD_DESCRIPTOR.get(),
486
            ThreadDescriptor::Executor | ThreadDescriptor::Subthread(_),
487
 "MockExecutor::spawn_blocking_io only allowed from future or subthread, being run by this executor"
488
        );
489
4
        self.spawn_thread_inner(f)
490
4
    }
491

            
492
    fn reenter_block_on<F>(&self, future: F) -> F::Output
493
    where
494
        F: Future,
495
        F::Output: Send + 'static,
496
    {
497
        self.subthread_block_on_future(future)
498
    }
499

            
500
    fn blocking_io<F, T>(&self, f: F) -> impl Future<Output = T>
501
    where
502
        F: FnOnce() -> T + Send + 'static,
503
        T: Send + 'static,
504
    {
505
        assert_eq!(
506
            THREAD_DESCRIPTOR.get(),
507
            ThreadDescriptor::Executor,
508
            "MockExecutor::blocking_io only allowed from future being polled by this executor"
509
        );
510
        self.spawn_thread_inner(f)
511
    }
512
}
513

            
514
//---------- block_on ----------
515

            
516
impl ToplevelBlockOn for MockExecutor {
517
334
    fn block_on<F>(&self, input_fut: F) -> F::Output
518
334
    where
519
334
        F: Future,
520
334
    {
521
334
        let mut value: Option<F::Output> = None;
522
334

            
523
334
        // Box this just so that we can conveniently control precisely when it's dropped.
524
334
        // (We could do this with Option and Pin::set but that seems clumsier.)
525
334
        let mut input_fut = Box::pin(input_fut);
526
334

            
527
334
        let run_store_fut = {
528
334
            let value = &mut value;
529
334
            let input_fut = &mut input_fut;
530
334
            async {
531
334
                trace!("MockExecutor block_on future...");
532
334
                let t = input_fut.await;
533
334
                trace!("MockExecutor block_on future returned...");
534
334
                *value = Some(t);
535
334
                trace!("MockExecutor block_on future exiting.");
536
334
            }
537
        };
538

            
539
        {
540
334
            pin_mut!(run_store_fut);
541
334

            
542
334
            let main_id = self
543
334
                .shared
544
334
                .lock()
545
334
                .insert_task("main".into(), TaskFutureInfo::Main);
546
334
            trace!("MockExecutor {main_id:?} is task for block_on");
547
334
            self.execute_to_completion(run_store_fut);
548
334
        }
549
334

            
550
334
        #[allow(clippy::let_and_return)] // clarity
551
334
        let value = value.take().unwrap_or_else(|| {
552
            // eprintln can be captured by libtest, but the debug_dump goes to io::stderr.
553
            // use the latter, so that the debug dump is prefixed by this message.
554
            let _: io::Result<()> = writeln!(io::stderr(), "all futures blocked, crashing...");
555
            // write to tracing too, so the tracing log is clear about when we crashed
556
            error!("all futures blocked, crashing...");
557

            
558
            // Sequencing here is subtle.
559
            //
560
            // We should do the dump before dropping the input future, because the input
561
            // future is likely to own things that, when dropped, wake up other tasks,
562
            // rendering the dump inaccurate.
563
            //
564
            // But also, dropping the input future may well drop a ProgressUntilStalledFuture
565
            // which then reenters us.  More generally, we mustn't call user code
566
            // with the lock held.
567
            //
568
            // And, we mustn't panic with the data lock held.
569
            //
570
            // If value was Some, then this closure is dropped without being called,
571
            // which drops the future after it has yielded the value, which is correct.
572
            {
573
                let mut data = self.shared.lock();
574
                data.debug_dump();
575
            }
576
            drop(input_fut);
577

            
578
            panic!(
579
                r"
580
all futures blocked. waiting for the real world? or deadlocked (waiting for each other) ?
581
"
582
            );
583
334
        });
584
334

            
585
334
        value
586
334
    }
587
}
588

            
589
//---------- execution - core implementation ----------
590

            
591
impl MockExecutor {
592
    /// Keep polling tasks until nothing more can be done
593
    ///
594
    /// Ie, stop when `awake` is empty and `progressing_until_stalled` is `None`.
595
7684
    fn execute_to_completion(&self, mut main_fut: MainFuture) {
596
7684
        trace!("MockExecutor execute_to_completion...");
597
        loop {
598
1055891
            self.execute_until_first_stall(main_fut.as_mut());
599

            
600
            // Handle `progressing_until_stalled`
601
1048207
            let pus_waker = {
602
1055891
                let mut data = self.shared.lock();
603
1055891
                let pus = &mut data.progressing_until_stalled;
604
1055891
                trace!("MockExecutor execute_to_completion PUS={:?}", &pus);
605
1055891
                let Some(pus) = pus else {
606
                    // No progressing_until_stalled, we're actually done.
607
7684
                    break;
608
                };
609
1048207
                assert_eq!(
610
                    pus.finished, Pending,
611
                    "ProgressingUntilStalled finished twice?!"
612
                );
613
1048207
                pus.finished = Ready(());
614
1048207

            
615
1048207
                // Release the lock temporarily so that ActualWaker::clone doesn't deadlock
616
1048207
                let waker = pus
617
1048207
                    .waker
618
1048207
                    .take()
619
1048207
                    .expect("ProgressUntilStalledFuture not ever polled!");
620
1048207
                drop(data);
621
1048207
                let waker_copy = waker.clone();
622
1048207
                let mut data = self.shared.lock();
623
1048207

            
624
1048207
                let pus = &mut data.progressing_until_stalled;
625
1048207
                if let Some(double) = mem::replace(
626
1048207
                    &mut pus
627
1048207
                        .as_mut()
628
1048207
                        .expect("progressing_until_stalled updated under our feet!")
629
1048207
                        .waker,
630
1048207
                    Some(waker),
631
1048207
                ) {
632
                    panic!("double progressing_until_stalled.waker! {double:?}");
633
1048207
                }
634
1048207

            
635
1048207
                waker_copy
636
1048207
            };
637
1048207
            pus_waker.wake();
638
        }
639
7684
        trace!("MockExecutor execute_to_completion done");
640
7684
    }
641

            
642
    /// Keep polling tasks until `awake` is empty
643
    ///
644
    /// (Ignores `progressing_until_stalled` - so if one is active,
645
    /// will return when all other tasks have blocked.)
646
    ///
647
    /// # Panics
648
    ///
649
    /// Might malfunction or panic if called reentrantly
650
1055891
    fn execute_until_first_stall(&self, main_fut: MainFuture) {
651
1055891
        trace!("MockExecutor execute_until_first_stall ...");
652

            
653
1055891
        assert_eq!(
654
1055891
            THREAD_DESCRIPTOR.get(),
655
            ThreadDescriptor::Foreign,
656
            "MockExecutor executor re-entered"
657
        );
658
1055891
        THREAD_DESCRIPTOR.set(ThreadDescriptor::Executor);
659
1055891

            
660
1076662
        let r = catch_unwind(AssertUnwindSafe(|| self.executor_main_loop(main_fut)));
661
1055891

            
662
1055891
        THREAD_DESCRIPTOR.set(ThreadDescriptor::Foreign);
663
1055891

            
664
1055891
        match r {
665
1055891
            Ok(()) => trace!("MockExecutor execute_until_first_stall done."),
666
            Err(e) => {
667
                trace!("MockExecutor executor, or async task, panicked!");
668
                panic_any(e)
669
            }
670
        }
671
1055891
    }
672

            
673
    /// Keep polling tasks until `awake` is empty (inner, executor main loop)
674
    ///
675
    /// This is only called from [`MockExecutor::execute_until_first_stall`],
676
    /// so it could also be called `execute_until_first_stall_inner`.
677
    #[allow(clippy::cognitive_complexity)]
678
1055891
    fn executor_main_loop(&self, mut main_fut: MainFuture) {
679
        'outer: loop {
680
            // Take a `Awake` task off `awake` and make it `Asleep`
681
1386746
            let (id, mut fut) = 'inner: loop {
682
2443710
                let mut data = self.shared.lock();
683
2443710
                let Some(id) = data.schedule() else {
684
1055891
                    break 'outer;
685
                };
686
1387819
                let Some(task) = data.tasks.get_mut(id) else {
687
1073
                    trace!("MockExecutor {id:?} vanished");
688
1073
                    continue;
689
                };
690
1386746
                task.state = Asleep(vec![]);
691
1386746
                let fut = task.fut.take().expect("future missing from task!");
692
1386746
                break 'inner (id, fut);
693
1386746
            };
694
1386746

            
695
1386746
            // Poll the selected task
696
1386746
            trace!("MockExecutor {id:?} polling...");
697
1386746
            let waker = ActualWaker::make_waker(&self.shared, id);
698
1386746
            let mut cx = Context::from_waker(&waker);
699
1386746
            let r: Either<Poll<()>, IsSubthread> = match &mut fut {
700
85453
                TaskFutureInfo::Normal(fut) => Left(fut.poll_unpin(&mut cx)),
701
1301233
                TaskFutureInfo::Main => Left(main_fut.as_mut().poll(&mut cx)),
702
60
                TaskFutureInfo::Subthread => Right(IsSubthread),
703
            };
704

            
705
            // Deal with the returned `Poll`
706
            let _fut_drop_late;
707
            {
708
1386746
                let mut data = self.shared.lock();
709
1386746
                let task = data
710
1386746
                    .tasks
711
1386746
                    .get_mut(id)
712
1386746
                    .expect("task vanished while we were polling it");
713

            
714
1386686
                match r {
715
                    Left(Pending) => {
716
1348746
                        trace!("MockExecutor {id:?} -> Pending");
717
1348746
                        if task.fut.is_some() {
718
                            panic!("task reinserted while we polled it?!");
719
1348746
                        }
720
1348746
                        // The task might have been woken *by its own poll method*.
721
1348746
                        // That's why we set it to `Asleep` *earlier* rather than here.
722
1348746
                        // All we need to do is put the future back.
723
1348746
                        task.fut = Some(fut);
724
                    }
725
                    Left(Ready(())) => {
726
37940
                        trace!("MockExecutor {id:?} -> Ready");
727
                        // Oh, it finished!
728
                        // It might be in `awake`, but that's allowed to contain stale tasks,
729
                        // so we *don't* need to scan that list and remove it.
730
37940
                        data.tasks.remove(id);
731
37940
                        // It is important that we don't drop `fut` until we have released
732
37940
                        // the data lock, since it is an external type and might try to reenter
733
37940
                        // us (eg by calling spawn).  If we do that here, we risk deadlock.
734
37940
                        // So, move `fut` to a variable with scope outside the block with `data`.
735
37940
                        _fut_drop_late = fut;
736
                    }
737
                    Right(IsSubthread) => {
738
60
                        trace!("MockExecutor {id:?} -> Ready, waking Subthread");
739
                        // Task is a subthread, which has called thread_context_switch
740
                        // to switch to us.  We "poll" it by switching back.
741

            
742
                        // Put back `TFI::Subthread`, which was moved out temporarily, above.
743
60
                        task.fut = Some(fut);
744
60

            
745
60
                        self.shared.thread_context_switch(
746
60
                            data,
747
60
                            ThreadDescriptor::Executor,
748
60
                            ThreadDescriptor::Subthread(id),
749
60
                        );
750

            
751
                        // Now, if the Subthread still exists, that's because it's switched
752
                        // back to us, and is waiting in subthread_block_on_future again.
753
                        // Or it might have ended, in which case it's not in `tasks` any more.
754
                        // In any case we can go back to scheduling futures.
755
                    }
756
                }
757
            }
758
        }
759
1055891
    }
760
}
761

            
762
impl Data {
763
    /// Return the next task to run
764
    ///
765
    /// The task is removed from `awake`, but **`state` is not set to `Asleep`**.
766
    /// The caller must restore the invariant!
767
2443710
    fn schedule(&mut self) -> Option<TaskId> {
768
        use SchedulingPolicy as SP;
769
2443710
        match self.scheduling {
770
1229449
            SP::Stack => self.awake.pop_back(),
771
1214261
            SP::Queue => self.awake.pop_front(),
772
        }
773
2443710
    }
774
}
775

            
776
impl ActualWaker {
777
    /// Obtain a strong reference to the executor's data
778
6076113
    fn upgrade_data(&self) -> Option<Arc<Shared>> {
779
6076113
        self.data.upgrade()
780
6076113
    }
781

            
782
    /// Wake the task corresponding to this `ActualWaker`
783
    ///
784
    /// This is like `<Self as std::task::Wake>::wake()` but takes `&self`, not `Arc`
785
1830753
    fn wake(&self) {
786
1830753
        let Some(data) = self.upgrade_data() else {
787
            // The executor is gone!  Don't try to wake.
788
4
            return;
789
        };
790
1830749
        let mut data = data.lock();
791
1830749
        let data = &mut *data;
792
1830749
        trace!("MockExecutor {:?} wake", &self.id);
793
1830749
        let Some(task) = data.tasks.get_mut(self.id) else {
794
2754
            return;
795
        };
796
1827995
        task.set_awake(self.id, &mut data.awake);
797
1830753
    }
798

            
799
    /// Create and return a `Waker` for task `id`
800
1386774
    fn make_waker(shared: &Arc<Shared>, id: TaskId) -> Waker {
801
1386774
        ActualWaker {
802
1386774
            data: Arc::downgrade(shared),
803
1386774
            id,
804
1386774
        }
805
1386774
        .new_waker()
806
1386774
    }
807
}
808

            
809
//---------- "progress until stalled" functionality ----------
810

            
811
impl MockExecutor {
812
    /// Run tasks in the current executor until every other task is waiting
813
    ///
814
    /// # Panics
815
    ///
816
    /// Might malfunction or panic if more than one such call is running at once.
817
    ///
818
    /// (Ie, you must `.await` or drop the returned `Future`
819
    /// before calling this method again.)
820
    ///
821
    /// Must be called and awaited within a future being run by `self`.
822
1048207
    pub fn progress_until_stalled(&self) -> impl Future<Output = ()> {
823
1048207
        let mut data = self.shared.lock();
824
1048207
        assert!(
825
1048207
            data.progressing_until_stalled.is_none(),
826
            "progress_until_stalled called more than once"
827
        );
828
1048207
        trace!("MockExecutor progress_until_stalled...");
829
1048207
        data.progressing_until_stalled = Some(ProgressingUntilStalled {
830
1048207
            finished: Pending,
831
1048207
            waker: None,
832
1048207
        });
833
1048207
        ProgressUntilStalledFuture {
834
1048207
            shared: self.shared.clone(),
835
1048207
        }
836
1048207
    }
837
}
838

            
839
impl Future for ProgressUntilStalledFuture {
840
    type Output = ();
841

            
842
2096414
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
843
2096414
        let waker = cx.waker().clone();
844
2096414
        let mut data = self.shared.lock();
845
2096414
        let pus = data.progressing_until_stalled.as_mut();
846
2096414
        trace!("MockExecutor progress_until_stalled polling... {:?}", &pus);
847
2096414
        let pus = pus.expect("ProgressingUntilStalled missing");
848
2096414
        pus.waker = Some(waker);
849
2096414
        pus.finished
850
2096414
    }
851
}
852

            
853
impl Drop for ProgressUntilStalledFuture {
854
1048207
    fn drop(&mut self) {
855
1048207
        self.shared.lock().progressing_until_stalled = None;
856
1048207
    }
857
}
858

            
859
//---------- (sub)threads ----------
860

            
861
impl MockExecutor {
862
    /// Spawn a "Subthread", for processing in a sync context
863
    ///
864
    /// `call` will be run on a separate thread, called a "Subthread".
865
    ///
866
    /// But it will **not run simultaneously** with the executor,
867
    /// nor with other Subthreads.
868
    /// So Subthreads are somewhat like coroutines.
869
    ///
870
    /// `call` must be capable of making progress without waiting for any other Subthreads.
871
    /// `call` may wait for async futures, using
872
    /// [`subthread_block_on_future`](MockExecutor::subthread_block_on_future).
873
    ///
874
    /// Subthreads may be used for cpubound activity,
875
    /// or synchronous IO (such as large volumes of disk activity),
876
    /// provided that the synchronous code will reliably make progress,
877
    /// without waiting (directly or indirectly) for any async task or Subthread -
878
    /// except via `subthread_block_on_future`.
879
    ///
880
    /// # Subthreads vs raw `std::thread` threads
881
    ///
882
    /// Programs using `MockExecutor` may use `std::thread` threads directly.
883
    /// However, this is not recommended.  There are severe limitations:
884
    ///
885
    ///  * Only a Subthread can re-enter the async context from sync code:
886
    ///    this must be done with
887
    ///    using [`subthread_block_on_future`](MockExecutor::subthread_block_on_future).
888
    ///    (Re-entering the executor with
889
    ///    [`block_on`](tor_rtcompat::ToplevelBlockOn::block_on)
890
    ///    is not allowed.)
891
    ///  * If async tasks want to suspend waiting for synchronous code,
892
    ///    the synchronous code must run on a Subthread.
893
    ///    This allows the `MockExecutor` to know when
894
    ///    that synchronous code is still making progress.
895
    ///    (This is needed for
896
    ///    [`progress_until_stalled`](MockExecutor::progress_until_stalled)
897
    ///    and the facilities which use it, such as
898
    ///    [`MockRuntime::advance_until_stalled`](crate::MockRuntime::advance_until_stalled).)
899
    ///  * Subthreads never run in parallel -
900
    ///    they only run as scheduled deterministically by the `MockExecutor`.
901
    ///    So using Subthreads eliminates a source of test nonndeterminism.
902
    ///    (Execution order is still varied due to explicitly varying the scheduling policy.)
903
    ///
904
    /// # Panics, abuse, and malfunctions
905
    ///
906
    /// If `call` panics and unwinds, `spawn_subthread` yields `Err`.
907
    /// The application code should to do something about it if this happens,
908
    /// typically, logging errors, tearing things down, or failing a test case.
909
    ///
910
    /// If the executor doesn't run, the subthread will not run either, and will remain stuck.
911
    /// (So, typically, if the thread supposed to run the executor panics,
912
    /// for example because a future or the executor itself panics,
913
    /// all the subthreads will become stuck - effectively, they'll be leaked.)
914
    ///
915
    /// `spawn_subthread` panics if OS thread spawning fails.
916
    /// (Like `std::thread::spawn()` does.)
917
    ///
918
    /// `MockExecutor`s will malfunction or panic if
919
    /// any executor invocation method (eg `block_on`) is called on a Subthread.
920
8
    pub fn subthread_spawn<T: Send + 'static>(
921
8
        &self,
922
8
        desc: impl Display,
923
8
        call: impl FnOnce() -> T + Send + 'static,
924
8
    ) -> impl Future<Output = Result<T, Box<dyn Any + Send>>> + Unpin + Send + Sync + 'static {
925
8
        let desc = desc.to_string();
926
8
        let (output_tx, output_rx) = oneshot::channel();
927
8

            
928
8
        // NB: we don't know which thread we're on!
929
8
        // In principle we might be on another Subthread.
930
8
        // So we can't context switch here.  That would be very confusing.
931
8
        //
932
8
        // Instead, we prepare the new Subthread as follows:
933
8
        //   - There is a task in the executor
934
8
        //   - The task is ready to be polled, whenever the executor decides to
935
8
        //   - The thread starts running right away, but immediately waits until it is scheduled
936
8
        // See `subthread_entrypoint`.
937
8

            
938
8
        {
939
8
            let mut data = self.shared.lock();
940
8
            let id = data.insert_task(desc.clone(), TaskFutureInfo::Subthread);
941
8

            
942
8
            let _: std::thread::JoinHandle<()> = std::thread::Builder::new()
943
8
                .name(desc)
944
8
                .spawn({
945
8
                    let shared = self.shared.clone();
946
8
                    move || shared.subthread_entrypoint(id, call, output_tx)
947
8
                })
948
8
                .expect("spawn failed");
949
8
        }
950
8

            
951
8
        output_rx.map(|r| {
952
8
            r.unwrap_or_else(|_: Canceled| panic!("Subthread cancelled but should be impossible!"))
953
8
        })
954
8
    }
955

            
956
    /// Call an async `Future` from a Subthread
957
    ///
958
    /// Blocks the Subthread, and arranges to run async tasks,
959
    /// including `fut`, until `fut` completes.
960
    ///
961
    /// `fut` is polled on the executor thread, not on the Subthread.
962
    /// (We may change that in the future, allowing passing a non-`Send` future.)
963
    ///
964
    /// # Panics, abuse, and malfunctions
965
    ///
966
    /// `subthread_block_on_future` will malfunction or panic
967
    /// if called on a thread that isn't a Subthread from the same `MockExecutor`
968
    /// (ie a thread made with [`spawn_subthread`](MockExecutor::subthread_spawn)).
969
    ///
970
    /// If `fut` itself panics, the executor will panic.
971
    ///
972
    /// If the executor isn't running, `subthread_block_on_future` will hang indefinitely.
973
    /// See `spawn_subthread`.
974
    #[allow(clippy::cognitive_complexity)] // Splitting this up would be worse
975
24
    pub fn subthread_block_on_future<T: Send + 'static>(&self, fut: impl Future<Output = T>) -> T {
976
24
        let id = match THREAD_DESCRIPTOR.get() {
977
24
            ThreadDescriptor::Subthread(id) => id,
978
            ThreadDescriptor::Executor => {
979
                panic!("subthread_block_on_future called from MockExecutor thread (async task?)")
980
            }
981
            ThreadDescriptor::Foreign => panic!(
982
    "subthread_block_on_future called on foreign thread (not spawned with spawn_subthread)"
983
            ),
984
        };
985
24
        trace!("MockExecutor thread {id:?}, subthread_block_on_future...");
986
24
        let mut fut = pin!(fut);
987
24

            
988
24
        // We yield once before the first poll, and once after Ready, to shake up the
989
24
        // execution order a bit, depending on the scheduling policy.
990
52
        let yield_ = |set_awake| self.shared.subthread_yield(id, set_awake);
991
24
        yield_(Some(SetAwake));
992

            
993
24
        let ret = loop {
994
            // Poll the provided future
995
28
            trace!("MockExecutor thread {id:?}, s.t._block_on_future polling...");
996
28
            let waker = ActualWaker::make_waker(&self.shared, id);
997
28
            let mut cx = Context::from_waker(&waker);
998
28
            let r: Poll<T> = fut.as_mut().poll(&mut cx);
999

            
28
            if let Ready(r) = r {
24
                trace!("MockExecutor thread {id:?}, s.t._block_on_future poll -> Ready");
24
                break r;
4
            }
4

            
4
            // Pending.  Switch back to the exeuctor thread.
4
            // When the future becomes ready, the Waker will be woken, waking the task,
4
            // so that the executor will "poll" us again.
4
            trace!("MockExecutor thread {id:?}, s.t._block_on_future poll -> Pending");
4
            yield_(None);
        };
24
        yield_(Some(SetAwake));
24

            
24
        trace!("MockExecutor thread {id:?}, subthread_block_on_future complete.");
24
        ret
24
    }
}
impl Shared {
    /// Main entrypoint function for a Subthread
    ///
    /// Entered on a new `std::thread` thread created by
    /// [`subthread_spawn`](MockExecutor::subthread_spawn).
    ///
    /// When `call` completes, sends its returned value `T` to `output_tx`.
8
    fn subthread_entrypoint<T: Send + 'static>(
8
        self: Arc<Self>,
8
        id: TaskId,
8
        call: impl FnOnce() -> T + Send + 'static,
8
        output_tx: oneshot::Sender<Result<T, Box<dyn Any + Send>>>,
8
    ) {
8
        THREAD_DESCRIPTOR.set(ThreadDescriptor::Subthread(id));
8
        trace!("MockExecutor thread {id:?}, entrypoint");
        // We start out Awake, but we wait for the executor to tell us to run.
        // This will be done the first time the task is "polled".
8
        {
8
            let data = self.lock();
8
            self.thread_context_switch_waitfor_instruction_to_run(
8
                data,
8
                ThreadDescriptor::Subthread(id),
8
            );
8
        }
8

            
8
        trace!("MockExecutor thread {id:?}, entering user code");
        // Run the user's actual thread function.
        // This will typically reenter us via subthread_block_on_future.
8
        let ret = catch_unwind(AssertUnwindSafe(call));
8

            
8
        trace!("MockExecutor thread {id:?}, completed user code");
        // This makes the return value from subthread_spawn ready.
        // It will be polled by the executor in due course, presumably.
8
        output_tx.send(ret).unwrap_or_else(
8
            #[allow(clippy::unnecessary_lazy_evaluations)]
8
            |_| {}, // receiver dropped, maybe executor dropped or something?
8
        );
8

            
8
        {
8
            let mut data = self.lock();
8

            
8
            // Never poll this task again (so never schedule this thread)
8
            let _: Task = data.tasks.remove(id).expect("Subthread task vanished!");
8

            
8
            // Tell the executor it is scheduled now.
8
            // We carry on exiting, in parallel (holding the data lock).
8
            self.thread_context_switch_send_instruction_to_run(
8
                &mut data,
8
                ThreadDescriptor::Subthread(id),
8
                ThreadDescriptor::Executor,
8
            );
8
        }
8
    }
    /// Yield back to the executor from a subthread
    ///
    /// Checks that things are in order
    /// (in particular, that this task is in the data structure as a subhtread)
    /// and switches to the executor thread.
    ///
    /// The caller must arrange that the task gets woken.
    ///
    /// With [`SetAwake`], sets our task awake, so that we'll be polled
    /// again as soon as we get to the top of the executor's queue.
    /// Otherwise, we'll be reentered after someone wakes a [`Waker`] for the task.
52
    fn subthread_yield(&self, us: TaskId, set_awake: Option<SetAwake>) {
52
        let mut data = self.lock();
52
        {
52
            let data = &mut *data;
52
            let task = data.tasks.get_mut(us).expect("Subthread task vanished!");
52
            match &task.fut {
52
                Some(TaskFutureInfo::Subthread) => {}
                other => panic!("subthread_block_on_future but TFI {other:?}"),
            };
52
            if let Some(SetAwake) = set_awake {
48
                task.set_awake(us, &mut data.awake);
48
            }
        }
52
        self.thread_context_switch(
52
            data,
52
            ThreadDescriptor::Subthread(us),
52
            ThreadDescriptor::Executor,
52
        );
52
    }
    /// Switch from (sub)thread `us` to (sub)thread `them`
    ///
    /// Returns when someone calls `thread_context_switch(.., us)`.
112
    fn thread_context_switch(
112
        &self,
112
        mut data: MutexGuard<Data>,
112
        us: ThreadDescriptor,
112
        them: ThreadDescriptor,
112
    ) {
112
        trace!("MockExecutor thread {us:?}, switching to {them:?}");
112
        self.thread_context_switch_send_instruction_to_run(&mut data, us, them);
112
        self.thread_context_switch_waitfor_instruction_to_run(data, us);
112
    }
    /// Instruct the (sub)thread `them` to run
    ///
    /// Update `thread_to_run`, which will wake up `them`'s
    /// call to `thread_context_switch_waitfor_instruction_to_run`.
    ///
    /// Must be called from (sub)thread `us`.
    /// Part of `thread_context_switch`, not normally called directly.
120
    fn thread_context_switch_send_instruction_to_run(
120
        &self,
120
        data: &mut MutexGuard<Data>,
120
        us: ThreadDescriptor,
120
        them: ThreadDescriptor,
120
    ) {
120
        assert_eq!(data.thread_to_run, us);
120
        data.thread_to_run = them;
120
        self.thread_condvar.notify_all();
120
    }
    /// Await an instruction for this thread, `us`, to run
    ///
    /// Waits for `thread_to_run` to be `us`,
    /// waiting for `thread_condvar` as necessary.
    ///
    /// Part of `thread_context_switch`, not normally called directly.
120
    fn thread_context_switch_waitfor_instruction_to_run(
120
        &self,
120
        data: MutexGuard<Data>,
120
        us: ThreadDescriptor,
120
    ) {
120
        #[allow(let_underscore_lock)]
120
        let _: MutexGuard<_> = self
120
            .thread_condvar
298
            .wait_while(data, |data| {
238
                let live = data.thread_to_run;
238
                let resume = live == us;
238
                if resume {
120
                    trace!("MockExecutor thread {us:?}, resuming");
                } else {
118
                    trace!("MockExecutor thread {us:?}, waiting for {live:?}");
                }
                // We're in `.wait_while`, not `.wait_until`.  Confusing.
238
                !resume
298
            })
120
            .expect("data lock poisoned");
120
    }
}
//---------- ancillary and convenience functions ----------
/// Trait to let us assert at compile time that something is nicely `Sync` etc.
#[allow(dead_code)] // yes, we don't *use* anything from this trait
trait EnsureSyncSend: Sync + Send + 'static {}
impl EnsureSyncSend for ActualWaker {}
impl EnsureSyncSend for MockExecutor {}
impl MockExecutor {
    /// Return the number of tasks running in this executor
    ///
    /// One possible use is for a test case to check that task(s)
    /// that ought to have exited, have indeed done so.
    ///
    /// In the usual case, the answer will be at least 1,
    /// because it counts the future passed to
    /// [`block_on`](MockExecutor::block_on)
    /// (perhaps via [`MockRuntime::test_with_various`](crate::MockRuntime::test_with_various)).
204
    pub fn n_tasks(&self) -> usize {
204
        self.shared.lock().tasks.len()
204
    }
}
impl Shared {
    /// Lock and obtain the guard
    ///
    /// Convenience method which panics on poison
16279355
    fn lock(&self) -> MutexGuard<Data> {
16279355
        self.data.lock().expect("data lock poisoned")
16279355
    }
}
impl Task {
    /// Set task `id` to `Awake` and arrange that it will be polled.
1828043
    fn set_awake(&mut self, id: TaskId, data_awake: &mut VecDeque<TaskId>) {
1828043
        match self.state {
484704
            Awake => {}
1343339
            Asleep(_) => {
1343339
                self.state = Awake;
1343339
                data_awake.push_back(id);
1343339
            }
        }
1828043
    }
}
//---------- ActualWaker as RawWaker ----------
/// Using [`ActualWaker`] in a [`RawWaker`]
///
/// We need to make a
/// [`Waker`] (the safe, type-erased, waker, used by actual futures)
/// which contains an
/// [`ActualWaker`] (our actual waker implementation, also safe).
///
/// `std` offers `Waker::from<Arc<impl Wake>>`.
/// But we want a bespoke `Clone` implementation, so we don't want to use `Arc`.
///
/// So instead, we implement the `RawWaker` API in terms of `ActualWaker`.
/// We keep the `ActualWaker` in a `Box`, and actually `clone` it (and the `Box`).
///
/// SAFETY
///
///  * The data pointer is `Box::<ActualWaker>::into_raw()`
///  * We share these when we clone
///  * No-one is allowed `&mut ActualWaker` unless there are no other clones
///  * So we may make references `&ActualWaker`
impl ActualWaker {
    /// Wrap up an [`ActualWaker`] as a type-erased [`Waker`] for passing to futures etc.
1386774
    fn new_waker(self) -> Waker {
1386774
        unsafe { Waker::from_raw(self.raw_new()) }
1386774
    }
    /// Helper: wrap up an [`ActualWaker`] as a [`RawWaker`].
5632134
    fn raw_new(self) -> RawWaker {
5632134
        let self_: Box<ActualWaker> = self.into();
5632134
        let self_: *mut ActualWaker = Box::into_raw(self_);
5632134
        let self_: *const () = self_ as _;
5632134
        RawWaker::new(self_, &RAW_WAKER_VTABLE)
5632134
    }
    /// Implementation of [`RawWakerVTable`]'s `clone`
4245360
    unsafe fn raw_clone(self_: *const ()) -> RawWaker {
4245360
        let self_: *const ActualWaker = self_ as _;
4245360
        let self_: &ActualWaker = self_.as_ref().unwrap_unchecked();
4245360
        let copy: ActualWaker = self_.clone();
4245360
        copy.raw_new()
4245360
    }
    /// Implementation of [`RawWakerVTable`]'s `wake`
1342785
    unsafe fn raw_wake(self_: *const ()) {
1342785
        Self::raw_wake_by_ref(self_);
1342785
        Self::raw_drop(self_);
1342785
    }
    /// Implementation of [`RawWakerVTable`]'s `wake_ref_by`
1830753
    unsafe fn raw_wake_by_ref(self_: *const ()) {
1830753
        let self_: *const ActualWaker = self_ as _;
1830753
        let self_: &ActualWaker = self_.as_ref().unwrap_unchecked();
1830753
        self_.wake();
1830753
    }
    /// Implementation of [`RawWakerVTable`]'s `drop`
5599902
    unsafe fn raw_drop(self_: *const ()) {
5599902
        let self_: *mut ActualWaker = self_ as _;
5599902
        let self_: Box<ActualWaker> = Box::from_raw(self_);
5599902
        drop(self_);
5599902
    }
}
/// vtable for `Box<ActualWaker>` as `RawWaker`
//
// This ought to be in the impl block above, but
//   "associated `static` items are not allowed"
static RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
    ActualWaker::raw_clone,
    ActualWaker::raw_wake,
    ActualWaker::raw_wake_by_ref,
    ActualWaker::raw_drop,
);
//---------- Sleep location tracking and dumping ----------
/// We record "where a future went to sleep" as (just) a backtrace
///
/// This type alias allows us to mock `Backtrace` for miri.
/// (It also insulates from future choices about sleep location representation.0
#[cfg(not(miri))]
type SleepLocation = Backtrace;
impl Data {
    /// Dump tasks and their sleep location backtraces
2
    fn dump_backtraces(&self, f: &mut fmt::Formatter) -> fmt::Result {
8
        for (id, task) in self.tasks.iter() {
12
            let prefix = |f: &mut fmt::Formatter| write!(f, "{id:?}={task:?}: ");
8
            match &task.state {
                Awake => {
2
                    prefix(f)?;
2
                    writeln!(f, "awake")?;
                }
6
                Asleep(locs) => {
6
                    let n = locs.len();
6
                    for (i, loc) in locs.iter().enumerate() {
6
                        prefix(f)?;
6
                        writeln!(f, "asleep, backtrace {i}/{n}:\n{loc}",)?;
                    }
6
                    if n == 0 {
                        prefix(f)?;
                        writeln!(f, "asleep, no backtraces, Waker never cloned, stuck!",)?;
6
                    }
                }
            }
        }
2
        writeln!(
2
            f,
2
            "\nNote: there might be spurious traces, see docs for MockExecutor::debug_dump\n"
2
        )?;
2
        Ok(())
2
    }
}
/// Track sleep locations via `<Waker as Clone>`.
///
/// See [`MockExecutor::debug_dump`] for the explanation.
impl Clone for ActualWaker {
4245360
    fn clone(&self) -> Self {
4245360
        let id = self.id;
4245360
        if let Some(data) = self.upgrade_data() {
            // If the executor is gone, there is nothing to adjust
4245360
            let mut data = data.lock();
4245360
            if let Some(task) = data.tasks.get_mut(self.id) {
4245360
                match &mut task.state {
244902
                    Awake => trace!("MockExecutor cloned waker for awake task {id:?}"),
4000458
                    Asleep(locs) => locs.push(SleepLocation::force_capture()),
                }
            } else {
                trace!("MockExecutor cloned waker for dead task {id:?}");
            }
        }
4245360
        ActualWaker {
4245360
            data: self.data.clone(),
4245360
            id,
4245360
        }
4245360
    }
}
//---------- API for full debug dump ----------
/// Debugging dump of a `MockExecutor`'s state
///
/// Returned by [`MockExecutor::as_debug_dump`]
//
// Existence implies backtraces have been resolved
//
// We use `Either` so that we can also use this internally when we have &mut Data.
pub struct DebugDump<'a>(Either<&'a Data, MutexGuard<'a, Data>>);
impl MockExecutor {
    /// Dump the executor's state including backtraces of waiting tasks, to stderr
    ///
    /// This is considerably more extensive than simply
    /// `MockExecutor as Debug`.
    ///
    /// (This is a convenience method, which wraps
    /// [`MockExecutor::as_debug_dump()`].
    ///
    /// ### Backtrace salience (possible spurious traces)
    ///
    /// **Summary**
    ///
    /// The technique used to capture backtraces when futures sleep is not 100% exact.
    /// It will usually show all the actual sleeping sites,
    /// but it might also show other backtraces which were part of
    /// the implementation of some complex relevant future.
    ///
    /// **Details**
    ///
    /// When a future's implementation wants to sleep,
    /// it needs to record the [`Waker`] (from the [`Context`])
    /// so that the "other end" can call `.wake()` on it later,
    /// when the future should be woken.
    ///
    /// Since `Context.waker()` gives `&Waker`, borrowed from the `Context`,
    /// the future must clone the `Waker`,
    /// and it must do so in within the `poll()` call.
    ///
    /// A future which is waiting in a `select!` will typically
    /// show multiple traces, one for each branch.
    /// But,
    /// if a future sleeps on one thing, and then when polled again later,
    /// sleeps on something different, without waking up in between,
    /// both backtrace locations will be shown.
    /// And,
    /// a complicated future contraption *might* clone the `Waker` more times.
    /// So not every backtrace will necessarily be informative.
    ///
    /// ### Panics
    ///
    /// Panics on write errors.
2
    pub fn debug_dump(&self) {
2
        self.as_debug_dump().to_stderr();
2
    }
    /// Dump the executor's state including backtraces of waiting tasks
    ///
    /// This is considerably more extensive than simply
    /// `MockExecutor as Debug`.
    ///
    /// Returns an object for formatting with [`Debug`].
    /// To simply print the dump to stderr (eg in a test),
    /// use [`.debug_dump()`](MockExecutor::debug_dump).
    ///
    /// **Backtrace salience (possible spurious traces)** -
    /// see [`.debug_dump()`](MockExecutor::debug_dump).
2
    pub fn as_debug_dump(&self) -> DebugDump {
2
        let data = self.shared.lock();
2
        DebugDump(Right(data))
2
    }
}
impl Data {
    /// Convenience function: dump including backtraces, to stderr
    fn debug_dump(&mut self) {
        DebugDump(Left(self)).to_stderr();
    }
}
impl DebugDump<'_> {
    /// Convenience function: dump tasks and backtraces to stderr
    #[allow(clippy::wrong_self_convention)] // "to_stderr" doesn't mean "convert to stderr"
2
    fn to_stderr(self) {
2
        write!(io::stderr().lock(), "{:?}", self)
2
            .unwrap_or_else(|e| error_report!(e, "failed to write debug dump to stderr"));
2
    }
}
//---------- bespoke Debug impls ----------
impl Debug for DebugDump<'_> {
2
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2
        let self_: &Data = &self.0;
2

            
2
        writeln!(f, "MockExecutor state:\n{self_:#?}")?;
2
        writeln!(f, "MockExecutor task dump:")?;
2
        self_.dump_backtraces(f)?;
2
        Ok(())
2
    }
}
// See `impl Debug for Data` for notes on the output
impl Debug for Task {
122
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
122
        let Task { desc, state, fut } = self;
122
        write!(f, "{:?}", desc)?;
122
        write!(f, "=")?;
118
        match fut {
4
            None => write!(f, "P")?,
80
            Some(TaskFutureInfo::Normal(_)) => write!(f, "f")?,
30
            Some(TaskFutureInfo::Main) => write!(f, "m")?,
8
            Some(TaskFutureInfo::Subthread) => write!(f, "T")?,
        }
122
        match state {
110
            Awake => write!(f, "W")?,
12
            Asleep(locs) => write!(f, "s{}", locs.len())?,
        };
122
        Ok(())
122
    }
}
/// Helper: `Debug`s as a list of tasks, given the `Data` for lookups and a list of the ids
///
/// `Task`s in `Data` are printed as `Ti(ID)"SPEC"=FLAGS"`.
///
/// `FLAGS` are:
///
///  * `T`: this task is for a Subthread (from subthread_spawn).
///  * `P`: this task is being polled (its `TaskFutureInfo` is absent)
///  * `f`: this is a normal task with a future and its future is present in `Data`
///  * `m`: this is the main task from `block_on`
///
///  * `W`: the task is awake
///  * `s<n>`: the task is asleep, and `<n>` is the number of recorded sleeping locations
//
// We do it this way because the naive dump from derive is very expansive
// and makes it impossible to see the wood for the trees.
// This very compact representation it easier to find a task of interest in the output.
//
// This is implemented in `impl Debug for Task`.
//
//
// rustc doesn't think automatically-derived Debug impls count for whether a thing is used.
// This has caused quite some fallout.  https://github.com/rust-lang/rust/pull/85200
// I think derive_more emits #[automatically_derived], so that even though we use this
// in our Debug impl, that construction is unused.
#[allow(dead_code)]
struct DebugTasks<'d, F>(&'d Data, F);
// See `impl Debug for Data` for notes on the output
impl<F, I> Debug for DebugTasks<'_, F>
where
    F: Fn() -> I,
    I: Iterator<Item = TaskId>,
{
4
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4
        let DebugTasks(data, ids) = self;
10
        for (id, delim) in izip!(ids(), chain!(iter::once(""), iter::repeat(" ")),) {
10
            write!(f, "{delim}{id:?}")?;
10
            match data.tasks.get(id) {
                None => write!(f, "-")?,
10
                Some(task) => write!(f, "={task:?}")?,
            }
        }
4
        Ok(())
4
    }
}
/// Mock `Backtrace` for miri
///
/// See also the not-miri `type SleepLocation`, alias above.
#[cfg(miri)]
mod miri_sleep_location {
    #[derive(Debug, derive_more::Display)]
    #[display("<SleepLocation>")]
    pub(super) struct SleepLocation {}
    impl SleepLocation {
        pub(super) fn force_capture() -> Self {
            SleepLocation {}
        }
    }
}
#[cfg(miri)]
use miri_sleep_location::SleepLocation;
#[cfg(test)]
mod test {
    // @@ begin test lint list maintained by maint/add_warning @@
    #![allow(clippy::bool_assert_comparison)]
    #![allow(clippy::clone_on_copy)]
    #![allow(clippy::dbg_macro)]
    #![allow(clippy::mixed_attributes_style)]
    #![allow(clippy::print_stderr)]
    #![allow(clippy::print_stdout)]
    #![allow(clippy::single_char_pattern)]
    #![allow(clippy::unwrap_used)]
    #![allow(clippy::unchecked_duration_subtraction)]
    #![allow(clippy::useless_vec)]
    #![allow(clippy::needless_pass_by_value)]
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
    use super::*;
    use futures::channel::mpsc;
    use futures::{SinkExt as _, StreamExt as _};
    use strum::IntoEnumIterator;
    use tracing::info;
    #[cfg(not(miri))] // trace! asks for the time, which miri doesn't support
    use tracing_test::traced_test;
    fn various_mock_executors() -> impl Iterator<Item = MockExecutor> {
        // This duplicates the part of the logic in MockRuntime::test_with_various which
        // relates to MockExecutor, because we don't have a MockRuntime::builder.
        // The only parameter to MockExecutor is its scheduling policy, so this seems fine.
        SchedulingPolicy::iter().map(|scheduling| {
            eprintln!("===== MockExecutor::with_scheduling({scheduling:?}) =====");
            MockExecutor::with_scheduling(scheduling)
        })
    }
    #[cfg_attr(not(miri), traced_test)]
    #[test]
    fn simple() {
        let runtime = MockExecutor::default();
        let val = runtime.block_on(async { 42 });
        assert_eq!(val, 42);
    }
    #[cfg_attr(not(miri), traced_test)]
    #[test]
    fn stall() {
        let runtime = MockExecutor::default();
        runtime.block_on({
            let runtime = runtime.clone();
            async move {
                const N: usize = 3;
                let (mut txs, mut rxs): (Vec<_>, Vec<_>) =
                    (0..N).map(|_| mpsc::channel::<usize>(5)).unzip();
                let mut rx_n = rxs.pop().unwrap();
                for (i, mut rx) in rxs.into_iter().enumerate() {
                    runtime.spawn_identified(i, {
                        let mut txs = txs.clone();
                        async move {
                            loop {
                                eprintln!("task {i} rx...");
                                let v = rx.next().await.unwrap();
                                let nv = v + 1;
                                eprintln!("task {i} rx {v}, tx {nv}");
                                let v = nv;
                                txs[v].send(v).await.unwrap();
                            }
                        }
                    });
                }
                dbg!();
                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
                dbg!();
                runtime.progress_until_stalled().await;
                dbg!();
                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
                dbg!();
                txs[0].send(0).await.unwrap();
                dbg!();
                runtime.progress_until_stalled().await;
                dbg!();
                let r = rx_n.next().await;
                assert_eq!(r, Some(N - 1));
                dbg!();
                let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
                runtime.spawn_identified("tx", {
                    let txs = txs.clone();
                    async {
                        eprintln!("sending task...");
                        for (i, mut tx) in txs.into_iter().enumerate() {
                            eprintln!("sending 0 to {i}...");
                            tx.send(0).await.unwrap();
                        }
                        eprintln!("sending task done");
                    }
                });
                runtime.debug_dump();
                for i in 0..txs.len() {
                    eprintln!("main {i} wait stall...");
                    runtime.progress_until_stalled().await;
                    eprintln!("main {i} rx wait...");
                    let r = rx_n.next().await;
                    eprintln!("main {i} rx = {r:?}");
                    assert!(r == Some(0) || r == Some(N - 1));
                }
                eprintln!("finishing...");
                runtime.progress_until_stalled().await;
                eprintln!("finished.");
            }
        });
    }
    #[cfg_attr(not(miri), traced_test)]
    #[test]
    fn spawn_blocking() {
        let runtime = MockExecutor::default();
        runtime.block_on({
            let runtime = runtime.clone();
            async move {
                let thr_1 = runtime.spawn_blocking(|| 42);
                let thr_2 = runtime.spawn_blocking(|| 99);
                assert_eq!(thr_2.await, 99);
                assert_eq!(thr_1.await, 42);
            }
        });
    }
    #[cfg_attr(not(miri), traced_test)]
    #[test]
    fn drop_reentrancy() {
        // Check that dropping a completed task future is done *outside* the data lock.
        // Involves a contrived future whose Drop impl reenters the executor.
        //
        // If `_fut_drop_late = fut` in execute_until_first_stall (the main loop)
        // is replaced with `drop(fut)` (dropping the future at the wrong moment),
        // we do indeed get deadlock, so this test case is working.
        struct ReentersOnDrop {
            runtime: MockExecutor,
        }
        impl Future for ReentersOnDrop {
            type Output = ();
            fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<()> {
                Poll::Ready(())
            }
        }
        impl Drop for ReentersOnDrop {
            fn drop(&mut self) {
                self.runtime
                    .spawn_identified("dummy", futures::future::ready(()));
            }
        }
        for runtime in various_mock_executors() {
            runtime.block_on(async {
                runtime.spawn_identified("trapper", {
                    let runtime = runtime.clone();
                    ReentersOnDrop { runtime }
                });
            });
        }
    }
    #[cfg_attr(not(miri), traced_test)]
    #[test]
    fn subthread_oneshot() {
        for runtime in various_mock_executors() {
            runtime.block_on(async {
                let (tx, rx) = oneshot::channel();
                info!("spawning subthread");
                let thr = runtime.subthread_spawn("thr1", {
                    let runtime = runtime.clone();
                    move || {
                        info!("subthread_block_on_future...");
                        let i = runtime.subthread_block_on_future(rx).unwrap();
                        info!("subthread_block_on_future => {i}");
                        i + 1
                    }
                });
                info!("main task sending");
                tx.send(12).unwrap();
                info!("main task sent");
                let r = thr.await.unwrap();
                info!("main task thr => {r}");
                assert_eq!(r, 13);
            });
        }
    }
    #[cfg_attr(not(miri), traced_test)]
    #[test]
    #[allow(clippy::cognitive_complexity)] // It's is not that complicated, really.
    fn subthread_pingpong() {
        for runtime in various_mock_executors() {
            runtime.block_on(async {
                let (mut i_tx, mut i_rx) = mpsc::channel(1);
                let (mut o_tx, mut o_rx) = mpsc::channel(1);
                info!("spawning subthread");
                let thr = runtime.subthread_spawn("thr", {
                    let runtime = runtime.clone();
                    move || {
                        while let Some(i) = {
                            info!("thread receiving ...");
                            runtime.subthread_block_on_future(i_rx.next())
                        } {
                            let o = i + 12;
                            info!("thread received {i}, sending {o}");
                            runtime.subthread_block_on_future(o_tx.send(o)).unwrap();
                            info!("thread sent {o}");
                        }
                        info!("thread exiting");
                        42
                    }
                });
                for i in 0..2 {
                    info!("main task sending {i}");
                    i_tx.send(i).await.unwrap();
                    info!("main task sent {i}");
                    let o = o_rx.next().await.unwrap();
                    info!("main task recv => {o}");
                    assert_eq!(o, i + 12);
                }
                info!("main task dropping sender");
                drop(i_tx);
                info!("main task awaiting thread");
                let r = thr.await.unwrap();
                info!("main task complete");
                assert_eq!(r, 42);
            });
        }
    }
}