tor_rtmock/
simple_time.rs

1//! Simple provider of simulated time
2//!
3//! See [`SimpleMockTimeProvider`]
4
5use std::cmp::Reverse;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::{Arc, Mutex, MutexGuard};
9use std::task::{Context, Poll, Waker};
10use std::time::{Duration, Instant, SystemTime};
11
12use derive_more::AsMut;
13use priority_queue::priority_queue::PriorityQueue;
14use slotmap_careful::DenseSlotMap;
15
16use tor_rtcompat::CoarseInstant;
17use tor_rtcompat::CoarseTimeProvider;
18use tor_rtcompat::SleepProvider;
19
20use crate::time_core::MockTimeCore;
21
22/// Simple provider of simulated time
23///
24/// Maintains a mocked view of the current [`Instant`] and [`SystemTime`].
25///
26/// The simulated time advances only when explicitly instructed,
27/// by calling [`.advance()`](Provider::advance).
28///
29/// The wallclock time can be warped with
30/// [`.jump_wallclock()`](Provider::jump_wallclock),
31/// allowing simulation of wall clock non-monotonicity.
32///
33/// # Panics and aborts
34///
35/// Panics on time under/overflow.
36///
37/// May cause an abort if the [`SimpleMockTimeProvider`] implementation contains bugs.
38#[derive(Clone, Debug)]
39pub struct SimpleMockTimeProvider {
40    /// The actual state
41    state: Arc<Mutex<State>>,
42}
43
44/// Convenience abbreviation
45pub(crate) use SimpleMockTimeProvider as Provider;
46
47/// Identifier of a [`SleepFuture`]
48type Id = slotmap_careful::DefaultKey;
49
50/// Future for `sleep`
51///
52/// Iff this struct exists, there is an entry for `id` in `prov.futures`.
53/// (It might contain `None`.)
54pub struct SleepFuture {
55    /// Reference to our state
56    prov: Provider,
57
58    /// Which `SleepFuture` are we
59    id: Id,
60}
61
62/// Mutable state for a [`Provider`]
63///
64/// Each sleep ([`Id`], [`SleepFuture`]) is in one of the following states:
65///
66/// | state       | [`SleepFuture`]  | `futures`         | `unready`          |
67/// |-------------|------------------|------------------|--------------------|
68/// | UNPOLLLED   | exists           | present, `None`  | present, `> now`   |
69/// | WAITING     | exists           | present, `Some`  | present, `> now`   |
70/// | READY       | exists           | present, `None`  | absent             |
71/// | DROPPED     | dropped          | absent           | absent             |
72#[derive(Debug, AsMut)]
73struct State {
74    /// Current time (coarse)
75    core: MockTimeCore,
76
77    /// Futures; record of every existing [`SleepFuture`], including any `Waker`
78    ///
79    /// Entry exists iff `SleepFuture` exists.
80    ///
81    /// Contains `None` if we haven't polled the future;
82    /// `Some` if we have.
83    ///
84    /// We could use a `Vec` or `TiVec`
85    /// but using a slotmap is more robust against bugs here.
86    futures: DenseSlotMap<Id, Option<Waker>>,
87
88    /// Priority queue
89    ///
90    /// Subset of `futures`.
91    ///
92    /// An entry is present iff the `Instant` is *strictly* after `State.now`,
93    /// in which case that's when the future should be woken.
94    ///
95    /// `PriorityQueue` is a max-heap but we want earliest times, hence `Reverse`
96    unready: PriorityQueue<Id, Reverse<Instant>>,
97}
98
99/// `Default` makes a `Provider` which starts at whatever the current real time is
100impl Default for Provider {
101    fn default() -> Self {
102        Self::from_real()
103    }
104}
105
106impl Provider {
107    /// Return a new mock time provider starting at a specified point in time
108    pub fn new(now: Instant, wallclock: SystemTime) -> Self {
109        let state = State {
110            core: MockTimeCore::new(now, wallclock),
111            futures: Default::default(),
112            unready: Default::default(),
113        };
114        Provider {
115            state: Arc::new(Mutex::new(state)),
116        }
117    }
118
119    /// Return a new mock time provider starting at the current actual (non-mock) time
120    ///
121    /// Like any [`SimpleMockTimeProvider`], the time is frozen and only changes
122    /// due to calls to `advance`.
123    pub fn from_real() -> Self {
124        Provider::from_wallclock(SystemTime::now())
125    }
126    /// Return a new mock time provider starting at a specified wallclock time
127    ///
128    /// The monotonic time ([`Instant`]) starts at the current actual (non-mock) time.
129    /// (Absolute values of the real monotonic time are not readily
130    /// observable or distinguishable from Rust,
131    /// nor can a fixed `Instant` be constructed,
132    /// so this is usually sufficient for a reproducible test.)
133    pub fn from_wallclock(wallclock: SystemTime) -> Self {
134        Provider::new(Instant::now(), wallclock)
135    }
136
137    /// Advance the simulated time by `d`
138    ///
139    /// This advances both the `Instant` (monotonic time)
140    /// and `SystemTime` (wallclock time)
141    /// by the same amount.
142    ///
143    /// Will wake sleeping [`SleepFuture`]s, as appropriate.
144    ///
145    /// Note that the tasks which were waiting on those now-expired `SleepFuture`s
146    /// will only actually execute when they are next polled.
147    /// `advance` does not yield to the executor or poll any futures.
148    /// The executor will (presumably) poll those woken tasks, when it regains control.
149    /// But the order in which the tasks run will depend on its scheduling policy,
150    /// and might be different to the order implied by the futures' timeout values.
151    ///
152    /// To simulate normal time advancement, wakeups, and task activations,
153    /// use [`MockExecutor::advance_*()`](crate::MockRuntime).
154    pub fn advance(&self, d: Duration) {
155        let mut state = self.lock();
156        state.core.advance(d);
157        state.wake_any();
158    }
159
160    /// Warp the wallclock time
161    ///
162    /// This has no effect on any sleeping futures.
163    /// It only affects the return value from [`.wallclock()`](Provider::wallclock).
164    pub fn jump_wallclock(&self, new_wallclock: SystemTime) {
165        self.lock().core.jump_wallclock(new_wallclock);
166        // Really we ought to wake people up, here.
167        // But absolutely every Rust API is wrong: none offer a way to sleep until a SystemTime.
168        // (There might be some less-portable non-Rust APIs for that.)
169    }
170
171    /// When will the next timeout occur?
172    ///
173    /// Returns the duration until the next [`SleepFuture`] should wake up.
174    ///
175    /// Advancing time by at least this amount will wake up that future,
176    /// and any others with the same wakeup time.
177    ///
178    /// Will never return `Some(ZERO)`:
179    /// any future that is supposed to wake up now (or earlier) has indeed already been woken,
180    /// so it is no longer sleeping and isn't included in the calculation.
181    pub fn time_until_next_timeout(&self) -> Option<Duration> {
182        let state = self.lock();
183        let Reverse(until) = state.unready.peek()?.1;
184        // The invariant (see `State`) guarantees that entries in `unready` are always `> now`,
185        // so we don't whether duration_since would panic or saturate.
186        let d = until.duration_since(state.core.instant());
187        Some(d)
188    }
189
190    /// Convenience function to lock the state
191    fn lock(&self) -> MutexGuard<'_, State> {
192        self.state.lock().expect("simple time state poisoned")
193    }
194}
195
196impl SleepProvider for Provider {
197    type SleepFuture = SleepFuture;
198
199    fn sleep(&self, d: Duration) -> SleepFuture {
200        let mut state = self.lock();
201        let until = state.core.instant() + d;
202
203        let id = state.futures.insert(None);
204        state.unready.push(id, Reverse(until));
205
206        let fut = SleepFuture {
207            id,
208            prov: self.clone(),
209        };
210
211        // This sleep is now UNPOLLLED, except that its time might be `<= now`:
212
213        // Possibly, `until` isn't *strictly* after than `state.now`, since d might be 0.
214        // If so, .wake_any() will restore the invariant by immediately waking.
215        state.wake_any();
216
217        // This sleep is now UNPOLLED or READY, according to whether duration was 0.
218
219        fut
220    }
221
222    fn now(&self) -> Instant {
223        self.lock().core.instant()
224    }
225    fn wallclock(&self) -> SystemTime {
226        self.lock().core.wallclock()
227    }
228}
229
230impl CoarseTimeProvider for Provider {
231    fn now_coarse(&self) -> CoarseInstant {
232        self.lock().core.coarse().now_coarse()
233    }
234}
235
236impl Future for SleepFuture {
237    type Output = ();
238
239    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
240        let mut state = self.prov.lock();
241        if let Some((_, Reverse(scheduled))) = state.unready.get(&self.id) {
242            // Presence of this entry implies scheduled > now: we are UNPOLLED or WAITING
243            assert!(*scheduled > state.core.instant());
244            let waker = Some(cx.waker().clone());
245            // Make this be WAITING.  (If we're re-polled, we simply drop any previous waker.)
246            *state
247                .futures
248                .get_mut(self.id)
249                .expect("polling futures entry") = waker;
250            Poll::Pending
251        } else {
252            // Absence implies scheduled (no longer stored) <= now: we are READY
253            Poll::Ready(())
254        }
255    }
256}
257
258impl State {
259    /// Restore the invariant for `unready` after `now` has been increased
260    ///
261    /// Ie, ensures that any sleeps which are
262    /// WAITING/UNPOLLED except that they are `<= now`,
263    /// are moved to state READY.
264    fn wake_any(&mut self) {
265        loop {
266            match self.unready.peek() {
267                // Keep picking off entries with scheduled <= now
268                Some((_, Reverse(scheduled))) if *scheduled <= self.core.instant() => {
269                    let (id, _) = self.unready.pop().expect("vanished");
270                    // We can .take() the waker since this can only ever run once
271                    // per sleep future (since it happens when we pop it from unready).
272                    let futures_entry = self.futures.get_mut(id).expect("stale unready entry");
273                    if let Some(waker) = futures_entry.take() {
274                        waker.wake();
275                    }
276                }
277                _ => break,
278            }
279        }
280    }
281}
282
283impl Drop for SleepFuture {
284    fn drop(&mut self) {
285        let mut state = self.prov.lock();
286        let _: Option<Waker> = state.futures.remove(self.id).expect("entry vanished");
287        let _: Option<(Id, Reverse<Instant>)> = state.unready.remove(&self.id);
288        // Now it is DROPPED.
289    }
290}
291
292#[cfg(test)]
293mod test {
294    // @@ begin test lint list maintained by maint/add_warning @@
295    #![allow(clippy::bool_assert_comparison)]
296    #![allow(clippy::clone_on_copy)]
297    #![allow(clippy::dbg_macro)]
298    #![allow(clippy::mixed_attributes_style)]
299    #![allow(clippy::print_stderr)]
300    #![allow(clippy::print_stdout)]
301    #![allow(clippy::single_char_pattern)]
302    #![allow(clippy::unwrap_used)]
303    #![allow(clippy::unchecked_duration_subtraction)]
304    #![allow(clippy::useless_vec)]
305    #![allow(clippy::needless_pass_by_value)]
306    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
307    use super::*;
308    use crate::task::MockExecutor;
309    use futures::poll;
310    use humantime::parse_rfc3339;
311    use tor_rtcompat::ToplevelBlockOn as _;
312    use Poll::*;
313
314    fn ms(ms: u64) -> Duration {
315        Duration::from_millis(ms)
316    }
317
318    fn run_test<FUT>(f: impl FnOnce(Provider, MockExecutor) -> FUT)
319    where
320        FUT: Future<Output = ()>,
321    {
322        let sp = Provider::new(
323            Instant::now(), // it would have been nice to make this fixed for the test
324            parse_rfc3339("2000-01-01T00:00:00Z").unwrap(),
325        );
326        let exec = MockExecutor::new();
327        exec.block_on(f(sp, exec.clone()));
328    }
329
330    #[test]
331    fn simple() {
332        run_test(|sp, _exec| async move {
333            let n1 = sp.now();
334            let w1 = sp.wallclock();
335            let mut f1 = sp.sleep(ms(500));
336            let mut f2 = sp.sleep(ms(1500));
337            assert_eq!(poll!(&mut f1), Pending);
338            sp.advance(ms(200));
339            assert_eq!(n1 + ms(200), sp.now());
340            assert_eq!(w1 + ms(200), sp.wallclock());
341            assert_eq!(poll!(&mut f1), Pending);
342            assert_eq!(poll!(&mut f2), Pending);
343            drop(f2);
344            sp.jump_wallclock(w1 + ms(10_000));
345            sp.advance(ms(300));
346            assert_eq!(n1 + ms(500), sp.now());
347            assert_eq!(w1 + ms(10_300), sp.wallclock());
348            assert_eq!(poll!(&mut f1), Ready(()));
349            let mut f0 = sp.sleep(ms(0));
350            assert_eq!(poll!(&mut f0), Ready(()));
351        });
352    }
353
354    #[test]
355    fn task() {
356        run_test(|sp, exec| async move {
357            let st = Arc::new(Mutex::new(0_i8));
358
359            exec.spawn_identified("test task", {
360                let st = st.clone();
361                let sp = sp.clone();
362                async move {
363                    *st.lock().unwrap() = 1;
364                    sp.sleep(ms(500)).await;
365                    *st.lock().unwrap() = 2;
366                    sp.sleep(ms(300)).await;
367                    *st.lock().unwrap() = 3;
368                }
369            });
370
371            let st = move || *st.lock().unwrap();
372
373            assert_eq!(st(), 0);
374            exec.progress_until_stalled().await;
375            assert_eq!(st(), 1);
376            assert_eq!(sp.time_until_next_timeout(), Some(ms(500)));
377
378            sp.advance(ms(500));
379
380            assert_eq!(st(), 1);
381            assert_eq!(sp.time_until_next_timeout(), None);
382            exec.progress_until_stalled().await;
383            assert_eq!(st(), 2);
384            assert_eq!(sp.time_until_next_timeout(), Some(ms(300)));
385
386            sp.advance(ms(500));
387            assert_eq!(st(), 2);
388            assert_eq!(sp.time_until_next_timeout(), None);
389            exec.progress_until_stalled().await;
390            assert_eq!(sp.time_until_next_timeout(), None);
391            assert_eq!(st(), 3);
392        });
393    }
394}