tor_rtmock/simple_time.rs
1//! Simple provider of simulated time
2//!
3//! See [`SimpleMockTimeProvider`]
4
5use std::cmp::Reverse;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::{Arc, Mutex, MutexGuard};
9use std::task::{Context, Poll, Waker};
10use std::time::{Duration, Instant, SystemTime};
11
12use derive_more::AsMut;
13use priority_queue::priority_queue::PriorityQueue;
14use slotmap_careful::DenseSlotMap;
15
16use tor_rtcompat::CoarseInstant;
17use tor_rtcompat::CoarseTimeProvider;
18use tor_rtcompat::SleepProvider;
19
20use crate::time_core::MockTimeCore;
21
22/// Simple provider of simulated time
23///
24/// Maintains a mocked view of the current [`Instant`] and [`SystemTime`].
25///
26/// The simulated time advances only when explicitly instructed,
27/// by calling [`.advance()`](Provider::advance).
28///
29/// The wallclock time can be warped with
30/// [`.jump_wallclock()`](Provider::jump_wallclock),
31/// allowing simulation of wall clock non-monotonicity.
32///
33/// # Panics and aborts
34///
35/// Panics on time under/overflow.
36///
37/// May cause an abort if the [`SimpleMockTimeProvider`] implementation contains bugs.
38#[derive(Clone, Debug)]
39pub struct SimpleMockTimeProvider {
40 /// The actual state
41 state: Arc<Mutex<State>>,
42}
43
44/// Convenience abbreviation
45pub(crate) use SimpleMockTimeProvider as Provider;
46
47/// Identifier of a [`SleepFuture`]
48type Id = slotmap_careful::DefaultKey;
49
50/// Future for `sleep`
51///
52/// Iff this struct exists, there is an entry for `id` in `prov.futures`.
53/// (It might contain `None`.)
54pub struct SleepFuture {
55 /// Reference to our state
56 prov: Provider,
57
58 /// Which `SleepFuture` are we
59 id: Id,
60}
61
62/// Mutable state for a [`Provider`]
63///
64/// Each sleep ([`Id`], [`SleepFuture`]) is in one of the following states:
65///
66/// | state | [`SleepFuture`] | `futures` | `unready` |
67/// |-------------|------------------|------------------|--------------------|
68/// | UNPOLLLED | exists | present, `None` | present, `> now` |
69/// | WAITING | exists | present, `Some` | present, `> now` |
70/// | READY | exists | present, `None` | absent |
71/// | DROPPED | dropped | absent | absent |
72#[derive(Debug, AsMut)]
73struct State {
74 /// Current time (coarse)
75 core: MockTimeCore,
76
77 /// Futures; record of every existing [`SleepFuture`], including any `Waker`
78 ///
79 /// Entry exists iff `SleepFuture` exists.
80 ///
81 /// Contains `None` if we haven't polled the future;
82 /// `Some` if we have.
83 ///
84 /// We could use a `Vec` or `TiVec`
85 /// but using a slotmap is more robust against bugs here.
86 futures: DenseSlotMap<Id, Option<Waker>>,
87
88 /// Priority queue
89 ///
90 /// Subset of `futures`.
91 ///
92 /// An entry is present iff the `Instant` is *strictly* after `State.now`,
93 /// in which case that's when the future should be woken.
94 ///
95 /// `PriorityQueue` is a max-heap but we want earliest times, hence `Reverse`
96 unready: PriorityQueue<Id, Reverse<Instant>>,
97}
98
99/// `Default` makes a `Provider` which starts at whatever the current real time is
100impl Default for Provider {
101 fn default() -> Self {
102 Self::from_real()
103 }
104}
105
106impl Provider {
107 /// Return a new mock time provider starting at a specified point in time
108 pub fn new(now: Instant, wallclock: SystemTime) -> Self {
109 let state = State {
110 core: MockTimeCore::new(now, wallclock),
111 futures: Default::default(),
112 unready: Default::default(),
113 };
114 Provider {
115 state: Arc::new(Mutex::new(state)),
116 }
117 }
118
119 /// Return a new mock time provider starting at the current actual (non-mock) time
120 ///
121 /// Like any [`SimpleMockTimeProvider`], the time is frozen and only changes
122 /// due to calls to `advance`.
123 pub fn from_real() -> Self {
124 Provider::from_wallclock(SystemTime::now())
125 }
126 /// Return a new mock time provider starting at a specified wallclock time
127 ///
128 /// The monotonic time ([`Instant`]) starts at the current actual (non-mock) time.
129 /// (Absolute values of the real monotonic time are not readily
130 /// observable or distinguishable from Rust,
131 /// nor can a fixed `Instant` be constructed,
132 /// so this is usually sufficient for a reproducible test.)
133 pub fn from_wallclock(wallclock: SystemTime) -> Self {
134 Provider::new(Instant::now(), wallclock)
135 }
136
137 /// Advance the simulated time by `d`
138 ///
139 /// This advances both the `Instant` (monotonic time)
140 /// and `SystemTime` (wallclock time)
141 /// by the same amount.
142 ///
143 /// Will wake sleeping [`SleepFuture`]s, as appropriate.
144 ///
145 /// Note that the tasks which were waiting on those now-expired `SleepFuture`s
146 /// will only actually execute when they are next polled.
147 /// `advance` does not yield to the executor or poll any futures.
148 /// The executor will (presumably) poll those woken tasks, when it regains control.
149 /// But the order in which the tasks run will depend on its scheduling policy,
150 /// and might be different to the order implied by the futures' timeout values.
151 ///
152 /// To simulate normal time advancement, wakeups, and task activations,
153 /// use [`MockExecutor::advance_*()`](crate::MockRuntime).
154 pub fn advance(&self, d: Duration) {
155 let mut state = self.lock();
156 state.core.advance(d);
157 state.wake_any();
158 }
159
160 /// Warp the wallclock time
161 ///
162 /// This has no effect on any sleeping futures.
163 /// It only affects the return value from [`.wallclock()`](Provider::wallclock).
164 pub fn jump_wallclock(&self, new_wallclock: SystemTime) {
165 self.lock().core.jump_wallclock(new_wallclock);
166 // Really we ought to wake people up, here.
167 // But absolutely every Rust API is wrong: none offer a way to sleep until a SystemTime.
168 // (There might be some less-portable non-Rust APIs for that.)
169 }
170
171 /// When will the next timeout occur?
172 ///
173 /// Returns the duration until the next [`SleepFuture`] should wake up.
174 ///
175 /// Advancing time by at least this amount will wake up that future,
176 /// and any others with the same wakeup time.
177 ///
178 /// Will never return `Some(ZERO)`:
179 /// any future that is supposed to wake up now (or earlier) has indeed already been woken,
180 /// so it is no longer sleeping and isn't included in the calculation.
181 pub fn time_until_next_timeout(&self) -> Option<Duration> {
182 let state = self.lock();
183 let Reverse(until) = state.unready.peek()?.1;
184 // The invariant (see `State`) guarantees that entries in `unready` are always `> now`,
185 // so we don't whether duration_since would panic or saturate.
186 let d = until.duration_since(state.core.instant());
187 Some(d)
188 }
189
190 /// Convenience function to lock the state
191 fn lock(&self) -> MutexGuard<'_, State> {
192 self.state.lock().expect("simple time state poisoned")
193 }
194}
195
196impl SleepProvider for Provider {
197 type SleepFuture = SleepFuture;
198
199 fn sleep(&self, d: Duration) -> SleepFuture {
200 let mut state = self.lock();
201 let until = state.core.instant() + d;
202
203 let id = state.futures.insert(None);
204 state.unready.push(id, Reverse(until));
205
206 let fut = SleepFuture {
207 id,
208 prov: self.clone(),
209 };
210
211 // This sleep is now UNPOLLLED, except that its time might be `<= now`:
212
213 // Possibly, `until` isn't *strictly* after than `state.now`, since d might be 0.
214 // If so, .wake_any() will restore the invariant by immediately waking.
215 state.wake_any();
216
217 // This sleep is now UNPOLLED or READY, according to whether duration was 0.
218
219 fut
220 }
221
222 fn now(&self) -> Instant {
223 self.lock().core.instant()
224 }
225 fn wallclock(&self) -> SystemTime {
226 self.lock().core.wallclock()
227 }
228}
229
230impl CoarseTimeProvider for Provider {
231 fn now_coarse(&self) -> CoarseInstant {
232 self.lock().core.coarse().now_coarse()
233 }
234}
235
236impl Future for SleepFuture {
237 type Output = ();
238
239 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
240 let mut state = self.prov.lock();
241 if let Some((_, Reverse(scheduled))) = state.unready.get(&self.id) {
242 // Presence of this entry implies scheduled > now: we are UNPOLLED or WAITING
243 assert!(*scheduled > state.core.instant());
244 let waker = Some(cx.waker().clone());
245 // Make this be WAITING. (If we're re-polled, we simply drop any previous waker.)
246 *state
247 .futures
248 .get_mut(self.id)
249 .expect("polling futures entry") = waker;
250 Poll::Pending
251 } else {
252 // Absence implies scheduled (no longer stored) <= now: we are READY
253 Poll::Ready(())
254 }
255 }
256}
257
258impl State {
259 /// Restore the invariant for `unready` after `now` has been increased
260 ///
261 /// Ie, ensures that any sleeps which are
262 /// WAITING/UNPOLLED except that they are `<= now`,
263 /// are moved to state READY.
264 fn wake_any(&mut self) {
265 loop {
266 match self.unready.peek() {
267 // Keep picking off entries with scheduled <= now
268 Some((_, Reverse(scheduled))) if *scheduled <= self.core.instant() => {
269 let (id, _) = self.unready.pop().expect("vanished");
270 // We can .take() the waker since this can only ever run once
271 // per sleep future (since it happens when we pop it from unready).
272 let futures_entry = self.futures.get_mut(id).expect("stale unready entry");
273 if let Some(waker) = futures_entry.take() {
274 waker.wake();
275 }
276 }
277 _ => break,
278 }
279 }
280 }
281}
282
283impl Drop for SleepFuture {
284 fn drop(&mut self) {
285 let mut state = self.prov.lock();
286 let _: Option<Waker> = state.futures.remove(self.id).expect("entry vanished");
287 let _: Option<(Id, Reverse<Instant>)> = state.unready.remove(&self.id);
288 // Now it is DROPPED.
289 }
290}
291
292#[cfg(test)]
293mod test {
294 // @@ begin test lint list maintained by maint/add_warning @@
295 #![allow(clippy::bool_assert_comparison)]
296 #![allow(clippy::clone_on_copy)]
297 #![allow(clippy::dbg_macro)]
298 #![allow(clippy::mixed_attributes_style)]
299 #![allow(clippy::print_stderr)]
300 #![allow(clippy::print_stdout)]
301 #![allow(clippy::single_char_pattern)]
302 #![allow(clippy::unwrap_used)]
303 #![allow(clippy::unchecked_duration_subtraction)]
304 #![allow(clippy::useless_vec)]
305 #![allow(clippy::needless_pass_by_value)]
306 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
307 use super::*;
308 use crate::task::MockExecutor;
309 use futures::poll;
310 use humantime::parse_rfc3339;
311 use tor_rtcompat::ToplevelBlockOn as _;
312 use Poll::*;
313
314 fn ms(ms: u64) -> Duration {
315 Duration::from_millis(ms)
316 }
317
318 fn run_test<FUT>(f: impl FnOnce(Provider, MockExecutor) -> FUT)
319 where
320 FUT: Future<Output = ()>,
321 {
322 let sp = Provider::new(
323 Instant::now(), // it would have been nice to make this fixed for the test
324 parse_rfc3339("2000-01-01T00:00:00Z").unwrap(),
325 );
326 let exec = MockExecutor::new();
327 exec.block_on(f(sp, exec.clone()));
328 }
329
330 #[test]
331 fn simple() {
332 run_test(|sp, _exec| async move {
333 let n1 = sp.now();
334 let w1 = sp.wallclock();
335 let mut f1 = sp.sleep(ms(500));
336 let mut f2 = sp.sleep(ms(1500));
337 assert_eq!(poll!(&mut f1), Pending);
338 sp.advance(ms(200));
339 assert_eq!(n1 + ms(200), sp.now());
340 assert_eq!(w1 + ms(200), sp.wallclock());
341 assert_eq!(poll!(&mut f1), Pending);
342 assert_eq!(poll!(&mut f2), Pending);
343 drop(f2);
344 sp.jump_wallclock(w1 + ms(10_000));
345 sp.advance(ms(300));
346 assert_eq!(n1 + ms(500), sp.now());
347 assert_eq!(w1 + ms(10_300), sp.wallclock());
348 assert_eq!(poll!(&mut f1), Ready(()));
349 let mut f0 = sp.sleep(ms(0));
350 assert_eq!(poll!(&mut f0), Ready(()));
351 });
352 }
353
354 #[test]
355 fn task() {
356 run_test(|sp, exec| async move {
357 let st = Arc::new(Mutex::new(0_i8));
358
359 exec.spawn_identified("test task", {
360 let st = st.clone();
361 let sp = sp.clone();
362 async move {
363 *st.lock().unwrap() = 1;
364 sp.sleep(ms(500)).await;
365 *st.lock().unwrap() = 2;
366 sp.sleep(ms(300)).await;
367 *st.lock().unwrap() = 3;
368 }
369 });
370
371 let st = move || *st.lock().unwrap();
372
373 assert_eq!(st(), 0);
374 exec.progress_until_stalled().await;
375 assert_eq!(st(), 1);
376 assert_eq!(sp.time_until_next_timeout(), Some(ms(500)));
377
378 sp.advance(ms(500));
379
380 assert_eq!(st(), 1);
381 assert_eq!(sp.time_until_next_timeout(), None);
382 exec.progress_until_stalled().await;
383 assert_eq!(st(), 2);
384 assert_eq!(sp.time_until_next_timeout(), Some(ms(300)));
385
386 sp.advance(ms(500));
387 assert_eq!(st(), 2);
388 assert_eq!(sp.time_until_next_timeout(), None);
389 exec.progress_until_stalled().await;
390 assert_eq!(sp.time_until_next_timeout(), None);
391 assert_eq!(st(), 3);
392 });
393 }
394}