tor_rtmock/task.rs
1//! Executor for running tests with mocked environment
2//!
3//! See [`MockExecutor`]
4
5use std::any::Any;
6use std::cell::Cell;
7use std::collections::VecDeque;
8use std::fmt::{self, Debug, Display};
9use std::future::Future;
10use std::io::{self, Write as _};
11use std::iter;
12use std::panic::{catch_unwind, panic_any, AssertUnwindSafe};
13use std::pin::{pin, Pin};
14use std::sync::{Arc, Mutex, MutexGuard, Weak};
15use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
16
17use futures::pin_mut;
18use futures::task::{FutureObj, Spawn, SpawnError};
19use futures::FutureExt as _;
20
21use assert_matches::assert_matches;
22use educe::Educe;
23use itertools::Either::{self, *};
24use itertools::{chain, izip};
25use slotmap_careful::DenseSlotMap;
26use std::backtrace::Backtrace;
27use strum::EnumIter;
28
29// NB: when using traced_test, the trace! and error! output here is generally suppressed
30// in tests of other crates. To see it, you can write something like this
31// (in the dev-dependencies of the crate whose tests you're running):
32// tracing-test = { version = "0.2.4", features = ["no-env-filter"] }
33use tracing::{error, trace};
34
35use oneshot_fused_workaround::{self as oneshot, Canceled};
36use tor_error::error_report;
37use tor_rtcompat::{Blocking, ToplevelBlockOn};
38
39use Poll::*;
40use TaskState::*;
41
42/// Type-erased future, one for each of our (normal) tasks
43type TaskFuture = FutureObj<'static, ()>;
44
45/// Future for the argument to `block_on`, which is handled specially
46type MainFuture<'m> = Pin<&'m mut dyn Future<Output = ()>>;
47
48//---------- principal data structures ----------
49
50/// Executor for running tests with mocked environment
51///
52/// For test cases which don't actually wait for anything in the real world.
53///
54/// This is the executor.
55/// It implements [`Spawn`] and [`ToplevelBlockOn`]
56///
57/// It will usually be used as part of a `MockRuntime`.
58///
59/// To run futures, call [`ToplevelBlockOn::block_on`]
60///
61/// # Restricted environment
62///
63/// Tests run with this executor must not attempt to block
64/// on anything "outside":
65/// every future that anything awaits must (eventually) be woken directly
66/// *by some other task* in the same test case.
67///
68/// (By directly we mean that the [`Waker::wake`] call is made
69/// by that waking future, before that future itself awaits anything.)
70///
71/// # Panics
72///
73/// The executor will panic
74/// if the toplevel future (passed to `block_on`)
75/// doesn't complete (without externally blocking),
76/// but instead waits for something.
77///
78/// The executor will malfunction or panic if reentered.
79/// (Eg, if `block_on` is reentered.)
80#[derive(Clone, Default, Educe)]
81#[educe(Debug)]
82pub struct MockExecutor {
83 /// Mutable state
84 #[educe(Debug(ignore))]
85 shared: Arc<Shared>,
86}
87
88/// Shared state and ancillary information
89///
90/// This is always within an `Arc`.
91#[derive(Default)]
92struct Shared {
93 /// Shared state
94 data: Mutex<Data>,
95 /// Condition variable for thread scheduling
96 ///
97 /// Signaled when [`Data.thread_to_run`](struct.Data.html#structfield.thread_to_run)
98 /// is modified.
99 thread_condvar: std::sync::Condvar,
100}
101
102/// Task id, module to hide `Ti` alias
103mod task_id {
104 slotmap_careful::new_key_type! {
105 /// Task ID, usually called `TaskId`
106 ///
107 /// Short name in special `task_id` module so that [`Debug`] is nice
108 pub(super) struct Ti;
109 }
110}
111use task_id::Ti as TaskId;
112
113/// Executor's state
114///
115/// ### Task state machine
116///
117/// A task is created in `tasks`, `Awake`, so also in `awake`.
118///
119/// When we poll it, we take it out of `awake` and set it to `Asleep`,
120/// and then call `poll()`.
121/// Any time after that, it can be made `Awake` again (and put back onto `awake`)
122/// by the waker ([`ActualWaker`], wrapped in [`Waker`]).
123///
124/// The task's future is of course also present here in this data structure.
125/// However, during poll we must release the lock,
126/// so we cannot borrow the future from `Data`.
127/// Instead, we move it out. So `Task.fut` is an `Option`.
128///
129/// ### "Main" task - the argument to `block_on`
130///
131/// The signature of `BlockOn::block_on` accepts a non-`'static` future
132/// (and a non-`Send`/`Sync` one).
133///
134/// So we cannot store that future in `Data` because `Data` is `'static`.
135/// Instead, this main task future is passed as an argument down the call stack.
136/// In the data structure we simply store a placeholder, `TaskFutureInfo::Main`.
137#[derive(Educe, derive_more::Debug)]
138#[educe(Default)]
139struct Data {
140 /// Tasks
141 ///
142 /// Includes tasks spawned with `spawn`,
143 /// and also the future passed to `block_on`.
144 #[debug("{:?}", DebugTasks(self, || tasks.keys()))]
145 tasks: DenseSlotMap<TaskId, Task>,
146
147 /// `awake` lists precisely: tasks that are `Awake`, plus maybe stale `TaskId`s
148 ///
149 /// Tasks are pushed onto the *back* when woken,
150 /// so back is the most recently woken.
151 #[debug("{:?}", DebugTasks(self, || awake.iter().cloned()))]
152 awake: VecDeque<TaskId>,
153
154 /// If a future from `progress_until_stalled` exists
155 progressing_until_stalled: Option<ProgressingUntilStalled>,
156
157 /// Scheduling policy
158 scheduling: SchedulingPolicy,
159
160 /// (Sub)thread we want to run now
161 ///
162 /// At any one time only one thread is meant to be running.
163 /// Other threads are blocked in condvar wait, waiting for this to change.
164 ///
165 /// **Modified only** within
166 /// [`thread_context_switch_send_instruction_to_run`](Shared::thread_context_switch_send_instruction_to_run),
167 /// which takes responsibility for preserving the following **invariants**:
168 ///
169 /// 1. no-one but the named thread is allowed to modify this field.
170 /// 2. after modifying this field, signal `thread_condvar`
171 #[educe(Default(expression = "ThreadDescriptor::Executor"))]
172 thread_to_run: ThreadDescriptor,
173}
174
175/// How we should schedule?
176#[derive(Debug, Clone, Default, EnumIter)]
177#[non_exhaustive]
178pub enum SchedulingPolicy {
179 /// Task *most* recently woken is run
180 ///
181 /// This is the default.
182 ///
183 /// It will expose starvation bugs if a task never sleeps.
184 /// (Which is a good thing in tests.)
185 #[default]
186 Stack,
187 /// Task *least* recently woken is run.
188 Queue,
189}
190
191/// Record of a single task
192///
193/// Tracks a spawned task, or the main task (the argument to `block_on`).
194///
195/// Stored in [`Data`]`.tasks`.
196struct Task {
197 /// For debugging output
198 desc: String,
199 /// Has this been woken via a waker? (And is it in `Data.awake`?)
200 ///
201 /// **Set to `Awake` only by [`Task::set_awake`]**,
202 /// preserving the invariant that
203 /// every `Awake` task is in [`Data.awake`](struct.Data.html#structfield.awake).
204 state: TaskState,
205 /// The actual future (or a placeholder for it)
206 ///
207 /// May be `None` briefly in the executor main loop, because we've
208 /// temporarily moved it out so we can poll it,
209 /// or if this is a Subthread task which is currently running sync code
210 /// (in which case we're blocked in the executor waiting to be
211 /// woken up by [`thread_context_switch`](Shared::thread_context_switch).
212 ///
213 /// Note that the `None` can be observed outside the main loop, because
214 /// the main loop unlocks while it polls, so other (non-main-loop) code
215 /// might see it.
216 fut: Option<TaskFutureInfo>,
217}
218
219/// A future as stored in our record of a [`Task`]
220#[derive(Educe)]
221#[educe(Debug)]
222enum TaskFutureInfo {
223 /// The [`Future`]. All is normal.
224 Normal(#[educe(Debug(ignore))] TaskFuture),
225 /// The future isn't here because this task is the main future for `block_on`
226 Main,
227 /// This task is actually a [`Subthread`](MockExecutor::subthread_spawn)
228 ///
229 /// Instead of polling it, we'll switch to it with
230 /// [`thread_context_switch`](Shared::thread_context_switch).
231 Subthread,
232}
233
234/// State of a task - do we think it needs to be polled?
235///
236/// Stored in [`Task`]`.state`.
237#[derive(Debug)]
238enum TaskState {
239 /// Awake - needs to be polled
240 ///
241 /// Established by [`waker.wake()`](Waker::wake)
242 Awake,
243 /// Asleep - does *not* need to be polled
244 ///
245 /// Established each time just before we call the future's [`poll`](Future::poll)
246 Asleep(Vec<SleepLocation>),
247}
248
249/// Actual implementor of `Wake` for use in a `Waker`
250///
251/// Futures (eg, channels from [`futures`]) will use this to wake a task
252/// when it should be polled.
253///
254/// This type must not be `Cloned` with the `Data` lock held.
255/// Consequently, a `Waker` mustn't either.
256struct ActualWaker {
257 /// Executor state
258 ///
259 /// The Waker mustn't to hold a strong reference to the executor,
260 /// since typically a task holds a future that holds a Waker,
261 /// and the executor holds the task - so that would be a cycle.
262 data: Weak<Shared>,
263
264 /// Which task this is
265 id: TaskId,
266}
267
268/// State used for an in-progress call to
269/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`]
270///
271/// If present in [`Data`], an (async) call to `progress_until_stalled`
272/// is in progress.
273///
274/// The future from `progress_until_stalled`, [`ProgressUntilStalledFuture`]
275/// is a normal-ish future.
276/// It can be polled in the normal way.
277/// When it is polled, it looks here, in `finished`, to see if it's `Ready`.
278///
279/// The future is made ready, and woken (via `waker`),
280/// by bespoke code in the task executor loop.
281///
282/// When `ProgressUntilStalledFuture` (maybe completes and) is dropped,
283/// its `Drop` impl is used to remove this from `Data.progressing_until_stalled`.
284#[derive(Debug)]
285struct ProgressingUntilStalled {
286 /// Have we, in fact, stalled?
287 ///
288 /// Made `Ready` by special code in the executor loop
289 finished: Poll<()>,
290
291 /// Waker
292 ///
293 /// Signalled by special code in the executor loop
294 waker: Option<Waker>,
295}
296
297/// Future from
298/// [`progress_until_stalled`][`MockExecutor::progress_until_stalled`]
299///
300/// See [`ProgressingUntilStalled`] for an overview of this aspect of the contraption.
301///
302/// Existence of this struct implies `Data.progressing_until_stalled` is `Some`.
303/// There can only be one at a time.
304#[derive(Educe)]
305#[educe(Debug)]
306struct ProgressUntilStalledFuture {
307 /// Executor's state; this future's state is in `.progressing_until_stalled`
308 #[educe(Debug(ignore))]
309 shared: Arc<Shared>,
310}
311
312/// Identifies a thread we know about - the executor thread, or a Subthread
313///
314/// Not related to `std::thread::ThreadId`.
315///
316/// See [`spawn_subthread`](MockExecutor::subthread_spawn) for definition of a Subthread.
317///
318/// This being a thread-local and not scoped by which `MockExecutor` we're talking about
319/// means that we can't cope if there are multiple `MockExecutor`s involved in the same thread.
320/// That's OK (and documented).
321#[derive(Copy, Clone, Eq, PartialEq, derive_more::Debug)]
322enum ThreadDescriptor {
323 /// Foreign - neither the (running) executor, nor a Subthread
324 #[debug("FOREIGN")]
325 Foreign,
326 /// The executor.
327 #[debug("Exe")]
328 Executor,
329 /// This task, which is a Subthread.
330 #[debug("{_0:?}")]
331 Subthread(TaskId),
332}
333
334/// Marker indicating that this task is a Subthread, not an async task.
335///
336/// See [`spawn_subthread`](MockExecutor::subthread_spawn) for definition of a Subthread.
337#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
338struct IsSubthread;
339
340/// [`Shared::subthread_yield`] should set our task awake before switching to the executor
341#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
342struct SetAwake;
343
344thread_local! {
345 /// Identifies this thread.
346 pub static THREAD_DESCRIPTOR: Cell<ThreadDescriptor> = const {
347 Cell::new(ThreadDescriptor::Foreign)
348 };
349}
350
351//---------- creation ----------
352
353impl MockExecutor {
354 /// Make a `MockExecutor` with default parameters
355 pub fn new() -> Self {
356 Self::default()
357 }
358
359 /// Make a `MockExecutor` with a specific `SchedulingPolicy`
360 pub fn with_scheduling(scheduling: SchedulingPolicy) -> Self {
361 Data {
362 scheduling,
363 ..Default::default()
364 }
365 .into()
366 }
367}
368
369impl From<Data> for MockExecutor {
370 fn from(data: Data) -> MockExecutor {
371 let shared = Shared {
372 data: Mutex::new(data),
373 thread_condvar: std::sync::Condvar::new(),
374 };
375 MockExecutor {
376 shared: Arc::new(shared),
377 }
378 }
379}
380
381//---------- spawning ----------
382
383impl MockExecutor {
384 /// Spawn a task and return something to identify it
385 ///
386 /// `desc` should `Display` as some kind of short string (ideally without spaces)
387 /// and will be used in the `Debug` impl and trace log messages from `MockExecutor`.
388 ///
389 /// The returned value is an opaque task identifier which is very cheap to clone
390 /// and which can be used by the caller in debug logging,
391 /// if it's desired to correlate with the debug output from `MockExecutor`.
392 /// Most callers will want to ignore it.
393 ///
394 /// This method is infallible. (The `MockExecutor` cannot be shut down.)
395 pub fn spawn_identified(
396 &self,
397 desc: impl Display,
398 fut: impl Future<Output = ()> + Send + 'static,
399 ) -> impl Debug + Clone + Send + 'static {
400 self.spawn_internal(desc.to_string(), FutureObj::from(Box::new(fut)))
401 }
402
403 /// Spawn a task and return its output for further usage
404 ///
405 /// `desc` should `Display` as some kind of short string (ideally without spaces)
406 /// and will be used in the `Debug` impl and trace log messages from `MockExecutor`.
407 pub fn spawn_join<T: Debug + Send + 'static>(
408 &self,
409 desc: impl Display,
410 fut: impl Future<Output = T> + Send + 'static,
411 ) -> impl Future<Output = T> {
412 let (tx, rx) = oneshot::channel();
413 self.spawn_identified(desc, async move {
414 let res = fut.await;
415 tx.send(res)
416 .expect("Failed to send future's output, did future panic?");
417 });
418 rx.map(|m| m.expect("Failed to receive future's output"))
419 }
420
421 /// Spawn a task and return its `TaskId`
422 ///
423 /// Convenience method for use by `spawn_identified` and `spawn_obj`.
424 /// The future passed to `block_on` is not handled here.
425 fn spawn_internal(&self, desc: String, fut: TaskFuture) -> TaskId {
426 let mut data = self.shared.lock();
427 data.insert_task(desc, TaskFutureInfo::Normal(fut))
428 }
429}
430
431impl Data {
432 /// Insert a task given its `TaskFutureInfo` and return its `TaskId`.
433 fn insert_task(&mut self, desc: String, fut: TaskFutureInfo) -> TaskId {
434 let state = Awake;
435 let id = self.tasks.insert(Task {
436 state,
437 desc,
438 fut: Some(fut),
439 });
440 self.awake.push_back(id);
441 trace!("MockExecutor spawned {:?}={:?}", id, self.tasks[id]);
442 id
443 }
444}
445
446impl Spawn for MockExecutor {
447 fn spawn_obj(&self, future: TaskFuture) -> Result<(), SpawnError> {
448 self.spawn_internal("spawn_obj".into(), future);
449 Ok(())
450 }
451}
452
453impl Blocking for MockExecutor {
454 type ThreadHandle<T: Send + 'static> = Pin<Box<dyn Future<Output = T>>>;
455
456 fn spawn_blocking<F, T>(&self, f: F) -> Self::ThreadHandle<T>
457 where
458 F: FnOnce() -> T + Send + 'static,
459 T: Send + 'static,
460 {
461 assert_matches!(
462 THREAD_DESCRIPTOR.get(),
463 ThreadDescriptor::Executor | ThreadDescriptor::Subthread(_),
464 "MockExecutor::spawn_blocking_io only allowed from future or subthread, being run by this executor"
465 );
466 Box::pin(
467 self.subthread_spawn("spawn_blocking", f)
468 .map(|x| x.expect("Error in spawn_blocking subthread.")),
469 )
470 }
471
472 fn reenter_block_on<F>(&self, future: F) -> F::Output
473 where
474 F: Future,
475 F::Output: Send + 'static,
476 {
477 self.subthread_block_on_future(future)
478 }
479}
480
481//---------- block_on ----------
482
483impl ToplevelBlockOn for MockExecutor {
484 fn block_on<F>(&self, input_fut: F) -> F::Output
485 where
486 F: Future,
487 {
488 let mut value: Option<F::Output> = None;
489
490 // Box this just so that we can conveniently control precisely when it's dropped.
491 // (We could do this with Option and Pin::set but that seems clumsier.)
492 let mut input_fut = Box::pin(input_fut);
493
494 let run_store_fut = {
495 let value = &mut value;
496 let input_fut = &mut input_fut;
497 async {
498 trace!("MockExecutor block_on future...");
499 let t = input_fut.await;
500 trace!("MockExecutor block_on future returned...");
501 *value = Some(t);
502 trace!("MockExecutor block_on future exiting.");
503 }
504 };
505
506 {
507 pin_mut!(run_store_fut);
508
509 let main_id = self
510 .shared
511 .lock()
512 .insert_task("main".into(), TaskFutureInfo::Main);
513 trace!("MockExecutor {main_id:?} is task for block_on");
514 self.execute_to_completion(run_store_fut);
515 }
516
517 #[allow(clippy::let_and_return)] // clarity
518 let value = value.take().unwrap_or_else(|| {
519 // eprintln can be captured by libtest, but the debug_dump goes to io::stderr.
520 // use the latter, so that the debug dump is prefixed by this message.
521 let _: io::Result<()> = writeln!(io::stderr(), "all futures blocked, crashing...");
522 // write to tracing too, so the tracing log is clear about when we crashed
523 error!("all futures blocked, crashing...");
524
525 // Sequencing here is subtle.
526 //
527 // We should do the dump before dropping the input future, because the input
528 // future is likely to own things that, when dropped, wake up other tasks,
529 // rendering the dump inaccurate.
530 //
531 // But also, dropping the input future may well drop a ProgressUntilStalledFuture
532 // which then reenters us. More generally, we mustn't call user code
533 // with the lock held.
534 //
535 // And, we mustn't panic with the data lock held.
536 //
537 // If value was Some, then this closure is dropped without being called,
538 // which drops the future after it has yielded the value, which is correct.
539 {
540 let mut data = self.shared.lock();
541 data.debug_dump();
542 }
543 drop(input_fut);
544
545 panic!(
546 r"
547all futures blocked. waiting for the real world? or deadlocked (waiting for each other) ?
548"
549 );
550 });
551
552 value
553 }
554}
555
556//---------- execution - core implementation ----------
557
558impl MockExecutor {
559 /// Keep polling tasks until nothing more can be done
560 ///
561 /// Ie, stop when `awake` is empty and `progressing_until_stalled` is `None`.
562 fn execute_to_completion(&self, mut main_fut: MainFuture) {
563 trace!("MockExecutor execute_to_completion...");
564 loop {
565 self.execute_until_first_stall(main_fut.as_mut());
566
567 // Handle `progressing_until_stalled`
568 let pus_waker = {
569 let mut data = self.shared.lock();
570 let pus = &mut data.progressing_until_stalled;
571 trace!("MockExecutor execute_to_completion PUS={:?}", &pus);
572 let Some(pus) = pus else {
573 // No progressing_until_stalled, we're actually done.
574 break;
575 };
576 assert_eq!(
577 pus.finished, Pending,
578 "ProgressingUntilStalled finished twice?!"
579 );
580 pus.finished = Ready(());
581
582 // Release the lock temporarily so that ActualWaker::clone doesn't deadlock
583 let waker = pus
584 .waker
585 .take()
586 .expect("ProgressUntilStalledFuture not ever polled!");
587 drop(data);
588 let waker_copy = waker.clone();
589 let mut data = self.shared.lock();
590
591 let pus = &mut data.progressing_until_stalled;
592 if let Some(double) = pus
593 .as_mut()
594 .expect("progressing_until_stalled updated under our feet!")
595 .waker
596 .replace(waker)
597 {
598 panic!("double progressing_until_stalled.waker! {double:?}");
599 }
600
601 waker_copy
602 };
603 pus_waker.wake();
604 }
605 trace!("MockExecutor execute_to_completion done");
606 }
607
608 /// Keep polling tasks until `awake` is empty
609 ///
610 /// (Ignores `progressing_until_stalled` - so if one is active,
611 /// will return when all other tasks have blocked.)
612 ///
613 /// # Panics
614 ///
615 /// Might malfunction or panic if called reentrantly
616 fn execute_until_first_stall(&self, main_fut: MainFuture) {
617 trace!("MockExecutor execute_until_first_stall ...");
618
619 assert_eq!(
620 THREAD_DESCRIPTOR.get(),
621 ThreadDescriptor::Foreign,
622 "MockExecutor executor re-entered"
623 );
624 THREAD_DESCRIPTOR.set(ThreadDescriptor::Executor);
625
626 let r = catch_unwind(AssertUnwindSafe(|| self.executor_main_loop(main_fut)));
627
628 THREAD_DESCRIPTOR.set(ThreadDescriptor::Foreign);
629
630 match r {
631 Ok(()) => trace!("MockExecutor execute_until_first_stall done."),
632 Err(e) => {
633 trace!("MockExecutor executor, or async task, panicked!");
634 panic_any(e)
635 }
636 }
637 }
638
639 /// Keep polling tasks until `awake` is empty (inner, executor main loop)
640 ///
641 /// This is only called from [`MockExecutor::execute_until_first_stall`],
642 /// so it could also be called `execute_until_first_stall_inner`.
643 #[allow(clippy::cognitive_complexity)]
644 fn executor_main_loop(&self, mut main_fut: MainFuture) {
645 'outer: loop {
646 // Take a `Awake` task off `awake` and make it `Asleep`
647 let (id, mut fut) = 'inner: loop {
648 let mut data = self.shared.lock();
649 let Some(id) = data.schedule() else {
650 break 'outer;
651 };
652 let Some(task) = data.tasks.get_mut(id) else {
653 trace!("MockExecutor {id:?} vanished");
654 continue;
655 };
656 task.state = Asleep(vec![]);
657 let fut = task.fut.take().expect("future missing from task!");
658 break 'inner (id, fut);
659 };
660
661 // Poll the selected task
662 trace!("MockExecutor {id:?} polling...");
663 let waker = ActualWaker::make_waker(&self.shared, id);
664 let mut cx = Context::from_waker(&waker);
665 let r: Either<Poll<()>, IsSubthread> = match &mut fut {
666 TaskFutureInfo::Normal(fut) => Left(fut.poll_unpin(&mut cx)),
667 TaskFutureInfo::Main => Left(main_fut.as_mut().poll(&mut cx)),
668 TaskFutureInfo::Subthread => Right(IsSubthread),
669 };
670
671 // Deal with the returned `Poll`
672 let _fut_drop_late;
673 {
674 let mut data = self.shared.lock();
675 let task = data
676 .tasks
677 .get_mut(id)
678 .expect("task vanished while we were polling it");
679
680 match r {
681 Left(Pending) => {
682 trace!("MockExecutor {id:?} -> Pending");
683 if task.fut.is_some() {
684 panic!("task reinserted while we polled it?!");
685 }
686 // The task might have been woken *by its own poll method*.
687 // That's why we set it to `Asleep` *earlier* rather than here.
688 // All we need to do is put the future back.
689 task.fut = Some(fut);
690 }
691 Left(Ready(())) => {
692 trace!("MockExecutor {id:?} -> Ready");
693 // Oh, it finished!
694 // It might be in `awake`, but that's allowed to contain stale tasks,
695 // so we *don't* need to scan that list and remove it.
696 data.tasks.remove(id);
697 // It is important that we don't drop `fut` until we have released
698 // the data lock, since it is an external type and might try to reenter
699 // us (eg by calling spawn). If we do that here, we risk deadlock.
700 // So, move `fut` to a variable with scope outside the block with `data`.
701 _fut_drop_late = fut;
702 }
703 Right(IsSubthread) => {
704 trace!("MockExecutor {id:?} -> Ready, waking Subthread");
705 // Task is a subthread, which has called thread_context_switch
706 // to switch to us. We "poll" it by switching back.
707
708 // Put back `TFI::Subthread`, which was moved out temporarily, above.
709 task.fut = Some(fut);
710
711 self.shared.thread_context_switch(
712 data,
713 ThreadDescriptor::Executor,
714 ThreadDescriptor::Subthread(id),
715 );
716
717 // Now, if the Subthread still exists, that's because it's switched
718 // back to us, and is waiting in subthread_block_on_future again.
719 // Or it might have ended, in which case it's not in `tasks` any more.
720 // In any case we can go back to scheduling futures.
721 }
722 }
723 }
724 }
725 }
726}
727
728impl Data {
729 /// Return the next task to run
730 ///
731 /// The task is removed from `awake`, but **`state` is not set to `Asleep`**.
732 /// The caller must restore the invariant!
733 fn schedule(&mut self) -> Option<TaskId> {
734 use SchedulingPolicy as SP;
735 match self.scheduling {
736 SP::Stack => self.awake.pop_back(),
737 SP::Queue => self.awake.pop_front(),
738 }
739 }
740}
741
742impl ActualWaker {
743 /// Obtain a strong reference to the executor's data
744 fn upgrade_data(&self) -> Option<Arc<Shared>> {
745 self.data.upgrade()
746 }
747
748 /// Wake the task corresponding to this `ActualWaker`
749 ///
750 /// This is like `<Self as std::task::Wake>::wake()` but takes `&self`, not `Arc`
751 fn wake(&self) {
752 let Some(data) = self.upgrade_data() else {
753 // The executor is gone! Don't try to wake.
754 return;
755 };
756 let mut data = data.lock();
757 let data = &mut *data;
758 trace!("MockExecutor {:?} wake", &self.id);
759 let Some(task) = data.tasks.get_mut(self.id) else {
760 return;
761 };
762 task.set_awake(self.id, &mut data.awake);
763 }
764
765 /// Create and return a `Waker` for task `id`
766 fn make_waker(shared: &Arc<Shared>, id: TaskId) -> Waker {
767 ActualWaker {
768 data: Arc::downgrade(shared),
769 id,
770 }
771 .new_waker()
772 }
773}
774
775//---------- "progress until stalled" functionality ----------
776
777impl MockExecutor {
778 /// Run tasks in the current executor until every other task is waiting
779 ///
780 /// # Panics
781 ///
782 /// Might malfunction or panic if more than one such call is running at once.
783 ///
784 /// (Ie, you must `.await` or drop the returned `Future`
785 /// before calling this method again.)
786 ///
787 /// Must be called and awaited within a future being run by `self`.
788 pub fn progress_until_stalled(&self) -> impl Future<Output = ()> {
789 let mut data = self.shared.lock();
790 assert!(
791 data.progressing_until_stalled.is_none(),
792 "progress_until_stalled called more than once"
793 );
794 trace!("MockExecutor progress_until_stalled...");
795 data.progressing_until_stalled = Some(ProgressingUntilStalled {
796 finished: Pending,
797 waker: None,
798 });
799 ProgressUntilStalledFuture {
800 shared: self.shared.clone(),
801 }
802 }
803}
804
805impl Future for ProgressUntilStalledFuture {
806 type Output = ();
807
808 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
809 let waker = cx.waker().clone();
810 let mut data = self.shared.lock();
811 let pus = data.progressing_until_stalled.as_mut();
812 trace!("MockExecutor progress_until_stalled polling... {:?}", &pus);
813 let pus = pus.expect("ProgressingUntilStalled missing");
814 pus.waker = Some(waker);
815 pus.finished
816 }
817}
818
819impl Drop for ProgressUntilStalledFuture {
820 fn drop(&mut self) {
821 self.shared.lock().progressing_until_stalled = None;
822 }
823}
824
825//---------- (sub)threads ----------
826
827impl MockExecutor {
828 /// Spawn a "Subthread", for processing in a sync context
829 ///
830 /// `call` will be run on a separate thread, called a "Subthread".
831 ///
832 /// But it will **not run simultaneously** with the executor,
833 /// nor with other Subthreads.
834 /// So Subthreads are somewhat like coroutines.
835 ///
836 /// `call` must be capable of making progress without waiting for any other Subthreads.
837 /// `call` may wait for async futures, using
838 /// [`subthread_block_on_future`](MockExecutor::subthread_block_on_future).
839 ///
840 /// Subthreads may be used for cpubound activity,
841 /// or synchronous IO (such as large volumes of disk activity),
842 /// provided that the synchronous code will reliably make progress,
843 /// without waiting (directly or indirectly) for any async task or Subthread -
844 /// except via `subthread_block_on_future`.
845 ///
846 /// # Subthreads vs raw `std::thread` threads
847 ///
848 /// Programs using `MockExecutor` may use `std::thread` threads directly.
849 /// However, this is not recommended. There are severe limitations:
850 ///
851 /// * Only a Subthread can re-enter the async context from sync code:
852 /// this must be done with
853 /// using [`subthread_block_on_future`](MockExecutor::subthread_block_on_future).
854 /// (Re-entering the executor with
855 /// [`block_on`](tor_rtcompat::ToplevelBlockOn::block_on)
856 /// is not allowed.)
857 /// * If async tasks want to suspend waiting for synchronous code,
858 /// the synchronous code must run on a Subthread.
859 /// This allows the `MockExecutor` to know when
860 /// that synchronous code is still making progress.
861 /// (This is needed for
862 /// [`progress_until_stalled`](MockExecutor::progress_until_stalled)
863 /// and the facilities which use it, such as
864 /// [`MockRuntime::advance_until_stalled`](crate::MockRuntime::advance_until_stalled).)
865 /// * Subthreads never run in parallel -
866 /// they only run as scheduled deterministically by the `MockExecutor`.
867 /// So using Subthreads eliminates a source of test nonndeterminism.
868 /// (Execution order is still varied due to explicitly varying the scheduling policy.)
869 ///
870 /// # Panics, abuse, and malfunctions
871 ///
872 /// If `call` panics and unwinds, `spawn_subthread` yields `Err`.
873 /// The application code should to do something about it if this happens,
874 /// typically, logging errors, tearing things down, or failing a test case.
875 ///
876 /// If the executor doesn't run, the subthread will not run either, and will remain stuck.
877 /// (So, typically, if the thread supposed to run the executor panics,
878 /// for example because a future or the executor itself panics,
879 /// all the subthreads will become stuck - effectively, they'll be leaked.)
880 ///
881 /// `spawn_subthread` panics if OS thread spawning fails.
882 /// (Like `std::thread::spawn()` does.)
883 ///
884 /// `MockExecutor`s will malfunction or panic if
885 /// any executor invocation method (eg `block_on`) is called on a Subthread.
886 pub fn subthread_spawn<T: Send + 'static>(
887 &self,
888 desc: impl Display,
889 call: impl FnOnce() -> T + Send + 'static,
890 ) -> impl Future<Output = Result<T, Box<dyn Any + Send>>> + Unpin + Send + Sync + 'static {
891 let desc = desc.to_string();
892 let (output_tx, output_rx) = oneshot::channel();
893
894 // NB: we don't know which thread we're on!
895 // In principle we might be on another Subthread.
896 // So we can't context switch here. That would be very confusing.
897 //
898 // Instead, we prepare the new Subthread as follows:
899 // - There is a task in the executor
900 // - The task is ready to be polled, whenever the executor decides to
901 // - The thread starts running right away, but immediately waits until it is scheduled
902 // See `subthread_entrypoint`.
903
904 {
905 let mut data = self.shared.lock();
906 let id = data.insert_task(desc.clone(), TaskFutureInfo::Subthread);
907
908 let _: std::thread::JoinHandle<()> = std::thread::Builder::new()
909 .name(desc)
910 .spawn({
911 let shared = self.shared.clone();
912 move || shared.subthread_entrypoint(id, call, output_tx)
913 })
914 .expect("spawn failed");
915 }
916
917 output_rx.map(|r| {
918 r.unwrap_or_else(|_: Canceled| panic!("Subthread cancelled but should be impossible!"))
919 })
920 }
921
922 /// Call an async `Future` from a Subthread
923 ///
924 /// Blocks the Subthread, and arranges to run async tasks,
925 /// including `fut`, until `fut` completes.
926 ///
927 /// `fut` is polled on the executor thread, not on the Subthread.
928 /// (We may change that in the future, allowing passing a non-`Send` future.)
929 ///
930 /// # Panics, abuse, and malfunctions
931 ///
932 /// `subthread_block_on_future` will malfunction or panic
933 /// if called on a thread that isn't a Subthread from the same `MockExecutor`
934 /// (ie a thread made with [`spawn_subthread`](MockExecutor::subthread_spawn)).
935 ///
936 /// If `fut` itself panics, the executor will panic.
937 ///
938 /// If the executor isn't running, `subthread_block_on_future` will hang indefinitely.
939 /// See `spawn_subthread`.
940 #[allow(clippy::cognitive_complexity)] // Splitting this up would be worse
941 pub fn subthread_block_on_future<T: Send + 'static>(&self, fut: impl Future<Output = T>) -> T {
942 let id = match THREAD_DESCRIPTOR.get() {
943 ThreadDescriptor::Subthread(id) => id,
944 ThreadDescriptor::Executor => {
945 panic!("subthread_block_on_future called from MockExecutor thread (async task?)")
946 }
947 ThreadDescriptor::Foreign => panic!(
948 "subthread_block_on_future called on foreign thread (not spawned with spawn_subthread)"
949 ),
950 };
951 trace!("MockExecutor thread {id:?}, subthread_block_on_future...");
952 let mut fut = pin!(fut);
953
954 // We yield once before the first poll, and once after Ready, to shake up the
955 // execution order a bit, depending on the scheduling policy.
956 let yield_ = |set_awake| self.shared.subthread_yield(id, set_awake);
957 yield_(Some(SetAwake));
958
959 let ret = loop {
960 // Poll the provided future
961 trace!("MockExecutor thread {id:?}, s.t._block_on_future polling...");
962 let waker = ActualWaker::make_waker(&self.shared, id);
963 let mut cx = Context::from_waker(&waker);
964 let r: Poll<T> = fut.as_mut().poll(&mut cx);
965
966 if let Ready(r) = r {
967 trace!("MockExecutor thread {id:?}, s.t._block_on_future poll -> Ready");
968 break r;
969 }
970
971 // Pending. Switch back to the exeuctor thread.
972 // When the future becomes ready, the Waker will be woken, waking the task,
973 // so that the executor will "poll" us again.
974 trace!("MockExecutor thread {id:?}, s.t._block_on_future poll -> Pending");
975
976 yield_(None);
977 };
978
979 yield_(Some(SetAwake));
980
981 trace!("MockExecutor thread {id:?}, subthread_block_on_future complete.");
982
983 ret
984 }
985}
986
987impl Shared {
988 /// Main entrypoint function for a Subthread
989 ///
990 /// Entered on a new `std::thread` thread created by
991 /// [`subthread_spawn`](MockExecutor::subthread_spawn).
992 ///
993 /// When `call` completes, sends its returned value `T` to `output_tx`.
994 fn subthread_entrypoint<T: Send + 'static>(
995 self: Arc<Self>,
996 id: TaskId,
997 call: impl FnOnce() -> T + Send + 'static,
998 output_tx: oneshot::Sender<Result<T, Box<dyn Any + Send>>>,
999 ) {
1000 THREAD_DESCRIPTOR.set(ThreadDescriptor::Subthread(id));
1001 trace!("MockExecutor thread {id:?}, entrypoint");
1002
1003 // We start out Awake, but we wait for the executor to tell us to run.
1004 // This will be done the first time the task is "polled".
1005 {
1006 let data = self.lock();
1007 self.thread_context_switch_waitfor_instruction_to_run(
1008 data,
1009 ThreadDescriptor::Subthread(id),
1010 );
1011 }
1012
1013 trace!("MockExecutor thread {id:?}, entering user code");
1014
1015 // Run the user's actual thread function.
1016 // This will typically reenter us via subthread_block_on_future.
1017 let ret = catch_unwind(AssertUnwindSafe(call));
1018
1019 trace!("MockExecutor thread {id:?}, completed user code");
1020
1021 // This makes the return value from subthread_spawn ready.
1022 // It will be polled by the executor in due course, presumably.
1023
1024 output_tx.send(ret).unwrap_or_else(
1025 #[allow(clippy::unnecessary_lazy_evaluations)]
1026 |_| {}, // receiver dropped, maybe executor dropped or something?
1027 );
1028
1029 {
1030 let mut data = self.lock();
1031
1032 // Never poll this task again (so never schedule this thread)
1033 let _: Task = data.tasks.remove(id).expect("Subthread task vanished!");
1034
1035 // Tell the executor it is scheduled now.
1036 // We carry on exiting, in parallel (holding the data lock).
1037 self.thread_context_switch_send_instruction_to_run(
1038 &mut data,
1039 ThreadDescriptor::Subthread(id),
1040 ThreadDescriptor::Executor,
1041 );
1042 }
1043 }
1044
1045 /// Yield back to the executor from a subthread
1046 ///
1047 /// Checks that things are in order
1048 /// (in particular, that this task is in the data structure as a subhtread)
1049 /// and switches to the executor thread.
1050 ///
1051 /// The caller must arrange that the task gets woken.
1052 ///
1053 /// With [`SetAwake`], sets our task awake, so that we'll be polled
1054 /// again as soon as we get to the top of the executor's queue.
1055 /// Otherwise, we'll be reentered after someone wakes a [`Waker`] for the task.
1056 fn subthread_yield(&self, us: TaskId, set_awake: Option<SetAwake>) {
1057 let mut data = self.lock();
1058 {
1059 let data = &mut *data;
1060 let task = data.tasks.get_mut(us).expect("Subthread task vanished!");
1061 match &task.fut {
1062 Some(TaskFutureInfo::Subthread) => {}
1063 other => panic!("subthread_block_on_future but TFI {other:?}"),
1064 };
1065 if let Some(SetAwake) = set_awake {
1066 task.set_awake(us, &mut data.awake);
1067 }
1068 }
1069 self.thread_context_switch(
1070 data,
1071 ThreadDescriptor::Subthread(us),
1072 ThreadDescriptor::Executor,
1073 );
1074 }
1075
1076 /// Switch from (sub)thread `us` to (sub)thread `them`
1077 ///
1078 /// Returns when someone calls `thread_context_switch(.., us)`.
1079 fn thread_context_switch(
1080 &self,
1081 mut data: MutexGuard<Data>,
1082 us: ThreadDescriptor,
1083 them: ThreadDescriptor,
1084 ) {
1085 trace!("MockExecutor thread {us:?}, switching to {them:?}");
1086 self.thread_context_switch_send_instruction_to_run(&mut data, us, them);
1087 self.thread_context_switch_waitfor_instruction_to_run(data, us);
1088 }
1089
1090 /// Instruct the (sub)thread `them` to run
1091 ///
1092 /// Update `thread_to_run`, which will wake up `them`'s
1093 /// call to `thread_context_switch_waitfor_instruction_to_run`.
1094 ///
1095 /// Must be called from (sub)thread `us`.
1096 /// Part of `thread_context_switch`, not normally called directly.
1097 fn thread_context_switch_send_instruction_to_run(
1098 &self,
1099 data: &mut MutexGuard<Data>,
1100 us: ThreadDescriptor,
1101 them: ThreadDescriptor,
1102 ) {
1103 assert_eq!(data.thread_to_run, us);
1104 data.thread_to_run = them;
1105 self.thread_condvar.notify_all();
1106 }
1107
1108 /// Await an instruction for this thread, `us`, to run
1109 ///
1110 /// Waits for `thread_to_run` to be `us`,
1111 /// waiting for `thread_condvar` as necessary.
1112 ///
1113 /// Part of `thread_context_switch`, not normally called directly.
1114 fn thread_context_switch_waitfor_instruction_to_run(
1115 &self,
1116 data: MutexGuard<Data>,
1117 us: ThreadDescriptor,
1118 ) {
1119 #[allow(let_underscore_lock)]
1120 let _: MutexGuard<_> = self
1121 .thread_condvar
1122 .wait_while(data, |data| {
1123 let live = data.thread_to_run;
1124 let resume = live == us;
1125 if resume {
1126 trace!("MockExecutor thread {us:?}, resuming");
1127 } else {
1128 trace!("MockExecutor thread {us:?}, waiting for {live:?}");
1129 }
1130 // We're in `.wait_while`, not `.wait_until`. Confusing.
1131 !resume
1132 })
1133 .expect("data lock poisoned");
1134 }
1135}
1136
1137//---------- ancillary and convenience functions ----------
1138
1139/// Trait to let us assert at compile time that something is nicely `Sync` etc.
1140#[allow(dead_code)] // yes, we don't *use* anything from this trait
1141trait EnsureSyncSend: Sync + Send + 'static {}
1142impl EnsureSyncSend for ActualWaker {}
1143impl EnsureSyncSend for MockExecutor {}
1144
1145impl MockExecutor {
1146 /// Return the number of tasks running in this executor
1147 ///
1148 /// One possible use is for a test case to check that task(s)
1149 /// that ought to have exited, have indeed done so.
1150 ///
1151 /// In the usual case, the answer will be at least 1,
1152 /// because it counts the future passed to
1153 /// [`block_on`](MockExecutor::block_on)
1154 /// (perhaps via [`MockRuntime::test_with_various`](crate::MockRuntime::test_with_various)).
1155 pub fn n_tasks(&self) -> usize {
1156 self.shared.lock().tasks.len()
1157 }
1158}
1159
1160impl Shared {
1161 /// Lock and obtain the guard
1162 ///
1163 /// Convenience method which panics on poison
1164 fn lock(&self) -> MutexGuard<Data> {
1165 self.data.lock().expect("data lock poisoned")
1166 }
1167}
1168
1169impl Task {
1170 /// Set task `id` to `Awake` and arrange that it will be polled.
1171 fn set_awake(&mut self, id: TaskId, data_awake: &mut VecDeque<TaskId>) {
1172 match self.state {
1173 Awake => {}
1174 Asleep(_) => {
1175 self.state = Awake;
1176 data_awake.push_back(id);
1177 }
1178 }
1179 }
1180}
1181
1182//---------- ActualWaker as RawWaker ----------
1183
1184/// Using [`ActualWaker`] in a [`RawWaker`]
1185///
1186/// We need to make a
1187/// [`Waker`] (the safe, type-erased, waker, used by actual futures)
1188/// which contains an
1189/// [`ActualWaker`] (our actual waker implementation, also safe).
1190///
1191/// `std` offers `Waker::from<Arc<impl Wake>>`.
1192/// But we want a bespoke `Clone` implementation, so we don't want to use `Arc`.
1193///
1194/// So instead, we implement the `RawWaker` API in terms of `ActualWaker`.
1195/// We keep the `ActualWaker` in a `Box`, and actually `clone` it (and the `Box`).
1196///
1197/// SAFETY
1198///
1199/// * The data pointer is `Box::<ActualWaker>::into_raw()`
1200/// * We share these when we clone
1201/// * No-one is allowed `&mut ActualWaker` unless there are no other clones
1202/// * So we may make references `&ActualWaker`
1203impl ActualWaker {
1204 /// Wrap up an [`ActualWaker`] as a type-erased [`Waker`] for passing to futures etc.
1205 fn new_waker(self) -> Waker {
1206 unsafe { Waker::from_raw(self.raw_new()) }
1207 }
1208
1209 /// Helper: wrap up an [`ActualWaker`] as a [`RawWaker`].
1210 fn raw_new(self) -> RawWaker {
1211 let self_: Box<ActualWaker> = self.into();
1212 let self_: *mut ActualWaker = Box::into_raw(self_);
1213 let self_: *const () = self_ as _;
1214 RawWaker::new(self_, &RAW_WAKER_VTABLE)
1215 }
1216
1217 /// Implementation of [`RawWakerVTable`]'s `clone`
1218 unsafe fn raw_clone(self_: *const ()) -> RawWaker {
1219 let self_: *const ActualWaker = self_ as _;
1220 let self_: &ActualWaker = self_.as_ref().unwrap_unchecked();
1221 let copy: ActualWaker = self_.clone();
1222 copy.raw_new()
1223 }
1224
1225 /// Implementation of [`RawWakerVTable`]'s `wake`
1226 unsafe fn raw_wake(self_: *const ()) {
1227 Self::raw_wake_by_ref(self_);
1228 Self::raw_drop(self_);
1229 }
1230
1231 /// Implementation of [`RawWakerVTable`]'s `wake_ref_by`
1232 unsafe fn raw_wake_by_ref(self_: *const ()) {
1233 let self_: *const ActualWaker = self_ as _;
1234 let self_: &ActualWaker = self_.as_ref().unwrap_unchecked();
1235 self_.wake();
1236 }
1237
1238 /// Implementation of [`RawWakerVTable`]'s `drop`
1239 unsafe fn raw_drop(self_: *const ()) {
1240 let self_: *mut ActualWaker = self_ as _;
1241 let self_: Box<ActualWaker> = Box::from_raw(self_);
1242 drop(self_);
1243 }
1244}
1245
1246/// vtable for `Box<ActualWaker>` as `RawWaker`
1247//
1248// This ought to be in the impl block above, but
1249// "associated `static` items are not allowed"
1250static RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
1251 ActualWaker::raw_clone,
1252 ActualWaker::raw_wake,
1253 ActualWaker::raw_wake_by_ref,
1254 ActualWaker::raw_drop,
1255);
1256
1257//---------- Sleep location tracking and dumping ----------
1258
1259/// We record "where a future went to sleep" as (just) a backtrace
1260///
1261/// This type alias allows us to mock `Backtrace` for miri.
1262/// (It also insulates from future choices about sleep location representation.0
1263#[cfg(not(miri))]
1264type SleepLocation = Backtrace;
1265
1266impl Data {
1267 /// Dump tasks and their sleep location backtraces
1268 fn dump_backtraces(&self, f: &mut fmt::Formatter) -> fmt::Result {
1269 for (id, task) in self.tasks.iter() {
1270 let prefix = |f: &mut fmt::Formatter| write!(f, "{id:?}={task:?}: ");
1271 match &task.state {
1272 Awake => {
1273 prefix(f)?;
1274 writeln!(f, "awake")?;
1275 }
1276 Asleep(locs) => {
1277 let n = locs.len();
1278 for (i, loc) in locs.iter().enumerate() {
1279 prefix(f)?;
1280 writeln!(f, "asleep, backtrace {i}/{n}:\n{loc}",)?;
1281 }
1282 if n == 0 {
1283 prefix(f)?;
1284 writeln!(f, "asleep, no backtraces, Waker never cloned, stuck!",)?;
1285 }
1286 }
1287 }
1288 }
1289 writeln!(
1290 f,
1291 "\nNote: there might be spurious traces, see docs for MockExecutor::debug_dump\n"
1292 )?;
1293 Ok(())
1294 }
1295}
1296
1297/// Track sleep locations via `<Waker as Clone>`.
1298///
1299/// See [`MockExecutor::debug_dump`] for the explanation.
1300impl Clone for ActualWaker {
1301 fn clone(&self) -> Self {
1302 let id = self.id;
1303
1304 if let Some(data) = self.upgrade_data() {
1305 // If the executor is gone, there is nothing to adjust
1306 let mut data = data.lock();
1307 if let Some(task) = data.tasks.get_mut(self.id) {
1308 match &mut task.state {
1309 Awake => trace!("MockExecutor cloned waker for awake task {id:?}"),
1310 Asleep(locs) => locs.push(SleepLocation::force_capture()),
1311 }
1312 } else {
1313 trace!("MockExecutor cloned waker for dead task {id:?}");
1314 }
1315 }
1316
1317 ActualWaker {
1318 data: self.data.clone(),
1319 id,
1320 }
1321 }
1322}
1323
1324//---------- API for full debug dump ----------
1325
1326/// Debugging dump of a `MockExecutor`'s state
1327///
1328/// Returned by [`MockExecutor::as_debug_dump`]
1329//
1330// Existence implies backtraces have been resolved
1331//
1332// We use `Either` so that we can also use this internally when we have &mut Data.
1333pub struct DebugDump<'a>(Either<&'a Data, MutexGuard<'a, Data>>);
1334
1335impl MockExecutor {
1336 /// Dump the executor's state including backtraces of waiting tasks, to stderr
1337 ///
1338 /// This is considerably more extensive than simply
1339 /// `MockExecutor as Debug`.
1340 ///
1341 /// (This is a convenience method, which wraps
1342 /// [`MockExecutor::as_debug_dump()`].
1343 ///
1344 /// ### Backtrace salience (possible spurious traces)
1345 ///
1346 /// **Summary**
1347 ///
1348 /// The technique used to capture backtraces when futures sleep is not 100% exact.
1349 /// It will usually show all the actual sleeping sites,
1350 /// but it might also show other backtraces which were part of
1351 /// the implementation of some complex relevant future.
1352 ///
1353 /// **Details**
1354 ///
1355 /// When a future's implementation wants to sleep,
1356 /// it needs to record the [`Waker`] (from the [`Context`])
1357 /// so that the "other end" can call `.wake()` on it later,
1358 /// when the future should be woken.
1359 ///
1360 /// Since `Context.waker()` gives `&Waker`, borrowed from the `Context`,
1361 /// the future must clone the `Waker`,
1362 /// and it must do so in within the `poll()` call.
1363 ///
1364 /// A future which is waiting in a `select!` will typically
1365 /// show multiple traces, one for each branch.
1366 /// But,
1367 /// if a future sleeps on one thing, and then when polled again later,
1368 /// sleeps on something different, without waking up in between,
1369 /// both backtrace locations will be shown.
1370 /// And,
1371 /// a complicated future contraption *might* clone the `Waker` more times.
1372 /// So not every backtrace will necessarily be informative.
1373 ///
1374 /// ### Panics
1375 ///
1376 /// Panics on write errors.
1377 pub fn debug_dump(&self) {
1378 self.as_debug_dump().to_stderr();
1379 }
1380
1381 /// Dump the executor's state including backtraces of waiting tasks
1382 ///
1383 /// This is considerably more extensive than simply
1384 /// `MockExecutor as Debug`.
1385 ///
1386 /// Returns an object for formatting with [`Debug`].
1387 /// To simply print the dump to stderr (eg in a test),
1388 /// use [`.debug_dump()`](MockExecutor::debug_dump).
1389 ///
1390 /// **Backtrace salience (possible spurious traces)** -
1391 /// see [`.debug_dump()`](MockExecutor::debug_dump).
1392 pub fn as_debug_dump(&self) -> DebugDump {
1393 let data = self.shared.lock();
1394 DebugDump(Right(data))
1395 }
1396}
1397
1398impl Data {
1399 /// Convenience function: dump including backtraces, to stderr
1400 fn debug_dump(&mut self) {
1401 DebugDump(Left(self)).to_stderr();
1402 }
1403}
1404
1405impl DebugDump<'_> {
1406 /// Convenience function: dump tasks and backtraces to stderr
1407 #[allow(clippy::wrong_self_convention)] // "to_stderr" doesn't mean "convert to stderr"
1408 fn to_stderr(self) {
1409 write!(io::stderr().lock(), "{:?}", self)
1410 .unwrap_or_else(|e| error_report!(e, "failed to write debug dump to stderr"));
1411 }
1412}
1413
1414//---------- bespoke Debug impls ----------
1415
1416impl Debug for DebugDump<'_> {
1417 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1418 let self_: &Data = &self.0;
1419
1420 writeln!(f, "MockExecutor state:\n{self_:#?}")?;
1421 writeln!(f, "MockExecutor task dump:")?;
1422 self_.dump_backtraces(f)?;
1423
1424 Ok(())
1425 }
1426}
1427
1428// See `impl Debug for Data` for notes on the output
1429impl Debug for Task {
1430 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1431 let Task { desc, state, fut } = self;
1432 write!(f, "{:?}", desc)?;
1433 write!(f, "=")?;
1434 match fut {
1435 None => write!(f, "P")?,
1436 Some(TaskFutureInfo::Normal(_)) => write!(f, "f")?,
1437 Some(TaskFutureInfo::Main) => write!(f, "m")?,
1438 Some(TaskFutureInfo::Subthread) => write!(f, "T")?,
1439 }
1440 match state {
1441 Awake => write!(f, "W")?,
1442 Asleep(locs) => write!(f, "s{}", locs.len())?,
1443 };
1444 Ok(())
1445 }
1446}
1447
1448/// Helper: `Debug`s as a list of tasks, given the `Data` for lookups and a list of the ids
1449///
1450/// `Task`s in `Data` are printed as `Ti(ID)"SPEC"=FLAGS"`.
1451///
1452/// `FLAGS` are:
1453///
1454/// * `T`: this task is for a Subthread (from subthread_spawn).
1455/// * `P`: this task is being polled (its `TaskFutureInfo` is absent)
1456/// * `f`: this is a normal task with a future and its future is present in `Data`
1457/// * `m`: this is the main task from `block_on`
1458///
1459/// * `W`: the task is awake
1460/// * `s<n>`: the task is asleep, and `<n>` is the number of recorded sleeping locations
1461//
1462// We do it this way because the naive dump from derive is very expansive
1463// and makes it impossible to see the wood for the trees.
1464// This very compact representation it easier to find a task of interest in the output.
1465//
1466// This is implemented in `impl Debug for Task`.
1467//
1468//
1469// rustc doesn't think automatically-derived Debug impls count for whether a thing is used.
1470// This has caused quite some fallout. https://github.com/rust-lang/rust/pull/85200
1471// I think derive_more emits #[automatically_derived], so that even though we use this
1472// in our Debug impl, that construction is unused.
1473#[allow(dead_code)]
1474struct DebugTasks<'d, F>(&'d Data, F);
1475
1476// See `impl Debug for Data` for notes on the output
1477impl<F, I> Debug for DebugTasks<'_, F>
1478where
1479 F: Fn() -> I,
1480 I: Iterator<Item = TaskId>,
1481{
1482 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1483 let DebugTasks(data, ids) = self;
1484 for (id, delim) in izip!(ids(), chain!(iter::once(""), iter::repeat(" ")),) {
1485 write!(f, "{delim}{id:?}")?;
1486 match data.tasks.get(id) {
1487 None => write!(f, "-")?,
1488 Some(task) => write!(f, "={task:?}")?,
1489 }
1490 }
1491 Ok(())
1492 }
1493}
1494
1495/// Mock `Backtrace` for miri
1496///
1497/// See also the not-miri `type SleepLocation`, alias above.
1498#[cfg(miri)]
1499mod miri_sleep_location {
1500 #[derive(Debug, derive_more::Display)]
1501 #[display("<SleepLocation>")]
1502 pub(super) struct SleepLocation {}
1503
1504 impl SleepLocation {
1505 pub(super) fn force_capture() -> Self {
1506 SleepLocation {}
1507 }
1508 }
1509}
1510#[cfg(miri)]
1511use miri_sleep_location::SleepLocation;
1512
1513#[cfg(test)]
1514mod test {
1515 // @@ begin test lint list maintained by maint/add_warning @@
1516 #![allow(clippy::bool_assert_comparison)]
1517 #![allow(clippy::clone_on_copy)]
1518 #![allow(clippy::dbg_macro)]
1519 #![allow(clippy::mixed_attributes_style)]
1520 #![allow(clippy::print_stderr)]
1521 #![allow(clippy::print_stdout)]
1522 #![allow(clippy::single_char_pattern)]
1523 #![allow(clippy::unwrap_used)]
1524 #![allow(clippy::unchecked_duration_subtraction)]
1525 #![allow(clippy::useless_vec)]
1526 #![allow(clippy::needless_pass_by_value)]
1527 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
1528 use super::*;
1529 use futures::channel::mpsc;
1530 use futures::{SinkExt as _, StreamExt as _};
1531 use strum::IntoEnumIterator;
1532 use tracing::info;
1533
1534 #[cfg(not(miri))] // trace! asks for the time, which miri doesn't support
1535 use tracing_test::traced_test;
1536
1537 fn various_mock_executors() -> impl Iterator<Item = MockExecutor> {
1538 // This duplicates the part of the logic in MockRuntime::test_with_various which
1539 // relates to MockExecutor, because we don't have a MockRuntime::builder.
1540 // The only parameter to MockExecutor is its scheduling policy, so this seems fine.
1541 SchedulingPolicy::iter().map(|scheduling| {
1542 eprintln!("===== MockExecutor::with_scheduling({scheduling:?}) =====");
1543 MockExecutor::with_scheduling(scheduling)
1544 })
1545 }
1546
1547 #[cfg_attr(not(miri), traced_test)]
1548 #[test]
1549 fn simple() {
1550 let runtime = MockExecutor::default();
1551 let val = runtime.block_on(async { 42 });
1552 assert_eq!(val, 42);
1553 }
1554
1555 #[cfg_attr(not(miri), traced_test)]
1556 #[test]
1557 fn stall() {
1558 let runtime = MockExecutor::default();
1559
1560 runtime.block_on({
1561 let runtime = runtime.clone();
1562 async move {
1563 const N: usize = 3;
1564 let (mut txs, mut rxs): (Vec<_>, Vec<_>) =
1565 (0..N).map(|_| mpsc::channel::<usize>(5)).unzip();
1566
1567 let mut rx_n = rxs.pop().unwrap();
1568
1569 for (i, mut rx) in rxs.into_iter().enumerate() {
1570 runtime.spawn_identified(i, {
1571 let mut txs = txs.clone();
1572 async move {
1573 loop {
1574 eprintln!("task {i} rx...");
1575 let v = rx.next().await.unwrap();
1576 let nv = v + 1;
1577 eprintln!("task {i} rx {v}, tx {nv}");
1578 let v = nv;
1579 txs[v].send(v).await.unwrap();
1580 }
1581 }
1582 });
1583 }
1584
1585 dbg!();
1586 let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1587
1588 dbg!();
1589 runtime.progress_until_stalled().await;
1590
1591 dbg!();
1592 let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1593
1594 dbg!();
1595 txs[0].send(0).await.unwrap();
1596
1597 dbg!();
1598 runtime.progress_until_stalled().await;
1599
1600 dbg!();
1601 let r = rx_n.next().await;
1602 assert_eq!(r, Some(N - 1));
1603
1604 dbg!();
1605 let _: mpsc::TryRecvError = rx_n.try_next().unwrap_err();
1606
1607 runtime.spawn_identified("tx", {
1608 let txs = txs.clone();
1609 async {
1610 eprintln!("sending task...");
1611 for (i, mut tx) in txs.into_iter().enumerate() {
1612 eprintln!("sending 0 to {i}...");
1613 tx.send(0).await.unwrap();
1614 }
1615 eprintln!("sending task done");
1616 }
1617 });
1618
1619 runtime.debug_dump();
1620
1621 for i in 0..txs.len() {
1622 eprintln!("main {i} wait stall...");
1623 runtime.progress_until_stalled().await;
1624 eprintln!("main {i} rx wait...");
1625 let r = rx_n.next().await;
1626 eprintln!("main {i} rx = {r:?}");
1627 assert!(r == Some(0) || r == Some(N - 1));
1628 }
1629
1630 eprintln!("finishing...");
1631 runtime.progress_until_stalled().await;
1632 eprintln!("finished.");
1633 }
1634 });
1635 }
1636
1637 #[cfg_attr(not(miri), traced_test)]
1638 #[test]
1639 fn spawn_blocking() {
1640 let runtime = MockExecutor::default();
1641
1642 runtime.block_on({
1643 let runtime = runtime.clone();
1644 async move {
1645 let thr_1 = runtime.spawn_blocking(|| 42);
1646 let thr_2 = runtime.spawn_blocking(|| 99);
1647
1648 assert_eq!(thr_2.await, 99);
1649 assert_eq!(thr_1.await, 42);
1650 }
1651 });
1652 }
1653
1654 #[cfg_attr(not(miri), traced_test)]
1655 #[test]
1656 fn drop_reentrancy() {
1657 // Check that dropping a completed task future is done *outside* the data lock.
1658 // Involves a contrived future whose Drop impl reenters the executor.
1659 //
1660 // If `_fut_drop_late = fut` in execute_until_first_stall (the main loop)
1661 // is replaced with `drop(fut)` (dropping the future at the wrong moment),
1662 // we do indeed get deadlock, so this test case is working.
1663
1664 struct ReentersOnDrop {
1665 runtime: MockExecutor,
1666 }
1667 impl Future for ReentersOnDrop {
1668 type Output = ();
1669 fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<()> {
1670 Poll::Ready(())
1671 }
1672 }
1673 impl Drop for ReentersOnDrop {
1674 fn drop(&mut self) {
1675 self.runtime
1676 .spawn_identified("dummy", futures::future::ready(()));
1677 }
1678 }
1679
1680 for runtime in various_mock_executors() {
1681 runtime.block_on(async {
1682 runtime.spawn_identified("trapper", {
1683 let runtime = runtime.clone();
1684 ReentersOnDrop { runtime }
1685 });
1686 });
1687 }
1688 }
1689
1690 #[cfg_attr(not(miri), traced_test)]
1691 #[test]
1692 fn subthread_oneshot() {
1693 for runtime in various_mock_executors() {
1694 runtime.block_on(async {
1695 let (tx, rx) = oneshot::channel();
1696 info!("spawning subthread");
1697 let thr = runtime.subthread_spawn("thr1", {
1698 let runtime = runtime.clone();
1699 move || {
1700 info!("subthread_block_on_future...");
1701 let i = runtime.subthread_block_on_future(rx).unwrap();
1702 info!("subthread_block_on_future => {i}");
1703 i + 1
1704 }
1705 });
1706 info!("main task sending");
1707 tx.send(12).unwrap();
1708 info!("main task sent");
1709 let r = thr.await.unwrap();
1710 info!("main task thr => {r}");
1711 assert_eq!(r, 13);
1712 });
1713 }
1714 }
1715
1716 #[cfg_attr(not(miri), traced_test)]
1717 #[test]
1718 #[allow(clippy::cognitive_complexity)] // It's is not that complicated, really.
1719 fn subthread_pingpong() {
1720 for runtime in various_mock_executors() {
1721 runtime.block_on(async {
1722 let (mut i_tx, mut i_rx) = mpsc::channel(1);
1723 let (mut o_tx, mut o_rx) = mpsc::channel(1);
1724 info!("spawning subthread");
1725 let thr = runtime.subthread_spawn("thr", {
1726 let runtime = runtime.clone();
1727 move || {
1728 while let Some(i) = {
1729 info!("thread receiving ...");
1730 runtime.subthread_block_on_future(i_rx.next())
1731 } {
1732 let o = i + 12;
1733 info!("thread received {i}, sending {o}");
1734 runtime.subthread_block_on_future(o_tx.send(o)).unwrap();
1735 info!("thread sent {o}");
1736 }
1737 info!("thread exiting");
1738 42
1739 }
1740 });
1741 for i in 0..2 {
1742 info!("main task sending {i}");
1743 i_tx.send(i).await.unwrap();
1744 info!("main task sent {i}");
1745 let o = o_rx.next().await.unwrap();
1746 info!("main task recv => {o}");
1747 assert_eq!(o, i + 12);
1748 }
1749 info!("main task dropping sender");
1750 drop(i_tx);
1751 info!("main task awaiting thread");
1752 let r = thr.await.unwrap();
1753 info!("main task complete");
1754 assert_eq!(r, 42);
1755 });
1756 }
1757 }
1758}