tor_rtmock/time.rs
1//! Functionality for simulating the passage of time in unit tests.
2//!
3//! We do this by providing [`MockSleepProvider`], a "SleepProvider"
4//! instance that can simulate timeouts and retries without requiring
5//! the actual system clock to advance.
6//!
7//! ### Deprecated
8//!
9//! This mock time facility has some limitations.
10//! See [`MockSleepProvider`] for more information.
11//! Use [`MockRuntime`](crate::MockRuntime) for new tests.
12
13#![forbid(unsafe_code)] // if you remove this, enable (or write) miri tests (git grep miri)
14#![allow(clippy::missing_docs_in_private_items)]
15
16use std::{
17 cmp::{Eq, Ordering, PartialEq, PartialOrd},
18 collections::BinaryHeap,
19 fmt,
20 pin::Pin,
21 sync::{Arc, Mutex, Weak},
22 task::{Context, Poll, Waker},
23 time::{Duration, Instant, SystemTime},
24};
25
26use futures::Future;
27use tracing::trace;
28
29use std::collections::HashSet;
30use std::fmt::Formatter;
31use tor_rtcompat::{CoarseInstant, CoarseTimeProvider, SleepProvider};
32
33use crate::time_core::MockTimeCore;
34
35/// A dummy [`SleepProvider`] instance for testing.
36///
37/// The MockSleepProvider ignores the current time, and instead keeps
38/// its own view of the current `Instant` and `SystemTime`. You
39/// can advance them in-step by calling `advance()`, and you can simulate
40/// jumps in the system clock by calling `jump()`.
41///
42/// This is *not* for production use.
43///
44/// ### Deprecated
45///
46/// This mock time facility has some limitations, notably lack of support for tasks,
47/// and a confusing API for controlling the mock time.
48///
49/// New test cases should probably use `MockRuntime`
50/// which incorporates `MockSimpletimeProvider`.
51///
52/// Comparison of `MockSleepProvider` with `SimpleMockTimeProvider`:
53///
54/// * `SimpleMockTimeProvider` does not support, or expect the use of,
55/// `block_advance` et al.
56/// Instead, the advancement of simulated time is typically done automatically
57/// in cooperation with the executor,
58/// using `MockRuntime`'s `advance_*` methods.
59///
60/// * Consequently, `SimpleMockTimeProvider` can be used in test cases that
61/// spawn tasks and perform sleeps in them.
62///
63/// * And, consequently, `SimpleMockTimeProvider` does not need non-test code to
64/// contain calls which are solely related to getting the time mocking to work right.
65///
66/// * `SimpleMockTimeProvider` gives correct sleeping locations
67/// with `MockExecutor`'s dump of sleeping tasks' stack traces.
68///
69/// * Conversely, to use `SimpleMockTimeProvider` in all but the most simple test cases,
70/// coordination with the executor is required.
71/// This coordination is provided by the integrated `MockRuntime`;
72/// `SimpleMockTimeProvider` is of limited usefulness by itself.
73///
74/// ### Examples
75///
76/// Suppose you've written a function that relies on making a
77/// connection to the network and possibly timing out:
78///
79/// ```rust
80/// use tor_rtcompat::{Runtime,SleepProviderExt};
81/// use std::{net::SocketAddr, io::Result, time::Duration, io::Error};
82/// use futures::io::AsyncWriteExt;
83///
84/// async fn say_hi(runtime: impl Runtime, addr: &SocketAddr) -> Result<()> {
85/// let delay = Duration::new(5,0);
86/// runtime.timeout(delay, async {
87/// let mut conn = runtime.connect(addr).await?;
88/// conn.write_all(b"Hello world!\r\n").await?;
89/// conn.close().await?;
90/// Ok::<_,Error>(())
91/// }).await??;
92/// Ok(())
93/// }
94/// ```
95///
96/// But how should you test this function?
97///
98/// You might try connecting to a well-known website to test the
99/// connection case, and to a well-known black hole to test the
100/// timeout case... but that's a bit undesirable. Your tests might be
101/// running in a container with no internet access; and even if they
102/// aren't, it isn't so great for your tests to rely on the actual
103/// state of the internet. Similarly, if you make your timeout too long,
104/// your tests might block for a long time; but if your timeout is too short,
105/// the tests might fail on a slow machine or on a slow network.
106///
107/// Or, you could solve both of these problems by using `tor-rtmock`
108/// to replace the internet _and_ the passage of time. (Here we're only
109/// replacing the internet.)
110///
111/// ```rust,no_run
112/// # async fn say_hi<R,A>(runtime: R, addr: A) -> Result<(), ()> { Ok(()) }
113/// # // TODO this test hangs for some reason? Fix it and remove no_run above
114/// use tor_rtmock::{MockSleepRuntime,MockNetRuntime,net::MockNetwork};
115/// use tor_rtcompat::{NetStreamProvider,NetStreamListener};
116/// use futures::io::AsyncReadExt;
117/// use std::net::SocketAddr;
118/// use futures::StreamExt as _;
119///
120/// tor_rtcompat::test_with_all_runtimes!(|rt| async move {
121///
122/// let addr1 = "198.51.100.7".parse().unwrap();
123/// let addr2 = "198.51.100.99".parse().unwrap();
124/// let sockaddr: SocketAddr = "198.51.100.99:101".parse().unwrap();
125///
126/// // Make a runtime that pretends that we are at the first address...
127/// let fake_internet = MockNetwork::new();
128/// let rt1 = fake_internet.builder().add_address(addr1).runtime(rt.clone());
129/// // ...and one that pretends we're listening at the second address.
130/// let rt2 = fake_internet.builder().add_address(addr2).runtime(rt);
131/// let listener = rt2.listen(&sockaddr).await.unwrap();
132/// let mut incoming_stream = listener.incoming();
133///
134/// // Now we can test our function!
135/// let (result1,output) = futures::join!(
136/// say_hi(rt1, &sockaddr),
137/// async {
138/// let (mut conn,addr) = incoming_stream.next().await.unwrap().unwrap();
139/// assert_eq!(addr.ip(), addr1);
140/// let mut output = Vec::new();
141/// conn.read_to_end(&mut output).await.unwrap();
142/// output
143/// });
144///
145/// assert!(result1.is_ok());
146/// assert_eq!(&output[..], b"Hello world!\r\n");
147/// });
148/// ```
149#[derive(Clone)]
150// When we're used by external crates, we're always cfg(not(test)), so we seem deprecated
151// from outside this crate. *Within* this crate, this cfg_attr means that if we use things
152// that are deprecated for other reasons, we will notice.
153#[cfg_attr(not(test), deprecated(since = "0.29.0"))]
154pub struct MockSleepProvider {
155 /// The shared backend for this MockSleepProvider and its futures.
156 state: Arc<Mutex<SleepSchedule>>,
157}
158
159impl fmt::Debug for MockSleepProvider {
160 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
161 f.debug_struct("MockSleepProvider").finish_non_exhaustive()
162 }
163}
164
165/// Shared backend for sleep provider and Sleeping futures.
166struct SleepSchedule {
167 /// What time do we pretend it is?
168 core: MockTimeCore,
169 /// Priority queue of events, in the order that we should wake them.
170 sleepers: BinaryHeap<SleepEntry>,
171 /// If the mock time system is being driven by a `WaitFor`, holds a `Waker` to wake up that
172 /// `WaitFor` in order for it to make more progress.
173 waitfor_waker: Option<Waker>,
174 /// Number of sleepers instantiated.
175 sleepers_made: usize,
176 /// Number of sleepers polled.
177 sleepers_polled: usize,
178 /// Whether an advance is needed.
179 should_advance: bool,
180 /// A set of reasons why advances shouldn't be allowed right now.
181 blocked_advance: HashSet<String>,
182 /// A time up to which advances are allowed, irrespective of them being blocked.
183 allowed_advance: Duration,
184}
185
186/// An entry telling us when to wake which future up.
187struct SleepEntry {
188 /// The time at which this entry should wake
189 when: Instant,
190 /// The Waker to call when the instant has passed.
191 waker: Waker,
192}
193
194/// A future returned by [`MockSleepProvider::sleep()`].
195pub struct Sleeping {
196 /// The instant when we should become ready.
197 when: Instant,
198 /// True if we have pushed this into the queue.
199 inserted: bool,
200 /// The schedule to queue ourselves in if we're polled before we're ready.
201 provider: Weak<Mutex<SleepSchedule>>,
202}
203
204impl Default for MockSleepProvider {
205 fn default() -> Self {
206 let wallclock = humantime::parse_rfc3339("2023-07-05T11:25:56Z").expect("parse");
207 MockSleepProvider::new(wallclock)
208 }
209}
210
211impl MockSleepProvider {
212 /// Create a new MockSleepProvider, starting at a given wall-clock time.
213 pub fn new(wallclock: SystemTime) -> Self {
214 let instant = Instant::now();
215 let sleepers = BinaryHeap::new();
216 let core = MockTimeCore::new(instant, wallclock);
217 let state = SleepSchedule {
218 core,
219 sleepers,
220 waitfor_waker: None,
221 sleepers_made: 0,
222 sleepers_polled: 0,
223 should_advance: false,
224 blocked_advance: HashSet::new(),
225 allowed_advance: Duration::from_nanos(0),
226 };
227 MockSleepProvider {
228 state: Arc::new(Mutex::new(state)),
229 }
230 }
231
232 /// Advance the simulated timeline forward by `dur`.
233 ///
234 /// Calling this function will wake any pending futures as
235 /// appropriate, and yield to the scheduler so they get a chance
236 /// to run.
237 ///
238 /// # Limitations
239 ///
240 /// This function advances time in one big step. We might instead
241 /// want to advance in small steps and make sure that each step's
242 /// futures can get run before the ones scheduled to run after it.
243 pub async fn advance(&self, dur: Duration) {
244 self.advance_noyield(dur);
245 tor_rtcompat::task::yield_now().await;
246 }
247
248 /// Advance the simulated timeline forward by `dur`.
249 ///
250 /// Calling this function will wake any pending futures as
251 /// appropriate, but not yield to the scheduler. Mostly you
252 /// should call [`advance`](Self::advance) instead.
253 pub(crate) fn advance_noyield(&self, dur: Duration) {
254 // It's not so great to unwrap here in general, but since this is
255 // only testing code we don't really care.
256 let mut state = self.state.lock().expect("Poisoned lock for state");
257 state.core.advance(dur);
258 state.fire();
259 }
260
261 /// Simulate a discontinuity in the system clock, by jumping to
262 /// `new_wallclock`.
263 ///
264 /// # Panics
265 ///
266 /// Panics if we have already panicked while holding the lock on
267 /// the internal timer state, and the lock is poisoned.
268 pub fn jump_to(&self, new_wallclock: SystemTime) {
269 let mut state = self.state.lock().expect("Poisoned lock for state");
270 state.core.jump_wallclock(new_wallclock);
271 }
272
273 /// Return the amount of virtual time until the next timeout
274 /// should elapse.
275 ///
276 /// If there are no more timeouts, return None. If the next
277 /// timeout should elapse right now, return Some(0).
278 pub(crate) fn time_until_next_timeout(&self) -> Option<Duration> {
279 let state = self.state.lock().expect("Poisoned lock for state");
280 let now = state.core.instant();
281 state
282 .sleepers
283 .peek()
284 .map(|sleepent| sleepent.when.saturating_duration_since(now))
285 }
286
287 /// Return true if a `WaitFor` driving this sleep provider should advance time in order for
288 /// futures blocked on sleeping to make progress.
289 ///
290 /// NOTE: This function has side-effects; if it returns true, the caller is expected to do an
291 /// advance before calling it again.
292 #[allow(clippy::cognitive_complexity)]
293 pub(crate) fn should_advance(&mut self) -> bool {
294 let mut state = self.state.lock().expect("Poisoned lock for state");
295 if !state.blocked_advance.is_empty() && state.allowed_advance == Duration::from_nanos(0) {
296 // We've had advances blocked, and don't have any quota for doing allowances while
297 // blocked left.
298 trace!(
299 "should_advance = false: blocked by {:?}",
300 state.blocked_advance
301 );
302 return false;
303 }
304 if !state.should_advance {
305 // The advance flag wasn't set.
306 trace!("should_advance = false; bit not previously set");
307 return false;
308 }
309 // Clear the advance flag; we'll either return true and cause an advance to happen,
310 // or the reasons to return false below also imply that the advance flag will be set again
311 // later on.
312 state.should_advance = false;
313 if state.sleepers_polled < state.sleepers_made {
314 // Something did set the advance flag before, but it's not valid any more now because
315 // more unpolled sleepers were created.
316 trace!("should_advance = false; advancing no longer valid");
317 return false;
318 }
319 if !state.blocked_advance.is_empty() && state.allowed_advance > Duration::from_nanos(0) {
320 // If we're here, we would've returned earlier due to having advances blocked, but
321 // we have quota to advance up to a certain time while advances are blocked.
322 // Let's see when the next timeout is, and whether it falls within that quota.
323 let next_timeout = {
324 let now = state.core.instant();
325 state
326 .sleepers
327 .peek()
328 .map(|sleepent| sleepent.when.saturating_duration_since(now))
329 };
330 let next_timeout = match next_timeout {
331 Some(x) => x,
332 None => {
333 // There's no timeout set, so we really shouldn't be here anyway.
334 trace!("should_advance = false; allow_one set but no timeout yet");
335 return false;
336 }
337 };
338 if next_timeout <= state.allowed_advance {
339 // We can advance up to the next timeout, since it's in our quota.
340 // Subtract the amount we're going to advance by from said quota.
341 state.allowed_advance -= next_timeout;
342 trace!(
343 "WARNING: allowing advance due to allow_one; new allowed is {:?}",
344 state.allowed_advance
345 );
346 } else {
347 // The next timeout is too far in the future.
348 trace!(
349 "should_advance = false; allow_one set but only up to {:?}, next is {:?}",
350 state.allowed_advance, next_timeout
351 );
352 return false;
353 }
354 }
355 true
356 }
357
358 /// Register a `Waker` to be woken up when an advance in time is required to make progress.
359 ///
360 /// This is used by `WaitFor`.
361 pub(crate) fn register_waitfor_waker(&mut self, waker: Waker) {
362 let mut state = self.state.lock().expect("Poisoned lock for state");
363 state.waitfor_waker = Some(waker);
364 }
365
366 /// Remove a previously registered `Waker` registered with `register_waitfor_waker()`.
367 pub(crate) fn clear_waitfor_waker(&mut self) {
368 let mut state = self.state.lock().expect("Poisoned lock for state");
369 state.waitfor_waker = None;
370 }
371
372 /// Returns true if a `Waker` has been registered with `register_waitfor_waker()`.
373 ///
374 /// This is used to ensure that you don't have two concurrent `WaitFor`s running.
375 pub(crate) fn has_waitfor_waker(&self) -> bool {
376 let state = self.state.lock().expect("Poisoned lock for state");
377 state.waitfor_waker.is_some()
378 }
379}
380
381impl SleepSchedule {
382 /// Wake any pending events that are ready according to the
383 /// current simulated time.
384 fn fire(&mut self) {
385 use std::collections::binary_heap::PeekMut;
386
387 let now = self.core.instant();
388 while let Some(top) = self.sleepers.peek_mut() {
389 if now < top.when {
390 return;
391 }
392
393 PeekMut::pop(top).waker.wake();
394 }
395 }
396
397 /// Add a new SleepEntry to this schedule.
398 fn push(&mut self, ent: SleepEntry) {
399 self.sleepers.push(ent);
400 }
401
402 /// If all sleepers made have been polled, set the advance flag and wake up any `WaitFor` that
403 /// might be waiting.
404 fn maybe_advance(&mut self) {
405 if self.sleepers_polled >= self.sleepers_made {
406 if let Some(ref waker) = self.waitfor_waker {
407 trace!("setting advance flag");
408 self.should_advance = true;
409 waker.wake_by_ref();
410 } else {
411 trace!("would advance, but no waker");
412 }
413 }
414 }
415
416 /// Register a sleeper as having been polled, and advance if necessary.
417 fn increment_poll_count(&mut self) {
418 self.sleepers_polled += 1;
419 trace!(
420 "sleeper polled, {}/{}",
421 self.sleepers_polled, self.sleepers_made
422 );
423 self.maybe_advance();
424 }
425}
426
427impl SleepProvider for MockSleepProvider {
428 type SleepFuture = Sleeping;
429 fn sleep(&self, duration: Duration) -> Self::SleepFuture {
430 let mut provider = self.state.lock().expect("Poisoned lock for state");
431 let when = provider.core.instant() + duration;
432 // We're making a new sleeper, so register this in the state.
433 provider.sleepers_made += 1;
434 trace!(
435 "sleeper made for {:?}, {}/{}",
436 duration, provider.sleepers_polled, provider.sleepers_made
437 );
438
439 Sleeping {
440 when,
441 inserted: false,
442 provider: Arc::downgrade(&self.state),
443 }
444 }
445
446 fn block_advance<T: Into<String>>(&self, reason: T) {
447 let mut provider = self.state.lock().expect("Poisoned lock for state");
448 let reason = reason.into();
449 trace!("advancing blocked: {}", reason);
450 provider.blocked_advance.insert(reason);
451 }
452
453 fn release_advance<T: Into<String>>(&self, reason: T) {
454 let mut provider = self.state.lock().expect("Poisoned lock for state");
455 let reason = reason.into();
456 trace!("advancing released: {}", reason);
457 provider.blocked_advance.remove(&reason);
458 if provider.blocked_advance.is_empty() {
459 provider.maybe_advance();
460 }
461 }
462
463 fn allow_one_advance(&self, dur: Duration) {
464 let mut provider = self.state.lock().expect("Poisoned lock for state");
465 provider.allowed_advance = Duration::max(provider.allowed_advance, dur);
466 trace!(
467 "** allow_one_advance fired; may advance up to {:?} **",
468 provider.allowed_advance
469 );
470 provider.maybe_advance();
471 }
472
473 fn now(&self) -> Instant {
474 self.state
475 .lock()
476 .expect("Poisoned lock for state")
477 .core
478 .instant()
479 }
480
481 fn wallclock(&self) -> SystemTime {
482 self.state
483 .lock()
484 .expect("Poisoned lock for state")
485 .core
486 .wallclock()
487 }
488}
489
490impl CoarseTimeProvider for MockSleepProvider {
491 fn now_coarse(&self) -> CoarseInstant {
492 self.state
493 .lock()
494 .expect("poisoned")
495 .core
496 .coarse()
497 .now_coarse()
498 }
499}
500
501impl PartialEq for SleepEntry {
502 fn eq(&self, other: &Self) -> bool {
503 self.when == other.when
504 }
505}
506impl Eq for SleepEntry {}
507impl PartialOrd for SleepEntry {
508 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
509 Some(self.cmp(other))
510 }
511}
512impl Ord for SleepEntry {
513 fn cmp(&self, other: &Self) -> Ordering {
514 self.when.cmp(&other.when).reverse()
515 }
516}
517
518impl Drop for Sleeping {
519 fn drop(&mut self) {
520 if let Some(provider) = Weak::upgrade(&self.provider) {
521 let mut provider = provider.lock().expect("Poisoned lock for provider");
522 if !self.inserted {
523 // A sleeper being dropped will never be polled, so there's no point waiting;
524 // act as if it's been polled in order to avoid waiting forever.
525 trace!("sleeper dropped, incrementing count");
526 provider.increment_poll_count();
527 self.inserted = true;
528 }
529 }
530 }
531}
532
533impl Future for Sleeping {
534 type Output = ();
535 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
536 if let Some(provider) = Weak::upgrade(&self.provider) {
537 let mut provider = provider.lock().expect("Poisoned lock for provider");
538 let now = provider.core.instant();
539
540 if now >= self.when {
541 // The sleep time's elapsed.
542 if !self.inserted {
543 // If we never registered this sleeper as being polled, do so now.
544 provider.increment_poll_count();
545 self.inserted = true;
546 }
547 if !provider.should_advance {
548 // The first advance during a `WaitFor` gets triggered by all sleepers that
549 // have been created being polled.
550 // However, this only happens once.
551 // What we do to get around this is have sleepers that return Ready kick off
552 // another advance, in order to wake the next waiting sleeper.
553 provider.maybe_advance();
554 }
555 return Poll::Ready(());
556 }
557 // dbg!("sleep check with", self.when-now);
558
559 if !self.inserted {
560 let entry = SleepEntry {
561 when: self.when,
562 waker: cx.waker().clone(),
563 };
564
565 provider.push(entry);
566 self.inserted = true;
567 // Register this sleeper as having been polled.
568 provider.increment_poll_count();
569 }
570 // dbg!(provider.sleepers.len());
571 }
572 Poll::Pending
573 }
574}
575
576#[cfg(all(test, not(miri)))] // miri cannot do CLOCK_REALTIME
577mod test {
578 // @@ begin test lint list maintained by maint/add_warning @@
579 #![allow(clippy::bool_assert_comparison)]
580 #![allow(clippy::clone_on_copy)]
581 #![allow(clippy::dbg_macro)]
582 #![allow(clippy::mixed_attributes_style)]
583 #![allow(clippy::print_stderr)]
584 #![allow(clippy::print_stdout)]
585 #![allow(clippy::single_char_pattern)]
586 #![allow(clippy::unwrap_used)]
587 #![allow(clippy::unchecked_duration_subtraction)]
588 #![allow(clippy::useless_vec)]
589 #![allow(clippy::needless_pass_by_value)]
590 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
591 use super::*;
592 use tor_rtcompat::test_with_all_runtimes;
593
594 #[test]
595 fn basics_of_time_travel() {
596 let w1 = SystemTime::now();
597 let sp = MockSleepProvider::new(w1);
598 let i1 = sp.now();
599 assert_eq!(sp.wallclock(), w1);
600
601 let interval = Duration::new(4 * 3600 + 13 * 60, 0);
602 sp.advance_noyield(interval);
603 assert_eq!(sp.now(), i1 + interval);
604 assert_eq!(sp.wallclock(), w1 + interval);
605
606 sp.jump_to(w1 + interval * 3);
607 assert_eq!(sp.now(), i1 + interval);
608 assert_eq!(sp.wallclock(), w1 + interval * 3);
609 }
610
611 #[test]
612 fn time_moves_on() {
613 test_with_all_runtimes!(|_| async {
614 use oneshot_fused_workaround as oneshot;
615 use std::sync::atomic::AtomicBool;
616 use std::sync::atomic::Ordering;
617
618 let sp = MockSleepProvider::new(SystemTime::now());
619 let one_hour = Duration::new(3600, 0);
620
621 let (s1, r1) = oneshot::channel();
622 let (s2, r2) = oneshot::channel();
623 let (s3, r3) = oneshot::channel();
624
625 let b1 = AtomicBool::new(false);
626 let b2 = AtomicBool::new(false);
627 let b3 = AtomicBool::new(false);
628
629 let real_start = Instant::now();
630
631 futures::join!(
632 async {
633 sp.sleep(one_hour).await;
634 b1.store(true, Ordering::SeqCst);
635 s1.send(()).unwrap();
636 },
637 async {
638 sp.sleep(one_hour * 3).await;
639 b2.store(true, Ordering::SeqCst);
640 s2.send(()).unwrap();
641 },
642 async {
643 sp.sleep(one_hour * 5).await;
644 b3.store(true, Ordering::SeqCst);
645 s3.send(()).unwrap();
646 },
647 async {
648 sp.advance(one_hour * 2).await;
649 r1.await.unwrap();
650 assert!(b1.load(Ordering::SeqCst));
651 assert!(!b2.load(Ordering::SeqCst));
652 assert!(!b3.load(Ordering::SeqCst));
653
654 sp.advance(one_hour * 2).await;
655 r2.await.unwrap();
656 assert!(b1.load(Ordering::SeqCst));
657 assert!(b2.load(Ordering::SeqCst));
658 assert!(!b3.load(Ordering::SeqCst));
659
660 sp.advance(one_hour * 2).await;
661 r3.await.unwrap();
662 assert!(b1.load(Ordering::SeqCst));
663 assert!(b2.load(Ordering::SeqCst));
664 assert!(b3.load(Ordering::SeqCst));
665 let real_end = Instant::now();
666
667 assert!(real_end - real_start < one_hour);
668 }
669 );
670 std::io::Result::Ok(())
671 });
672 }
673}