tor_rtmock/
time.rs

1//! Functionality for simulating the passage of time in unit tests.
2//!
3//! We do this by providing [`MockSleepProvider`], a "SleepProvider"
4//! instance that can simulate timeouts and retries without requiring
5//! the actual system clock to advance.
6//!
7//! ### Deprecated
8//!
9//! This mock time facility has some limitations.
10//! See [`MockSleepProvider`] for more information.
11//! Use [`MockRuntime`](crate::MockRuntime) for new tests.
12
13#![forbid(unsafe_code)] // if you remove this, enable (or write) miri tests (git grep miri)
14#![allow(clippy::missing_docs_in_private_items)]
15
16use std::{
17    cmp::{Eq, Ordering, PartialEq, PartialOrd},
18    collections::BinaryHeap,
19    fmt,
20    pin::Pin,
21    sync::{Arc, Mutex, Weak},
22    task::{Context, Poll, Waker},
23    time::{Duration, Instant, SystemTime},
24};
25
26use futures::Future;
27use tracing::trace;
28
29use std::collections::HashSet;
30use std::fmt::Formatter;
31use tor_rtcompat::{CoarseInstant, CoarseTimeProvider, SleepProvider};
32
33use crate::time_core::MockTimeCore;
34
35/// A dummy [`SleepProvider`] instance for testing.
36///
37/// The MockSleepProvider ignores the current time, and instead keeps
38/// its own view of the current `Instant` and `SystemTime`.  You
39/// can advance them in-step by calling `advance()`, and you can simulate
40/// jumps in the system clock by calling `jump()`.
41///
42/// This is *not* for production use.
43///
44/// ### Deprecated
45///
46/// This mock time facility has some limitations, notably lack of support for tasks,
47/// and a confusing API for controlling the mock time.
48///
49/// New test cases should probably use `MockRuntime`
50/// which incorporates `MockSimpletimeProvider`.
51///
52/// Comparison of `MockSleepProvider` with `SimpleMockTimeProvider`:
53///
54///  * `SimpleMockTimeProvider` does not support, or expect the use of,
55///    `block_advance` et al.
56///    Instead, the advancement of simulated time is typically done automatically
57///    in cooperation with the executor,
58///    using `MockRuntime`'s `advance_*` methods.
59///
60///  * Consequently, `SimpleMockTimeProvider` can be used in test cases that
61///    spawn tasks and perform sleeps in them.
62///
63///  * And, consequently, `SimpleMockTimeProvider` does not need non-test code to
64///    contain calls which are solely related to getting the time mocking to work right.
65///
66///  * `SimpleMockTimeProvider` gives correct sleeping locations
67///    with `MockExecutor`'s dump of sleeping tasks' stack traces.
68///
69///  * Conversely, to use `SimpleMockTimeProvider` in all but the most simple test cases,
70///    coordination with the executor is required.
71///    This coordination is provided by the integrated `MockRuntime`;
72///    `SimpleMockTimeProvider` is of limited usefulness by itself.
73///
74/// ### Examples
75///
76/// Suppose you've written a function that relies on making a
77/// connection to the network and possibly timing out:
78///
79/// ```rust
80/// use tor_rtcompat::{Runtime,SleepProviderExt};
81/// use std::{net::SocketAddr, io::Result, time::Duration, io::Error};
82/// use futures::io::AsyncWriteExt;
83///
84/// async fn say_hi(runtime: impl Runtime, addr: &SocketAddr) -> Result<()> {
85///    let delay = Duration::new(5,0);
86///    runtime.timeout(delay, async {
87///       let mut conn = runtime.connect(addr).await?;
88///       conn.write_all(b"Hello world!\r\n").await?;
89///       conn.close().await?;
90///       Ok::<_,Error>(())
91///    }).await??;
92///    Ok(())
93/// }
94/// ```
95///
96/// But how should you test this function?
97///
98/// You might try connecting to a well-known website to test the
99/// connection case, and to a well-known black hole to test the
100/// timeout case... but that's a bit undesirable.  Your tests might be
101/// running in a container with no internet access; and even if they
102/// aren't, it isn't so great for your tests to rely on the actual
103/// state of the internet.  Similarly, if you make your timeout too long,
104/// your tests might block for a long time; but if your timeout is too short,
105/// the tests might fail on a slow machine or on a slow network.
106///
107/// Or, you could solve both of these problems by using `tor-rtmock`
108/// to replace the internet _and_ the passage of time.  (Here we're only
109/// replacing the internet.)
110///
111/// ```rust,no_run
112/// # async fn say_hi<R,A>(runtime: R, addr: A) -> Result<(), ()> { Ok(()) }
113/// # // TODO this test hangs for some reason?  Fix it and remove no_run above
114/// use tor_rtmock::{MockSleepRuntime,MockNetRuntime,net::MockNetwork};
115/// use tor_rtcompat::{NetStreamProvider,NetStreamListener};
116/// use futures::io::AsyncReadExt;
117/// use std::net::SocketAddr;
118/// use futures::StreamExt as _;
119///
120/// tor_rtcompat::test_with_all_runtimes!(|rt| async move {
121///
122///    let addr1 = "198.51.100.7".parse().unwrap();
123///    let addr2 = "198.51.100.99".parse().unwrap();
124///    let sockaddr: SocketAddr = "198.51.100.99:101".parse().unwrap();
125///
126///    // Make a runtime that pretends that we are at the first address...
127///    let fake_internet = MockNetwork::new();
128///    let rt1 = fake_internet.builder().add_address(addr1).runtime(rt.clone());
129///    // ...and one that pretends we're listening at the second address.
130///    let rt2 = fake_internet.builder().add_address(addr2).runtime(rt);
131///    let listener = rt2.listen(&sockaddr).await.unwrap();
132///    let mut incoming_stream = listener.incoming();
133///
134///    // Now we can test our function!
135///    let (result1,output) = futures::join!(
136///           say_hi(rt1, &sockaddr),
137///           async {
138///               let (mut conn,addr) = incoming_stream.next().await.unwrap().unwrap();
139///               assert_eq!(addr.ip(), addr1);
140///               let mut output = Vec::new();
141///               conn.read_to_end(&mut output).await.unwrap();
142///               output
143///           });
144///
145///    assert!(result1.is_ok());
146///    assert_eq!(&output[..], b"Hello world!\r\n");
147/// });
148/// ```
149#[derive(Clone)]
150// When we're used by external crates, we're always cfg(not(test)), so we seem deprecated
151// from outside this crate.  *Within* this crate, this cfg_attr means that if we use things
152// that are deprecated for other reasons, we will notice.
153#[cfg_attr(not(test), deprecated(since = "0.29.0"))]
154pub struct MockSleepProvider {
155    /// The shared backend for this MockSleepProvider and its futures.
156    state: Arc<Mutex<SleepSchedule>>,
157}
158
159impl fmt::Debug for MockSleepProvider {
160    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
161        f.debug_struct("MockSleepProvider").finish_non_exhaustive()
162    }
163}
164
165/// Shared backend for sleep provider and Sleeping futures.
166struct SleepSchedule {
167    /// What time do we pretend it is?
168    core: MockTimeCore,
169    /// Priority queue of events, in the order that we should wake them.
170    sleepers: BinaryHeap<SleepEntry>,
171    /// If the mock time system is being driven by a `WaitFor`, holds a `Waker` to wake up that
172    /// `WaitFor` in order for it to make more progress.
173    waitfor_waker: Option<Waker>,
174    /// Number of sleepers instantiated.
175    sleepers_made: usize,
176    /// Number of sleepers polled.
177    sleepers_polled: usize,
178    /// Whether an advance is needed.
179    should_advance: bool,
180    /// A set of reasons why advances shouldn't be allowed right now.
181    blocked_advance: HashSet<String>,
182    /// A time up to which advances are allowed, irrespective of them being blocked.
183    allowed_advance: Duration,
184}
185
186/// An entry telling us when to wake which future up.
187struct SleepEntry {
188    /// The time at which this entry should wake
189    when: Instant,
190    /// The Waker to call when the instant has passed.
191    waker: Waker,
192}
193
194/// A future returned by [`MockSleepProvider::sleep()`].
195pub struct Sleeping {
196    /// The instant when we should become ready.
197    when: Instant,
198    /// True if we have pushed this into the queue.
199    inserted: bool,
200    /// The schedule to queue ourselves in if we're polled before we're ready.
201    provider: Weak<Mutex<SleepSchedule>>,
202}
203
204impl Default for MockSleepProvider {
205    fn default() -> Self {
206        let wallclock = humantime::parse_rfc3339("2023-07-05T11:25:56Z").expect("parse");
207        MockSleepProvider::new(wallclock)
208    }
209}
210
211impl MockSleepProvider {
212    /// Create a new MockSleepProvider, starting at a given wall-clock time.
213    pub fn new(wallclock: SystemTime) -> Self {
214        let instant = Instant::now();
215        let sleepers = BinaryHeap::new();
216        let core = MockTimeCore::new(instant, wallclock);
217        let state = SleepSchedule {
218            core,
219            sleepers,
220            waitfor_waker: None,
221            sleepers_made: 0,
222            sleepers_polled: 0,
223            should_advance: false,
224            blocked_advance: HashSet::new(),
225            allowed_advance: Duration::from_nanos(0),
226        };
227        MockSleepProvider {
228            state: Arc::new(Mutex::new(state)),
229        }
230    }
231
232    /// Advance the simulated timeline forward by `dur`.
233    ///
234    /// Calling this function will wake any pending futures as
235    /// appropriate, and yield to the scheduler so they get a chance
236    /// to run.
237    ///
238    /// # Limitations
239    ///
240    /// This function advances time in one big step.  We might instead
241    /// want to advance in small steps and make sure that each step's
242    /// futures can get run before the ones scheduled to run after it.
243    pub async fn advance(&self, dur: Duration) {
244        self.advance_noyield(dur);
245        tor_rtcompat::task::yield_now().await;
246    }
247
248    /// Advance the simulated timeline forward by `dur`.
249    ///
250    /// Calling this function will wake any pending futures as
251    /// appropriate, but not yield to the scheduler.  Mostly you
252    /// should call [`advance`](Self::advance) instead.
253    pub(crate) fn advance_noyield(&self, dur: Duration) {
254        // It's not so great to unwrap here in general, but since this is
255        // only testing code we don't really care.
256        let mut state = self.state.lock().expect("Poisoned lock for state");
257        state.core.advance(dur);
258        state.fire();
259    }
260
261    /// Simulate a discontinuity in the system clock, by jumping to
262    /// `new_wallclock`.
263    ///
264    /// # Panics
265    ///
266    /// Panics if we have already panicked while holding the lock on
267    /// the internal timer state, and the lock is poisoned.
268    pub fn jump_to(&self, new_wallclock: SystemTime) {
269        let mut state = self.state.lock().expect("Poisoned lock for state");
270        state.core.jump_wallclock(new_wallclock);
271    }
272
273    /// Return the amount of virtual time until the next timeout
274    /// should elapse.
275    ///
276    /// If there are no more timeouts, return None.  If the next
277    /// timeout should elapse right now, return Some(0).
278    pub(crate) fn time_until_next_timeout(&self) -> Option<Duration> {
279        let state = self.state.lock().expect("Poisoned lock for state");
280        let now = state.core.instant();
281        state
282            .sleepers
283            .peek()
284            .map(|sleepent| sleepent.when.saturating_duration_since(now))
285    }
286
287    /// Return true if a `WaitFor` driving this sleep provider should advance time in order for
288    /// futures blocked on sleeping to make progress.
289    ///
290    /// NOTE: This function has side-effects; if it returns true, the caller is expected to do an
291    /// advance before calling it again.
292    #[allow(clippy::cognitive_complexity)]
293    pub(crate) fn should_advance(&mut self) -> bool {
294        let mut state = self.state.lock().expect("Poisoned lock for state");
295        if !state.blocked_advance.is_empty() && state.allowed_advance == Duration::from_nanos(0) {
296            // We've had advances blocked, and don't have any quota for doing allowances while
297            // blocked left.
298            trace!(
299                "should_advance = false: blocked by {:?}",
300                state.blocked_advance
301            );
302            return false;
303        }
304        if !state.should_advance {
305            // The advance flag wasn't set.
306            trace!("should_advance = false; bit not previously set");
307            return false;
308        }
309        // Clear the advance flag; we'll either return true and cause an advance to happen,
310        // or the reasons to return false below also imply that the advance flag will be set again
311        // later on.
312        state.should_advance = false;
313        if state.sleepers_polled < state.sleepers_made {
314            // Something did set the advance flag before, but it's not valid any more now because
315            // more unpolled sleepers were created.
316            trace!("should_advance = false; advancing no longer valid");
317            return false;
318        }
319        if !state.blocked_advance.is_empty() && state.allowed_advance > Duration::from_nanos(0) {
320            // If we're here, we would've returned earlier due to having advances blocked, but
321            // we have quota to advance up to a certain time while advances are blocked.
322            // Let's see when the next timeout is, and whether it falls within that quota.
323            let next_timeout = {
324                let now = state.core.instant();
325                state
326                    .sleepers
327                    .peek()
328                    .map(|sleepent| sleepent.when.saturating_duration_since(now))
329            };
330            let next_timeout = match next_timeout {
331                Some(x) => x,
332                None => {
333                    // There's no timeout set, so we really shouldn't be here anyway.
334                    trace!("should_advance = false; allow_one set but no timeout yet");
335                    return false;
336                }
337            };
338            if next_timeout <= state.allowed_advance {
339                // We can advance up to the next timeout, since it's in our quota.
340                // Subtract the amount we're going to advance by from said quota.
341                state.allowed_advance -= next_timeout;
342                trace!(
343                    "WARNING: allowing advance due to allow_one; new allowed is {:?}",
344                    state.allowed_advance
345                );
346            } else {
347                // The next timeout is too far in the future.
348                trace!(
349                    "should_advance = false; allow_one set but only up to {:?}, next is {:?}",
350                    state.allowed_advance,
351                    next_timeout
352                );
353                return false;
354            }
355        }
356        true
357    }
358
359    /// Register a `Waker` to be woken up when an advance in time is required to make progress.
360    ///
361    /// This is used by `WaitFor`.
362    pub(crate) fn register_waitfor_waker(&mut self, waker: Waker) {
363        let mut state = self.state.lock().expect("Poisoned lock for state");
364        state.waitfor_waker = Some(waker);
365    }
366
367    /// Remove a previously registered `Waker` registered with `register_waitfor_waker()`.
368    pub(crate) fn clear_waitfor_waker(&mut self) {
369        let mut state = self.state.lock().expect("Poisoned lock for state");
370        state.waitfor_waker = None;
371    }
372
373    /// Returns true if a `Waker` has been registered with `register_waitfor_waker()`.
374    ///
375    /// This is used to ensure that you don't have two concurrent `WaitFor`s running.
376    pub(crate) fn has_waitfor_waker(&self) -> bool {
377        let state = self.state.lock().expect("Poisoned lock for state");
378        state.waitfor_waker.is_some()
379    }
380}
381
382impl SleepSchedule {
383    /// Wake any pending events that are ready according to the
384    /// current simulated time.
385    fn fire(&mut self) {
386        use std::collections::binary_heap::PeekMut;
387
388        let now = self.core.instant();
389        while let Some(top) = self.sleepers.peek_mut() {
390            if now < top.when {
391                return;
392            }
393
394            PeekMut::pop(top).waker.wake();
395        }
396    }
397
398    /// Add a new SleepEntry to this schedule.
399    fn push(&mut self, ent: SleepEntry) {
400        self.sleepers.push(ent);
401    }
402
403    /// If all sleepers made have been polled, set the advance flag and wake up any `WaitFor` that
404    /// might be waiting.
405    fn maybe_advance(&mut self) {
406        if self.sleepers_polled >= self.sleepers_made {
407            if let Some(ref waker) = self.waitfor_waker {
408                trace!("setting advance flag");
409                self.should_advance = true;
410                waker.wake_by_ref();
411            } else {
412                trace!("would advance, but no waker");
413            }
414        }
415    }
416
417    /// Register a sleeper as having been polled, and advance if necessary.
418    fn increment_poll_count(&mut self) {
419        self.sleepers_polled += 1;
420        trace!(
421            "sleeper polled, {}/{}",
422            self.sleepers_polled,
423            self.sleepers_made
424        );
425        self.maybe_advance();
426    }
427}
428
429impl SleepProvider for MockSleepProvider {
430    type SleepFuture = Sleeping;
431    fn sleep(&self, duration: Duration) -> Self::SleepFuture {
432        let mut provider = self.state.lock().expect("Poisoned lock for state");
433        let when = provider.core.instant() + duration;
434        // We're making a new sleeper, so register this in the state.
435        provider.sleepers_made += 1;
436        trace!(
437            "sleeper made for {:?}, {}/{}",
438            duration,
439            provider.sleepers_polled,
440            provider.sleepers_made
441        );
442
443        Sleeping {
444            when,
445            inserted: false,
446            provider: Arc::downgrade(&self.state),
447        }
448    }
449
450    fn block_advance<T: Into<String>>(&self, reason: T) {
451        let mut provider = self.state.lock().expect("Poisoned lock for state");
452        let reason = reason.into();
453        trace!("advancing blocked: {}", reason);
454        provider.blocked_advance.insert(reason);
455    }
456
457    fn release_advance<T: Into<String>>(&self, reason: T) {
458        let mut provider = self.state.lock().expect("Poisoned lock for state");
459        let reason = reason.into();
460        trace!("advancing released: {}", reason);
461        provider.blocked_advance.remove(&reason);
462        if provider.blocked_advance.is_empty() {
463            provider.maybe_advance();
464        }
465    }
466
467    fn allow_one_advance(&self, dur: Duration) {
468        let mut provider = self.state.lock().expect("Poisoned lock for state");
469        provider.allowed_advance = Duration::max(provider.allowed_advance, dur);
470        trace!(
471            "** allow_one_advance fired; may advance up to {:?} **",
472            provider.allowed_advance
473        );
474        provider.maybe_advance();
475    }
476
477    fn now(&self) -> Instant {
478        self.state
479            .lock()
480            .expect("Poisoned lock for state")
481            .core
482            .instant()
483    }
484
485    fn wallclock(&self) -> SystemTime {
486        self.state
487            .lock()
488            .expect("Poisoned lock for state")
489            .core
490            .wallclock()
491    }
492}
493
494impl CoarseTimeProvider for MockSleepProvider {
495    fn now_coarse(&self) -> CoarseInstant {
496        self.state
497            .lock()
498            .expect("poisoned")
499            .core
500            .coarse()
501            .now_coarse()
502    }
503}
504
505impl PartialEq for SleepEntry {
506    fn eq(&self, other: &Self) -> bool {
507        self.when == other.when
508    }
509}
510impl Eq for SleepEntry {}
511impl PartialOrd for SleepEntry {
512    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
513        Some(self.cmp(other))
514    }
515}
516impl Ord for SleepEntry {
517    fn cmp(&self, other: &Self) -> Ordering {
518        self.when.cmp(&other.when).reverse()
519    }
520}
521
522impl Drop for Sleeping {
523    fn drop(&mut self) {
524        if let Some(provider) = Weak::upgrade(&self.provider) {
525            let mut provider = provider.lock().expect("Poisoned lock for provider");
526            if !self.inserted {
527                // A sleeper being dropped will never be polled, so there's no point waiting;
528                // act as if it's been polled in order to avoid waiting forever.
529                trace!("sleeper dropped, incrementing count");
530                provider.increment_poll_count();
531                self.inserted = true;
532            }
533        }
534    }
535}
536
537impl Future for Sleeping {
538    type Output = ();
539    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
540        if let Some(provider) = Weak::upgrade(&self.provider) {
541            let mut provider = provider.lock().expect("Poisoned lock for provider");
542            let now = provider.core.instant();
543
544            if now >= self.when {
545                // The sleep time's elapsed.
546                if !self.inserted {
547                    // If we never registered this sleeper as being polled, do so now.
548                    provider.increment_poll_count();
549                    self.inserted = true;
550                }
551                if !provider.should_advance {
552                    // The first advance during a `WaitFor` gets triggered by all sleepers that
553                    // have been created being polled.
554                    // However, this only happens once.
555                    // What we do to get around this is have sleepers that return Ready kick off
556                    // another advance, in order to wake the next waiting sleeper.
557                    provider.maybe_advance();
558                }
559                return Poll::Ready(());
560            }
561            // dbg!("sleep check with", self.when-now);
562
563            if !self.inserted {
564                let entry = SleepEntry {
565                    when: self.when,
566                    waker: cx.waker().clone(),
567                };
568
569                provider.push(entry);
570                self.inserted = true;
571                // Register this sleeper as having been polled.
572                provider.increment_poll_count();
573            }
574            // dbg!(provider.sleepers.len());
575        }
576        Poll::Pending
577    }
578}
579
580#[cfg(all(test, not(miri)))] // miri cannot do CLOCK_REALTIME
581mod test {
582    // @@ begin test lint list maintained by maint/add_warning @@
583    #![allow(clippy::bool_assert_comparison)]
584    #![allow(clippy::clone_on_copy)]
585    #![allow(clippy::dbg_macro)]
586    #![allow(clippy::mixed_attributes_style)]
587    #![allow(clippy::print_stderr)]
588    #![allow(clippy::print_stdout)]
589    #![allow(clippy::single_char_pattern)]
590    #![allow(clippy::unwrap_used)]
591    #![allow(clippy::unchecked_duration_subtraction)]
592    #![allow(clippy::useless_vec)]
593    #![allow(clippy::needless_pass_by_value)]
594    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
595    use super::*;
596    use tor_rtcompat::test_with_all_runtimes;
597
598    #[test]
599    fn basics_of_time_travel() {
600        let w1 = SystemTime::now();
601        let sp = MockSleepProvider::new(w1);
602        let i1 = sp.now();
603        assert_eq!(sp.wallclock(), w1);
604
605        let interval = Duration::new(4 * 3600 + 13 * 60, 0);
606        sp.advance_noyield(interval);
607        assert_eq!(sp.now(), i1 + interval);
608        assert_eq!(sp.wallclock(), w1 + interval);
609
610        sp.jump_to(w1 + interval * 3);
611        assert_eq!(sp.now(), i1 + interval);
612        assert_eq!(sp.wallclock(), w1 + interval * 3);
613    }
614
615    #[test]
616    fn time_moves_on() {
617        test_with_all_runtimes!(|_| async {
618            use oneshot_fused_workaround as oneshot;
619            use std::sync::atomic::AtomicBool;
620            use std::sync::atomic::Ordering;
621
622            let sp = MockSleepProvider::new(SystemTime::now());
623            let one_hour = Duration::new(3600, 0);
624
625            let (s1, r1) = oneshot::channel();
626            let (s2, r2) = oneshot::channel();
627            let (s3, r3) = oneshot::channel();
628
629            let b1 = AtomicBool::new(false);
630            let b2 = AtomicBool::new(false);
631            let b3 = AtomicBool::new(false);
632
633            let real_start = Instant::now();
634
635            futures::join!(
636                async {
637                    sp.sleep(one_hour).await;
638                    b1.store(true, Ordering::SeqCst);
639                    s1.send(()).unwrap();
640                },
641                async {
642                    sp.sleep(one_hour * 3).await;
643                    b2.store(true, Ordering::SeqCst);
644                    s2.send(()).unwrap();
645                },
646                async {
647                    sp.sleep(one_hour * 5).await;
648                    b3.store(true, Ordering::SeqCst);
649                    s3.send(()).unwrap();
650                },
651                async {
652                    sp.advance(one_hour * 2).await;
653                    r1.await.unwrap();
654                    assert!(b1.load(Ordering::SeqCst));
655                    assert!(!b2.load(Ordering::SeqCst));
656                    assert!(!b3.load(Ordering::SeqCst));
657
658                    sp.advance(one_hour * 2).await;
659                    r2.await.unwrap();
660                    assert!(b1.load(Ordering::SeqCst));
661                    assert!(b2.load(Ordering::SeqCst));
662                    assert!(!b3.load(Ordering::SeqCst));
663
664                    sp.advance(one_hour * 2).await;
665                    r3.await.unwrap();
666                    assert!(b1.load(Ordering::SeqCst));
667                    assert!(b2.load(Ordering::SeqCst));
668                    assert!(b3.load(Ordering::SeqCst));
669                    let real_end = Instant::now();
670
671                    assert!(real_end - real_start < one_hour);
672                }
673            );
674            std::io::Result::Ok(())
675        });
676    }
677}