tor_log_ratelim/
ratelim.rs

1//! Implement half of log rate-limiting: the ability to cause the state of a
2//! Loggable to get flushed at appropriate intervals.
3
4use super::{Activity, Loggable};
5use futures::task::SpawnExt as _;
6use std::{
7    sync::{Arc, Mutex},
8    time::Duration,
9};
10use tor_error::ErrorReport;
11
12/// Declare a dyn-safe trait for the parts of an asynchronous runtime so that we
13/// can install it globally.
14pub(crate) mod rt {
15    use futures::{future::BoxFuture, task::Spawn};
16    use std::sync::OnceLock;
17    use std::time::{Duration, Instant};
18
19    /// A dyn-safe view of the parts of an async runtime that we need for rate-limiting.
20    pub trait RuntimeSupport: Spawn + 'static + Sync + Send {
21        /// Return a future that will yield () after `duration` has passed.
22        fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()>;
23
24        /// Return the current time as an Instant.
25        fn now(&self) -> Instant;
26    }
27
28    impl<R: tor_rtcompat::Runtime> RuntimeSupport for R {
29        fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()> {
30            Box::pin(tor_rtcompat::SleepProvider::sleep(self, duration))
31        }
32        fn now(&self) -> Instant {
33            tor_rtcompat::SleepProvider::now(self)
34        }
35    }
36
37    /// A global view of our runtime, used for rate-limited logging.
38    static RUNTIME_SUPPORT: OnceLock<Box<dyn RuntimeSupport>> = OnceLock::new();
39
40    /// Try to install `runtime` as a global runtime to be used for rate-limited logging.
41    ///
42    /// Return an error (and make no changes) if there there was already a runtime installed.
43    pub fn install_runtime<R: tor_rtcompat::Runtime>(
44        runtime: R,
45    ) -> Result<(), InstallRuntimeError> {
46        let rt = Box::new(runtime);
47        RUNTIME_SUPPORT
48            .set(rt)
49            .map_err(|_| InstallRuntimeError::DuplicateCall)
50    }
51
52    /// An error that occurs while installing a runtime.
53    #[derive(Clone, Debug, thiserror::Error)]
54    #[non_exhaustive]
55    pub enum InstallRuntimeError {
56        /// Tried to install a runtime when there was already one installed.
57        #[error("Called tor_log_ratelim::install_runtime() more than once")]
58        DuplicateCall,
59    }
60
61    /// Return the installed runtime, if there is one.
62    pub fn rt_support() -> Option<&'static dyn RuntimeSupport> {
63        RUNTIME_SUPPORT.get().map(Box::as_ref)
64    }
65}
66
67/// A rate-limited wrapper around a [`Loggable`]` that ensures its events are
68/// flushed from time to time.
69pub struct RateLim<T> {
70    /// The Loggable itself.
71    inner: Mutex<Inner<T>>,
72}
73
74/// The mutable state of a [`RateLim`].
75struct Inner<T> {
76    /// The loggable state whose reports are rate-limited
77    loggable: T,
78    /// True if we have a running task that is collating reports for `loggable`.
79    task_running: bool,
80}
81
82impl<T: Loggable> RateLim<T> {
83    /// Create a new `RateLim` to flush events for `loggable`.
84    pub fn new(loggable: T) -> Arc<Self> {
85        Arc::new(RateLim {
86            inner: Mutex::new(Inner {
87                loggable,
88                task_running: false,
89            }),
90        })
91    }
92
93    /// Adjust the status of this reporter's `Loggable` by calling `f` on it,
94    /// but only if it is already scheduled to report itself.  Otherwise, do nothing.
95    ///
96    /// This is the appropriate function to use for tracking successes.f
97    pub fn nonevent<F>(&self, f: F)
98    where
99        F: FnOnce(&mut T),
100    {
101        let mut inner = self.inner.lock().expect("lock poisoned");
102        if inner.task_running {
103            f(&mut inner.loggable);
104        }
105    }
106
107    /// Add an event to this rate-limited reporter by calling `f` on it, and
108    /// schedule it to be reported after an appropriate time.
109    ///
110    /// NOTE: This API is a big ugly. If we ever decide to make it non-hidden,
111    /// we may want to make it call rt_support() directly again, as it did in
112    /// earlier visions.
113    pub fn event<F>(self: &Arc<Self>, rt: &'static dyn rt::RuntimeSupport, f: F)
114    where
115        F: FnOnce(&mut T),
116    {
117        let mut inner = self.inner.lock().expect("poisoned lock");
118        f(&mut inner.loggable);
119
120        if !inner.task_running {
121            // Launch a task to make periodic reports on the state of our Loggable.
122            inner.task_running = true;
123            if let Err(e) = rt.spawn(Box::pin(run(rt, Arc::clone(self)))) {
124                // We couldn't spawn a task; we have to flush the state
125                // immediately.
126                //
127                // TODO: This behavior is undesirable if it causes us to spam
128                // the log while we are shutting down.  On the other hand, it's
129                // also undesirable if we suppress our logs while we're
130                // shutting down.
131                inner.loggable.flush(Duration::default());
132                tracing::warn!("Also, unable to spawn a logging task: {}", e.report());
133            }
134        }
135    }
136}
137
138/// After approximately this many seconds of not having anything to report, we
139/// should reset our timeout schedule.
140const RESET_AFTER_DORMANT_FOR: Duration = Duration::new(4 * 60 * 60, 0);
141
142/// Return an iterator of reasonable amounts of time to summarize.
143///
144/// We summarize short intervals at first, and back off as the event keeps
145/// happening.
146fn timeout_sequence() -> impl Iterator<Item = Duration> {
147    /// seconds per second.
148    const SEC: u64 = 1;
149    /// seconds per minute.
150    const MIN: u64 = 60;
151    /// seconds per hour
152    const HOUR: u64 = 3600;
153    [
154        5 * SEC,
155        MIN,
156        5 * MIN,
157        30 * MIN,
158        30 * MIN,
159        HOUR,
160        HOUR,
161        4 * HOUR,
162        4 * HOUR,
163    ]
164    .into_iter()
165    .chain(std::iter::repeat(24 * HOUR))
166    .map(|secs| Duration::new(secs, 0))
167}
168
169/// Helper: runs in a background task, and periodically flushes the `Loggable`
170/// in `ratelim`.  Exits after [`Loggable::flush`] returns [`Activity::Dormant`]
171/// for "long enough".
172async fn run<T>(rt_support: &dyn rt::RuntimeSupport, ratelim: Arc<RateLim<T>>)
173// TODO : Perhaps instead of taking an Arc<RateLim<T>> we want sometimes to take
174// a `&'static RateLim<T>``, so we don't need to mess about with `Arc`s needlessly.
175where
176    T: Loggable,
177{
178    let mut dormant_since = None;
179    for duration in timeout_sequence() {
180        rt_support.sleep(duration).await;
181        {
182            let mut inner = ratelim.inner.lock().expect("Lock poisoned");
183            debug_assert!(inner.task_running);
184            // NOTE: We say that we are summarizing "duration" on the theory
185            // that we actually slept for "duration".  But maybe `sleep` slept
186            // for a little more or less?  Nonetheless, we report this as if we
187            // had slept for the exact amount, since the alternative appears to
188            // be saying stuff like "this problem occurred 8/12 times in the
189            // last 10min 0.0014ssec" instead of "10m".
190            if inner.loggable.flush(duration) == Activity::Dormant {
191                // TODO: This can tell the user several times that the problem
192                // did not occur! Perhaps we only want to flush once on dormant,
193                // and then not report the dormant condition again until we are
194                // no longer tracking it.  Or perhaps we should lower the
195                // responsibility for deciding when to log and when to uninstall
196                // to the Loggable?
197                match dormant_since {
198                    Some(when) => {
199                        if let Some(dormant_for) = rt_support.now().checked_duration_since(when) {
200                            if dormant_for >= RESET_AFTER_DORMANT_FOR {
201                                inner.task_running = false;
202                                return;
203                            }
204                        }
205                    }
206                    None => {
207                        dormant_since = Some(rt_support.now());
208                    }
209                }
210            } else {
211                dormant_since = None;
212            }
213        }
214    }
215
216    unreachable!("timeout_sequence returned a finite sequence");
217}
218
219// TODO : Write some tests.