1
//! Simple provider of simulated time
2
//!
3
//! See [`SimpleMockTimeProvider`]
4

            
5
use std::cmp::Reverse;
6
use std::future::Future;
7
use std::pin::Pin;
8
use std::sync::{Arc, Mutex, MutexGuard};
9
use std::task::{Context, Poll, Waker};
10
use std::time::{Duration, Instant, SystemTime};
11

            
12
use derive_more::AsMut;
13
use priority_queue::priority_queue::PriorityQueue;
14
use slotmap_careful::DenseSlotMap;
15

            
16
use tor_rtcompat::CoarseInstant;
17
use tor_rtcompat::CoarseTimeProvider;
18
use tor_rtcompat::SleepProvider;
19

            
20
use crate::time_core::MockTimeCore;
21

            
22
/// Simple provider of simulated time
23
///
24
/// Maintains a mocked view of the current [`Instant`] and [`SystemTime`].
25
///
26
/// The simulated time advances only when explicitly instructed,
27
/// by calling [`.advance()`](Provider::advance).
28
///
29
/// The wallclock time can be warped with
30
/// [`.jump_wallclock()`](Provider::jump_wallclock),
31
/// allowing simulation of wall clock non-monotonicity.
32
///
33
/// # Panics and aborts
34
///
35
/// Panics on time under/overflow.
36
///
37
/// May cause an abort if the [`SimpleMockTimeProvider`] implementation contains bugs.
38
#[derive(Clone, Debug)]
39
pub struct SimpleMockTimeProvider {
40
    /// The actual state
41
    state: Arc<Mutex<State>>,
42
}
43

            
44
/// Convenience abbreviation
45
pub(crate) use SimpleMockTimeProvider as Provider;
46

            
47
/// Identifier of a [`SleepFuture`]
48
type Id = slotmap_careful::DefaultKey;
49

            
50
/// Future for `sleep`
51
///
52
/// Iff this struct exists, there is an entry for `id` in `prov.futures`.
53
/// (It might contain `None`.)
54
pub struct SleepFuture {
55
    /// Reference to our state
56
    prov: Provider,
57

            
58
    /// Which `SleepFuture` are we
59
    id: Id,
60
}
61

            
62
/// Mutable state for a [`Provider`]
63
///
64
/// Each sleep ([`Id`], [`SleepFuture`]) is in one of the following states:
65
///
66
/// | state       | [`SleepFuture`]  | `futures`         | `unready`          |
67
/// |-------------|------------------|------------------|--------------------|
68
/// | UNPOLLLED   | exists           | present, `None`  | present, `> now`   |
69
/// | WAITING     | exists           | present, `Some`  | present, `> now`   |
70
/// | READY       | exists           | present, `None`  | absent             |
71
/// | DROPPED     | dropped          | absent           | absent             |
72
#[derive(Debug, AsMut)]
73
struct State {
74
    /// Current time (coarse)
75
    core: MockTimeCore,
76

            
77
    /// Futures; record of every existing [`SleepFuture`], including any `Waker`
78
    ///
79
    /// Entry exists iff `SleepFuture` exists.
80
    ///
81
    /// Contains `None` if we haven't polled the future;
82
    /// `Some` if we have.
83
    ///
84
    /// We could use a `Vec` or `TiVec`
85
    /// but using a slotmap is more robust against bugs here.
86
    futures: DenseSlotMap<Id, Option<Waker>>,
87

            
88
    /// Priority queue
89
    ///
90
    /// Subset of `futures`.
91
    ///
92
    /// An entry is present iff the `Instant` is *strictly* after `State.now`,
93
    /// in which case that's when the future should be woken.
94
    ///
95
    /// `PriorityQueue` is a max-heap but we want earliest times, hence `Reverse`
96
    unready: PriorityQueue<Id, Reverse<Instant>>,
97
}
98

            
99
/// `Default` makes a `Provider` which starts at whatever the current real time is
100
impl Default for Provider {
101
50762
    fn default() -> Self {
102
50762
        Self::from_real()
103
50762
    }
104
}
105

            
106
impl Provider {
107
    /// Return a new mock time provider starting at a specified point in time
108
50904
    pub fn new(now: Instant, wallclock: SystemTime) -> Self {
109
50904
        let state = State {
110
50904
            core: MockTimeCore::new(now, wallclock),
111
50904
            futures: Default::default(),
112
50904
            unready: Default::default(),
113
50904
        };
114
50904
        Provider {
115
50904
            state: Arc::new(Mutex::new(state)),
116
50904
        }
117
50904
    }
118

            
119
    /// Return a new mock time provider starting at the current actual (non-mock) time
120
    ///
121
    /// Like any [`SimpleMockTimeProvider`], the time is frozen and only changes
122
    /// due to calls to `advance`.
123
50762
    pub fn from_real() -> Self {
124
50762
        Provider::from_wallclock(SystemTime::now())
125
50762
    }
126
    /// Return a new mock time provider starting at a specified wallclock time
127
    ///
128
    /// The monotonic time ([`Instant`]) starts at the current actual (non-mock) time.
129
    /// (Absolute values of the real monotonic time are not readily
130
    /// observable or distinguishable from Rust,
131
    /// nor can a fixed `Instant` be constructed,
132
    /// so this is usually sufficient for a reproducible test.)
133
50762
    pub fn from_wallclock(wallclock: SystemTime) -> Self {
134
50762
        Provider::new(Instant::now(), wallclock)
135
50762
    }
136

            
137
    /// Advance the simulated time by `d`
138
    ///
139
    /// This advances both the `Instant` (monotonic time)
140
    /// and `SystemTime` (wallclock time)
141
    /// by the same amount.
142
    ///
143
    /// Will wake sleeping [`SleepFuture`]s, as appropriate.
144
    ///
145
    /// Note that the tasks which were waiting on those now-expired `SleepFuture`s
146
    /// will only actually execute when they are next polled.
147
    /// `advance` does not yield to the executor or poll any futures.
148
    /// The executor will (presumably) poll those woken tasks, when it regains control.
149
    /// But the order in which the tasks run will depend on its scheduling policy,
150
    /// and might be different to the order implied by the futures' timeout values.
151
    ///
152
    /// To simulate normal time advancement, wakeups, and task activations,
153
    /// use [`MockExecutor::advance_*()`](crate::MockRuntime).
154
10548
    pub fn advance(&self, d: Duration) {
155
10548
        let mut state = self.lock();
156
10548
        state.core.advance(d);
157
10548
        state.wake_any();
158
10548
    }
159

            
160
    /// Warp the wallclock time
161
    ///
162
    /// This has no effect on any sleeping futures.
163
    /// It only affects the return value from [`.wallclock()`](Provider::wallclock).
164
1152
    pub fn jump_wallclock(&self, new_wallclock: SystemTime) {
165
1152
        self.lock().core.jump_wallclock(new_wallclock);
166
1152
        // Really we ought to wake people up, here.
167
1152
        // But absolutely every Rust API is wrong: none offer a way to sleep until a SystemTime.
168
1152
        // (There might be some less-portable non-Rust APIs for that.)
169
1152
    }
170

            
171
    /// When will the next timeout occur?
172
    ///
173
    /// Returns the duration until the next [`SleepFuture`] should wake up.
174
    ///
175
    /// Advancing time by at least this amount will wake up that future,
176
    /// and any others with the same wakeup time.
177
    ///
178
    /// Will never return `Some(ZERO)`:
179
    /// any future that is supposed to wake up now (or earlier) has indeed already been woken,
180
    /// so it is no longer sleeping and isn't included in the calculation.
181
936106
    pub fn time_until_next_timeout(&self) -> Option<Duration> {
182
936106
        let state = self.lock();
183
936106
        let Reverse(until) = state.unready.peek()?.1;
184
        // The invariant (see `State`) guarantees that entries in `unready` are always `> now`,
185
        // so we don't whether duration_since would panic or saturate.
186
8854
        let d = until.duration_since(state.core.instant());
187
8854
        Some(d)
188
936106
    }
189

            
190
    /// Convenience function to lock the state
191
1339542
    fn lock(&self) -> MutexGuard<'_, State> {
192
1339542
        self.state.lock().expect("simple time state poisoned")
193
1339542
    }
194
}
195

            
196
impl SleepProvider for Provider {
197
    type SleepFuture = SleepFuture;
198

            
199
27186
    fn sleep(&self, d: Duration) -> SleepFuture {
200
27186
        let mut state = self.lock();
201
27186
        let until = state.core.instant() + d;
202
27186

            
203
27186
        let id = state.futures.insert(None);
204
27186
        state.unready.push(id, Reverse(until));
205
27186

            
206
27186
        let fut = SleepFuture {
207
27186
            id,
208
27186
            prov: self.clone(),
209
27186
        };
210
27186

            
211
27186
        // This sleep is now UNPOLLLED, except that its time might be `<= now`:
212
27186

            
213
27186
        // Possibly, `until` isn't *strictly* after than `state.now`, since d might be 0.
214
27186
        // If so, .wake_any() will restore the invariant by immediately waking.
215
27186
        state.wake_any();
216
27186

            
217
27186
        // This sleep is now UNPOLLED or READY, according to whether duration was 0.
218
27186

            
219
27186
        fut
220
27186
    }
221

            
222
75344
    fn now(&self) -> Instant {
223
75344
        self.lock().core.instant()
224
75344
    }
225
78942
    fn wallclock(&self) -> SystemTime {
226
78942
        self.lock().core.wallclock()
227
78942
    }
228
}
229

            
230
impl CoarseTimeProvider for Provider {
231
151800
    fn now_coarse(&self) -> CoarseInstant {
232
151800
        self.lock().core.coarse().now_coarse()
233
151800
    }
234
}
235

            
236
impl Future for SleepFuture {
237
    type Output = ();
238

            
239
34958
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
240
34958
        let mut state = self.prov.lock();
241
34958
        if let Some((_, Reverse(scheduled))) = state.unready.get(&self.id) {
242
            // Presence of this entry implies scheduled > now: we are UNPOLLED or WAITING
243
24932
            assert!(*scheduled > state.core.instant());
244
24932
            let waker = Some(cx.waker().clone());
245
24932
            // Make this be WAITING.  (If we're re-polled, we simply drop any previous waker.)
246
24932
            *state
247
24932
                .futures
248
24932
                .get_mut(self.id)
249
24932
                .expect("polling futures entry") = waker;
250
24932
            Poll::Pending
251
        } else {
252
            // Absence implies scheduled (no longer stored) <= now: we are READY
253
10026
            Poll::Ready(())
254
        }
255
34958
    }
256
}
257

            
258
impl State {
259
    /// Restore the invariant for `unready` after `now` has been increased
260
    ///
261
    /// Ie, ensures that any sleeps which are
262
    /// WAITING/UNPOLLED except that they are `<= now`,
263
    /// are moved to state READY.
264
37734
    fn wake_any(&mut self) {
265
        loop {
266
47852
            match self.unready.peek() {
267
                // Keep picking off entries with scheduled <= now
268
43822
                Some((_, Reverse(scheduled))) if *scheduled <= self.core.instant() => {
269
10118
                    let (id, _) = self.unready.pop().expect("vanished");
270
10118
                    // We can .take() the waker since this can only ever run once
271
10118
                    // per sleep future (since it happens when we pop it from unready).
272
10118
                    let futures_entry = self.futures.get_mut(id).expect("stale unready entry");
273
10118
                    if let Some(waker) = futures_entry.take() {
274
10116
                        waker.wake();
275
10116
                    }
276
                }
277
37734
                _ => break,
278
37734
            }
279
37734
        }
280
37734
    }
281
}
282

            
283
impl Drop for SleepFuture {
284
23506
    fn drop(&mut self) {
285
23506
        let mut state = self.prov.lock();
286
23506
        let _: Option<Waker> = state.futures.remove(self.id).expect("entry vanished");
287
23506
        let _: Option<(Id, Reverse<Instant>)> = state.unready.remove(&self.id);
288
23506
        // Now it is DROPPED.
289
23506
    }
290
}
291

            
292
#[cfg(test)]
293
mod test {
294
    // @@ begin test lint list maintained by maint/add_warning @@
295
    #![allow(clippy::bool_assert_comparison)]
296
    #![allow(clippy::clone_on_copy)]
297
    #![allow(clippy::dbg_macro)]
298
    #![allow(clippy::mixed_attributes_style)]
299
    #![allow(clippy::print_stderr)]
300
    #![allow(clippy::print_stdout)]
301
    #![allow(clippy::single_char_pattern)]
302
    #![allow(clippy::unwrap_used)]
303
    #![allow(clippy::unchecked_duration_subtraction)]
304
    #![allow(clippy::useless_vec)]
305
    #![allow(clippy::needless_pass_by_value)]
306
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
307
    use super::*;
308
    use crate::task::MockExecutor;
309
    use futures::poll;
310
    use humantime::parse_rfc3339;
311
    use tor_rtcompat::ToplevelBlockOn as _;
312
    use Poll::*;
313

            
314
    fn ms(ms: u64) -> Duration {
315
        Duration::from_millis(ms)
316
    }
317

            
318
    fn run_test<FUT>(f: impl FnOnce(Provider, MockExecutor) -> FUT)
319
    where
320
        FUT: Future<Output = ()>,
321
    {
322
        let sp = Provider::new(
323
            Instant::now(), // it would have been nice to make this fixed for the test
324
            parse_rfc3339("2000-01-01T00:00:00Z").unwrap(),
325
        );
326
        let exec = MockExecutor::new();
327
        exec.block_on(f(sp, exec.clone()));
328
    }
329

            
330
    #[test]
331
    fn simple() {
332
        run_test(|sp, _exec| async move {
333
            let n1 = sp.now();
334
            let w1 = sp.wallclock();
335
            let mut f1 = sp.sleep(ms(500));
336
            let mut f2 = sp.sleep(ms(1500));
337
            assert_eq!(poll!(&mut f1), Pending);
338
            sp.advance(ms(200));
339
            assert_eq!(n1 + ms(200), sp.now());
340
            assert_eq!(w1 + ms(200), sp.wallclock());
341
            assert_eq!(poll!(&mut f1), Pending);
342
            assert_eq!(poll!(&mut f2), Pending);
343
            drop(f2);
344
            sp.jump_wallclock(w1 + ms(10_000));
345
            sp.advance(ms(300));
346
            assert_eq!(n1 + ms(500), sp.now());
347
            assert_eq!(w1 + ms(10_300), sp.wallclock());
348
            assert_eq!(poll!(&mut f1), Ready(()));
349
            let mut f0 = sp.sleep(ms(0));
350
            assert_eq!(poll!(&mut f0), Ready(()));
351
        });
352
    }
353

            
354
    #[test]
355
    fn task() {
356
        run_test(|sp, exec| async move {
357
            let st = Arc::new(Mutex::new(0_i8));
358

            
359
            exec.spawn_identified("test task", {
360
                let st = st.clone();
361
                let sp = sp.clone();
362
                async move {
363
                    *st.lock().unwrap() = 1;
364
                    sp.sleep(ms(500)).await;
365
                    *st.lock().unwrap() = 2;
366
                    sp.sleep(ms(300)).await;
367
                    *st.lock().unwrap() = 3;
368
                }
369
            });
370

            
371
            let st = move || *st.lock().unwrap();
372

            
373
            assert_eq!(st(), 0);
374
            exec.progress_until_stalled().await;
375
            assert_eq!(st(), 1);
376
            assert_eq!(sp.time_until_next_timeout(), Some(ms(500)));
377

            
378
            sp.advance(ms(500));
379

            
380
            assert_eq!(st(), 1);
381
            assert_eq!(sp.time_until_next_timeout(), None);
382
            exec.progress_until_stalled().await;
383
            assert_eq!(st(), 2);
384
            assert_eq!(sp.time_until_next_timeout(), Some(ms(300)));
385

            
386
            sp.advance(ms(500));
387
            assert_eq!(st(), 2);
388
            assert_eq!(sp.time_until_next_timeout(), None);
389
            exec.progress_until_stalled().await;
390
            assert_eq!(sp.time_until_next_timeout(), None);
391
            assert_eq!(st(), 3);
392
        });
393
    }
394
}