tor_rtmock/
runtime.rs

1//! Completely mock runtime
2
3#![forbid(unsafe_code)] // if you remove this, enable (or write) miri tests (git grep miri)
4
5use std::fmt::{Debug, Display};
6use std::ops::ControlFlow;
7
8use amplify::Getters;
9use futures::FutureExt as _;
10use itertools::chain;
11use strum::IntoEnumIterator as _;
12use void::{ResultVoidExt as _, Void};
13
14use crate::util::impl_runtime_prelude::*;
15
16use crate::net::MockNetProvider;
17use crate::simple_time::SimpleMockTimeProvider;
18use crate::task::{MockExecutor, SchedulingPolicy};
19
20/// Completely mock runtime, with simulated time
21///
22/// Suitable for test cases that wish to completely control
23/// the environment experienced by the code under test.
24///
25/// ### Useful properties
26///
27/// The execution order is deterministic.
28/// Time will advance only in a controlled fashion.
29/// Typically, the main task in a test will call
30/// [`advance_until_stalled`](MockRuntime::advance_until_stalled).
31///
32/// Reliable sequencing techniques which can be used in tests include:
33/// sleeping for carefully chosen durations;
34/// interlocking via intertask channels; or,
35/// sequenced control of requests to the code under test.
36///
37/// ### Restrictions
38///
39/// The test case must advance the mock time explicitly as desired,
40/// typically by calling one of the `MockRuntime::advance_*` methods.
41///
42/// Tests that use this runtime *must not* interact with the outside world;
43/// everything must go through this runtime (and its pieces).
44///
45/// There is no mocking of filesystem access;
46/// the `MockRuntime`'s time will disagree with `SystemTime`'s
47/// obtained from (for example) `std::fs::Metadata`.
48///
49/// #### Allowed
50///
51///  * Inter-future communication facilities from `futures`
52///    or other runtime-agnostic crates.
53///
54///  * Fast synchronous operations that will complete "immediately" or "quickly".
55///    E.g.: filesystem calls.
56///
57///  * `std::sync::Mutex` (assuming the use is deadlock-free in a single-threaded
58///    executor, as it should be in all of Arti).
59///
60///  * Slower operations that are run synchronously (without futures `await`)
61///    provided their completion doesn't depend on any of the futures we're running.
62///    (These kind of operations are often discouraged in async contexts,
63///    because they block the async runtime or its worker threads.
64///    But they are often OK in tests.)
65///
66///  * All facilities provided by this `MockExecutor` and its trait impls.
67///
68/// #### Not allowed
69///
70///  * Direct access to the real-world clock (`SystemTime::now`, `Instant::now`),
71///    including direct use of `coarsetime`.
72///    Instead, use [`SleepProvider`] and [`CoarseTimeProvider`] methods on the runtime.
73///    Exception: CPU use measurements.
74///
75///  * Anything that spawns threads and then communicates with those threads
76///    using async Rust facilities (futures).
77///
78///  * Async sockets, or async use of other kernel-based IPC or network mechanisms.
79///
80///  * Anything provided by a Rust runtime/executor project (eg anything from Tokio),
81///    unless it is definitively established that it's runtime-agnostic.
82#[derive(Debug, Default, Clone, Getters, Deftly)]
83#[derive_deftly(SomeMockRuntime)]
84#[getter(prefix = "mock_")]
85pub struct MockRuntime {
86    /// Tasks
87    #[deftly(mock(task, toplevel))]
88    task: MockExecutor,
89    /// Time provider
90    #[deftly(mock(sleep))]
91    sleep: SimpleMockTimeProvider,
92    /// Net provider
93    #[deftly(mock(net))]
94    net: MockNetProvider,
95}
96
97/// Builder for a manually-configured `MockRuntime`
98#[derive(Debug, Default, Clone)]
99pub struct MockRuntimeBuilder {
100    /// scheduling policy
101    scheduling: SchedulingPolicy,
102    /// sleep provider
103    sleep: Option<SimpleMockTimeProvider>,
104    /// starting wall clock time
105    starting_wallclock: Option<SystemTime>,
106}
107
108impl MockRuntime {
109    /// Create a new `MockRuntime` with default parameters
110    pub fn new() -> Self {
111        Self::default()
112    }
113
114    /// Return a builder, for creating a `MockRuntime` with some parameters manually configured
115    pub fn builder() -> MockRuntimeBuilder {
116        Default::default()
117    }
118
119    /// Run a test case with a variety of runtime parameters, to try to find bugs
120    ///
121    /// `test_case` is an async closure which receives a `MockRuntime`.
122    /// It will be run with a number of differently configured executors.
123    ///
124    /// Each run will be preceded by an [`eprintln!`] showing the runtime configuration.
125    ///
126    /// ### Variations
127    ///
128    /// The only variation currently implemented is this:
129    ///
130    /// Both FIFO and LIFO scheduling policies are tested,
131    /// in the hope that this will help discover ordering-dependent bugs.
132    pub fn test_with_various<TC, FUT>(mut test_case: TC)
133    where
134        TC: FnMut(MockRuntime) -> FUT,
135        FUT: Future<Output = ()>,
136    {
137        Self::try_test_with_various(|runtime| test_case(runtime).map(|()| Ok::<_, Void>(())))
138            .void_unwrap();
139    }
140
141    /// Run a faillible test case with a variety of runtime parameters, to try to find bugs
142    ///
143    /// `test_case` is an async closure which receives a `MockRuntime`.
144    /// It will be run with a number of differently configured executors.
145    ///
146    /// This function accepts a fallible closure,
147    /// and returns the first `Err` to the caller.
148    ///
149    /// See [`test_with_various()`](MockRuntime::test_with_various) for more details.
150    #[allow(clippy::print_stderr)]
151    pub fn try_test_with_various<TC, FUT, E>(mut test_case: TC) -> Result<(), E>
152    where
153        TC: FnMut(MockRuntime) -> FUT,
154        FUT: Future<Output = Result<(), E>>,
155    {
156        for scheduling in SchedulingPolicy::iter() {
157            let config = MockRuntime::builder().scheduling(scheduling);
158            eprintln!("running test with MockRuntime configuration {config:?}");
159            let runtime = config.build();
160            runtime.block_on(test_case(runtime.clone()))?;
161        }
162        Ok(())
163    }
164
165    /// Spawn a task and return something to identify it
166    ///
167    /// See [`MockExecutor::spawn_identified()`]
168    pub fn spawn_identified(
169        &self,
170        desc: impl Display,
171        fut: impl Future<Output = ()> + Send + 'static,
172    ) -> impl Debug + Clone + Send + 'static {
173        self.task.spawn_identified(desc, fut)
174    }
175
176    /// Spawn a task and return its output for further usage
177    ///
178    /// See [`MockExecutor::spawn_join()`]
179    pub fn spawn_join<T: Debug + Send + 'static>(
180        &self,
181        desc: impl Display,
182        fut: impl Future<Output = T> + Send + 'static,
183    ) -> impl Future<Output = T> {
184        self.task.spawn_join(desc, fut)
185    }
186
187    /// Run tasks and advance time, until every task except this one is waiting
188    ///
189    /// On return the other tasks won't be waiting on timeouts,
190    /// since time will be advanced as needed.
191    ///
192    /// Therefore the other tasks (if any) will be waiting for something
193    /// that won't happen by itself,
194    /// such as a provocation via their APIs from this task.
195    ///
196    /// # Panics
197    ///
198    /// See [`progress_until_stalled`](MockRuntime::progress_until_stalled)
199    pub async fn advance_until_stalled(&self) {
200        self.advance_inner(|| {
201            let Some(timeout) = self.time_until_next_timeout() else {
202                // Nothing is waiting on timeouts
203                return ControlFlow::Break(());
204            };
205            assert_ne!(timeout, Duration::ZERO);
206            ControlFlow::Continue(timeout)
207        })
208        .await;
209    }
210
211    /// Run tasks in the current executor until every task except this one is waiting
212    ///
213    /// Calls [`MockExecutor::progress_until_stalled()`].
214    ///
215    /// # Restriction - no automatic time advance
216    ///
217    /// The mocked time will *not* be automatically advanced.
218    ///
219    /// Usually
220    /// (and especially if the tasks under test are waiting for timeouts or periodic events)
221    /// you must use
222    /// [`advance_by()`](MockRuntime::advance_by)
223    /// or
224    /// [`advance_until()`](MockRuntime::advance_until)
225    /// to ensure the simulated time progresses as required.
226    ///
227    /// # Panics
228    ///
229    /// Might malfunction or panic if more than one such call is running at once.
230    ///
231    /// (Ie, you must `.await` or drop the returned `Future`
232    /// before calling this method again.)
233    ///
234    /// Must be called and awaited within a future being run by `self`.
235    pub async fn progress_until_stalled(&self) {
236        self.task.progress_until_stalled().await;
237    }
238
239    /// Run tasks and advance time up to at most `limit`
240    ///
241    /// Will return when all other tasks are either:
242    ///  * Waiting on a timeout that will fire strictly after `limit`,
243    ///    (return value is the time until the earliest such)
244    ///  * Waiting for something else that won't happen by itself.
245    ///    (return value is `None`)
246    ///
247    /// Like [`advance_until_stalled`](MockRuntime::advance_until_stalled)
248    /// but stops when the mock time reaches `limit`.
249    ///
250    /// # Panics
251    ///
252    /// Panics if the time somehow advances beyond `limit`.
253    /// (This function won't do that, but maybe it was beyond `limit` on entry,
254    /// or another task advanced the clock.)
255    ///
256    /// And, see [`progress_until_stalled`](MockRuntime::progress_until_stalled)
257    pub async fn advance_until(&self, limit: Instant) -> Option<Duration> {
258        self.advance_inner(|| {
259            let timeout = self.time_until_next_timeout();
260
261            let limit = limit
262                .checked_duration_since(self.now())
263                .expect("MockRuntime::advance_until: time advanced beyond `limit`!");
264
265            if limit == Duration::ZERO {
266                // Time has reached `limit`
267                return ControlFlow::Break(timeout);
268            }
269
270            let advance = chain!(timeout, [limit]).min().expect("empty!");
271            assert_ne!(advance, Duration::ZERO);
272
273            ControlFlow::Continue(advance)
274        })
275        .await
276    }
277
278    /// Advance time, firing events and other tasks - internal implementation
279    ///
280    /// Common code for `advance_*`.
281    ///
282    /// `body` will called after `progress_until_stalled`.
283    /// It should examine the simulated time, and the next timeout,
284    /// and decide what to do - returning
285    /// `Break` to break the loop, or
286    /// `Continue` giving the `Duration` by which to advance time and go round again.
287    #[allow(clippy::print_stderr)]
288    async fn advance_inner<B>(&self, mut body: impl FnMut() -> ControlFlow<B, Duration>) -> B {
289        /// Warn when we loop more than this many times per call
290        const WARN_AT: u32 = 1000;
291        let mut counter = Some(WARN_AT);
292
293        loop {
294            self.task.progress_until_stalled().await;
295
296            match body() {
297                ControlFlow::Break(v) => break v,
298                ControlFlow::Continue(advance) => {
299                    counter = match counter.map(|v| v.checked_sub(1)) {
300                        None => None,
301                        Some(Some(v)) => Some(v),
302                        Some(None) => {
303                            eprintln!(
304 "warning: MockRuntime advance_* looped >{WARN_AT} (next sleep: {}ms)\n{:?}",
305                                advance.as_millis(),
306                                self.mock_task().as_debug_dump(),
307                            );
308                            None
309                        }
310                    };
311
312                    self.sleep.advance(advance);
313                }
314            }
315        }
316    }
317
318    /// Advances time by `dur`, firing time events and other tasks in order
319    ///
320    /// Prefer this to [`SimpleMockTimeProvider::advance()`];
321    /// it works more faithfully.
322    ///
323    /// Specifically, it advances time in successive stages,
324    /// so that timeouts occur sequentially, in the right order.
325    ///
326    /// # Panics
327    ///
328    /// Can panic if the mock time is advanced by other tasks.
329    ///
330    /// And, see [`progress_until_stalled`](MockRuntime::progress_until_stalled)
331    pub async fn advance_by(&self, dur: Duration) -> Option<Duration> {
332        let limit = self
333            .now()
334            .checked_add(dur)
335            .expect("MockRuntime::advance: time overflow");
336
337        self.advance_until(limit).await
338    }
339
340    /// See [`SimpleMockTimeProvider::jump_wallclock()`]
341    pub fn jump_wallclock(&self, new_wallclock: SystemTime) {
342        self.sleep.jump_wallclock(new_wallclock);
343    }
344
345    /// Return the amount of virtual time until the next timeout
346    /// should elapse.
347    ///
348    /// If there are no more timeouts, return None.
349    ///
350    /// If the next
351    /// timeout should elapse right now, return Some(0).
352    /// However, if other tasks are proceeding,
353    /// typically in that situation those other tasks will wake,
354    /// so a `Some(0)` return won't be visible.
355    /// In test cases, detect immediate timeouts by detecting
356    /// what your task does after the timeout occurs.
357    ///
358    /// Likewise whether this function returns `None` or `Some(...)`
359    /// can depend on whether tasks have actually yet polled various futures.
360    /// The answer should be correct after
361    /// [`progress_until_stalled`](Self::progress_until_stalled).
362    pub fn time_until_next_timeout(&self) -> Option<Duration> {
363        self.sleep.time_until_next_timeout()
364    }
365}
366
367impl MockRuntimeBuilder {
368    /// Set the scheduling policy
369    pub fn scheduling(mut self, scheduling: SchedulingPolicy) -> Self {
370        self.scheduling = scheduling;
371        self
372    }
373
374    /// Provide a non-`Default` [`SimpleMockTimeProvider`]
375    pub fn sleep_provider(mut self, sleep: SimpleMockTimeProvider) -> Self {
376        self.sleep = Some(sleep);
377        self
378    }
379
380    /// Set the starting wall clock time
381    pub fn starting_wallclock(mut self, starting_wallclock: SystemTime) -> Self {
382        self.starting_wallclock = Some(starting_wallclock);
383        self
384    }
385
386    /// Build the runtime
387    pub fn build(self) -> MockRuntime {
388        let MockRuntimeBuilder {
389            scheduling,
390            sleep,
391            starting_wallclock,
392        } = self;
393
394        let sleep = sleep.unwrap_or_default();
395        if let Some(starting_wallclock) = starting_wallclock {
396            sleep.jump_wallclock(starting_wallclock);
397        };
398
399        let task = MockExecutor::with_scheduling(scheduling);
400
401        MockRuntime {
402            sleep,
403            task,
404            ..Default::default()
405        }
406    }
407}
408
409#[cfg(all(test, not(miri)))] // miri cannot do CLOCK_REALTIME
410mod test {
411    // @@ begin test lint list maintained by maint/add_warning @@
412    #![allow(clippy::bool_assert_comparison)]
413    #![allow(clippy::clone_on_copy)]
414    #![allow(clippy::dbg_macro)]
415    #![allow(clippy::mixed_attributes_style)]
416    #![allow(clippy::print_stderr)]
417    #![allow(clippy::print_stdout)]
418    #![allow(clippy::single_char_pattern)]
419    #![allow(clippy::unwrap_used)]
420    #![allow(clippy::unchecked_duration_subtraction)]
421    #![allow(clippy::useless_vec)]
422    #![allow(clippy::needless_pass_by_value)]
423    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
424    use super::*;
425    use futures::channel::mpsc;
426    use futures::{SinkExt as _, StreamExt as _};
427    use std::sync::atomic::AtomicBool;
428    use std::sync::atomic::Ordering::SeqCst;
429    use std::sync::Arc;
430    use tracing::trace;
431    use tracing_test::traced_test;
432
433    //---------- helper alias ----------
434
435    fn ms(i: u64) -> Duration {
436        Duration::from_millis(i)
437    }
438
439    //---------- set up some test tasks ----------
440
441    struct TestTasks {
442        runtime: MockRuntime,
443        start: Instant,
444        tx: mpsc::Sender<()>,
445        signals: Vec<Arc<AtomicBool>>,
446    }
447    impl TestTasks {
448        fn spawn(runtime: &MockRuntime) -> TestTasks {
449            let start = runtime.now();
450            let mut signals = vec![];
451
452            let mut new_signal = || {
453                let signal = Arc::new(AtomicBool::new(false));
454                signals.push(signal.clone());
455                signal
456            };
457
458            let (tx, mut rx) = mpsc::channel(0);
459            runtime.spawn_identified("rx", {
460                let signal = new_signal();
461                async move {
462                    trace!("task rx starting...");
463                    let _: Option<()> = rx.next().await;
464                    signal.store(true, SeqCst);
465                    trace!("task rx finished.");
466                }
467            });
468
469            for i in 1..=3 {
470                let signal = new_signal();
471                runtime.spawn_identified(i, {
472                    let runtime = runtime.clone();
473                    async move {
474                        trace!("task {i} starting...");
475                        runtime.sleep(ms(i * 1000)).await;
476                        signal.store(true, SeqCst);
477                        trace!("task {i} finished.");
478                    }
479                });
480            }
481            let runtime = runtime.clone();
482
483            TestTasks {
484                runtime,
485                start,
486                tx,
487                signals,
488            }
489        }
490
491        fn signals_list(&self) -> String {
492            self.signals
493                .iter()
494                .map(|s| if s.load(SeqCst) { 't' } else { 'f' })
495                .collect()
496        }
497    }
498
499    //---------- test advance_until_stalled ----------
500
501    impl TestTasks {
502        async fn advance_until_stalled(&self, exp_offset_from_start: Duration, exp_signals: &str) {
503            self.runtime.advance_until_stalled().await;
504            assert_eq!(self.runtime.now() - self.start, exp_offset_from_start);
505            assert_eq!(self.signals_list(), exp_signals);
506        }
507    }
508
509    #[traced_test]
510    #[test]
511    fn advance_until_stalled() {
512        MockRuntime::test_with_various(|runtime| async move {
513            let mut tt = TestTasks::spawn(&runtime);
514
515            tt.advance_until_stalled(ms(3000), "fttt").await;
516            tt.tx.send(()).await.unwrap();
517            tt.advance_until_stalled(ms(3000), "tttt").await;
518        });
519    }
520
521    //---------- test advance_until ----------
522
523    impl TestTasks {
524        async fn advance_until(
525            &self,
526            offset_from_start: Duration,
527            exp_signals: &str,
528            exp_got: Option<Duration>,
529        ) {
530            let limit = self.start + offset_from_start;
531            eprintln!("===> advance_until {}ms", offset_from_start.as_millis());
532            let got = self.runtime.advance_until(limit).await;
533            assert_eq!(self.runtime.now() - self.start, offset_from_start);
534            assert_eq!(self.signals_list(), exp_signals);
535            assert_eq!(got, exp_got);
536        }
537    }
538
539    #[traced_test]
540    #[test]
541    fn advance_until() {
542        MockRuntime::test_with_various(|runtime| async move {
543            let mut tt = TestTasks::spawn(&runtime);
544
545            tt.advance_until(ms(1100), "ftff", Some(ms(900))).await;
546            tt.advance_until(ms(2000), "fttf", Some(ms(1000))).await;
547            tt.tx.send(()).await.unwrap();
548            tt.advance_until(ms(2000), "tttf", Some(ms(1000))).await;
549            tt.advance_until(ms(3300), "tttt", None).await;
550        });
551    }
552
553    //---------- test advance_by ----------
554
555    impl TestTasks {
556        async fn advance_by(
557            &self,
558            advance: Duration,
559            exp_offset_from_start: Duration,
560            exp_signals: &str,
561            exp_got: Option<Duration>,
562        ) {
563            eprintln!("===> advance {}ms", advance.as_millis());
564            let got = self.runtime.advance_by(advance).await;
565            assert_eq!(self.runtime.now() - self.start, exp_offset_from_start);
566            assert_eq!(self.signals_list(), exp_signals);
567            assert_eq!(got, exp_got);
568        }
569    }
570
571    #[traced_test]
572    #[test]
573    fn advance_by() {
574        MockRuntime::test_with_various(|runtime| async move {
575            let mut tt = TestTasks::spawn(&runtime);
576
577            tt.advance_by(ms(1100), ms(1100), "ftff", Some(ms(900)))
578                .await;
579            tt.advance_by(ms(900), ms(2000), "fttf", Some(ms(1000)))
580                .await;
581            tt.tx.send(()).await.unwrap();
582            tt.advance_by(ms(1300), ms(3300), "tttt", None).await;
583        });
584    }
585}