1//! Utilities for dealing with periodic recurring tasks.
23use crate::SleepProvider;
4use futures::channel::mpsc;
5use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
6use futures::{Stream, StreamExt};
7use std::future::Future;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use std::time::{Duration, Instant, SystemTime};
1112use pin_project::pin_project;
1314/// An error returned while telling a [`TaskSchedule`] to sleep.
15///
16/// Unlike regular "sleep" functions, the sleep operations on a [`TaskSchedule`]
17/// can fail because there are no [`TaskHandle`]s left.
18///
19/// Note that it is *not* an error if the `sleep` function is interrupted,
20/// cancelled, or or rescheduled for a later time: See [`TaskSchedule::sleep`]
21/// for more information.
22#[derive(Clone, Debug, thiserror::Error)]
23#[non_exhaustive]
24pub enum SleepError {
25/// The final [`TaskHandle`] for this [`TaskSchedule`] has been dropped: the
26 /// task should exit.
27#[error("All task handles dropped: task exiting.")]
28ScheduleDropped,
29}
3031/// A command sent from task handles to schedule objects.
32#[derive(Copy, Clone)]
33enum SchedulerCommand {
34/// Run the task now.
35Fire,
36/// Run the task at the provided `Instant`.
37FireAt(Instant),
38/// Cancel a pending execution, if there is one.
39Cancel,
40/// Pause execution without cancelling any running timers. (Those timers
41 /// will fire after we resume execution.)
42Suspend,
43/// Resume execution. If there is a pending timer, start waiting for it again;
44 /// otherwise, fire immediately.
45Resume,
46}
4748/// A remotely-controllable trigger for recurring tasks.
49///
50/// This implements [`Stream`], and is intended to be used in a `while` loop; you should
51/// wrap your recurring task in a `while schedule.next().await.is_some()` or similar.
52#[pin_project(project = TaskScheduleP)]
53pub struct TaskSchedule<R: SleepProvider> {
54/// If we're waiting for a deadline to expire, the future for that.
55sleep: Option<Pin<Box<R::SleepFuture>>>,
56/// Receiver of scheduler commands from handles.
57rx: UnboundedReceiver<SchedulerCommand>,
58/// Runtime.
59rt: R,
60/// Whether or not to yield a result immediately when polled, once.
61 ///
62 /// This is used to avoid having to create a `SleepFuture` with zero duration,
63 /// which is potentially a bit wasteful.
64instant_fire: bool,
65/// Whether we are currently "suspended". If we are suspended, we won't
66 /// start executing again till we're explicitly "resumed".
67suspended: bool,
68}
6970/// A handle used to control a [`TaskSchedule`].
71///
72/// When the final handle is dropped, the computation governed by the
73/// `TaskSchedule` should terminate.
74#[derive(Clone)]
75pub struct TaskHandle {
76/// Sender of scheduler commands to the corresponding schedule.
77tx: UnboundedSender<SchedulerCommand>,
78}
7980impl<R: SleepProvider> TaskSchedule<R> {
81/// Create a new schedule, and corresponding handle.
82pub fn new(rt: R) -> (Self, TaskHandle) {
83let (tx, rx) = mpsc::unbounded();
84 (
85Self {
86 sleep: None,
87 rx,
88 rt,
89// Start off ready.
90instant_fire: true,
91 suspended: false,
92 },
93 TaskHandle { tx },
94 )
95 }
9697/// Trigger the schedule after `dur`.
98pub fn fire_in(&mut self, dur: Duration) {
99self.instant_fire = false;
100self.sleep = Some(Box::pin(self.rt.sleep(dur)));
101 }
102103/// Trigger the schedule instantly.
104pub fn fire(&mut self) {
105self.instant_fire = true;
106self.sleep = None;
107 }
108109/// Wait until `Dur` has elapsed.
110 ///
111 /// This call is equivalent to [`SleepProvider::sleep`], except that the
112 /// resulting future will respect calls to the functions on this schedule's
113 /// associated [`TaskHandle`].
114 ///
115 /// Alternatively, you can view this function as equivalent to
116 /// `self.fire_in(dur); self.next().await;`, only with the intent made more
117 /// explicit.
118 ///
119 /// If the associated [`TaskHandle`] for this schedule is suspended, then
120 /// this method will not return until the schedule is unsuspended _and_ the
121 /// timer elapses. If the associated [`TaskHandle`] is cancelled, then this
122 /// method will not return at all, until the schedule is re-activated by
123 /// [`TaskHandle::fire`] or [`TaskHandle::fire_at`].
124 ///
125 /// Finally, if every associated [`TaskHandle`] has been dropped, then this
126 /// method will return an error.
127pub async fn sleep(&mut self, dur: Duration) -> Result<(), SleepError> {
128self.fire_in(dur);
129self.next().await.ok_or(SleepError::ScheduleDropped)
130 }
131132/// As
133 /// [`sleep_until_wallclock`](crate::SleepProviderExt::sleep_until_wallclock),
134 /// but respect messages from this schedule's associated [`TaskHandle`].
135pub async fn sleep_until_wallclock(&mut self, when: SystemTime) -> Result<(), SleepError> {
136loop {
137let (finished, delay) = crate::timer::calc_next_delay(self.rt.wallclock(), when);
138self.sleep(delay).await?;
139if finished {
140return Ok(());
141 }
142 }
143 }
144}
145146impl TaskHandle {
147/// Trigger this handle's corresponding schedule now.
148 ///
149 /// Returns `true` if the schedule still exists, and `false` otherwise.
150pub fn fire(&self) -> bool {
151self.tx.unbounded_send(SchedulerCommand::Fire).is_ok()
152 }
153/// Trigger this handle's corresponding schedule at `instant`.
154 ///
155 /// Returns `true` if the schedule still exists, and `false` otherwise.
156pub fn fire_at(&self, instant: Instant) -> bool {
157self.tx
158 .unbounded_send(SchedulerCommand::FireAt(instant))
159 .is_ok()
160 }
161/// Cancel a pending firing of the handle's corresponding schedule.
162 ///
163 /// Returns `true` if the schedule still exists, and `false` otherwise.
164pub fn cancel(&self) -> bool {
165self.tx.unbounded_send(SchedulerCommand::Cancel).is_ok()
166 }
167168/// Suspend execution of the corresponding schedule.
169 ///
170 /// If the schedule is ready now, it will become pending; it won't become
171 /// ready again until `resume()` is called. If the schedule is waiting for a
172 /// timer, the timer will keep counting, but the schedule won't become ready
173 /// until the timer has elapsed _and_ `resume()` has been called.
174 ///
175 /// Returns `true` if the schedule still exists, and `false` otherwise.
176pub fn suspend(&self) -> bool {
177self.tx.unbounded_send(SchedulerCommand::Suspend).is_ok()
178 }
179180/// Resume execution of the corresponding schedule.
181 ///
182 /// This method undoes the effect of a call to `suspend()`: the schedule
183 /// will fire again if it is ready (or when it becomes ready).
184 ///
185 /// This method won't cause the schedule to fire if it was already
186 /// cancelled. For that, use the `fire()` or fire_at()` methods.
187 ///
188 /// Returns `true` if the schedule still exists, and `false` otherwise.
189pub fn resume(&self) -> bool {
190self.tx.unbounded_send(SchedulerCommand::Resume).is_ok()
191 }
192}
193194// NOTE(eta): implemented on the *pin projection*, not the original type, because we don't want
195// to require `R: Unpin`. Accordingly, all the fields are mutable references.
196impl<R: SleepProvider> TaskScheduleP<'_, R> {
197/// Handle an internal command.
198fn handle_command(&mut self, cmd: SchedulerCommand) {
199match cmd {
200 SchedulerCommand::Fire => {
201*self.instant_fire = true;
202*self.sleep = None;
203 }
204 SchedulerCommand::FireAt(instant) => {
205let now = self.rt.now();
206let dur = instant.saturating_duration_since(now);
207*self.instant_fire = false;
208*self.sleep = Some(Box::pin(self.rt.sleep(dur)));
209 }
210 SchedulerCommand::Cancel => {
211*self.instant_fire = false;
212*self.sleep = None;
213 }
214 SchedulerCommand::Suspend => {
215*self.suspended = true;
216 }
217 SchedulerCommand::Resume => {
218*self.suspended = false;
219 }
220 }
221 }
222}
223224impl<R: SleepProvider> Stream for TaskSchedule<R> {
225type Item = ();
226227fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
228let mut this = self.project();
229while let Poll::Ready(maybe_cmd) = this.rx.poll_next_unpin(cx) {
230match maybe_cmd {
231Some(c) => this.handle_command(c),
232None => {
233// All task handles dropped; return end of stream.
234return Poll::Ready(None);
235 }
236 }
237 }
238if *this.suspended {
239return Poll::Pending;
240 }
241if *this.instant_fire {
242*this.instant_fire = false;
243return Poll::Ready(Some(()));
244 }
245if this
246 .sleep
247 .as_mut()
248 .map(|x| x.as_mut().poll(cx).is_ready())
249 .unwrap_or(false)
250 {
251*this.sleep = None;
252return Poll::Ready(Some(()));
253 }
254 Poll::Pending
255 }
256}
257258// test_with_all_runtimes! only exists if these features are satisfied.
259#[cfg(all(
260 test,
261 any(feature = "native-tls", feature = "rustls"),
262 any(feature = "tokio", feature = "async-std"),
263 not(miri), // Several of these use real SystemTime
264))]
265mod test {
266use crate::scheduler::TaskSchedule;
267use crate::{test_with_all_runtimes, SleepProvider};
268use futures::FutureExt;
269use futures::StreamExt;
270use std::time::{Duration, Instant};
271272#[test]
273fn it_fires_immediately() {
274test_with_all_runtimes!(|rt| async move {
275let (mut sch, _hdl) = TaskSchedule::new(rt);
276assert!(sch.next().now_or_never().is_some());
277 });
278 }
279280#[test]
281 #[allow(clippy::unwrap_used)]
282fn it_dies_if_dropped() {
283test_with_all_runtimes!(|rt| async move {
284let (mut sch, hdl) = TaskSchedule::new(rt);
285 drop(hdl);
286assert!(sch.next().now_or_never().unwrap().is_none());
287 });
288 }
289290#[test]
291fn it_fires_on_demand() {
292test_with_all_runtimes!(|rt| async move {
293let (mut sch, hdl) = TaskSchedule::new(rt);
294assert!(sch.next().now_or_never().is_some());
295296assert!(sch.next().now_or_never().is_none());
297assert!(hdl.fire());
298assert!(sch.next().now_or_never().is_some());
299assert!(sch.next().now_or_never().is_none());
300 });
301 }
302303#[test]
304fn it_cancels_instant_firings() {
305// NOTE(eta): this test very much assumes that unbounded channels will
306 // transmit things instantly. If it breaks, that's probably why.
307test_with_all_runtimes!(|rt| async move {
308let (mut sch, hdl) = TaskSchedule::new(rt);
309assert!(sch.next().now_or_never().is_some());
310311assert!(sch.next().now_or_never().is_none());
312assert!(hdl.fire());
313assert!(hdl.cancel());
314assert!(sch.next().now_or_never().is_none());
315 });
316 }
317318#[test]
319fn it_fires_after_self_reschedule() {
320test_with_all_runtimes!(|rt| async move {
321let (mut sch, _hdl) = TaskSchedule::new(rt);
322assert!(sch.next().now_or_never().is_some());
323324 sch.fire_in(Duration::from_millis(100));
325326assert!(sch.next().now_or_never().is_none());
327assert!(sch.next().await.is_some());
328assert!(sch.next().now_or_never().is_none());
329 });
330 }
331332#[test]
333fn it_fires_after_external_reschedule() {
334test_with_all_runtimes!(|rt| async move {
335let (mut sch, hdl) = TaskSchedule::new(rt);
336assert!(sch.next().now_or_never().is_some());
337338 hdl.fire_at(Instant::now() + Duration::from_millis(100));
339340assert!(sch.next().now_or_never().is_none());
341assert!(sch.next().await.is_some());
342assert!(sch.next().now_or_never().is_none());
343 });
344 }
345346// This test is disabled because it was flaky when the CI servers were
347 // heavily loaded. (See #545.)
348 //
349 // TODO: Let's fix this test and make it more reliable, then re-enable it.
350#[test]
351 #[ignore]
352fn it_cancels_delayed_firings() {
353test_with_all_runtimes!(|rt| async move {
354let (mut sch, hdl) = TaskSchedule::new(rt.clone());
355assert!(sch.next().now_or_never().is_some());
356357 hdl.fire_at(Instant::now() + Duration::from_millis(100));
358359assert!(sch.next().now_or_never().is_none());
360361 rt.sleep(Duration::from_millis(50)).await;
362363assert!(sch.next().now_or_never().is_none());
364365 hdl.cancel();
366367assert!(sch.next().now_or_never().is_none());
368369 rt.sleep(Duration::from_millis(100)).await;
370371assert!(sch.next().now_or_never().is_none());
372 });
373 }
374375#[test]
376fn last_fire_wins() {
377test_with_all_runtimes!(|rt| async move {
378let (mut sch, hdl) = TaskSchedule::new(rt.clone());
379assert!(sch.next().now_or_never().is_some());
380381 hdl.fire_at(Instant::now() + Duration::from_millis(100));
382 hdl.fire();
383384assert!(sch.next().now_or_never().is_some());
385assert!(sch.next().now_or_never().is_none());
386387 rt.sleep(Duration::from_millis(150)).await;
388389assert!(sch.next().now_or_never().is_none());
390 });
391 }
392393#[test]
394fn suspend_and_resume_with_fire() {
395test_with_all_runtimes!(|rt| async move {
396let (mut sch, hdl) = TaskSchedule::new(rt.clone());
397 hdl.fire();
398 hdl.suspend();
399400assert!(sch.next().now_or_never().is_none());
401 hdl.resume();
402assert!(sch.next().now_or_never().is_some());
403 });
404 }
405406#[test]
407fn suspend_and_resume_with_sleep() {
408test_with_all_runtimes!(|rt| async move {
409let (mut sch, hdl) = TaskSchedule::new(rt.clone());
410 sch.fire_in(Duration::from_millis(100));
411 hdl.suspend();
412413assert!(sch.next().now_or_never().is_none());
414 hdl.resume();
415assert!(sch.next().now_or_never().is_none());
416assert!(sch.next().await.is_some());
417 });
418 }
419420#[test]
421fn suspend_and_resume_with_nothing() {
422test_with_all_runtimes!(|rt| async move {
423let (mut sch, hdl) = TaskSchedule::new(rt.clone());
424assert!(sch.next().now_or_never().is_some());
425 hdl.suspend();
426427assert!(sch.next().now_or_never().is_none());
428 hdl.resume();
429 });
430 }
431}