1
//! Implement half of log rate-limiting: the ability to cause the state of a
2
//! Loggable to get flushed at appropriate intervals.
3

            
4
use super::{Activity, Loggable};
5
use futures::task::SpawnExt as _;
6
use std::{
7
    sync::{Arc, Mutex},
8
    time::Duration,
9
};
10
use tor_error::ErrorReport;
11

            
12
/// Declare a dyn-safe trait for the parts of an asynchronous runtime so that we
13
/// can install it globally.
14
pub(crate) mod rt {
15
    use futures::{future::BoxFuture, task::Spawn};
16
    use once_cell::sync::OnceCell;
17
    use std::time::{Duration, Instant};
18

            
19
    /// A dyn-safe view of the parts of an async runtime that we need for rate-limiting.
20
    pub trait RuntimeSupport: Spawn + 'static + Sync + Send {
21
        /// Return a future that will yield () after `duration` has passed.
22
        fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()>;
23

            
24
        /// Return the current time as an Instant.
25
        fn now(&self) -> Instant;
26
    }
27

            
28
    impl<R: tor_rtcompat::Runtime> RuntimeSupport for R {
29
        fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()> {
30
            Box::pin(tor_rtcompat::SleepProvider::sleep(self, duration))
31
        }
32
        fn now(&self) -> Instant {
33
            tor_rtcompat::SleepProvider::now(self)
34
        }
35
    }
36

            
37
    /// A global view of our runtime, used for rate-limited logging.
38
    // TODO MSRV 1.70: We could use OnceSync instead.
39
    static RUNTIME_SUPPORT: OnceCell<Box<dyn RuntimeSupport>> = OnceCell::new();
40

            
41
    /// Try to install `runtime` as a global runtime to be used for rate-limited logging.
42
    ///
43
    /// Return an error (and make no changes) if there there was already a runtime installed.
44
    pub fn install_runtime<R: tor_rtcompat::Runtime>(
45
        runtime: R,
46
    ) -> Result<(), InstallRuntimeError> {
47
        let rt = Box::new(runtime);
48
        RUNTIME_SUPPORT
49
            .set(rt)
50
            .map_err(|_| InstallRuntimeError::DuplicateCall)
51
    }
52

            
53
    /// An error that occurs while installing a runtime.
54
    #[derive(Clone, Debug, thiserror::Error)]
55
    #[non_exhaustive]
56
    pub enum InstallRuntimeError {
57
        /// Tried to install a runtime when there was already one installed.
58
        #[error("Called tor_log_ratelim::install_runtime() more than once")]
59
        DuplicateCall,
60
    }
61

            
62
    /// Return the installed runtime, if there is one.
63
    pub fn rt_support() -> Option<&'static dyn RuntimeSupport> {
64
        RUNTIME_SUPPORT.get().map(Box::as_ref)
65
    }
66
}
67

            
68
/// A rate-limited wrapper around a [`Loggable`]` that ensures its events are
69
/// flushed from time to time.
70
pub struct RateLim<T> {
71
    /// The Loggable itself.
72
    inner: Mutex<Inner<T>>,
73
}
74

            
75
/// The mutable state of a [`RateLim`].
76
struct Inner<T> {
77
    /// The loggable state whose reports are rate-limited
78
    loggable: T,
79
    /// True if we have a running task that is collating reports for `loggable`.
80
    task_running: bool,
81
}
82

            
83
impl<T: Loggable> RateLim<T> {
84
    /// Create a new `RateLim` to flush events for `loggable`.
85
    pub fn new(loggable: T) -> Arc<Self> {
86
        Arc::new(RateLim {
87
            inner: Mutex::new(Inner {
88
                loggable,
89
                task_running: false,
90
            }),
91
        })
92
    }
93

            
94
    /// Adjust the status of this reporter's `Loggable` by calling `f` on it,
95
    /// but only if it is already scheduled to report itself.  Otherwise, do nothing.
96
    ///
97
    /// This is the appropriate function to use for tracking successes.f
98
    pub fn nonevent<F>(&self, f: F)
99
    where
100
        F: FnOnce(&mut T),
101
    {
102
        let mut inner = self.inner.lock().expect("lock poisoned");
103
        if inner.task_running {
104
            f(&mut inner.loggable);
105
        }
106
    }
107

            
108
    /// Add an event to this rate-limited reporter by calling `f` on it, and
109
    /// schedule it to be reported after an appropriate time.
110
    ///
111
    /// NOTE: This API is a big ugly. If we ever decide to make it non-hidden,
112
    /// we may want to make it call rt_support() directly again, as it did in
113
    /// earlier visions.
114
    pub fn event<F>(self: &Arc<Self>, rt: &'static dyn rt::RuntimeSupport, f: F)
115
    where
116
        F: FnOnce(&mut T),
117
    {
118
        let mut inner = self.inner.lock().expect("poisoned lock");
119
        f(&mut inner.loggable);
120

            
121
        if !inner.task_running {
122
            // Launch a task to make periodic reports on the state of our Loggable.
123
            inner.task_running = true;
124
            if let Err(e) = rt.spawn(Box::pin(run(rt, Arc::clone(self)))) {
125
                // We couldn't spawn a task; we have to flush the state
126
                // immediately.
127
                //
128
                // TODO: This behavior is undesirable if it causes us to spam
129
                // the log while we are shutting down.  On the other hand, it's
130
                // also undesirable if we suppress our logs while we're
131
                // shutting down.
132
                inner.loggable.flush(Duration::default());
133
                tracing::warn!("Also, unable to spawn a logging task: {}", e.report());
134
            }
135
        }
136
    }
137
}
138

            
139
/// After approximately this many seconds of not having anything to report, we
140
/// should reset our timeout schedule.
141
const RESET_AFTER_DORMANT_FOR: Duration = Duration::new(4 * 60 * 60, 0);
142

            
143
/// Return an iterator of reasonable amounts of time to summarize.
144
///
145
/// We summarize short intervals at first, and back off as the event keeps
146
/// happening.
147
fn timeout_sequence() -> impl Iterator<Item = Duration> {
148
    /// seconds per second.
149
    const SEC: u64 = 1;
150
    /// seconds per minute.
151
    const MIN: u64 = 60;
152
    /// seconds per hour
153
    const HOUR: u64 = 3600;
154
    [
155
        5 * SEC,
156
        MIN,
157
        5 * MIN,
158
        30 * MIN,
159
        30 * MIN,
160
        HOUR,
161
        HOUR,
162
        4 * HOUR,
163
        4 * HOUR,
164
    ]
165
    .into_iter()
166
    .chain(std::iter::repeat(24 * HOUR))
167
    .map(|secs| Duration::new(secs, 0))
168
}
169

            
170
/// Helper: runs in a background task, and periodically flushes the `Loggable`
171
/// in `ratelim`.  Exits after [`Loggable::flush`] returns [`Activity::Dormant`]
172
/// for "long enough".
173
async fn run<T>(rt_support: &dyn rt::RuntimeSupport, ratelim: Arc<RateLim<T>>)
174
// TODO : Perhaps instead of taking an Arc<RateLim<T>> we want sometimes to take
175
// a `&'static RateLim<T>``, so we don't need to mess about with `Arc`s needlessly.
176
where
177
    T: Loggable,
178
{
179
    let mut dormant_since = None;
180
    for duration in timeout_sequence() {
181
        rt_support.sleep(duration).await;
182
        {
183
            let mut inner = ratelim.inner.lock().expect("Lock poisoned");
184
            debug_assert!(inner.task_running);
185
            // NOTE: We say that we are summarizing "duration" on the theory
186
            // that we actually slept for "duration".  But maybe `sleep` slept
187
            // for a little more or less?  Nonetheless, we report this as if we
188
            // had slept for the exact amount, since the alternative appears to
189
            // be saying stuff like "this problem occurred 8/12 times in the
190
            // last 10min 0.0014ssec" instead of "10m".
191
            if inner.loggable.flush(duration) == Activity::Dormant {
192
                // TODO: This can tell the user several times that the problem
193
                // did not occur! Perhaps we only want to flush once on dormant,
194
                // and then not report the dormant condition again until we are
195
                // no longer tracking it.  Or perhaps we should lower the
196
                // responsibility for deciding when to log and when to uninstall
197
                // to the Loggable?
198
                match dormant_since {
199
                    Some(when) => {
200
                        if let Some(dormant_for) = rt_support.now().checked_duration_since(when) {
201
                            if dormant_for >= RESET_AFTER_DORMANT_FOR {
202
                                inner.task_running = false;
203
                                return;
204
                            }
205
                        }
206
                    }
207
                    None => {
208
                        dormant_since = Some(rt_support.now());
209
                    }
210
                }
211
            } else {
212
                dormant_since = None;
213
            }
214
        }
215
    }
216

            
217
    unreachable!("timeout_sequence returned a finite sequence");
218
}
219

            
220
// TODO : Write some tests.