tor_rtmock/runtime.rs
1//! Completely mock runtime
2
3#![forbid(unsafe_code)] // if you remove this, enable (or write) miri tests (git grep miri)
4
5use std::fmt::{Debug, Display};
6use std::ops::ControlFlow;
7
8use amplify::Getters;
9use futures::FutureExt as _;
10use itertools::chain;
11use strum::IntoEnumIterator as _;
12use void::{ResultVoidExt as _, Void};
13
14use crate::util::impl_runtime_prelude::*;
15
16use crate::net::MockNetProvider;
17use crate::simple_time::SimpleMockTimeProvider;
18use crate::task::{MockExecutor, SchedulingPolicy};
19
20/// Completely mock runtime, with simulated time
21///
22/// Suitable for test cases that wish to completely control
23/// the environment experienced by the code under test.
24///
25/// ### Useful properties
26///
27/// The execution order is deterministic.
28/// Time will advance only in a controlled fashion.
29/// Typically, the main task in a test will call
30/// [`advance_until_stalled`](MockRuntime::advance_until_stalled).
31///
32/// Reliable sequencing techniques which can be used in tests include:
33/// sleeping for carefully chosen durations;
34/// interlocking via intertask channels; or,
35/// sequenced control of requests to the code under test.
36///
37/// ### Restrictions
38///
39/// The test case must advance the mock time explicitly as desired,
40/// typically by calling one of the `MockRuntime::advance_*` methods.
41///
42/// Tests that use this runtime *must not* interact with the outside world;
43/// everything must go through this runtime (and its pieces).
44///
45/// There is no mocking of filesystem access;
46/// the `MockRuntime`'s time will disagree with `SystemTime`'s
47/// obtained from (for example) `std::fs::Metadata`.
48///
49/// #### Allowed
50///
51/// * Inter-future communication facilities from `futures`
52/// or other runtime-agnostic crates.
53///
54/// * Fast synchronous operations that will complete "immediately" or "quickly".
55/// E.g.: filesystem calls.
56///
57/// * `std::sync::Mutex` (assuming the use is deadlock-free in a single-threaded
58/// executor, as it should be in all of Arti).
59///
60/// * Slower operations that are run synchronously (without futures `await`)
61/// provided their completion doesn't depend on any of the futures we're running.
62/// (These kind of operations are often discouraged in async contexts,
63/// because they block the async runtime or its worker threads.
64/// But they are often OK in tests.)
65///
66/// * All facilities provided by this `MockExecutor` and its trait impls.
67///
68/// #### Not allowed
69///
70/// * Direct access to the real-world clock (`SystemTime::now`, `Instant::now`),
71/// including direct use of `coarsetime`.
72/// Instead, use [`SleepProvider`] and [`CoarseTimeProvider`] methods on the runtime.
73/// Exception: CPU use measurements.
74///
75/// * Anything that spawns threads and then communicates with those threads
76/// using async Rust facilities (futures).
77///
78/// * Async sockets, or async use of other kernel-based IPC or network mechanisms.
79///
80/// * Anything provided by a Rust runtime/executor project (eg anything from Tokio),
81/// unless it is definitively established that it's runtime-agnostic.
82#[derive(Debug, Default, Clone, Getters, Deftly)]
83#[derive_deftly(SomeMockRuntime)]
84#[getter(prefix = "mock_")]
85pub struct MockRuntime {
86 /// Tasks
87 #[deftly(mock(task, toplevel))]
88 task: MockExecutor,
89 /// Time provider
90 #[deftly(mock(sleep))]
91 sleep: SimpleMockTimeProvider,
92 /// Net provider
93 #[deftly(mock(net))]
94 net: MockNetProvider,
95}
96
97/// Builder for a manually-configured `MockRuntime`
98#[derive(Debug, Default, Clone)]
99pub struct MockRuntimeBuilder {
100 /// scheduling policy
101 scheduling: SchedulingPolicy,
102 /// sleep provider
103 sleep: Option<SimpleMockTimeProvider>,
104 /// starting wall clock time
105 starting_wallclock: Option<SystemTime>,
106}
107
108impl MockRuntime {
109 /// Create a new `MockRuntime` with default parameters
110 pub fn new() -> Self {
111 Self::default()
112 }
113
114 /// Return a builder, for creating a `MockRuntime` with some parameters manually configured
115 pub fn builder() -> MockRuntimeBuilder {
116 Default::default()
117 }
118
119 /// Run a test case with a variety of runtime parameters, to try to find bugs
120 ///
121 /// `test_case` is an async closure which receives a `MockRuntime`.
122 /// It will be run with a number of differently configured executors.
123 ///
124 /// Each run will be preceded by an [`eprintln!`] showing the runtime configuration.
125 ///
126 /// ### Variations
127 ///
128 /// The only variation currently implemented is this:
129 ///
130 /// Both FIFO and LIFO scheduling policies are tested,
131 /// in the hope that this will help discover ordering-dependent bugs.
132 pub fn test_with_various<TC, FUT>(mut test_case: TC)
133 where
134 TC: FnMut(MockRuntime) -> FUT,
135 FUT: Future<Output = ()>,
136 {
137 Self::try_test_with_various(|runtime| test_case(runtime).map(|()| Ok::<_, Void>(())))
138 .void_unwrap();
139 }
140
141 /// Run a faillible test case with a variety of runtime parameters, to try to find bugs
142 ///
143 /// `test_case` is an async closure which receives a `MockRuntime`.
144 /// It will be run with a number of differently configured executors.
145 ///
146 /// This function accepts a fallible closure,
147 /// and returns the first `Err` to the caller.
148 ///
149 /// See [`test_with_various()`](MockRuntime::test_with_various) for more details.
150 #[allow(clippy::print_stderr)]
151 pub fn try_test_with_various<TC, FUT, E>(mut test_case: TC) -> Result<(), E>
152 where
153 TC: FnMut(MockRuntime) -> FUT,
154 FUT: Future<Output = Result<(), E>>,
155 {
156 for scheduling in SchedulingPolicy::iter() {
157 let config = MockRuntime::builder().scheduling(scheduling);
158 eprintln!("running test with MockRuntime configuration {config:?}");
159 let runtime = config.build();
160 runtime.block_on(test_case(runtime.clone()))?;
161 }
162 Ok(())
163 }
164
165 /// Spawn a task and return something to identify it
166 ///
167 /// See [`MockExecutor::spawn_identified()`]
168 pub fn spawn_identified(
169 &self,
170 desc: impl Display,
171 fut: impl Future<Output = ()> + Send + 'static,
172 ) -> impl Debug + Clone + Send + 'static {
173 self.task.spawn_identified(desc, fut)
174 }
175
176 /// Spawn a task and return its output for further usage
177 ///
178 /// See [`MockExecutor::spawn_join()`]
179 pub fn spawn_join<T: Debug + Send + 'static>(
180 &self,
181 desc: impl Display,
182 fut: impl Future<Output = T> + Send + 'static,
183 ) -> impl Future<Output = T> {
184 self.task.spawn_join(desc, fut)
185 }
186
187 /// Run tasks and advance time, until every task except this one is waiting
188 ///
189 /// On return the other tasks won't be waiting on timeouts,
190 /// since time will be advanced as needed.
191 ///
192 /// Therefore the other tasks (if any) will be waiting for something
193 /// that won't happen by itself,
194 /// such as a provocation via their APIs from this task.
195 ///
196 /// # Panics
197 ///
198 /// See [`progress_until_stalled`](MockRuntime::progress_until_stalled)
199 pub async fn advance_until_stalled(&self) {
200 self.advance_inner(|| {
201 let Some(timeout) = self.time_until_next_timeout() else {
202 // Nothing is waiting on timeouts
203 return ControlFlow::Break(());
204 };
205 assert_ne!(timeout, Duration::ZERO);
206 ControlFlow::Continue(timeout)
207 })
208 .await;
209 }
210
211 /// Run tasks in the current executor until every task except this one is waiting
212 ///
213 /// Calls [`MockExecutor::progress_until_stalled()`].
214 ///
215 /// # Restriction - no automatic time advance
216 ///
217 /// The mocked time will *not* be automatically advanced.
218 ///
219 /// Usually
220 /// (and especially if the tasks under test are waiting for timeouts or periodic events)
221 /// you must use
222 /// [`advance_by()`](MockRuntime::advance_by)
223 /// or
224 /// [`advance_until()`](MockRuntime::advance_until)
225 /// to ensure the simulated time progresses as required.
226 ///
227 /// # Panics
228 ///
229 /// Might malfunction or panic if more than one such call is running at once.
230 ///
231 /// (Ie, you must `.await` or drop the returned `Future`
232 /// before calling this method again.)
233 ///
234 /// Must be called and awaited within a future being run by `self`.
235 pub async fn progress_until_stalled(&self) {
236 self.task.progress_until_stalled().await;
237 }
238
239 /// Run tasks and advance time up to at most `limit`
240 ///
241 /// Will return when all other tasks are either:
242 /// * Waiting on a timeout that will fire strictly after `limit`,
243 /// (return value is the time until the earliest such)
244 /// * Waiting for something else that won't happen by itself.
245 /// (return value is `None`)
246 ///
247 /// Like [`advance_until_stalled`](MockRuntime::advance_until_stalled)
248 /// but stops when the mock time reaches `limit`.
249 ///
250 /// # Panics
251 ///
252 /// Panics if the time somehow advances beyond `limit`.
253 /// (This function won't do that, but maybe it was beyond `limit` on entry,
254 /// or another task advanced the clock.)
255 ///
256 /// And, see [`progress_until_stalled`](MockRuntime::progress_until_stalled)
257 pub async fn advance_until(&self, limit: Instant) -> Option<Duration> {
258 self.advance_inner(|| {
259 let timeout = self.time_until_next_timeout();
260
261 let limit = limit
262 .checked_duration_since(self.now())
263 .expect("MockRuntime::advance_until: time advanced beyond `limit`!");
264
265 if limit == Duration::ZERO {
266 // Time has reached `limit`
267 return ControlFlow::Break(timeout);
268 }
269
270 let advance = chain!(timeout, [limit]).min().expect("empty!");
271 assert_ne!(advance, Duration::ZERO);
272
273 ControlFlow::Continue(advance)
274 })
275 .await
276 }
277
278 /// Advance time, firing events and other tasks - internal implementation
279 ///
280 /// Common code for `advance_*`.
281 ///
282 /// `body` will called after `progress_until_stalled`.
283 /// It should examine the simulated time, and the next timeout,
284 /// and decide what to do - returning
285 /// `Break` to break the loop, or
286 /// `Continue` giving the `Duration` by which to advance time and go round again.
287 #[allow(clippy::print_stderr)]
288 async fn advance_inner<B>(&self, mut body: impl FnMut() -> ControlFlow<B, Duration>) -> B {
289 /// Warn when we loop more than this many times per call
290 const WARN_AT: u32 = 1000;
291 let mut counter = Some(WARN_AT);
292
293 loop {
294 self.task.progress_until_stalled().await;
295
296 match body() {
297 ControlFlow::Break(v) => break v,
298 ControlFlow::Continue(advance) => {
299 counter = match counter.map(|v| v.checked_sub(1)) {
300 None => None,
301 Some(Some(v)) => Some(v),
302 Some(None) => {
303 eprintln!(
304 "warning: MockRuntime advance_* looped >{WARN_AT} (next sleep: {}ms)\n{:?}",
305 advance.as_millis(),
306 self.mock_task().as_debug_dump(),
307 );
308 None
309 }
310 };
311
312 self.sleep.advance(advance);
313 }
314 }
315 }
316 }
317
318 /// Advances time by `dur`, firing time events and other tasks in order
319 ///
320 /// Prefer this to [`SimpleMockTimeProvider::advance()`];
321 /// it works more faithfully.
322 ///
323 /// Specifically, it advances time in successive stages,
324 /// so that timeouts occur sequentially, in the right order.
325 ///
326 /// # Panics
327 ///
328 /// Can panic if the mock time is advanced by other tasks.
329 ///
330 /// And, see [`progress_until_stalled`](MockRuntime::progress_until_stalled)
331 pub async fn advance_by(&self, dur: Duration) -> Option<Duration> {
332 let limit = self
333 .now()
334 .checked_add(dur)
335 .expect("MockRuntime::advance: time overflow");
336
337 self.advance_until(limit).await
338 }
339
340 /// See [`SimpleMockTimeProvider::jump_wallclock()`]
341 pub fn jump_wallclock(&self, new_wallclock: SystemTime) {
342 self.sleep.jump_wallclock(new_wallclock);
343 }
344
345 /// Return the amount of virtual time until the next timeout
346 /// should elapse.
347 ///
348 /// If there are no more timeouts, return None.
349 ///
350 /// If the next
351 /// timeout should elapse right now, return Some(0).
352 /// However, if other tasks are proceeding,
353 /// typically in that situation those other tasks will wake,
354 /// so a `Some(0)` return won't be visible.
355 /// In test cases, detect immediate timeouts by detecting
356 /// what your task does after the timeout occurs.
357 ///
358 /// Likewise whether this function returns `None` or `Some(...)`
359 /// can depend on whether tasks have actually yet polled various futures.
360 /// The answer should be correct after
361 /// [`progress_until_stalled`](Self::progress_until_stalled).
362 pub fn time_until_next_timeout(&self) -> Option<Duration> {
363 self.sleep.time_until_next_timeout()
364 }
365}
366
367impl MockRuntimeBuilder {
368 /// Set the scheduling policy
369 pub fn scheduling(mut self, scheduling: SchedulingPolicy) -> Self {
370 self.scheduling = scheduling;
371 self
372 }
373
374 /// Provide a non-`Default` [`SimpleMockTimeProvider`]
375 pub fn sleep_provider(mut self, sleep: SimpleMockTimeProvider) -> Self {
376 self.sleep = Some(sleep);
377 self
378 }
379
380 /// Set the starting wall clock time
381 pub fn starting_wallclock(mut self, starting_wallclock: SystemTime) -> Self {
382 self.starting_wallclock = Some(starting_wallclock);
383 self
384 }
385
386 /// Build the runtime
387 pub fn build(self) -> MockRuntime {
388 let MockRuntimeBuilder {
389 scheduling,
390 sleep,
391 starting_wallclock,
392 } = self;
393
394 let sleep = sleep.unwrap_or_default();
395 if let Some(starting_wallclock) = starting_wallclock {
396 sleep.jump_wallclock(starting_wallclock);
397 };
398
399 let task = MockExecutor::with_scheduling(scheduling);
400
401 MockRuntime {
402 sleep,
403 task,
404 ..Default::default()
405 }
406 }
407}
408
409#[cfg(all(test, not(miri)))] // miri cannot do CLOCK_REALTIME
410mod test {
411 // @@ begin test lint list maintained by maint/add_warning @@
412 #![allow(clippy::bool_assert_comparison)]
413 #![allow(clippy::clone_on_copy)]
414 #![allow(clippy::dbg_macro)]
415 #![allow(clippy::mixed_attributes_style)]
416 #![allow(clippy::print_stderr)]
417 #![allow(clippy::print_stdout)]
418 #![allow(clippy::single_char_pattern)]
419 #![allow(clippy::unwrap_used)]
420 #![allow(clippy::unchecked_duration_subtraction)]
421 #![allow(clippy::useless_vec)]
422 #![allow(clippy::needless_pass_by_value)]
423 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
424 use super::*;
425 use futures::channel::mpsc;
426 use futures::{SinkExt as _, StreamExt as _};
427 use std::sync::atomic::AtomicBool;
428 use std::sync::atomic::Ordering::SeqCst;
429 use std::sync::Arc;
430 use tracing::trace;
431 use tracing_test::traced_test;
432
433 //---------- helper alias ----------
434
435 fn ms(i: u64) -> Duration {
436 Duration::from_millis(i)
437 }
438
439 //---------- set up some test tasks ----------
440
441 struct TestTasks {
442 runtime: MockRuntime,
443 start: Instant,
444 tx: mpsc::Sender<()>,
445 signals: Vec<Arc<AtomicBool>>,
446 }
447 impl TestTasks {
448 fn spawn(runtime: &MockRuntime) -> TestTasks {
449 let start = runtime.now();
450 let mut signals = vec![];
451
452 let mut new_signal = || {
453 let signal = Arc::new(AtomicBool::new(false));
454 signals.push(signal.clone());
455 signal
456 };
457
458 let (tx, mut rx) = mpsc::channel(0);
459 runtime.spawn_identified("rx", {
460 let signal = new_signal();
461 async move {
462 trace!("task rx starting...");
463 let _: Option<()> = rx.next().await;
464 signal.store(true, SeqCst);
465 trace!("task rx finished.");
466 }
467 });
468
469 for i in 1..=3 {
470 let signal = new_signal();
471 runtime.spawn_identified(i, {
472 let runtime = runtime.clone();
473 async move {
474 trace!("task {i} starting...");
475 runtime.sleep(ms(i * 1000)).await;
476 signal.store(true, SeqCst);
477 trace!("task {i} finished.");
478 }
479 });
480 }
481 let runtime = runtime.clone();
482
483 TestTasks {
484 runtime,
485 start,
486 tx,
487 signals,
488 }
489 }
490
491 fn signals_list(&self) -> String {
492 self.signals
493 .iter()
494 .map(|s| if s.load(SeqCst) { 't' } else { 'f' })
495 .collect()
496 }
497 }
498
499 //---------- test advance_until_stalled ----------
500
501 impl TestTasks {
502 async fn advance_until_stalled(&self, exp_offset_from_start: Duration, exp_signals: &str) {
503 self.runtime.advance_until_stalled().await;
504 assert_eq!(self.runtime.now() - self.start, exp_offset_from_start);
505 assert_eq!(self.signals_list(), exp_signals);
506 }
507 }
508
509 #[traced_test]
510 #[test]
511 fn advance_until_stalled() {
512 MockRuntime::test_with_various(|runtime| async move {
513 let mut tt = TestTasks::spawn(&runtime);
514
515 tt.advance_until_stalled(ms(3000), "fttt").await;
516 tt.tx.send(()).await.unwrap();
517 tt.advance_until_stalled(ms(3000), "tttt").await;
518 });
519 }
520
521 //---------- test advance_until ----------
522
523 impl TestTasks {
524 async fn advance_until(
525 &self,
526 offset_from_start: Duration,
527 exp_signals: &str,
528 exp_got: Option<Duration>,
529 ) {
530 let limit = self.start + offset_from_start;
531 eprintln!("===> advance_until {}ms", offset_from_start.as_millis());
532 let got = self.runtime.advance_until(limit).await;
533 assert_eq!(self.runtime.now() - self.start, offset_from_start);
534 assert_eq!(self.signals_list(), exp_signals);
535 assert_eq!(got, exp_got);
536 }
537 }
538
539 #[traced_test]
540 #[test]
541 fn advance_until() {
542 MockRuntime::test_with_various(|runtime| async move {
543 let mut tt = TestTasks::spawn(&runtime);
544
545 tt.advance_until(ms(1100), "ftff", Some(ms(900))).await;
546 tt.advance_until(ms(2000), "fttf", Some(ms(1000))).await;
547 tt.tx.send(()).await.unwrap();
548 tt.advance_until(ms(2000), "tttf", Some(ms(1000))).await;
549 tt.advance_until(ms(3300), "tttt", None).await;
550 });
551 }
552
553 //---------- test advance_by ----------
554
555 impl TestTasks {
556 async fn advance_by(
557 &self,
558 advance: Duration,
559 exp_offset_from_start: Duration,
560 exp_signals: &str,
561 exp_got: Option<Duration>,
562 ) {
563 eprintln!("===> advance {}ms", advance.as_millis());
564 let got = self.runtime.advance_by(advance).await;
565 assert_eq!(self.runtime.now() - self.start, exp_offset_from_start);
566 assert_eq!(self.signals_list(), exp_signals);
567 assert_eq!(got, exp_got);
568 }
569 }
570
571 #[traced_test]
572 #[test]
573 fn advance_by() {
574 MockRuntime::test_with_various(|runtime| async move {
575 let mut tt = TestTasks::spawn(&runtime);
576
577 tt.advance_by(ms(1100), ms(1100), "ftff", Some(ms(900)))
578 .await;
579 tt.advance_by(ms(900), ms(2000), "fttf", Some(ms(1000)))
580 .await;
581 tt.tx.send(()).await.unwrap();
582 tt.advance_by(ms(1300), ms(3300), "tttt", None).await;
583 });
584 }
585}