1
//! Functionality for simulating the passage of time in unit tests.
2
//!
3
//! We do this by providing [`MockSleepProvider`], a "SleepProvider"
4
//! instance that can simulate timeouts and retries without requiring
5
//! the actual system clock to advance.
6
//!
7
//! ### Deprecated
8
//!
9
//! This mock time facility has some limitations.
10
//! See [`MockSleepProvider`] for more information.
11
//! Use [`MockRuntime`](crate::MockRuntime) for new tests.
12

            
13
#![forbid(unsafe_code)] // if you remove this, enable (or write) miri tests (git grep miri)
14
#![allow(clippy::missing_docs_in_private_items)]
15

            
16
use std::{
17
    cmp::{Eq, Ordering, PartialEq, PartialOrd},
18
    collections::BinaryHeap,
19
    fmt,
20
    pin::Pin,
21
    sync::{Arc, Mutex, Weak},
22
    task::{Context, Poll, Waker},
23
    time::{Duration, Instant, SystemTime},
24
};
25

            
26
use futures::Future;
27
use tracing::trace;
28

            
29
use std::collections::HashSet;
30
use std::fmt::Formatter;
31
use tor_rtcompat::{CoarseInstant, CoarseTimeProvider, SleepProvider};
32

            
33
use crate::time_core::MockTimeCore;
34

            
35
/// A dummy [`SleepProvider`] instance for testing.
36
///
37
/// The MockSleepProvider ignores the current time, and instead keeps
38
/// its own view of the current `Instant` and `SystemTime`.  You
39
/// can advance them in-step by calling `advance()`, and you can simulate
40
/// jumps in the system clock by calling `jump()`.
41
///
42
/// This is *not* for production use.
43
///
44
/// ### Deprecated
45
///
46
/// This mock time facility has some limitations, notably lack of support for tasks,
47
/// and a confusing API for controlling the mock time.
48
///
49
/// New test cases should probably use `MockRuntime`
50
/// which incorporates `MockSimpletimeProvider`.
51
///
52
/// Comparison of `MockSleepProvider` with `SimpleMockTimeProvider`:
53
///
54
///  * `SimpleMockTimeProvider` does not support, or expect the use of,
55
///    `block_advance` et al.
56
///    Instead, the advancement of simulated time is typically done automatically
57
///    in cooperation with the executor,
58
///    using `MockRuntime`'s `advance_*` methods.
59
///
60
///  * Consequently, `SimpleMockTimeProvider` can be used in test cases that
61
///    spawn tasks and perform sleeps in them.
62
///
63
///  * And, consequently, `SimpleMockTimeProvider` does not need non-test code to
64
///    contain calls which are solely related to getting the time mocking to work right.
65
///
66
///  * `SimpleMockTimeProvider` gives correct sleeping locations
67
///    with `MockExecutor`'s dump of sleeping tasks' stack traces.
68
///
69
///  * Conversely, to use `SimpleMockTimeProvider` in all but the most simple test cases,
70
///    coordination with the executor is required.
71
///    This coordination is provided by the integrated `MockRuntime`;
72
///    `SimpleMockTimeProvider` is of limited usefulness by itself.
73
///
74
/// ### Examples
75
///
76
/// Suppose you've written a function that relies on making a
77
/// connection to the network and possibly timing out:
78
///
79
/// ```rust
80
/// use tor_rtcompat::{Runtime,SleepProviderExt};
81
/// use std::{net::SocketAddr, io::Result, time::Duration, io::Error};
82
/// use futures::io::AsyncWriteExt;
83
///
84
/// async fn say_hi(runtime: impl Runtime, addr: &SocketAddr) -> Result<()> {
85
///    let delay = Duration::new(5,0);
86
///    runtime.timeout(delay, async {
87
///       let mut conn = runtime.connect(addr).await?;
88
///       conn.write_all(b"Hello world!\r\n").await?;
89
///       conn.close().await?;
90
///       Ok::<_,Error>(())
91
///    }).await??;
92
///    Ok(())
93
/// }
94
/// ```
95
///
96
/// But how should you test this function?
97
///
98
/// You might try connecting to a well-known website to test the
99
/// connection case, and to a well-known black hole to test the
100
/// timeout case... but that's a bit undesirable.  Your tests might be
101
/// running in a container with no internet access; and even if they
102
/// aren't, it isn't so great for your tests to rely on the actual
103
/// state of the internet.  Similarly, if you make your timeout too long,
104
/// your tests might block for a long time; but if your timeout is too short,
105
/// the tests might fail on a slow machine or on a slow network.
106
///
107
/// Or, you could solve both of these problems by using `tor-rtmock`
108
/// to replace the internet _and_ the passage of time.  (Here we're only
109
/// replacing the internet.)
110
///
111
/// ```rust,no_run
112
/// # async fn say_hi<R,A>(runtime: R, addr: A) -> Result<(), ()> { Ok(()) }
113
/// # // TODO this test hangs for some reason?  Fix it and remove no_run above
114
/// use tor_rtmock::{MockSleepRuntime,MockNetRuntime,net::MockNetwork};
115
/// use tor_rtcompat::{NetStreamProvider,NetStreamListener};
116
/// use futures::io::AsyncReadExt;
117
/// use std::net::SocketAddr;
118
/// use futures::StreamExt as _;
119
///
120
/// tor_rtcompat::test_with_all_runtimes!(|rt| async move {
121
///
122
///    let addr1 = "198.51.100.7".parse().unwrap();
123
///    let addr2 = "198.51.100.99".parse().unwrap();
124
///    let sockaddr: SocketAddr = "198.51.100.99:101".parse().unwrap();
125
///
126
///    // Make a runtime that pretends that we are at the first address...
127
///    let fake_internet = MockNetwork::new();
128
///    let rt1 = fake_internet.builder().add_address(addr1).runtime(rt.clone());
129
///    // ...and one that pretends we're listening at the second address.
130
///    let rt2 = fake_internet.builder().add_address(addr2).runtime(rt);
131
///    let listener = rt2.listen(&sockaddr).await.unwrap();
132
///    let mut incoming_stream = listener.incoming();
133
///
134
///    // Now we can test our function!
135
///    let (result1,output) = futures::join!(
136
///           say_hi(rt1, &sockaddr),
137
///           async {
138
///               let (mut conn,addr) = incoming_stream.next().await.unwrap().unwrap();
139
///               assert_eq!(addr.ip(), addr1);
140
///               let mut output = Vec::new();
141
///               conn.read_to_end(&mut output).await.unwrap();
142
///               output
143
///           });
144
///
145
///    assert!(result1.is_ok());
146
///    assert_eq!(&output[..], b"Hello world!\r\n");
147
/// });
148
/// ```
149
#[derive(Clone)]
150
// When we're used by external crates, we're always cfg(not(test)), so we seem deprecated
151
// from outside this crate.  *Within* this crate, this cfg_attr means that if we use things
152
// that are deprecated for other reasons, we will notice.
153
#[cfg_attr(not(test), deprecated(since = "0.29.0"))]
154
pub struct MockSleepProvider {
155
    /// The shared backend for this MockSleepProvider and its futures.
156
    state: Arc<Mutex<SleepSchedule>>,
157
}
158

            
159
impl fmt::Debug for MockSleepProvider {
160
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
161
        f.debug_struct("MockSleepProvider").finish_non_exhaustive()
162
    }
163
}
164

            
165
/// Shared backend for sleep provider and Sleeping futures.
166
struct SleepSchedule {
167
    /// What time do we pretend it is?
168
    core: MockTimeCore,
169
    /// Priority queue of events, in the order that we should wake them.
170
    sleepers: BinaryHeap<SleepEntry>,
171
    /// If the mock time system is being driven by a `WaitFor`, holds a `Waker` to wake up that
172
    /// `WaitFor` in order for it to make more progress.
173
    waitfor_waker: Option<Waker>,
174
    /// Number of sleepers instantiated.
175
    sleepers_made: usize,
176
    /// Number of sleepers polled.
177
    sleepers_polled: usize,
178
    /// Whether an advance is needed.
179
    should_advance: bool,
180
    /// A set of reasons why advances shouldn't be allowed right now.
181
    blocked_advance: HashSet<String>,
182
    /// A time up to which advances are allowed, irrespective of them being blocked.
183
    allowed_advance: Duration,
184
}
185

            
186
/// An entry telling us when to wake which future up.
187
struct SleepEntry {
188
    /// The time at which this entry should wake
189
    when: Instant,
190
    /// The Waker to call when the instant has passed.
191
    waker: Waker,
192
}
193

            
194
/// A future returned by [`MockSleepProvider::sleep()`].
195
pub struct Sleeping {
196
    /// The instant when we should become ready.
197
    when: Instant,
198
    /// True if we have pushed this into the queue.
199
    inserted: bool,
200
    /// The schedule to queue ourselves in if we're polled before we're ready.
201
    provider: Weak<Mutex<SleepSchedule>>,
202
}
203

            
204
impl Default for MockSleepProvider {
205
    fn default() -> Self {
206
        let wallclock = humantime::parse_rfc3339("2023-07-05T11:25:56Z").expect("parse");
207
        MockSleepProvider::new(wallclock)
208
    }
209
}
210

            
211
impl MockSleepProvider {
212
    /// Create a new MockSleepProvider, starting at a given wall-clock time.
213
3874
    pub fn new(wallclock: SystemTime) -> Self {
214
3874
        let instant = Instant::now();
215
3874
        let sleepers = BinaryHeap::new();
216
3874
        let core = MockTimeCore::new(instant, wallclock);
217
3874
        let state = SleepSchedule {
218
3874
            core,
219
3874
            sleepers,
220
3874
            waitfor_waker: None,
221
3874
            sleepers_made: 0,
222
3874
            sleepers_polled: 0,
223
3874
            should_advance: false,
224
3874
            blocked_advance: HashSet::new(),
225
3874
            allowed_advance: Duration::from_nanos(0),
226
3874
        };
227
3874
        MockSleepProvider {
228
3874
            state: Arc::new(Mutex::new(state)),
229
3874
        }
230
3874
    }
231

            
232
    /// Advance the simulated timeline forward by `dur`.
233
    ///
234
    /// Calling this function will wake any pending futures as
235
    /// appropriate, and yield to the scheduler so they get a chance
236
    /// to run.
237
    ///
238
    /// # Limitations
239
    ///
240
    /// This function advances time in one big step.  We might instead
241
    /// want to advance in small steps and make sure that each step's
242
    /// futures can get run before the ones scheduled to run after it.
243
119839
    pub async fn advance(&self, dur: Duration) {
244
5122
        self.advance_noyield(dur);
245
5122
        tor_rtcompat::task::yield_now().await;
246
5122
    }
247

            
248
    /// Advance the simulated timeline forward by `dur`.
249
    ///
250
    /// Calling this function will wake any pending futures as
251
    /// appropriate, but not yield to the scheduler.  Mostly you
252
    /// should call [`advance`](Self::advance) instead.
253
318300
    pub(crate) fn advance_noyield(&self, dur: Duration) {
254
318300
        // It's not so great to unwrap here in general, but since this is
255
318300
        // only testing code we don't really care.
256
318300
        let mut state = self.state.lock().expect("Poisoned lock for state");
257
318300
        state.core.advance(dur);
258
318300
        state.fire();
259
318300
    }
260

            
261
    /// Simulate a discontinuity in the system clock, by jumping to
262
    /// `new_wallclock`.
263
    ///
264
    /// # Panics
265
    ///
266
    /// Panics if we have already panicked while holding the lock on
267
    /// the internal timer state, and the lock is poisoned.
268
416
    pub fn jump_to(&self, new_wallclock: SystemTime) {
269
416
        let mut state = self.state.lock().expect("Poisoned lock for state");
270
416
        state.core.jump_wallclock(new_wallclock);
271
416
    }
272

            
273
    /// Return the amount of virtual time until the next timeout
274
    /// should elapse.
275
    ///
276
    /// If there are no more timeouts, return None.  If the next
277
    /// timeout should elapse right now, return Some(0).
278
201020
    pub(crate) fn time_until_next_timeout(&self) -> Option<Duration> {
279
201020
        let state = self.state.lock().expect("Poisoned lock for state");
280
201020
        let now = state.core.instant();
281
201020
        state
282
201020
            .sleepers
283
201020
            .peek()
284
205390
            .map(|sleepent| sleepent.when.saturating_duration_since(now))
285
201020
    }
286

            
287
    /// Return true if a `WaitFor` driving this sleep provider should advance time in order for
288
    /// futures blocked on sleeping to make progress.
289
    ///
290
    /// NOTE: This function has side-effects; if it returns true, the caller is expected to do an
291
    /// advance before calling it again.
292
    #[allow(clippy::cognitive_complexity)]
293
205574
    pub(crate) fn should_advance(&mut self) -> bool {
294
205574
        let mut state = self.state.lock().expect("Poisoned lock for state");
295
205574
        if !state.blocked_advance.is_empty() && state.allowed_advance == Duration::from_nanos(0) {
296
            // We've had advances blocked, and don't have any quota for doing allowances while
297
            // blocked left.
298
4278
            trace!(
299
                "should_advance = false: blocked by {:?}",
300
                state.blocked_advance
301
            );
302
4278
            return false;
303
201296
        }
304
201296
        if !state.should_advance {
305
            // The advance flag wasn't set.
306
184
            trace!("should_advance = false; bit not previously set");
307
184
            return false;
308
201112
        }
309
201112
        // Clear the advance flag; we'll either return true and cause an advance to happen,
310
201112
        // or the reasons to return false below also imply that the advance flag will be set again
311
201112
        // later on.
312
201112
        state.should_advance = false;
313
201112
        if state.sleepers_polled < state.sleepers_made {
314
            // Something did set the advance flag before, but it's not valid any more now because
315
            // more unpolled sleepers were created.
316
            trace!("should_advance = false; advancing no longer valid");
317
            return false;
318
201112
        }
319
201112
        if !state.blocked_advance.is_empty() && state.allowed_advance > Duration::from_nanos(0) {
320
            // If we're here, we would've returned earlier due to having advances blocked, but
321
            // we have quota to advance up to a certain time while advances are blocked.
322
            // Let's see when the next timeout is, and whether it falls within that quota.
323
6210
            let next_timeout = {
324
6210
                let now = state.core.instant();
325
6210
                state
326
6210
                    .sleepers
327
6210
                    .peek()
328
6345
                    .map(|sleepent| sleepent.when.saturating_duration_since(now))
329
            };
330
6210
            let next_timeout = match next_timeout {
331
6210
                Some(x) => x,
332
                None => {
333
                    // There's no timeout set, so we really shouldn't be here anyway.
334
                    trace!("should_advance = false; allow_one set but no timeout yet");
335
                    return false;
336
                }
337
            };
338
6210
            if next_timeout <= state.allowed_advance {
339
                // We can advance up to the next timeout, since it's in our quota.
340
                // Subtract the amount we're going to advance by from said quota.
341
6118
                state.allowed_advance -= next_timeout;
342
6118
                trace!(
343
                    "WARNING: allowing advance due to allow_one; new allowed is {:?}",
344
                    state.allowed_advance
345
                );
346
            } else {
347
                // The next timeout is too far in the future.
348
92
                trace!(
349
                    "should_advance = false; allow_one set but only up to {:?}, next is {:?}",
350
                    state.allowed_advance,
351
                    next_timeout
352
                );
353
92
                return false;
354
            }
355
194902
        }
356
201020
        true
357
205574
    }
358

            
359
    /// Register a `Waker` to be woken up when an advance in time is required to make progress.
360
    ///
361
    /// This is used by `WaitFor`.
362
210542
    pub(crate) fn register_waitfor_waker(&mut self, waker: Waker) {
363
210542
        let mut state = self.state.lock().expect("Poisoned lock for state");
364
210542
        state.waitfor_waker = Some(waker);
365
210542
    }
366

            
367
    /// Remove a previously registered `Waker` registered with `register_waitfor_waker()`.
368
4968
    pub(crate) fn clear_waitfor_waker(&mut self) {
369
4968
        let mut state = self.state.lock().expect("Poisoned lock for state");
370
4968
        state.waitfor_waker = None;
371
4968
    }
372

            
373
    /// Returns true if a `Waker` has been registered with `register_waitfor_waker()`.
374
    ///
375
    /// This is used to ensure that you don't have two concurrent `WaitFor`s running.
376
4968
    pub(crate) fn has_waitfor_waker(&self) -> bool {
377
4968
        let state = self.state.lock().expect("Poisoned lock for state");
378
4968
        state.waitfor_waker.is_some()
379
4968
    }
380
}
381

            
382
impl SleepSchedule {
383
    /// Wake any pending events that are ready according to the
384
    /// current simulated time.
385
318300
    fn fire(&mut self) {
386
        use std::collections::binary_heap::PeekMut;
387

            
388
318300
        let now = self.core.instant();
389
637426
        while let Some(top) = self.sleepers.peek_mut() {
390
635254
            if now < top.when {
391
316128
                return;
392
319126
            }
393
319126

            
394
319126
            PeekMut::pop(top).waker.wake();
395
        }
396
318300
    }
397

            
398
    /// Add a new SleepEntry to this schedule.
399
326670
    fn push(&mut self, ent: SleepEntry) {
400
326670
        self.sleepers.push(ent);
401
326670
    }
402

            
403
    /// If all sleepers made have been polled, set the advance flag and wake up any `WaitFor` that
404
    /// might be waiting.
405
547126
    fn maybe_advance(&mut self) {
406
547126
        if self.sleepers_polled >= self.sleepers_made {
407
541008
            if let Some(ref waker) = self.waitfor_waker {
408
421590
                trace!("setting advance flag");
409
421590
                self.should_advance = true;
410
421590
                waker.wake_by_ref();
411
            } else {
412
119418
                trace!("would advance, but no waker");
413
            }
414
6118
        }
415
547126
    }
416

            
417
    /// Register a sleeper as having been polled, and advance if necessary.
418
338630
    fn increment_poll_count(&mut self) {
419
338630
        self.sleepers_polled += 1;
420
338630
        trace!(
421
            "sleeper polled, {}/{}",
422
            self.sleepers_polled,
423
            self.sleepers_made
424
        );
425
338630
        self.maybe_advance();
426
338630
    }
427
}
428

            
429
impl SleepProvider for MockSleepProvider {
430
    type SleepFuture = Sleeping;
431
338814
    fn sleep(&self, duration: Duration) -> Self::SleepFuture {
432
338814
        let mut provider = self.state.lock().expect("Poisoned lock for state");
433
338814
        let when = provider.core.instant() + duration;
434
338814
        // We're making a new sleeper, so register this in the state.
435
338814
        provider.sleepers_made += 1;
436
338814
        trace!(
437
            "sleeper made for {:?}, {}/{}",
438
            duration,
439
            provider.sleepers_polled,
440
            provider.sleepers_made
441
        );
442

            
443
338814
        Sleeping {
444
338814
            when,
445
338814
            inserted: false,
446
338814
            provider: Arc::downgrade(&self.state),
447
338814
        }
448
338814
    }
449

            
450
152
    fn block_advance<T: Into<String>>(&self, reason: T) {
451
152
        let mut provider = self.state.lock().expect("Poisoned lock for state");
452
152
        let reason = reason.into();
453
152
        trace!("advancing blocked: {}", reason);
454
152
        provider.blocked_advance.insert(reason);
455
152
    }
456

            
457
112
    fn release_advance<T: Into<String>>(&self, reason: T) {
458
112
        let mut provider = self.state.lock().expect("Poisoned lock for state");
459
112
        let reason = reason.into();
460
112
        trace!("advancing released: {}", reason);
461
112
        provider.blocked_advance.remove(&reason);
462
112
        if provider.blocked_advance.is_empty() {
463
82
            provider.maybe_advance();
464
82
        }
465
112
    }
466

            
467
5612
    fn allow_one_advance(&self, dur: Duration) {
468
5612
        let mut provider = self.state.lock().expect("Poisoned lock for state");
469
5612
        provider.allowed_advance = Duration::max(provider.allowed_advance, dur);
470
5612
        trace!(
471
            "** allow_one_advance fired; may advance up to {:?} **",
472
            provider.allowed_advance
473
        );
474
5612
        provider.maybe_advance();
475
5612
    }
476

            
477
46190
    fn now(&self) -> Instant {
478
46190
        self.state
479
46190
            .lock()
480
46190
            .expect("Poisoned lock for state")
481
46190
            .core
482
46190
            .instant()
483
46190
    }
484

            
485
180464
    fn wallclock(&self) -> SystemTime {
486
180464
        self.state
487
180464
            .lock()
488
180464
            .expect("Poisoned lock for state")
489
180464
            .core
490
180464
            .wallclock()
491
180464
    }
492
}
493

            
494
impl CoarseTimeProvider for MockSleepProvider {
495
46
    fn now_coarse(&self) -> CoarseInstant {
496
46
        self.state
497
46
            .lock()
498
46
            .expect("poisoned")
499
46
            .core
500
46
            .coarse()
501
46
            .now_coarse()
502
46
    }
503
}
504

            
505
impl PartialEq for SleepEntry {
506
    fn eq(&self, other: &Self) -> bool {
507
        self.when == other.when
508
    }
509
}
510
impl Eq for SleepEntry {}
511
impl PartialOrd for SleepEntry {
512
2436046
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
513
2436046
        Some(self.cmp(other))
514
2436046
    }
515
}
516
impl Ord for SleepEntry {
517
2436046
    fn cmp(&self, other: &Self) -> Ordering {
518
2436046
        self.when.cmp(&other.when).reverse()
519
2436046
    }
520
}
521

            
522
impl Drop for Sleeping {
523
336284
    fn drop(&mut self) {
524
336284
        if let Some(provider) = Weak::upgrade(&self.provider) {
525
336100
            let mut provider = provider.lock().expect("Poisoned lock for provider");
526
336100
            if !self.inserted {
527
                // A sleeper being dropped will never be polled, so there's no point waiting;
528
                // act as if it's been polled in order to avoid waiting forever.
529
9522
                trace!("sleeper dropped, incrementing count");
530
9522
                provider.increment_poll_count();
531
9522
                self.inserted = true;
532
326578
            }
533
184
        }
534
336284
    }
535
}
536

            
537
impl Future for Sleeping {
538
    type Output = ();
539
552762
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
540
552762
        if let Some(provider) = Weak::upgrade(&self.provider) {
541
552762
            let mut provider = provider.lock().expect("Poisoned lock for provider");
542
552762
            let now = provider.core.instant();
543
552762

            
544
552762
            if now >= self.when {
545
                // The sleep time's elapsed.
546
206012
                if !self.inserted {
547
2438
                    // If we never registered this sleeper as being polled, do so now.
548
2438
                    provider.increment_poll_count();
549
2438
                    self.inserted = true;
550
203574
                }
551
206012
                if !provider.should_advance {
552
200998
                    // The first advance during a `WaitFor` gets triggered by all sleepers that
553
200998
                    // have been created being polled.
554
200998
                    // However, this only happens once.
555
200998
                    // What we do to get around this is have sleepers that return Ready kick off
556
200998
                    // another advance, in order to wake the next waiting sleeper.
557
200998
                    provider.maybe_advance();
558
200998
                }
559
206012
                return Poll::Ready(());
560
346750
            }
561
346750
            // dbg!("sleep check with", self.when-now);
562
346750

            
563
346750
            if !self.inserted {
564
326670
                let entry = SleepEntry {
565
326670
                    when: self.when,
566
326670
                    waker: cx.waker().clone(),
567
326670
                };
568
326670

            
569
326670
                provider.push(entry);
570
326670
                self.inserted = true;
571
326670
                // Register this sleeper as having been polled.
572
326670
                provider.increment_poll_count();
573
326670
            }
574
            // dbg!(provider.sleepers.len());
575
        }
576
346750
        Poll::Pending
577
552762
    }
578
}
579

            
580
#[cfg(all(test, not(miri)))] // miri cannot do CLOCK_REALTIME
581
mod test {
582
    // @@ begin test lint list maintained by maint/add_warning @@
583
    #![allow(clippy::bool_assert_comparison)]
584
    #![allow(clippy::clone_on_copy)]
585
    #![allow(clippy::dbg_macro)]
586
    #![allow(clippy::mixed_attributes_style)]
587
    #![allow(clippy::print_stderr)]
588
    #![allow(clippy::print_stdout)]
589
    #![allow(clippy::single_char_pattern)]
590
    #![allow(clippy::unwrap_used)]
591
    #![allow(clippy::unchecked_duration_subtraction)]
592
    #![allow(clippy::useless_vec)]
593
    #![allow(clippy::needless_pass_by_value)]
594
    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
595
    use super::*;
596
    use tor_rtcompat::test_with_all_runtimes;
597

            
598
    #[test]
599
    fn basics_of_time_travel() {
600
        let w1 = SystemTime::now();
601
        let sp = MockSleepProvider::new(w1);
602
        let i1 = sp.now();
603
        assert_eq!(sp.wallclock(), w1);
604

            
605
        let interval = Duration::new(4 * 3600 + 13 * 60, 0);
606
        sp.advance_noyield(interval);
607
        assert_eq!(sp.now(), i1 + interval);
608
        assert_eq!(sp.wallclock(), w1 + interval);
609

            
610
        sp.jump_to(w1 + interval * 3);
611
        assert_eq!(sp.now(), i1 + interval);
612
        assert_eq!(sp.wallclock(), w1 + interval * 3);
613
    }
614

            
615
    #[test]
616
    fn time_moves_on() {
617
        test_with_all_runtimes!(|_| async {
618
            use oneshot_fused_workaround as oneshot;
619
            use std::sync::atomic::AtomicBool;
620
            use std::sync::atomic::Ordering;
621

            
622
            let sp = MockSleepProvider::new(SystemTime::now());
623
            let one_hour = Duration::new(3600, 0);
624

            
625
            let (s1, r1) = oneshot::channel();
626
            let (s2, r2) = oneshot::channel();
627
            let (s3, r3) = oneshot::channel();
628

            
629
            let b1 = AtomicBool::new(false);
630
            let b2 = AtomicBool::new(false);
631
            let b3 = AtomicBool::new(false);
632

            
633
            let real_start = Instant::now();
634

            
635
            futures::join!(
636
                async {
637
                    sp.sleep(one_hour).await;
638
                    b1.store(true, Ordering::SeqCst);
639
                    s1.send(()).unwrap();
640
                },
641
                async {
642
                    sp.sleep(one_hour * 3).await;
643
                    b2.store(true, Ordering::SeqCst);
644
                    s2.send(()).unwrap();
645
                },
646
                async {
647
                    sp.sleep(one_hour * 5).await;
648
                    b3.store(true, Ordering::SeqCst);
649
                    s3.send(()).unwrap();
650
                },
651
                async {
652
                    sp.advance(one_hour * 2).await;
653
                    r1.await.unwrap();
654
                    assert!(b1.load(Ordering::SeqCst));
655
                    assert!(!b2.load(Ordering::SeqCst));
656
                    assert!(!b3.load(Ordering::SeqCst));
657

            
658
                    sp.advance(one_hour * 2).await;
659
                    r2.await.unwrap();
660
                    assert!(b1.load(Ordering::SeqCst));
661
                    assert!(b2.load(Ordering::SeqCst));
662
                    assert!(!b3.load(Ordering::SeqCst));
663

            
664
                    sp.advance(one_hour * 2).await;
665
                    r3.await.unwrap();
666
                    assert!(b1.load(Ordering::SeqCst));
667
                    assert!(b2.load(Ordering::SeqCst));
668
                    assert!(b3.load(Ordering::SeqCst));
669
                    let real_end = Instant::now();
670

            
671
                    assert!(real_end - real_start < one_hour);
672
                }
673
            );
674
            std::io::Result::Ok(())
675
        });
676
    }
677
}