Lines
100 %
Functions
97.56 %
Branches
//! Simple provider of simulated time
//!
//! See [`SimpleMockTimeProvider`]
use std::cmp::Reverse;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant, SystemTime};
use derive_more::AsMut;
use priority_queue::priority_queue::PriorityQueue;
use slotmap_careful::DenseSlotMap;
use tor_rtcompat::CoarseInstant;
use tor_rtcompat::CoarseTimeProvider;
use tor_rtcompat::SleepProvider;
use crate::time_core::MockTimeCore;
/// Simple provider of simulated time
///
/// Maintains a mocked view of the current [`Instant`] and [`SystemTime`].
/// The simulated time advances only when explicitly instructed,
/// by calling [`.advance()`](Provider::advance).
/// The wallclock time can be warped with
/// [`.jump_wallclock()`](Provider::jump_wallclock),
/// allowing simulation of wall clock non-monotonicity.
/// # Panics and aborts
/// Panics on time under/overflow.
/// May cause an abort if the [`SimpleMockTimeProvider`] implementation contains bugs.
#[derive(Clone, Debug)]
pub struct SimpleMockTimeProvider {
/// The actual state
state: Arc<Mutex<State>>,
}
/// Convenience abbreviation
pub(crate) use SimpleMockTimeProvider as Provider;
/// Identifier of a [`SleepFuture`]
type Id = slotmap_careful::DefaultKey;
/// Future for `sleep`
/// Iff this struct exists, there is an entry for `id` in `prov.futures`.
/// (It might contain `None`.)
pub struct SleepFuture {
/// Reference to our state
prov: Provider,
/// Which `SleepFuture` are we
id: Id,
/// Mutable state for a [`Provider`]
/// Each sleep ([`Id`], [`SleepFuture`]) is in one of the following states:
/// | state | [`SleepFuture`] | `futures` | `unready` |
/// |-------------|------------------|------------------|--------------------|
/// | UNPOLLLED | exists | present, `None` | present, `> now` |
/// | WAITING | exists | present, `Some` | present, `> now` |
/// | READY | exists | present, `None` | absent |
/// | DROPPED | dropped | absent | absent |
#[derive(Debug, AsMut)]
struct State {
/// Current time (coarse)
core: MockTimeCore,
/// Futures; record of every existing [`SleepFuture`], including any `Waker`
/// Entry exists iff `SleepFuture` exists.
/// Contains `None` if we haven't polled the future;
/// `Some` if we have.
/// We could use a `Vec` or `TiVec`
/// but using a slotmap is more robust against bugs here.
futures: DenseSlotMap<Id, Option<Waker>>,
/// Priority queue
/// Subset of `futures`.
/// An entry is present iff the `Instant` is *strictly* after `State.now`,
/// in which case that's when the future should be woken.
/// `PriorityQueue` is a max-heap but we want earliest times, hence `Reverse`
unready: PriorityQueue<Id, Reverse<Instant>>,
/// `Default` makes a `Provider` which starts at whatever the current real time is
impl Default for Provider {
fn default() -> Self {
Self::from_real()
impl Provider {
/// Return a new mock time provider starting at a specified point in time
pub fn new(now: Instant, wallclock: SystemTime) -> Self {
let state = State {
core: MockTimeCore::new(now, wallclock),
futures: Default::default(),
unready: Default::default(),
};
Provider {
state: Arc::new(Mutex::new(state)),
/// Return a new mock time provider starting at the current actual (non-mock) time
/// Like any [`SimpleMockTimeProvider`], the time is frozen and only changes
/// due to calls to `advance`.
pub fn from_real() -> Self {
Provider::from_wallclock(SystemTime::now())
/// Return a new mock time provider starting at a specified wallclock time
/// The monotonic time ([`Instant`]) starts at the current actual (non-mock) time.
/// (Absolute values of the real monotonic time are not readily
/// observable or distinguishable from Rust,
/// nor can a fixed `Instant` be constructed,
/// so this is usually sufficient for a reproducible test.)
pub fn from_wallclock(wallclock: SystemTime) -> Self {
Provider::new(Instant::now(), wallclock)
/// Advance the simulated time by `d`
/// This advances both the `Instant` (monotonic time)
/// and `SystemTime` (wallclock time)
/// by the same amount.
/// Will wake sleeping [`SleepFuture`]s, as appropriate.
/// Note that the tasks which were waiting on those now-expired `SleepFuture`s
/// will only actually execute when they are next polled.
/// `advance` does not yield to the executor or poll any futures.
/// The executor will (presumably) poll those woken tasks, when it regains control.
/// But the order in which the tasks run will depend on its scheduling policy,
/// and might be different to the order implied by the futures' timeout values.
/// To simulate normal time advancement, wakeups, and task activations,
/// use [`MockExecutor::advance_*()`](crate::MockRuntime).
pub fn advance(&self, d: Duration) {
let mut state = self.lock();
state.core.advance(d);
state.wake_any();
/// Warp the wallclock time
/// This has no effect on any sleeping futures.
/// It only affects the return value from [`.wallclock()`](Provider::wallclock).
pub fn jump_wallclock(&self, new_wallclock: SystemTime) {
self.lock().core.jump_wallclock(new_wallclock);
// Really we ought to wake people up, here.
// But absolutely every Rust API is wrong: none offer a way to sleep until a SystemTime.
// (There might be some less-portable non-Rust APIs for that.)
/// When will the next timeout occur?
/// Returns the duration until the next [`SleepFuture`] should wake up.
/// Advancing time by at least this amount will wake up that future,
/// and any others with the same wakeup time.
/// Will never return `Some(ZERO)`:
/// any future that is supposed to wake up now (or earlier) has indeed already been woken,
/// so it is no longer sleeping and isn't included in the calculation.
pub fn time_until_next_timeout(&self) -> Option<Duration> {
let state = self.lock();
let Reverse(until) = state.unready.peek()?.1;
// The invariant (see `State`) guarantees that entries in `unready` are always `> now`,
// so we don't whether duration_since would panic or saturate.
let d = until.duration_since(state.core.instant());
Some(d)
/// Convenience function to lock the state
fn lock(&self) -> MutexGuard<'_, State> {
self.state.lock().expect("simple time state poisoned")
impl SleepProvider for Provider {
type SleepFuture = SleepFuture;
fn sleep(&self, d: Duration) -> SleepFuture {
let until = state.core.instant() + d;
let id = state.futures.insert(None);
state.unready.push(id, Reverse(until));
let fut = SleepFuture {
id,
prov: self.clone(),
// This sleep is now UNPOLLLED, except that its time might be `<= now`:
// Possibly, `until` isn't *strictly* after than `state.now`, since d might be 0.
// If so, .wake_any() will restore the invariant by immediately waking.
// This sleep is now UNPOLLED or READY, according to whether duration was 0.
fut
fn now(&self) -> Instant {
self.lock().core.instant()
fn wallclock(&self) -> SystemTime {
self.lock().core.wallclock()
impl CoarseTimeProvider for Provider {
fn now_coarse(&self) -> CoarseInstant {
self.lock().core.coarse().now_coarse()
impl Future for SleepFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut state = self.prov.lock();
if let Some((_, Reverse(scheduled))) = state.unready.get(&self.id) {
// Presence of this entry implies scheduled > now: we are UNPOLLED or WAITING
assert!(*scheduled > state.core.instant());
let waker = Some(cx.waker().clone());
// Make this be WAITING. (If we're re-polled, we simply drop any previous waker.)
*state
.futures
.get_mut(self.id)
.expect("polling futures entry") = waker;
Poll::Pending
} else {
// Absence implies scheduled (no longer stored) <= now: we are READY
Poll::Ready(())
impl State {
/// Restore the invariant for `unready` after `now` has been increased
/// Ie, ensures that any sleeps which are
/// WAITING/UNPOLLED except that they are `<= now`,
/// are moved to state READY.
fn wake_any(&mut self) {
loop {
match self.unready.peek() {
// Keep picking off entries with scheduled <= now
Some((_, Reverse(scheduled))) if *scheduled <= self.core.instant() => {
let (id, _) = self.unready.pop().expect("vanished");
// We can .take() the waker since this can only ever run once
// per sleep future (since it happens when we pop it from unready).
let futures_entry = self.futures.get_mut(id).expect("stale unready entry");
if let Some(waker) = futures_entry.take() {
waker.wake();
_ => break,
impl Drop for SleepFuture {
fn drop(&mut self) {
let _: Option<Waker> = state.futures.remove(self.id).expect("entry vanished");
let _: Option<(Id, Reverse<Instant>)> = state.unready.remove(&self.id);
// Now it is DROPPED.
#[cfg(test)]
mod test {
// @@ begin test lint list maintained by maint/add_warning @@
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_duration_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
use super::*;
use crate::task::MockExecutor;
use futures::poll;
use humantime::parse_rfc3339;
use tor_rtcompat::ToplevelBlockOn as _;
use Poll::*;
fn ms(ms: u64) -> Duration {
Duration::from_millis(ms)
fn run_test<FUT>(f: impl FnOnce(Provider, MockExecutor) -> FUT)
where
FUT: Future<Output = ()>,
{
let sp = Provider::new(
Instant::now(), // it would have been nice to make this fixed for the test
parse_rfc3339("2000-01-01T00:00:00Z").unwrap(),
);
let exec = MockExecutor::new();
exec.block_on(f(sp, exec.clone()));
#[test]
fn simple() {
run_test(|sp, _exec| async move {
let n1 = sp.now();
let w1 = sp.wallclock();
let mut f1 = sp.sleep(ms(500));
let mut f2 = sp.sleep(ms(1500));
assert_eq!(poll!(&mut f1), Pending);
sp.advance(ms(200));
assert_eq!(n1 + ms(200), sp.now());
assert_eq!(w1 + ms(200), sp.wallclock());
assert_eq!(poll!(&mut f2), Pending);
drop(f2);
sp.jump_wallclock(w1 + ms(10_000));
sp.advance(ms(300));
assert_eq!(n1 + ms(500), sp.now());
assert_eq!(w1 + ms(10_300), sp.wallclock());
assert_eq!(poll!(&mut f1), Ready(()));
let mut f0 = sp.sleep(ms(0));
assert_eq!(poll!(&mut f0), Ready(()));
});
fn task() {
run_test(|sp, exec| async move {
let st = Arc::new(Mutex::new(0_i8));
exec.spawn_identified("test task", {
let st = st.clone();
let sp = sp.clone();
async move {
*st.lock().unwrap() = 1;
sp.sleep(ms(500)).await;
*st.lock().unwrap() = 2;
sp.sleep(ms(300)).await;
*st.lock().unwrap() = 3;
let st = move || *st.lock().unwrap();
assert_eq!(st(), 0);
exec.progress_until_stalled().await;
assert_eq!(st(), 1);
assert_eq!(sp.time_until_next_timeout(), Some(ms(500)));
sp.advance(ms(500));
assert_eq!(sp.time_until_next_timeout(), None);
assert_eq!(st(), 2);
assert_eq!(sp.time_until_next_timeout(), Some(ms(300)));
assert_eq!(st(), 3);