tor_rtcompat/
scheduler.rs

1//! Utilities for dealing with periodic recurring tasks.
2
3use crate::SleepProvider;
4use futures::channel::mpsc;
5use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
6use futures::{Stream, StreamExt};
7use std::future::Future;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10use std::time::{Duration, Instant, SystemTime};
11
12use pin_project::pin_project;
13
14/// An error returned while telling a [`TaskSchedule`] to sleep.
15///
16/// Unlike regular "sleep" functions, the sleep operations on a [`TaskSchedule`]
17/// can fail because there are no [`TaskHandle`]s left.
18///
19/// Note that it is *not* an error if the `sleep` function is interrupted,
20/// cancelled, or  or rescheduled for a later time: See [`TaskSchedule::sleep`]
21/// for more information.
22#[derive(Clone, Debug, thiserror::Error)]
23#[non_exhaustive]
24pub enum SleepError {
25    /// The final [`TaskHandle`] for this [`TaskSchedule`] has been dropped: the
26    /// task should exit.
27    #[error("All task handles dropped: task exiting.")]
28    ScheduleDropped,
29}
30
31/// A command sent from task handles to schedule objects.
32#[derive(Copy, Clone)]
33enum SchedulerCommand {
34    /// Run the task now.
35    Fire,
36    /// Run the task at the provided `Instant`.
37    FireAt(Instant),
38    /// Cancel a pending execution, if there is one.
39    Cancel,
40    /// Pause execution without cancelling any running timers.  (Those timers
41    /// will fire after we resume execution.)
42    Suspend,
43    /// Resume execution.  If there is a pending timer, start waiting for it again;
44    /// otherwise, fire immediately.
45    Resume,
46}
47
48/// A remotely-controllable trigger for recurring tasks.
49///
50/// This implements [`Stream`], and is intended to be used in a `while` loop; you should
51/// wrap your recurring task in a `while schedule.next().await.is_some()` or similar.
52#[pin_project(project = TaskScheduleP)]
53pub struct TaskSchedule<R: SleepProvider> {
54    /// If we're waiting for a deadline to expire, the future for that.
55    sleep: Option<Pin<Box<R::SleepFuture>>>,
56    /// Receiver of scheduler commands from handles.
57    rx: UnboundedReceiver<SchedulerCommand>,
58    /// Runtime.
59    rt: R,
60    /// Whether or not to yield a result immediately when polled, once.
61    ///
62    /// This is used to avoid having to create a `SleepFuture` with zero duration,
63    /// which is potentially a bit wasteful.
64    instant_fire: bool,
65    /// Whether we are currently "suspended".  If we are suspended, we won't
66    /// start executing again till we're explicitly "resumed".
67    suspended: bool,
68}
69
70/// A handle used to control a [`TaskSchedule`].
71///
72/// When the final handle is dropped, the computation governed by the
73/// `TaskSchedule` should terminate.
74#[derive(Clone)]
75pub struct TaskHandle {
76    /// Sender of scheduler commands to the corresponding schedule.
77    tx: UnboundedSender<SchedulerCommand>,
78}
79
80impl<R: SleepProvider> TaskSchedule<R> {
81    /// Create a new schedule, and corresponding handle.
82    pub fn new(rt: R) -> (Self, TaskHandle) {
83        let (tx, rx) = mpsc::unbounded();
84        (
85            Self {
86                sleep: None,
87                rx,
88                rt,
89                // Start off ready.
90                instant_fire: true,
91                suspended: false,
92            },
93            TaskHandle { tx },
94        )
95    }
96
97    /// Trigger the schedule after `dur`.
98    pub fn fire_in(&mut self, dur: Duration) {
99        self.instant_fire = false;
100        self.sleep = Some(Box::pin(self.rt.sleep(dur)));
101    }
102
103    /// Trigger the schedule instantly.
104    pub fn fire(&mut self) {
105        self.instant_fire = true;
106        self.sleep = None;
107    }
108
109    /// Wait until `Dur` has elapsed.
110    ///
111    /// This call is equivalent to [`SleepProvider::sleep`], except that the
112    /// resulting future will respect calls to the functions on this schedule's
113    /// associated [`TaskHandle`].
114    ///
115    /// Alternatively, you can view this function as equivalent to
116    /// `self.fire_in(dur); self.next().await;`, only  with the intent made more
117    /// explicit.
118    ///
119    /// If the associated [`TaskHandle`] for this schedule is suspended, then
120    /// this method will not return until the schedule is unsuspended _and_ the
121    /// timer elapses.  If the associated [`TaskHandle`] is cancelled, then this
122    /// method will not return at all, until the schedule is re-activated by
123    /// [`TaskHandle::fire`] or [`TaskHandle::fire_at`].
124    ///
125    /// Finally, if every associated [`TaskHandle`] has been dropped, then this
126    /// method will return an error.
127    pub async fn sleep(&mut self, dur: Duration) -> Result<(), SleepError> {
128        self.fire_in(dur);
129        self.next().await.ok_or(SleepError::ScheduleDropped)
130    }
131
132    /// As
133    /// [`sleep_until_wallclock`](crate::SleepProviderExt::sleep_until_wallclock),
134    /// but respect messages from this schedule's associated [`TaskHandle`].
135    pub async fn sleep_until_wallclock(&mut self, when: SystemTime) -> Result<(), SleepError> {
136        loop {
137            let (finished, delay) = crate::timer::calc_next_delay(self.rt.wallclock(), when);
138            self.sleep(delay).await?;
139            if finished {
140                return Ok(());
141            }
142        }
143    }
144}
145
146impl TaskHandle {
147    /// Trigger this handle's corresponding schedule now.
148    ///
149    /// Returns `true` if the schedule still exists, and `false` otherwise.
150    pub fn fire(&self) -> bool {
151        self.tx.unbounded_send(SchedulerCommand::Fire).is_ok()
152    }
153    /// Trigger this handle's corresponding schedule at `instant`.
154    ///
155    /// Returns `true` if the schedule still exists, and `false` otherwise.
156    pub fn fire_at(&self, instant: Instant) -> bool {
157        self.tx
158            .unbounded_send(SchedulerCommand::FireAt(instant))
159            .is_ok()
160    }
161    /// Cancel a pending firing of the handle's corresponding schedule.
162    ///
163    /// Returns `true` if the schedule still exists, and `false` otherwise.
164    pub fn cancel(&self) -> bool {
165        self.tx.unbounded_send(SchedulerCommand::Cancel).is_ok()
166    }
167
168    /// Suspend execution of the corresponding schedule.
169    ///
170    /// If the schedule is ready now, it will become pending; it won't become
171    /// ready again until `resume()` is called. If the schedule is waiting for a
172    /// timer, the timer will keep counting, but the schedule won't become ready
173    /// until the timer has elapsed _and_ `resume()` has been called.
174    ///
175    /// Returns `true` if the schedule still exists, and `false` otherwise.
176    pub fn suspend(&self) -> bool {
177        self.tx.unbounded_send(SchedulerCommand::Suspend).is_ok()
178    }
179
180    /// Resume execution of the corresponding schedule.
181    ///
182    /// This method undoes the effect of a call to `suspend()`: the schedule
183    /// will fire again if it is ready (or when it becomes ready).
184    ///
185    /// This method won't cause the schedule to fire if it was already
186    /// cancelled. For that, use the `fire()` or fire_at()` methods.
187    ///
188    /// Returns `true` if the schedule still exists, and `false` otherwise.
189    pub fn resume(&self) -> bool {
190        self.tx.unbounded_send(SchedulerCommand::Resume).is_ok()
191    }
192}
193
194// NOTE(eta): implemented on the *pin projection*, not the original type, because we don't want
195//            to require `R: Unpin`. Accordingly, all the fields are mutable references.
196impl<R: SleepProvider> TaskScheduleP<'_, R> {
197    /// Handle an internal command.
198    fn handle_command(&mut self, cmd: SchedulerCommand) {
199        match cmd {
200            SchedulerCommand::Fire => {
201                *self.instant_fire = true;
202                *self.sleep = None;
203            }
204            SchedulerCommand::FireAt(instant) => {
205                let now = self.rt.now();
206                let dur = instant.saturating_duration_since(now);
207                *self.instant_fire = false;
208                *self.sleep = Some(Box::pin(self.rt.sleep(dur)));
209            }
210            SchedulerCommand::Cancel => {
211                *self.instant_fire = false;
212                *self.sleep = None;
213            }
214            SchedulerCommand::Suspend => {
215                *self.suspended = true;
216            }
217            SchedulerCommand::Resume => {
218                *self.suspended = false;
219            }
220        }
221    }
222}
223
224impl<R: SleepProvider> Stream for TaskSchedule<R> {
225    type Item = ();
226
227    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
228        let mut this = self.project();
229        while let Poll::Ready(maybe_cmd) = this.rx.poll_next_unpin(cx) {
230            match maybe_cmd {
231                Some(c) => this.handle_command(c),
232                None => {
233                    // All task handles dropped; return end of stream.
234                    return Poll::Ready(None);
235                }
236            }
237        }
238        if *this.suspended {
239            return Poll::Pending;
240        }
241        if *this.instant_fire {
242            *this.instant_fire = false;
243            return Poll::Ready(Some(()));
244        }
245        if this
246            .sleep
247            .as_mut()
248            .map(|x| x.as_mut().poll(cx).is_ready())
249            .unwrap_or(false)
250        {
251            *this.sleep = None;
252            return Poll::Ready(Some(()));
253        }
254        Poll::Pending
255    }
256}
257
258// test_with_all_runtimes! only exists if these features are satisfied.
259#[cfg(all(
260    test,
261    any(feature = "native-tls", feature = "rustls"),
262    any(feature = "tokio", feature = "async-std"),
263    not(miri), // Several of these use real SystemTime
264))]
265mod test {
266    use crate::scheduler::TaskSchedule;
267    use crate::{test_with_all_runtimes, SleepProvider};
268    use futures::FutureExt;
269    use futures::StreamExt;
270    use std::time::{Duration, Instant};
271
272    #[test]
273    fn it_fires_immediately() {
274        test_with_all_runtimes!(|rt| async move {
275            let (mut sch, _hdl) = TaskSchedule::new(rt);
276            assert!(sch.next().now_or_never().is_some());
277        });
278    }
279
280    #[test]
281    #[allow(clippy::unwrap_used)]
282    fn it_dies_if_dropped() {
283        test_with_all_runtimes!(|rt| async move {
284            let (mut sch, hdl) = TaskSchedule::new(rt);
285            drop(hdl);
286            assert!(sch.next().now_or_never().unwrap().is_none());
287        });
288    }
289
290    #[test]
291    fn it_fires_on_demand() {
292        test_with_all_runtimes!(|rt| async move {
293            let (mut sch, hdl) = TaskSchedule::new(rt);
294            assert!(sch.next().now_or_never().is_some());
295
296            assert!(sch.next().now_or_never().is_none());
297            assert!(hdl.fire());
298            assert!(sch.next().now_or_never().is_some());
299            assert!(sch.next().now_or_never().is_none());
300        });
301    }
302
303    #[test]
304    fn it_cancels_instant_firings() {
305        // NOTE(eta): this test very much assumes that unbounded channels will
306        //            transmit things instantly. If it breaks, that's probably why.
307        test_with_all_runtimes!(|rt| async move {
308            let (mut sch, hdl) = TaskSchedule::new(rt);
309            assert!(sch.next().now_or_never().is_some());
310
311            assert!(sch.next().now_or_never().is_none());
312            assert!(hdl.fire());
313            assert!(hdl.cancel());
314            assert!(sch.next().now_or_never().is_none());
315        });
316    }
317
318    #[test]
319    fn it_fires_after_self_reschedule() {
320        test_with_all_runtimes!(|rt| async move {
321            let (mut sch, _hdl) = TaskSchedule::new(rt);
322            assert!(sch.next().now_or_never().is_some());
323
324            sch.fire_in(Duration::from_millis(100));
325
326            assert!(sch.next().now_or_never().is_none());
327            assert!(sch.next().await.is_some());
328            assert!(sch.next().now_or_never().is_none());
329        });
330    }
331
332    #[test]
333    fn it_fires_after_external_reschedule() {
334        test_with_all_runtimes!(|rt| async move {
335            let (mut sch, hdl) = TaskSchedule::new(rt);
336            assert!(sch.next().now_or_never().is_some());
337
338            hdl.fire_at(Instant::now() + Duration::from_millis(100));
339
340            assert!(sch.next().now_or_never().is_none());
341            assert!(sch.next().await.is_some());
342            assert!(sch.next().now_or_never().is_none());
343        });
344    }
345
346    // This test is disabled because it was flaky when the CI servers were
347    // heavily loaded. (See #545.)
348    //
349    // TODO: Let's fix this test and make it more reliable, then re-enable it.
350    #[test]
351    #[ignore]
352    fn it_cancels_delayed_firings() {
353        test_with_all_runtimes!(|rt| async move {
354            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
355            assert!(sch.next().now_or_never().is_some());
356
357            hdl.fire_at(Instant::now() + Duration::from_millis(100));
358
359            assert!(sch.next().now_or_never().is_none());
360
361            rt.sleep(Duration::from_millis(50)).await;
362
363            assert!(sch.next().now_or_never().is_none());
364
365            hdl.cancel();
366
367            assert!(sch.next().now_or_never().is_none());
368
369            rt.sleep(Duration::from_millis(100)).await;
370
371            assert!(sch.next().now_or_never().is_none());
372        });
373    }
374
375    #[test]
376    fn last_fire_wins() {
377        test_with_all_runtimes!(|rt| async move {
378            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
379            assert!(sch.next().now_or_never().is_some());
380
381            hdl.fire_at(Instant::now() + Duration::from_millis(100));
382            hdl.fire();
383
384            assert!(sch.next().now_or_never().is_some());
385            assert!(sch.next().now_or_never().is_none());
386
387            rt.sleep(Duration::from_millis(150)).await;
388
389            assert!(sch.next().now_or_never().is_none());
390        });
391    }
392
393    #[test]
394    fn suspend_and_resume_with_fire() {
395        test_with_all_runtimes!(|rt| async move {
396            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
397            hdl.fire();
398            hdl.suspend();
399
400            assert!(sch.next().now_or_never().is_none());
401            hdl.resume();
402            assert!(sch.next().now_or_never().is_some());
403        });
404    }
405
406    #[test]
407    fn suspend_and_resume_with_sleep() {
408        test_with_all_runtimes!(|rt| async move {
409            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
410            sch.fire_in(Duration::from_millis(100));
411            hdl.suspend();
412
413            assert!(sch.next().now_or_never().is_none());
414            hdl.resume();
415            assert!(sch.next().now_or_never().is_none());
416            assert!(sch.next().await.is_some());
417        });
418    }
419
420    #[test]
421    fn suspend_and_resume_with_nothing() {
422        test_with_all_runtimes!(|rt| async move {
423            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
424            assert!(sch.next().now_or_never().is_some());
425            hdl.suspend();
426
427            assert!(sch.next().now_or_never().is_none());
428            hdl.resume();
429        });
430    }
431}