1
//! Utilities for dealing with periodic recurring tasks.
2

            
3
use crate::SleepProvider;
4
use futures::channel::mpsc;
5
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
6
use futures::{Stream, StreamExt};
7
use std::future::Future;
8
use std::pin::Pin;
9
use std::task::{Context, Poll};
10
use std::time::{Duration, Instant, SystemTime};
11

            
12
use pin_project::pin_project;
13

            
14
/// An error returned while telling a [`TaskSchedule`] to sleep.
15
///
16
/// Unlike regular "sleep" functions, the sleep operations on a [`TaskSchedule`]
17
/// can fail because there are no [`TaskHandle`]s left.
18
///
19
/// Note that it is *not* an error if the `sleep` function is interrupted,
20
/// cancelled, or  or rescheduled for a later time: See [`TaskSchedule::sleep`]
21
/// for more information.
22
#[derive(Clone, Debug, thiserror::Error)]
23
#[non_exhaustive]
24
pub enum SleepError {
25
    /// The final [`TaskHandle`] for this [`TaskSchedule`] has been dropped: the
26
    /// task should exit.
27
    #[error("All task handles dropped: task exiting.")]
28
    ScheduleDropped,
29
}
30

            
31
/// A command sent from task handles to schedule objects.
32
#[derive(Copy, Clone)]
33
enum SchedulerCommand {
34
    /// Run the task now.
35
    Fire,
36
    /// Run the task at the provided `Instant`.
37
    FireAt(Instant),
38
    /// Cancel a pending execution, if there is one.
39
    Cancel,
40
    /// Pause execution without cancelling any running timers.  (Those timers
41
    /// will fire after we resume execution.)
42
    Suspend,
43
    /// Resume execution.  If there is a pending timer, start waiting for it again;
44
    /// otherwise, fire immediately.
45
    Resume,
46
}
47

            
48
/// A remotely-controllable trigger for recurring tasks.
49
///
50
/// This implements [`Stream`], and is intended to be used in a `while` loop; you should
51
/// wrap your recurring task in a `while schedule.next().await.is_some()` or similar.
52
346
#[pin_project(project = TaskScheduleP)]
53
pub struct TaskSchedule<R: SleepProvider> {
54
    /// If we're waiting for a deadline to expire, the future for that.
55
    sleep: Option<Pin<Box<R::SleepFuture>>>,
56
    /// Receiver of scheduler commands from handles.
57
    rx: UnboundedReceiver<SchedulerCommand>,
58
    /// Runtime.
59
    rt: R,
60
    /// Whether or not to yield a result immediately when polled, once.
61
    ///
62
    /// This is used to avoid having to create a `SleepFuture` with zero duration,
63
    /// which is potentially a bit wasteful.
64
    instant_fire: bool,
65
    /// Whether we are currently "suspended".  If we are suspended, we won't
66
    /// start executing again till we're explicitly "resumed".
67
    suspended: bool,
68
}
69

            
70
/// A handle used to control a [`TaskSchedule`].
71
///
72
/// When the final handle is dropped, the computation governed by the
73
/// `TaskSchedule` should terminate.
74
#[derive(Clone)]
75
pub struct TaskHandle {
76
    /// Sender of scheduler commands to the corresponding schedule.
77
    tx: UnboundedSender<SchedulerCommand>,
78
}
79

            
80
impl<R: SleepProvider> TaskSchedule<R> {
81
    /// Create a new schedule, and corresponding handle.
82
158
    pub fn new(rt: R) -> (Self, TaskHandle) {
83
158
        let (tx, rx) = mpsc::unbounded();
84
158
        (
85
158
            Self {
86
158
                sleep: None,
87
158
                rx,
88
158
                rt,
89
158
                // Start off ready.
90
158
                instant_fire: true,
91
158
                suspended: false,
92
158
            },
93
158
            TaskHandle { tx },
94
158
        )
95
158
    }
96

            
97
    /// Trigger the schedule after `dur`.
98
46
    pub fn fire_in(&mut self, dur: Duration) {
99
46
        self.instant_fire = false;
100
46
        self.sleep = Some(Box::pin(self.rt.sleep(dur)));
101
46
    }
102

            
103
    /// Trigger the schedule instantly.
104
    pub fn fire(&mut self) {
105
        self.instant_fire = true;
106
        self.sleep = None;
107
    }
108

            
109
    /// Wait until `Dur` has elapsed.
110
    ///
111
    /// This call is equivalent to [`SleepProvider::sleep`], except that the
112
    /// resulting future will respect calls to the functions on this schedule's
113
    /// associated [`TaskHandle`].
114
    ///
115
    /// Alternatively, you can view this function as equivalent to
116
    /// `self.fire_in(dur); self.next().await;`, only  with the intent made more
117
    /// explicit.
118
    ///
119
    /// If the associated [`TaskHandle`] for this schedule is suspended, then
120
    /// this method will not return until the schedule is unsuspended _and_ the
121
    /// timer elapses.  If the associated [`TaskHandle`] is cancelled, then this
122
    /// method will not return at all, until the schedule is re-activated by
123
    /// [`TaskHandle::fire`] or [`TaskHandle::fire_at`].
124
    ///
125
    /// Finally, if every associated [`TaskHandle`] has been dropped, then this
126
    /// method will return an error.
127
    pub async fn sleep(&mut self, dur: Duration) -> Result<(), SleepError> {
128
        self.fire_in(dur);
129
        self.next().await.ok_or(SleepError::ScheduleDropped)
130
    }
131

            
132
    /// As
133
    /// [`sleep_until_wallclock`](crate::SleepProviderExt::sleep_until_wallclock),
134
    /// but respect messages from this schedule's associated [`TaskHandle`].
135
2
    pub async fn sleep_until_wallclock(&mut self, when: SystemTime) -> Result<(), SleepError> {
136
        loop {
137
            let (finished, delay) = crate::timer::calc_next_delay(self.rt.wallclock(), when);
138
            self.sleep(delay).await?;
139
            if finished {
140
                return Ok(());
141
            }
142
        }
143
    }
144
}
145

            
146
impl TaskHandle {
147
    /// Trigger this handle's corresponding schedule now.
148
    ///
149
    /// Returns `true` if the schedule still exists, and `false` otherwise.
150
252
    pub fn fire(&self) -> bool {
151
252
        self.tx.unbounded_send(SchedulerCommand::Fire).is_ok()
152
252
    }
153
    /// Trigger this handle's corresponding schedule at `instant`.
154
    ///
155
    /// Returns `true` if the schedule still exists, and `false` otherwise.
156
16
    pub fn fire_at(&self, instant: Instant) -> bool {
157
16
        self.tx
158
16
            .unbounded_send(SchedulerCommand::FireAt(instant))
159
16
            .is_ok()
160
16
    }
161
    /// Cancel a pending firing of the handle's corresponding schedule.
162
    ///
163
    /// Returns `true` if the schedule still exists, and `false` otherwise.
164
8
    pub fn cancel(&self) -> bool {
165
8
        self.tx.unbounded_send(SchedulerCommand::Cancel).is_ok()
166
8
    }
167

            
168
    /// Suspend execution of the corresponding schedule.
169
    ///
170
    /// If the schedule is ready now, it will become pending; it won't become
171
    /// ready again until `resume()` is called. If the schedule is waiting for a
172
    /// timer, the timer will keep counting, but the schedule won't become ready
173
    /// until the timer has elapsed _and_ `resume()` has been called.
174
    ///
175
    /// Returns `true` if the schedule still exists, and `false` otherwise.
176
24
    pub fn suspend(&self) -> bool {
177
24
        self.tx.unbounded_send(SchedulerCommand::Suspend).is_ok()
178
24
    }
179

            
180
    /// Resume execution of the corresponding schedule.
181
    ///
182
    /// This method undoes the effect of a call to `suspend()`: the schedule
183
    /// will fire again if it is ready (or when it becomes ready).
184
    ///
185
    /// This method won't cause the schedule to fire if it was already
186
    /// cancelled. For that, use the `fire()` or fire_at()` methods.
187
    ///
188
    /// Returns `true` if the schedule still exists, and `false` otherwise.
189
24
    pub fn resume(&self) -> bool {
190
24
        self.tx.unbounded_send(SchedulerCommand::Resume).is_ok()
191
24
    }
192
}
193

            
194
// NOTE(eta): implemented on the *pin projection*, not the original type, because we don't want
195
//            to require `R: Unpin`. Accordingly, all the fields are mutable references.
196
impl<R: SleepProvider> TaskScheduleP<'_, R> {
197
    /// Handle an internal command.
198
104
    fn handle_command(&mut self, cmd: SchedulerCommand) {
199
104
        match cmd {
200
40
            SchedulerCommand::Fire => {
201
40
                *self.instant_fire = true;
202
40
                *self.sleep = None;
203
40
            }
204
16
            SchedulerCommand::FireAt(instant) => {
205
16
                let now = self.rt.now();
206
16
                let dur = instant.saturating_duration_since(now);
207
16
                *self.instant_fire = false;
208
16
                *self.sleep = Some(Box::pin(self.rt.sleep(dur)));
209
16
            }
210
8
            SchedulerCommand::Cancel => {
211
8
                *self.instant_fire = false;
212
8
                *self.sleep = None;
213
8
            }
214
24
            SchedulerCommand::Suspend => {
215
24
                *self.suspended = true;
216
24
            }
217
16
            SchedulerCommand::Resume => {
218
16
                *self.suspended = false;
219
16
            }
220
        }
221
104
    }
222
}
223

            
224
impl<R: SleepProvider> Stream for TaskSchedule<R> {
225
    type Item = ();
226

            
227
346
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
228
346
        let mut this = self.project();
229
450
        while let Poll::Ready(maybe_cmd) = this.rx.poll_next_unpin(cx) {
230
140
            match maybe_cmd {
231
104
                Some(c) => this.handle_command(c),
232
                None => {
233
                    // All task handles dropped; return end of stream.
234
36
                    return Poll::Ready(None);
235
                }
236
            }
237
        }
238
310
        if *this.suspended {
239
24
            return Poll::Pending;
240
286
        }
241
286
        if *this.instant_fire {
242
126
            *this.instant_fire = false;
243
126
            return Poll::Ready(Some(()));
244
160
        }
245
160
        if this
246
160
            .sleep
247
160
            .as_mut()
248
160
            .map(|x| x.as_mut().poll(cx).is_ready())
249
160
            .unwrap_or(false)
250
        {
251
24
            *this.sleep = None;
252
24
            return Poll::Ready(Some(()));
253
136
        }
254
136
        Poll::Pending
255
346
    }
256
}
257

            
258
// test_with_all_runtimes! only exists if these features are satisfied.
259
#[cfg(all(
260
    test,
261
    any(feature = "native-tls", feature = "rustls"),
262
    any(feature = "tokio", feature = "async-std"),
263
    not(miri), // Several of these use real SystemTime
264
))]
265
mod test {
266
    use crate::scheduler::TaskSchedule;
267
    use crate::{test_with_all_runtimes, SleepProvider};
268
    use futures::FutureExt;
269
    use futures::StreamExt;
270
    use std::time::{Duration, Instant};
271

            
272
    #[test]
273
    fn it_fires_immediately() {
274
        test_with_all_runtimes!(|rt| async move {
275
            let (mut sch, _hdl) = TaskSchedule::new(rt);
276
            assert!(sch.next().now_or_never().is_some());
277
        });
278
    }
279

            
280
    #[test]
281
    #[allow(clippy::unwrap_used)]
282
    fn it_dies_if_dropped() {
283
        test_with_all_runtimes!(|rt| async move {
284
            let (mut sch, hdl) = TaskSchedule::new(rt);
285
            drop(hdl);
286
            assert!(sch.next().now_or_never().unwrap().is_none());
287
        });
288
    }
289

            
290
    #[test]
291
    fn it_fires_on_demand() {
292
        test_with_all_runtimes!(|rt| async move {
293
            let (mut sch, hdl) = TaskSchedule::new(rt);
294
            assert!(sch.next().now_or_never().is_some());
295

            
296
            assert!(sch.next().now_or_never().is_none());
297
            assert!(hdl.fire());
298
            assert!(sch.next().now_or_never().is_some());
299
            assert!(sch.next().now_or_never().is_none());
300
        });
301
    }
302

            
303
    #[test]
304
    fn it_cancels_instant_firings() {
305
        // NOTE(eta): this test very much assumes that unbounded channels will
306
        //            transmit things instantly. If it breaks, that's probably why.
307
        test_with_all_runtimes!(|rt| async move {
308
            let (mut sch, hdl) = TaskSchedule::new(rt);
309
            assert!(sch.next().now_or_never().is_some());
310

            
311
            assert!(sch.next().now_or_never().is_none());
312
            assert!(hdl.fire());
313
            assert!(hdl.cancel());
314
            assert!(sch.next().now_or_never().is_none());
315
        });
316
    }
317

            
318
    #[test]
319
    fn it_fires_after_self_reschedule() {
320
        test_with_all_runtimes!(|rt| async move {
321
            let (mut sch, _hdl) = TaskSchedule::new(rt);
322
            assert!(sch.next().now_or_never().is_some());
323

            
324
            sch.fire_in(Duration::from_millis(100));
325

            
326
            assert!(sch.next().now_or_never().is_none());
327
            assert!(sch.next().await.is_some());
328
            assert!(sch.next().now_or_never().is_none());
329
        });
330
    }
331

            
332
    #[test]
333
    fn it_fires_after_external_reschedule() {
334
        test_with_all_runtimes!(|rt| async move {
335
            let (mut sch, hdl) = TaskSchedule::new(rt);
336
            assert!(sch.next().now_or_never().is_some());
337

            
338
            hdl.fire_at(Instant::now() + Duration::from_millis(100));
339

            
340
            assert!(sch.next().now_or_never().is_none());
341
            assert!(sch.next().await.is_some());
342
            assert!(sch.next().now_or_never().is_none());
343
        });
344
    }
345

            
346
    // This test is disabled because it was flaky when the CI servers were
347
    // heavily loaded. (See #545.)
348
    //
349
    // TODO: Let's fix this test and make it more reliable, then re-enable it.
350
    #[test]
351
    #[ignore]
352
    fn it_cancels_delayed_firings() {
353
        test_with_all_runtimes!(|rt| async move {
354
            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
355
            assert!(sch.next().now_or_never().is_some());
356

            
357
            hdl.fire_at(Instant::now() + Duration::from_millis(100));
358

            
359
            assert!(sch.next().now_or_never().is_none());
360

            
361
            rt.sleep(Duration::from_millis(50)).await;
362

            
363
            assert!(sch.next().now_or_never().is_none());
364

            
365
            hdl.cancel();
366

            
367
            assert!(sch.next().now_or_never().is_none());
368

            
369
            rt.sleep(Duration::from_millis(100)).await;
370

            
371
            assert!(sch.next().now_or_never().is_none());
372
        });
373
    }
374

            
375
    #[test]
376
    fn last_fire_wins() {
377
        test_with_all_runtimes!(|rt| async move {
378
            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
379
            assert!(sch.next().now_or_never().is_some());
380

            
381
            hdl.fire_at(Instant::now() + Duration::from_millis(100));
382
            hdl.fire();
383

            
384
            assert!(sch.next().now_or_never().is_some());
385
            assert!(sch.next().now_or_never().is_none());
386

            
387
            rt.sleep(Duration::from_millis(150)).await;
388

            
389
            assert!(sch.next().now_or_never().is_none());
390
        });
391
    }
392

            
393
    #[test]
394
    fn suspend_and_resume_with_fire() {
395
        test_with_all_runtimes!(|rt| async move {
396
            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
397
            hdl.fire();
398
            hdl.suspend();
399

            
400
            assert!(sch.next().now_or_never().is_none());
401
            hdl.resume();
402
            assert!(sch.next().now_or_never().is_some());
403
        });
404
    }
405

            
406
    #[test]
407
    fn suspend_and_resume_with_sleep() {
408
        test_with_all_runtimes!(|rt| async move {
409
            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
410
            sch.fire_in(Duration::from_millis(100));
411
            hdl.suspend();
412

            
413
            assert!(sch.next().now_or_never().is_none());
414
            hdl.resume();
415
            assert!(sch.next().now_or_never().is_none());
416
            assert!(sch.next().await.is_some());
417
        });
418
    }
419

            
420
    #[test]
421
    fn suspend_and_resume_with_nothing() {
422
        test_with_all_runtimes!(|rt| async move {
423
            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
424
            assert!(sch.next().now_or_never().is_some());
425
            hdl.suspend();
426

            
427
            assert!(sch.next().now_or_never().is_none());
428
            hdl.resume();
429
        });
430
    }
431
}