1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
//! Utilities for dealing with periodic recurring tasks.

use crate::SleepProvider;
use futures::channel::mpsc;
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::{Stream, StreamExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant, SystemTime};

use pin_project::pin_project;

/// An error returned while telling a [`TaskSchedule`] to sleep.
///
/// Unlike regular "sleep" functions, the sleep operations on a [`TaskSchedule`]
/// can fail because there are no [`TaskHandle`]s left.
///
/// Note that it is *not* an error if the `sleep` function is interrupted,
/// cancelled, or  or rescheduled for a later time: See [`TaskSchedule::sleep`]
/// for more information.
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum SleepError {
    /// The final [`TaskHandle`] for this [`TaskSchedule`] has been dropped: the
    /// task should exit.
    #[error("All task handles dropped: task exiting.")]
    ScheduleDropped,
}

/// A command sent from task handles to schedule objects.
#[derive(Copy, Clone)]
enum SchedulerCommand {
    /// Run the task now.
    Fire,
    /// Run the task at the provided `Instant`.
    FireAt(Instant),
    /// Cancel a pending execution, if there is one.
    Cancel,
    /// Pause execution without cancelling any running timers.  (Those timers
    /// will fire after we resume execution.)
    Suspend,
    /// Resume execution.  If there is a pending timer, start waiting for it again;
    /// otherwise, fire immediately.
    Resume,
}

/// A remotely-controllable trigger for recurring tasks.
///
/// This implements [`Stream`], and is intended to be used in a `while` loop; you should
/// wrap your recurring task in a `while schedule.next().await.is_some()` or similar.
#[pin_project(project = TaskScheduleP)]
pub struct TaskSchedule<R: SleepProvider> {
    /// If we're waiting for a deadline to expire, the future for that.
    sleep: Option<Pin<Box<R::SleepFuture>>>,
    /// Receiver of scheduler commands from handles.
    rx: UnboundedReceiver<SchedulerCommand>,
    /// Runtime.
    rt: R,
    /// Whether or not to yield a result immediately when polled, once.
    ///
    /// This is used to avoid having to create a `SleepFuture` with zero duration,
    /// which is potentially a bit wasteful.
    instant_fire: bool,
    /// Whether we are currently "suspended".  If we are suspended, we won't
    /// start executing again till we're explicitly "resumed".
    suspended: bool,
}

/// A handle used to control a [`TaskSchedule`].
///
/// When the final handle is dropped, the computation governed by the
/// `TaskSchedule` should terminate.
#[derive(Clone)]
pub struct TaskHandle {
    /// Sender of scheduler commands to the corresponding schedule.
    tx: UnboundedSender<SchedulerCommand>,
}

impl<R: SleepProvider> TaskSchedule<R> {
    /// Create a new schedule, and corresponding handle.
    pub fn new(rt: R) -> (Self, TaskHandle) {
        let (tx, rx) = mpsc::unbounded();
        (
            Self {
                sleep: None,
                rx,
                rt,
                // Start off ready.
                instant_fire: true,
                suspended: false,
            },
            TaskHandle { tx },
        )
    }

    /// Trigger the schedule after `dur`.
    pub fn fire_in(&mut self, dur: Duration) {
        self.instant_fire = false;
        self.sleep = Some(Box::pin(self.rt.sleep(dur)));
    }

    /// Trigger the schedule instantly.
    pub fn fire(&mut self) {
        self.instant_fire = true;
        self.sleep = None;
    }

    /// Wait until `Dur` has elapsed.
    ///
    /// This call is equivalent to [`SleepProvider::sleep`], except that the
    /// resulting future will respect calls to the functions on this schedule's
    /// associated [`TaskHandle`].
    ///
    /// Alternatively, you can view this function as equivalent to
    /// `self.fire_in(dur); self.next().await;`, only  with the intent made more
    /// explicit.
    ///
    /// If the associated [`TaskHandle`] for this schedule is suspended, then
    /// this method will not return until the schedule is unsuspended _and_ the
    /// timer elapses.  If the associated [`TaskHandle`] is cancelled, then this
    /// method will not return at all, until the schedule is re-activated by
    /// [`TaskHandle::fire`] or [`TaskHandle::fire_at`].
    ///
    /// Finally, if every associated [`TaskHandle`] has been dropped, then this
    /// method will return an error.
    pub async fn sleep(&mut self, dur: Duration) -> Result<(), SleepError> {
        self.fire_in(dur);
        self.next().await.ok_or(SleepError::ScheduleDropped)
    }

    /// As
    /// [`sleep_until_wallclock`](crate::SleepProviderExt::sleep_until_wallclock),
    /// but respect messages from this schedule's associated [`TaskHandle`].
    pub async fn sleep_until_wallclock(&mut self, when: SystemTime) -> Result<(), SleepError> {
        loop {
            let (finished, delay) = crate::timer::calc_next_delay(self.rt.wallclock(), when);
            self.sleep(delay).await?;
            if finished {
                return Ok(());
            }
        }
    }
}

impl TaskHandle {
    /// Trigger this handle's corresponding schedule now.
    ///
    /// Returns `true` if the schedule still exists, and `false` otherwise.
    pub fn fire(&self) -> bool {
        self.tx.unbounded_send(SchedulerCommand::Fire).is_ok()
    }
    /// Trigger this handle's corresponding schedule at `instant`.
    ///
    /// Returns `true` if the schedule still exists, and `false` otherwise.
    pub fn fire_at(&self, instant: Instant) -> bool {
        self.tx
            .unbounded_send(SchedulerCommand::FireAt(instant))
            .is_ok()
    }
    /// Cancel a pending firing of the handle's corresponding schedule.
    ///
    /// Returns `true` if the schedule still exists, and `false` otherwise.
    pub fn cancel(&self) -> bool {
        self.tx.unbounded_send(SchedulerCommand::Cancel).is_ok()
    }

    /// Suspend execution of the corresponding schedule.
    ///
    /// If the schedule is ready now, it will become pending; it won't become
    /// ready again until `resume()` is called. If the schedule is waiting for a
    /// timer, the timer will keep counting, but the schedule won't become ready
    /// until the timer has elapsed _and_ `resume()` has been called.
    ///
    /// Returns `true` if the schedule still exists, and `false` otherwise.
    pub fn suspend(&self) -> bool {
        self.tx.unbounded_send(SchedulerCommand::Suspend).is_ok()
    }

    /// Resume execution of the corresponding schedule.
    ///
    /// This method undoes the effect of a call to `suspend()`: the schedule
    /// will fire again if it is ready (or when it becomes ready).
    ///
    /// This method won't cause the schedule to fire if it was already
    /// cancelled. For that, use the `fire()` or fire_at()` methods.
    ///
    /// Returns `true` if the schedule still exists, and `false` otherwise.
    pub fn resume(&self) -> bool {
        self.tx.unbounded_send(SchedulerCommand::Resume).is_ok()
    }
}

// NOTE(eta): implemented on the *pin projection*, not the original type, because we don't want
//            to require `R: Unpin`. Accordingly, all the fields are mutable references.
impl<R: SleepProvider> TaskScheduleP<'_, R> {
    /// Handle an internal command.
    fn handle_command(&mut self, cmd: SchedulerCommand) {
        match cmd {
            SchedulerCommand::Fire => {
                *self.instant_fire = true;
                *self.sleep = None;
            }
            SchedulerCommand::FireAt(instant) => {
                let now = self.rt.now();
                let dur = instant.saturating_duration_since(now);
                *self.instant_fire = false;
                *self.sleep = Some(Box::pin(self.rt.sleep(dur)));
            }
            SchedulerCommand::Cancel => {
                *self.instant_fire = false;
                *self.sleep = None;
            }
            SchedulerCommand::Suspend => {
                *self.suspended = true;
            }
            SchedulerCommand::Resume => {
                *self.suspended = false;
            }
        }
    }
}

impl<R: SleepProvider> Stream for TaskSchedule<R> {
    type Item = ();

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();
        while let Poll::Ready(maybe_cmd) = this.rx.poll_next_unpin(cx) {
            match maybe_cmd {
                Some(c) => this.handle_command(c),
                None => {
                    // All task handles dropped; return end of stream.
                    return Poll::Ready(None);
                }
            }
        }
        if *this.suspended {
            return Poll::Pending;
        }
        if *this.instant_fire {
            *this.instant_fire = false;
            return Poll::Ready(Some(()));
        }
        if this
            .sleep
            .as_mut()
            .map(|x| x.as_mut().poll(cx).is_ready())
            .unwrap_or(false)
        {
            *this.sleep = None;
            return Poll::Ready(Some(()));
        }
        Poll::Pending
    }
}

// test_with_all_runtimes! only exists if these features are satisfied.
#[cfg(all(
    test,
    any(feature = "native-tls", feature = "rustls"),
    any(feature = "tokio", feature = "async-std"),
))]
mod test {
    use crate::scheduler::TaskSchedule;
    use crate::{test_with_all_runtimes, SleepProvider};
    use futures::FutureExt;
    use futures::StreamExt;
    use std::time::{Duration, Instant};

    #[test]
    fn it_fires_immediately() {
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, _hdl) = TaskSchedule::new(rt);
            assert!(sch.next().now_or_never().is_some());
        });
    }

    #[test]
    #[allow(clippy::unwrap_used)]
    fn it_dies_if_dropped() {
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, hdl) = TaskSchedule::new(rt);
            drop(hdl);
            assert!(sch.next().now_or_never().unwrap().is_none());
        });
    }

    #[test]
    fn it_fires_on_demand() {
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, hdl) = TaskSchedule::new(rt);
            assert!(sch.next().now_or_never().is_some());

            assert!(sch.next().now_or_never().is_none());
            assert!(hdl.fire());
            assert!(sch.next().now_or_never().is_some());
            assert!(sch.next().now_or_never().is_none());
        });
    }

    #[test]
    fn it_cancels_instant_firings() {
        // NOTE(eta): this test very much assumes that unbounded channels will
        //            transmit things instantly. If it breaks, that's probably why.
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, hdl) = TaskSchedule::new(rt);
            assert!(sch.next().now_or_never().is_some());

            assert!(sch.next().now_or_never().is_none());
            assert!(hdl.fire());
            assert!(hdl.cancel());
            assert!(sch.next().now_or_never().is_none());
        });
    }

    #[test]
    fn it_fires_after_self_reschedule() {
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, _hdl) = TaskSchedule::new(rt);
            assert!(sch.next().now_or_never().is_some());

            sch.fire_in(Duration::from_millis(100));

            assert!(sch.next().now_or_never().is_none());
            assert!(sch.next().await.is_some());
            assert!(sch.next().now_or_never().is_none());
        });
    }

    #[test]
    fn it_fires_after_external_reschedule() {
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, hdl) = TaskSchedule::new(rt);
            assert!(sch.next().now_or_never().is_some());

            hdl.fire_at(Instant::now() + Duration::from_millis(100));

            assert!(sch.next().now_or_never().is_none());
            assert!(sch.next().await.is_some());
            assert!(sch.next().now_or_never().is_none());
        });
    }

    // This test is disabled because it was flaky when the CI servers were
    // heavily loaded. (See #545.)
    //
    // TODO: Let's fix this test and make it more reliable, then re-enable it.
    #[test]
    #[ignore]
    fn it_cancels_delayed_firings() {
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
            assert!(sch.next().now_or_never().is_some());

            hdl.fire_at(Instant::now() + Duration::from_millis(100));

            assert!(sch.next().now_or_never().is_none());

            rt.sleep(Duration::from_millis(50)).await;

            assert!(sch.next().now_or_never().is_none());

            hdl.cancel();

            assert!(sch.next().now_or_never().is_none());

            rt.sleep(Duration::from_millis(100)).await;

            assert!(sch.next().now_or_never().is_none());
        });
    }

    #[test]
    fn last_fire_wins() {
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
            assert!(sch.next().now_or_never().is_some());

            hdl.fire_at(Instant::now() + Duration::from_millis(100));
            hdl.fire();

            assert!(sch.next().now_or_never().is_some());
            assert!(sch.next().now_or_never().is_none());

            rt.sleep(Duration::from_millis(150)).await;

            assert!(sch.next().now_or_never().is_none());
        });
    }

    #[test]
    fn suspend_and_resume_with_fire() {
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
            hdl.fire();
            hdl.suspend();

            assert!(sch.next().now_or_never().is_none());
            hdl.resume();
            assert!(sch.next().now_or_never().is_some());
        });
    }

    #[test]
    fn suspend_and_resume_with_sleep() {
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
            sch.fire_in(Duration::from_millis(100));
            hdl.suspend();

            assert!(sch.next().now_or_never().is_none());
            hdl.resume();
            assert!(sch.next().now_or_never().is_none());
            assert!(sch.next().await.is_some());
        });
    }

    #[test]
    fn suspend_and_resume_with_nothing() {
        test_with_all_runtimes!(|rt| async move {
            let (mut sch, hdl) = TaskSchedule::new(rt.clone());
            assert!(sch.next().now_or_never().is_some());
            hdl.suspend();

            assert!(sch.next().now_or_never().is_none());
            hdl.resume();
        });
    }
}