1//! Definitions for [`SleepProviderExt`] and related types.
23use crate::traits::SleepProvider;
4use futures::{Future, FutureExt};
5use pin_project::pin_project;
6use std::{
7 pin::Pin,
8 task::{Context, Poll},
9 time::{Duration, SystemTime},
10};
1112/// An error value given when a function times out.
13///
14/// This value is generated when the timeout from
15/// [`SleepProviderExt::timeout`] expires before the provided future
16/// is ready.
17#[derive(Copy, Clone, Debug, Eq, PartialEq)]
18#[allow(clippy::exhaustive_structs)]
19pub struct TimeoutError;
20impl std::error::Error for TimeoutError {}
21impl std::fmt::Display for TimeoutError {
22fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23write!(f, "Timeout expired")
24 }
25}
2627impl From<TimeoutError> for std::io::Error {
28fn from(err: TimeoutError) -> std::io::Error {
29 std::io::Error::new(std::io::ErrorKind::TimedOut, err)
30 }
31}
3233/// An extension trait on [`SleepProvider`] for timeouts and clock delays.
34pub trait SleepProviderExt: SleepProvider {
35/// Wrap a [`Future`] with a timeout.
36 ///
37 /// The output of the new future will be the returned value of
38 /// `future` if it completes within `duration`. Otherwise, it
39 /// will be `Err(TimeoutError)`.
40 ///
41 /// # Limitations
42 ///
43 /// This uses [`SleepProvider::sleep`] for its timer, and is
44 /// subject to the same limitations.
45#[must_use = "timeout() returns a future, which does nothing unless used"]
46fn timeout<F: Future>(&self, duration: Duration, future: F) -> Timeout<F, Self::SleepFuture> {
47let sleep_future = self.sleep(duration);
4849 Timeout {
50 future,
51 sleep_future,
52 }
53 }
5455/// Pause until the wall-clock is at `when` or later, trying to
56 /// recover from clock jumps.
57 ///
58 /// Unlike [`SleepProvider::sleep()`], the future returned by this function will
59 /// wake up periodically to check the current time, and see if
60 /// it is at or past the target.
61 ///
62 /// # Limitations
63 ///
64 /// The ability of this function to detect clock jumps is limited
65 /// to its granularity; it may finish a while after the declared
66 /// wallclock time if the system clock jumps forward.
67 ///
68 /// This function does not detect backward clock jumps; arguably,
69 /// we should have another function to do that.
70 ///
71 /// This uses [`SleepProvider::sleep`] for its timer, and is
72 /// subject to the same limitations.
73#[must_use = "sleep_until_wallclock() returns a future, which does nothing unless used"]
74fn sleep_until_wallclock(&self, when: SystemTime) -> SleepUntilWallclock<'_, Self> {
75 SleepUntilWallclock {
76 provider: self,
77 target: when,
78 sleep_future: None,
79 }
80 }
81}
8283impl<T: SleepProvider> SleepProviderExt for T {}
8485/// A timeout returned by [`SleepProviderExt::timeout`].
86#[pin_project]
87pub struct Timeout<T, S> {
88/// The future we want to execute.
89#[pin]
90future: T,
91/// The future implementing the timeout.
92#[pin]
93sleep_future: S,
94}
9596impl<T, S> Future for Timeout<T, S>
97where
98T: Future,
99 S: Future<Output = ()>,
100{
101type Output = Result<T::Output, TimeoutError>;
102103fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
104let this = self.project();
105if let Poll::Ready(x) = this.future.poll(cx) {
106return Poll::Ready(Ok(x));
107 }
108109match this.sleep_future.poll(cx) {
110 Poll::Pending => Poll::Pending,
111 Poll::Ready(()) => Poll::Ready(Err(TimeoutError)),
112 }
113 }
114}
115116/// A future implementing [`SleepProviderExt::sleep_until_wallclock`].
117pub struct SleepUntilWallclock<'a, SP: SleepProvider> {
118/// Reference to the provider that we use to make new SleepFutures.
119provider: &'a SP,
120/// The time that we are waiting for.
121target: SystemTime,
122/// The future representing our current delay.
123sleep_future: Option<Pin<Box<SP::SleepFuture>>>,
124}
125126impl<'a, SP> Future for SleepUntilWallclock<'a, SP>
127where
128SP: SleepProvider,
129{
130type Output = ();
131132fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
133// Strategy: we implement sleep_until_wallclock by
134 // waiting in increments of up to MAX_SLEEP, checking the
135 // wall clock before and after each increment. This makes
136 // us wake up a bit more frequently, but enables us to detect it
137 // if the system clock jumps forward.
138let target = self.target;
139loop {
140let now = self.provider.wallclock();
141if now >= target {
142return Poll::Ready(());
143 }
144145let (last_delay, delay) = calc_next_delay(now, target);
146147// Note that we store this future to keep it from being
148 // cancelled, even though we don't ever poll it more than
149 // once.
150 //
151 // TODO: I'm not sure that it's actually necessary to keep
152 // this future around.
153self.sleep_future.take();
154155let mut sleep_future = Box::pin(self.provider.sleep(delay));
156match sleep_future.poll_unpin(cx) {
157 Poll::Pending => {
158self.sleep_future = Some(sleep_future);
159return Poll::Pending;
160 }
161 Poll::Ready(()) => {
162if last_delay {
163return Poll::Ready(());
164 }
165 }
166 }
167 }
168 }
169}
170171/// We never sleep more than this much, in case our system clock jumps.
172///
173/// Note that there's a tradeoff here: Making this duration
174/// shorter helps our accuracy, but makes us wake up more
175/// frequently and consume more CPU.
176const MAX_SLEEP: Duration = Duration::from_secs(600);
177178/// Return the amount of time we should wait next, when running
179/// sleep_until_wallclock(). Also return a boolean indicating whether we
180/// expect this to be the final delay.
181///
182/// (This is a separate function for testing.)
183pub(crate) fn calc_next_delay(now: SystemTime, when: SystemTime) -> (bool, Duration) {
184let remainder = when
185 .duration_since(now)
186 .unwrap_or_else(|_| Duration::from_secs(0));
187if remainder > MAX_SLEEP {
188 (false, MAX_SLEEP)
189 } else {
190 (true, remainder)
191 }
192}
193194#[cfg(test)]
195mod test {
196// @@ begin test lint list maintained by maint/add_warning @@
197#![allow(clippy::bool_assert_comparison)]
198 #![allow(clippy::clone_on_copy)]
199 #![allow(clippy::dbg_macro)]
200 #![allow(clippy::mixed_attributes_style)]
201 #![allow(clippy::print_stderr)]
202 #![allow(clippy::print_stdout)]
203 #![allow(clippy::single_char_pattern)]
204 #![allow(clippy::unwrap_used)]
205 #![allow(clippy::unchecked_duration_subtraction)]
206 #![allow(clippy::useless_vec)]
207 #![allow(clippy::needless_pass_by_value)]
208//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
209#![allow(clippy::erasing_op)]
210211 #[cfg(not(miri))]
212use super::*;
213214#[cfg(not(miri))] // This uses a real SystemTime, which doesn't work in miri
215#[test]
216fn sleep_delay() {
217fn calc(now: SystemTime, when: SystemTime) -> Duration {
218 calc_next_delay(now, when).1
219}
220let minute = Duration::from_secs(60);
221let second = Duration::from_secs(1);
222let start = SystemTime::now();
223224let target = start + 30 * minute;
225226assert_eq!(calc(start, target), minute * 10);
227assert_eq!(calc(target + minute, target), minute * 0);
228assert_eq!(calc(target, target), minute * 0);
229assert_eq!(calc(target - second, target), second);
230assert_eq!(calc(target - minute * 9, target), minute * 9);
231assert_eq!(calc(target - minute * 11, target), minute * 10);
232 }
233}