1
//! Implement half of log rate-limiting: the ability to cause the state of a
2
//! Loggable to get flushed at appropriate intervals.
3

            
4
use super::{Activity, Loggable};
5
use futures::task::SpawnExt as _;
6
use std::{
7
    sync::{Arc, Mutex},
8
    time::Duration,
9
};
10
use tor_error::ErrorReport;
11

            
12
/// Declare a dyn-safe trait for the parts of an asynchronous runtime so that we
13
/// can install it globally.
14
pub(crate) mod rt {
15
    use futures::{future::BoxFuture, task::Spawn};
16
    use std::sync::OnceLock;
17
    use std::time::{Duration, Instant};
18

            
19
    /// A dyn-safe view of the parts of an async runtime that we need for rate-limiting.
20
    pub trait RuntimeSupport: Spawn + 'static + Sync + Send {
21
        /// Return a future that will yield () after `duration` has passed.
22
        fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()>;
23

            
24
        /// Return the current time as an Instant.
25
        fn now(&self) -> Instant;
26
    }
27

            
28
    impl<R: tor_rtcompat::Runtime> RuntimeSupport for R {
29
        fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()> {
30
            Box::pin(tor_rtcompat::SleepProvider::sleep(self, duration))
31
        }
32
        fn now(&self) -> Instant {
33
            tor_rtcompat::SleepProvider::now(self)
34
        }
35
    }
36

            
37
    /// A global view of our runtime, used for rate-limited logging.
38
    static RUNTIME_SUPPORT: OnceLock<Box<dyn RuntimeSupport>> = OnceLock::new();
39

            
40
    /// Try to install `runtime` as a global runtime to be used for rate-limited logging.
41
    ///
42
    /// Return an error (and make no changes) if there there was already a runtime installed.
43
    pub fn install_runtime<R: tor_rtcompat::Runtime>(
44
        runtime: R,
45
    ) -> Result<(), InstallRuntimeError> {
46
        let rt = Box::new(runtime);
47
        RUNTIME_SUPPORT
48
            .set(rt)
49
            .map_err(|_| InstallRuntimeError::DuplicateCall)
50
    }
51

            
52
    /// An error that occurs while installing a runtime.
53
    #[derive(Clone, Debug, thiserror::Error)]
54
    #[non_exhaustive]
55
    pub enum InstallRuntimeError {
56
        /// Tried to install a runtime when there was already one installed.
57
        #[error("Called tor_log_ratelim::install_runtime() more than once")]
58
        DuplicateCall,
59
    }
60

            
61
    /// Return the installed runtime, if there is one.
62
5390
    pub fn rt_support() -> Option<&'static dyn RuntimeSupport> {
63
5390
        RUNTIME_SUPPORT.get().map(Box::as_ref)
64
5390
    }
65
}
66

            
67
/// A rate-limited wrapper around a [`Loggable`]` that ensures its events are
68
/// flushed from time to time.
69
pub struct RateLim<T> {
70
    /// The Loggable itself.
71
    inner: Mutex<Inner<T>>,
72
}
73

            
74
/// The mutable state of a [`RateLim`].
75
struct Inner<T> {
76
    /// The loggable state whose reports are rate-limited
77
    loggable: T,
78
    /// True if we have a running task that is collating reports for `loggable`.
79
    task_running: bool,
80
}
81

            
82
impl<T: Loggable> RateLim<T> {
83
    /// Create a new `RateLim` to flush events for `loggable`.
84
    pub fn new(loggable: T) -> Arc<Self> {
85
        Arc::new(RateLim {
86
            inner: Mutex::new(Inner {
87
                loggable,
88
                task_running: false,
89
            }),
90
        })
91
    }
92

            
93
    /// Adjust the status of this reporter's `Loggable` by calling `f` on it,
94
    /// but only if it is already scheduled to report itself.  Otherwise, do nothing.
95
    ///
96
    /// This is the appropriate function to use for tracking successes.f
97
    pub fn nonevent<F>(&self, f: F)
98
    where
99
        F: FnOnce(&mut T),
100
    {
101
        let mut inner = self.inner.lock().expect("lock poisoned");
102
        if inner.task_running {
103
            f(&mut inner.loggable);
104
        }
105
    }
106

            
107
    /// Add an event to this rate-limited reporter by calling `f` on it, and
108
    /// schedule it to be reported after an appropriate time.
109
    ///
110
    /// NOTE: This API is a big ugly. If we ever decide to make it non-hidden,
111
    /// we may want to make it call rt_support() directly again, as it did in
112
    /// earlier visions.
113
    pub fn event<F>(self: &Arc<Self>, rt: &'static dyn rt::RuntimeSupport, f: F)
114
    where
115
        F: FnOnce(&mut T),
116
    {
117
        let mut inner = self.inner.lock().expect("poisoned lock");
118
        f(&mut inner.loggable);
119

            
120
        if !inner.task_running {
121
            // Launch a task to make periodic reports on the state of our Loggable.
122
            inner.task_running = true;
123
            if let Err(e) = rt.spawn(Box::pin(run(rt, Arc::clone(self)))) {
124
                // We couldn't spawn a task; we have to flush the state
125
                // immediately.
126
                //
127
                // TODO: This behavior is undesirable if it causes us to spam
128
                // the log while we are shutting down.  On the other hand, it's
129
                // also undesirable if we suppress our logs while we're
130
                // shutting down.
131
                inner.loggable.flush(Duration::default());
132
                tracing::warn!("Also, unable to spawn a logging task: {}", e.report());
133
            }
134
        }
135
    }
136
}
137

            
138
/// After approximately this many seconds of not having anything to report, we
139
/// should reset our timeout schedule.
140
const RESET_AFTER_DORMANT_FOR: Duration = Duration::new(4 * 60 * 60, 0);
141

            
142
/// Return an iterator of reasonable amounts of time to summarize.
143
///
144
/// We summarize short intervals at first, and back off as the event keeps
145
/// happening.
146
fn timeout_sequence() -> impl Iterator<Item = Duration> {
147
    /// seconds per second.
148
    const SEC: u64 = 1;
149
    /// seconds per minute.
150
    const MIN: u64 = 60;
151
    /// seconds per hour
152
    const HOUR: u64 = 3600;
153
    [
154
        5 * SEC,
155
        MIN,
156
        5 * MIN,
157
        30 * MIN,
158
        30 * MIN,
159
        HOUR,
160
        HOUR,
161
        4 * HOUR,
162
        4 * HOUR,
163
    ]
164
    .into_iter()
165
    .chain(std::iter::repeat(24 * HOUR))
166
    .map(|secs| Duration::new(secs, 0))
167
}
168

            
169
/// Helper: runs in a background task, and periodically flushes the `Loggable`
170
/// in `ratelim`.  Exits after [`Loggable::flush`] returns [`Activity::Dormant`]
171
/// for "long enough".
172
async fn run<T>(rt_support: &dyn rt::RuntimeSupport, ratelim: Arc<RateLim<T>>)
173
// TODO : Perhaps instead of taking an Arc<RateLim<T>> we want sometimes to take
174
// a `&'static RateLim<T>``, so we don't need to mess about with `Arc`s needlessly.
175
where
176
    T: Loggable,
177
{
178
    let mut dormant_since = None;
179
    for duration in timeout_sequence() {
180
        rt_support.sleep(duration).await;
181
        {
182
            let mut inner = ratelim.inner.lock().expect("Lock poisoned");
183
            debug_assert!(inner.task_running);
184
            // NOTE: We say that we are summarizing "duration" on the theory
185
            // that we actually slept for "duration".  But maybe `sleep` slept
186
            // for a little more or less?  Nonetheless, we report this as if we
187
            // had slept for the exact amount, since the alternative appears to
188
            // be saying stuff like "this problem occurred 8/12 times in the
189
            // last 10min 0.0014ssec" instead of "10m".
190
            if inner.loggable.flush(duration) == Activity::Dormant {
191
                // TODO: This can tell the user several times that the problem
192
                // did not occur! Perhaps we only want to flush once on dormant,
193
                // and then not report the dormant condition again until we are
194
                // no longer tracking it.  Or perhaps we should lower the
195
                // responsibility for deciding when to log and when to uninstall
196
                // to the Loggable?
197
                match dormant_since {
198
                    Some(when) => {
199
                        if let Some(dormant_for) = rt_support.now().checked_duration_since(when) {
200
                            if dormant_for >= RESET_AFTER_DORMANT_FOR {
201
                                inner.task_running = false;
202
                                return;
203
                            }
204
                        }
205
                    }
206
                    None => {
207
                        dormant_since = Some(rt_support.now());
208
                    }
209
                }
210
            } else {
211
                dormant_since = None;
212
            }
213
        }
214
    }
215

            
216
    unreachable!("timeout_sequence returned a finite sequence");
217
}
218

            
219
// TODO : Write some tests.