1
//! Completely mock runtime
2

            
3
#![forbid(unsafe_code)] // if you remove this, enable (or write) miri tests (git grep miri)
4

            
5
use std::fmt::{Debug, Display};
6
use std::ops::ControlFlow;
7

            
8
use amplify::Getters;
9
use futures::FutureExt as _;
10
use itertools::chain;
11
use strum::IntoEnumIterator as _;
12
use void::{ResultVoidExt as _, Void};
13

            
14
use crate::util::impl_runtime_prelude::*;
15

            
16
use crate::net::MockNetProvider;
17
use crate::simple_time::SimpleMockTimeProvider;
18
use crate::task::{MockExecutor, SchedulingPolicy};
19

            
20
/// Completely mock runtime, with simulated time
21
///
22
/// Suitable for test cases that wish to completely control
23
/// the environment experienced by the code under test.
24
///
25
/// ### Useful properties
26
///
27
/// The execution order is deterministic.
28
/// Time will advance only in a controlled fashion.
29
/// Typically, the main task in a test will call
30
/// [`advance_until_stalled`](MockRuntime::advance_until_stalled).
31
///
32
/// Reliable sequencing techniques which can be used in tests include:
33
/// sleeping for carefully chosen durations;
34
/// interlocking via intertask channels; or,
35
/// sequenced control of requests to the code under test.
36
///
37
/// ### Restrictions
38
///
39
/// The test case must advance the mock time explicitly as desired,
40
/// typically by calling one of the `MockRuntime::advance_*` methods.
41
///
42
/// Tests that use this runtime *must not* interact with the outside world;
43
/// everything must go through this runtime (and its pieces).
44
///
45
/// There is no mocking of filesystem access;
46
/// the `MockRuntime`'s time will disagree with `SystemTime`'s
47
/// obtained from (for example) `std::fs::Metadata`.
48
///
49
/// #### Allowed
50
///
51
///  * Inter-future communication facilities from `futures`
52
///    or other runtime-agnostic crates.
53
///
54
///  * Fast synchronous operations that will complete "immediately" or "quickly".
55
///    E.g.: filesystem calls.
56
///
57
///  * `std::sync::Mutex` (assuming the use is deadlock-free in a single-threaded
58
///    executor, as it should be in all of Arti).
59
///
60
///  * Slower operations that are run synchronously (without futures `await`)
61
///    provided their completion doesn't depend on any of the futures we're running.
62
///    (These kind of operations are often discouraged in async contexts,
63
///    because they block the async runtime or its worker threads.
64
///    But they are often OK in tests.)
65
///
66
///  * All facilities provided by this `MockExecutor` and its trait impls.
67
///
68
/// #### Not allowed
69
///
70
///  * Direct access to the real-world clock (`SystemTime::now`, `Instant::now`),
71
///    including direct use of `coarsetime`.
72
///    Instead, use [`SleepProvider`] and [`CoarseTimeProvider`] methods on the runtime.
73
///    Exception: CPU use measurements.
74
///
75
///  * Anything that spawns threads and then communicates with those threads
76
///    using async Rust facilities (futures).
77
///
78
///  * Async sockets, or async use of other kernel-based IPC or network mechanisms.
79
///
80
///  * Anything provided by a Rust runtime/executor project (eg anything from Tokio),
81
///    unless it is definitively established that it's runtime-agnostic.
82
#[derive(Debug, Default, Clone, Getters, Deftly)]
83
#[derive_deftly(SomeMockRuntime)]
84
#[getter(prefix = "mock_")]
85
pub struct MockRuntime {
86
    /// Tasks
87
    #[deftly(mock(task, toplevel))]
88
    task: MockExecutor,
89
    /// Time provider
90
    #[deftly(mock(sleep))]
91
    sleep: SimpleMockTimeProvider,
92
    /// Net provider
93
    #[deftly(mock(net))]
94
    net: MockNetProvider,
95
}
96

            
97
/// Builder for a manually-configured `MockRuntime`
98
#[derive(Debug, Default, Clone)]
99
pub struct MockRuntimeBuilder {
100
    /// scheduling policy
101
    scheduling: SchedulingPolicy,
102
    /// sleep provider
103
    sleep: Option<SimpleMockTimeProvider>,
104
    /// starting wall clock time
105
    starting_wallclock: Option<SystemTime>,
106
}
107

            
108
impl MockRuntime {
109
    /// Create a new `MockRuntime` with default parameters
110
9720
    pub fn new() -> Self {
111
9720
        Self::default()
112
9720
    }
113

            
114
    /// Return a builder, for creating a `MockRuntime` with some parameters manually configured
115
6267
    pub fn builder() -> MockRuntimeBuilder {
116
6267
        Default::default()
117
6267
    }
118

            
119
    /// Run a test case with a variety of runtime parameters, to try to find bugs
120
    ///
121
    /// `test_case` is an async closure which receives a `MockRuntime`.
122
    /// It will be run with a number of differently configured executors.
123
    ///
124
    /// Each run will be preceded by an [`eprintln!`] showing the runtime configuration.
125
    ///
126
    /// ### Variations
127
    ///
128
    /// The only variation currently implemented is this:
129
    ///
130
    /// Both FIFO and LIFO scheduling policies are tested,
131
    /// in the hope that this will help discover ordering-dependent bugs.
132
134
    pub fn test_with_various<TC, FUT>(mut test_case: TC)
133
134
    where
134
134
        TC: FnMut(MockRuntime) -> FUT,
135
134
        FUT: Future<Output = ()>,
136
134
    {
137
268
        Self::try_test_with_various(|runtime| test_case(runtime).map(|()| Ok::<_, Void>(())))
138
134
            .void_unwrap();
139
134
    }
140

            
141
    /// Run a faillible test case with a variety of runtime parameters, to try to find bugs
142
    ///
143
    /// `test_case` is an async closure which receives a `MockRuntime`.
144
    /// It will be run with a number of differently configured executors.
145
    ///
146
    /// This function accepts a fallible closure,
147
    /// and returns the first `Err` to the caller.
148
    ///
149
    /// See [`test_with_various()`](MockRuntime::test_with_various) for more details.
150
    #[allow(clippy::print_stderr)]
151
142
    pub fn try_test_with_various<TC, FUT, E>(mut test_case: TC) -> Result<(), E>
152
142
    where
153
142
        TC: FnMut(MockRuntime) -> FUT,
154
142
        FUT: Future<Output = Result<(), E>>,
155
142
    {
156
426
        for scheduling in SchedulingPolicy::iter() {
157
284
            let config = MockRuntime::builder().scheduling(scheduling);
158
284
            eprintln!("running test with MockRuntime configuration {config:?}");
159
284
            let runtime = config.build();
160
284
            runtime.block_on(test_case(runtime.clone()))?;
161
        }
162
142
        Ok(())
163
142
    }
164

            
165
    /// Spawn a task and return something to identify it
166
    ///
167
    /// See [`MockExecutor::spawn_identified()`]
168
132
    pub fn spawn_identified(
169
132
        &self,
170
132
        desc: impl Display,
171
132
        fut: impl Future<Output = ()> + Send + 'static,
172
132
    ) -> impl Debug + Clone + Send + 'static {
173
132
        self.task.spawn_identified(desc, fut)
174
132
    }
175

            
176
    /// Spawn a task and return its output for further usage
177
    ///
178
    /// See [`MockExecutor::spawn_join()`]
179
16
    pub fn spawn_join<T: Debug + Send + 'static>(
180
16
        &self,
181
16
        desc: impl Display,
182
16
        fut: impl Future<Output = T> + Send + 'static,
183
16
    ) -> impl Future<Output = T> {
184
16
        self.task.spawn_join(desc, fut)
185
16
    }
186

            
187
    /// Run tasks and advance time, until every task except this one is waiting
188
    ///
189
    /// On return the other tasks won't be waiting on timeouts,
190
    /// since time will be advanced as needed.
191
    ///
192
    /// Therefore the other tasks (if any) will be waiting for something
193
    /// that won't happen by itself,
194
    /// such as a provocation via their APIs from this task.
195
    ///
196
    /// # Panics
197
    ///
198
    /// See [`progress_until_stalled`](MockRuntime::progress_until_stalled)
199
925532
    pub async fn advance_until_stalled(&self) {
200
40512
        self.advance_inner(|| {
201
40512
            let Some(timeout) = self.time_until_next_timeout() else {
202
                // Nothing is waiting on timeouts
203
40248
                return ControlFlow::Break(());
204
            };
205
264
            assert_ne!(timeout, Duration::ZERO);
206
264
            ControlFlow::Continue(timeout)
207
40512
        })
208
40248
        .await;
209
40248
    }
210

            
211
    /// Run tasks in the current executor until every task except this one is waiting
212
    ///
213
    /// Calls [`MockExecutor::progress_until_stalled()`].
214
    ///
215
    /// # Restriction - no automatic time advance
216
    ///
217
    /// The mocked time will *not* be automatically advanced.
218
    ///
219
    /// Usually
220
    /// (and especially if the tasks under test are waiting for timeouts or periodic events)
221
    /// you must use
222
    /// [`advance_by()`](MockRuntime::advance_by)
223
    /// or
224
    /// [`advance_until()`](MockRuntime::advance_until)
225
    /// to ensure the simulated time progresses as required.
226
    ///
227
    /// # Panics
228
    ///
229
    /// Might malfunction or panic if more than one such call is running at once.
230
    ///
231
    /// (Ie, you must `.await` or drop the returned `Future`
232
    /// before calling this method again.)
233
    ///
234
    /// Must be called and awaited within a future being run by `self`.
235
7130
    pub async fn progress_until_stalled(&self) {
236
310
        self.task.progress_until_stalled().await;
237
310
    }
238

            
239
    /// Run tasks and advance time up to at most `limit`
240
    ///
241
    /// Will return when all other tasks are either:
242
    ///  * Waiting on a timeout that will fire strictly after `limit`,
243
    ///    (return value is the time until the earliest such)
244
    ///  * Waiting for something else that won't happen by itself.
245
    ///    (return value is `None`)
246
    ///
247
    /// Like [`advance_until_stalled`](MockRuntime::advance_until_stalled)
248
    /// but stops when the mock time reaches `limit`.
249
    ///
250
    /// # Panics
251
    ///
252
    /// Panics if the time somehow advances beyond `limit`.
253
    /// (This function won't do that, but maybe it was beyond `limit` on entry,
254
    /// or another task advanced the clock.)
255
    ///
256
    /// And, see [`progress_until_stalled`](MockRuntime::progress_until_stalled)
257
1882
    pub async fn advance_until(&self, limit: Instant) -> Option<Duration> {
258
262
        self.advance_inner(|| {
259
262
            let timeout = self.time_until_next_timeout();
260
262

            
261
262
            let limit = limit
262
262
                .checked_duration_since(self.now())
263
262
                .expect("MockRuntime::advance_until: time advanced beyond `limit`!");
264
262

            
265
262
            if limit == Duration::ZERO {
266
                // Time has reached `limit`
267
108
                return ControlFlow::Break(timeout);
268
154
            }
269
154

            
270
154
            let advance = chain!(timeout, [limit]).min().expect("empty!");
271
154
            assert_ne!(advance, Duration::ZERO);
272

            
273
154
            ControlFlow::Continue(advance)
274
262
        })
275
108
        .await
276
108
    }
277

            
278
    /// Advance time, firing events and other tasks - internal implementation
279
    ///
280
    /// Common code for `advance_*`.
281
    ///
282
    /// `body` will called after `progress_until_stalled`.
283
    /// It should examine the simulated time, and the next timeout,
284
    /// and decide what to do - returning
285
    /// `Break` to break the loop, or
286
    /// `Continue` giving the `Duration` by which to advance time and go round again.
287
    #[allow(clippy::print_stderr)]
288
40356
    async fn advance_inner<B>(&self, mut body: impl FnMut() -> ControlFlow<B, Duration>) -> B {
289
        /// Warn when we loop more than this many times per call
290
        const WARN_AT: u32 = 1000;
291
40356
        let mut counter = Some(WARN_AT);
292

            
293
        loop {
294
40774
            self.task.progress_until_stalled().await;
295

            
296
40774
            match body() {
297
40356
                ControlFlow::Break(v) => break v,
298
418
                ControlFlow::Continue(advance) => {
299
418
                    counter = match counter.map(|v| v.checked_sub(1)) {
300
                        None => None,
301
418
                        Some(Some(v)) => Some(v),
302
                        Some(None) => {
303
                            eprintln!(
304
 "warning: MockRuntime advance_* looped >{WARN_AT} (next sleep: {}ms)\n{:?}",
305
                                advance.as_millis(),
306
                                self.mock_task().as_debug_dump(),
307
                            );
308
                            None
309
                        }
310
                    };
311

            
312
418
                    self.sleep.advance(advance);
313
                }
314
            }
315
        }
316
40356
    }
317

            
318
    /// Advances time by `dur`, firing time events and other tasks in order
319
    ///
320
    /// Prefer this to [`SimpleMockTimeProvider::advance()`];
321
    /// it works more faithfully.
322
    ///
323
    /// Specifically, it advances time in successive stages,
324
    /// so that timeouts occur sequentially, in the right order.
325
    ///
326
    /// # Panics
327
    ///
328
    /// Can panic if the mock time is advanced by other tasks.
329
    ///
330
    /// And, see [`progress_until_stalled`](MockRuntime::progress_until_stalled)
331
1858
    pub async fn advance_by(&self, dur: Duration) -> Option<Duration> {
332
92
        let limit = self
333
92
            .now()
334
92
            .checked_add(dur)
335
92
            .expect("MockRuntime::advance: time overflow");
336
92

            
337
92
        self.advance_until(limit).await
338
92
    }
339

            
340
    /// See [`SimpleMockTimeProvider::jump_wallclock()`]
341
315
    pub fn jump_wallclock(&self, new_wallclock: SystemTime) {
342
315
        self.sleep.jump_wallclock(new_wallclock);
343
315
    }
344

            
345
    /// Return the amount of virtual time until the next timeout
346
    /// should elapse.
347
    ///
348
    /// If there are no more timeouts, return None.
349
    ///
350
    /// If the next
351
    /// timeout should elapse right now, return Some(0).
352
    /// However, if other tasks are proceeding,
353
    /// typically in that situation those other tasks will wake,
354
    /// so a `Some(0)` return won't be visible.
355
    /// In test cases, detect immediate timeouts by detecting
356
    /// what your task does after the timeout occurs.
357
    ///
358
    /// Likewise whether this function returns `None` or `Some(...)`
359
    /// can depend on whether tasks have actually yet polled various futures.
360
    /// The answer should be correct after
361
    /// [`progress_until_stalled`](Self::progress_until_stalled).
362
915748
    pub fn time_until_next_timeout(&self) -> Option<Duration> {
363
915748
        self.sleep.time_until_next_timeout()
364
915748
    }
365
}
366

            
367
impl MockRuntimeBuilder {
368
    /// Set the scheduling policy
369
6132
    pub fn scheduling(mut self, scheduling: SchedulingPolicy) -> Self {
370
6132
        self.scheduling = scheduling;
371
6132
        self
372
6132
    }
373

            
374
    /// Provide a non-`Default` [`SimpleMockTimeProvider`]
375
135
    pub fn sleep_provider(mut self, sleep: SimpleMockTimeProvider) -> Self {
376
135
        self.sleep = Some(sleep);
377
135
        self
378
135
    }
379

            
380
    /// Set the starting wall clock time
381
    pub fn starting_wallclock(mut self, starting_wallclock: SystemTime) -> Self {
382
        self.starting_wallclock = Some(starting_wallclock);
383
        self
384
    }
385

            
386
    /// Build the runtime
387
6267
    pub fn build(self) -> MockRuntime {
388
6267
        let MockRuntimeBuilder {
389
6267
            scheduling,
390
6267
            sleep,
391
6267
            starting_wallclock,
392
6267
        } = self;
393
6267

            
394
6267
        let sleep = sleep.unwrap_or_default();
395
6267
        if let Some(starting_wallclock) = starting_wallclock {
396
            sleep.jump_wallclock(starting_wallclock);
397
6267
        };
398

            
399
6267
        let task = MockExecutor::with_scheduling(scheduling);
400
6267

            
401
6267
        MockRuntime {
402
6267
            sleep,
403
6267
            task,
404
6267
            ..Default::default()
405
6267
        }
406
6267
    }
407
}
408

            
409
#[cfg(all(test, not(miri)))] // miri cannot do CLOCK_REALTIME
410
mod test {
411
    // @@ begin test lint list maintained by maint/add_warning @@
412
    #![allow(clippy::bool_assert_comparison)]
413
    #![allow(clippy::clone_on_copy)]
414
    #![allow(clippy::dbg_macro)]
415
    #![allow(clippy::mixed_attributes_style)]
416
    #![allow(clippy::print_stderr)]
417
    #![allow(clippy::print_stdout)]
418
    #![allow(clippy::single_char_pattern)]
419
    #![allow(clippy::unwrap_used)]
420
    #![allow(clippy::unchecked_duration_subtraction)]
421
    #![allow(clippy::useless_vec)]
422
    #![allow(clippy::needless_pass_by_value)]
423
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
424
    use super::*;
425
    use futures::channel::mpsc;
426
    use futures::{SinkExt as _, StreamExt as _};
427
    use std::sync::atomic::AtomicBool;
428
    use std::sync::atomic::Ordering::SeqCst;
429
    use std::sync::Arc;
430
    use tracing::trace;
431
    use tracing_test::traced_test;
432

            
433
    //---------- helper alias ----------
434

            
435
    fn ms(i: u64) -> Duration {
436
        Duration::from_millis(i)
437
    }
438

            
439
    //---------- set up some test tasks ----------
440

            
441
    struct TestTasks {
442
        runtime: MockRuntime,
443
        start: Instant,
444
        tx: mpsc::Sender<()>,
445
        signals: Vec<Arc<AtomicBool>>,
446
    }
447
    impl TestTasks {
448
        fn spawn(runtime: &MockRuntime) -> TestTasks {
449
            let start = runtime.now();
450
            let mut signals = vec![];
451

            
452
            let mut new_signal = || {
453
                let signal = Arc::new(AtomicBool::new(false));
454
                signals.push(signal.clone());
455
                signal
456
            };
457

            
458
            let (tx, mut rx) = mpsc::channel(0);
459
            runtime.spawn_identified("rx", {
460
                let signal = new_signal();
461
                async move {
462
                    trace!("task rx starting...");
463
                    let _: Option<()> = rx.next().await;
464
                    signal.store(true, SeqCst);
465
                    trace!("task rx finished.");
466
                }
467
            });
468

            
469
            for i in 1..=3 {
470
                let signal = new_signal();
471
                runtime.spawn_identified(i, {
472
                    let runtime = runtime.clone();
473
                    async move {
474
                        trace!("task {i} starting...");
475
                        runtime.sleep(ms(i * 1000)).await;
476
                        signal.store(true, SeqCst);
477
                        trace!("task {i} finished.");
478
                    }
479
                });
480
            }
481
            let runtime = runtime.clone();
482

            
483
            TestTasks {
484
                runtime,
485
                start,
486
                tx,
487
                signals,
488
            }
489
        }
490

            
491
        fn signals_list(&self) -> String {
492
            self.signals
493
                .iter()
494
                .map(|s| if s.load(SeqCst) { 't' } else { 'f' })
495
                .collect()
496
        }
497
    }
498

            
499
    //---------- test advance_until_stalled ----------
500

            
501
    impl TestTasks {
502
        async fn advance_until_stalled(&self, exp_offset_from_start: Duration, exp_signals: &str) {
503
            self.runtime.advance_until_stalled().await;
504
            assert_eq!(self.runtime.now() - self.start, exp_offset_from_start);
505
            assert_eq!(self.signals_list(), exp_signals);
506
        }
507
    }
508

            
509
    #[traced_test]
510
    #[test]
511
    fn advance_until_stalled() {
512
        MockRuntime::test_with_various(|runtime| async move {
513
            let mut tt = TestTasks::spawn(&runtime);
514

            
515
            tt.advance_until_stalled(ms(3000), "fttt").await;
516
            tt.tx.send(()).await.unwrap();
517
            tt.advance_until_stalled(ms(3000), "tttt").await;
518
        });
519
    }
520

            
521
    //---------- test advance_until ----------
522

            
523
    impl TestTasks {
524
        async fn advance_until(
525
            &self,
526
            offset_from_start: Duration,
527
            exp_signals: &str,
528
            exp_got: Option<Duration>,
529
        ) {
530
            let limit = self.start + offset_from_start;
531
            eprintln!("===> advance_until {}ms", offset_from_start.as_millis());
532
            let got = self.runtime.advance_until(limit).await;
533
            assert_eq!(self.runtime.now() - self.start, offset_from_start);
534
            assert_eq!(self.signals_list(), exp_signals);
535
            assert_eq!(got, exp_got);
536
        }
537
    }
538

            
539
    #[traced_test]
540
    #[test]
541
    fn advance_until() {
542
        MockRuntime::test_with_various(|runtime| async move {
543
            let mut tt = TestTasks::spawn(&runtime);
544

            
545
            tt.advance_until(ms(1100), "ftff", Some(ms(900))).await;
546
            tt.advance_until(ms(2000), "fttf", Some(ms(1000))).await;
547
            tt.tx.send(()).await.unwrap();
548
            tt.advance_until(ms(2000), "tttf", Some(ms(1000))).await;
549
            tt.advance_until(ms(3300), "tttt", None).await;
550
        });
551
    }
552

            
553
    //---------- test advance_by ----------
554

            
555
    impl TestTasks {
556
        async fn advance_by(
557
            &self,
558
            advance: Duration,
559
            exp_offset_from_start: Duration,
560
            exp_signals: &str,
561
            exp_got: Option<Duration>,
562
        ) {
563
            eprintln!("===> advance {}ms", advance.as_millis());
564
            let got = self.runtime.advance_by(advance).await;
565
            assert_eq!(self.runtime.now() - self.start, exp_offset_from_start);
566
            assert_eq!(self.signals_list(), exp_signals);
567
            assert_eq!(got, exp_got);
568
        }
569
    }
570

            
571
    #[traced_test]
572
    #[test]
573
    fn advance_by() {
574
        MockRuntime::test_with_various(|runtime| async move {
575
            let mut tt = TestTasks::spawn(&runtime);
576

            
577
            tt.advance_by(ms(1100), ms(1100), "ftff", Some(ms(900)))
578
                .await;
579
            tt.advance_by(ms(900), ms(2000), "fttf", Some(ms(1000)))
580
                .await;
581
            tt.tx.send(()).await.unwrap();
582
            tt.advance_by(ms(1300), ms(3300), "tttt", None).await;
583
        });
584
    }
585
}