tor_log_ratelim/ratelim.rs
1//! Implement half of log rate-limiting: the ability to cause the state of a
2//! Loggable to get flushed at appropriate intervals.
3
4use super::{Activity, Loggable};
5use futures::task::SpawnExt as _;
6use std::{
7 sync::{Arc, Mutex},
8 time::Duration,
9};
10use tor_error::ErrorReport;
11
12/// Declare a dyn-safe trait for the parts of an asynchronous runtime so that we
13/// can install it globally.
14pub(crate) mod rt {
15 use futures::{future::BoxFuture, task::Spawn};
16 use std::sync::OnceLock;
17 use std::time::{Duration, Instant};
18
19 /// A dyn-safe view of the parts of an async runtime that we need for rate-limiting.
20 pub trait RuntimeSupport: Spawn + 'static + Sync + Send {
21 /// Return a future that will yield () after `duration` has passed.
22 fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()>;
23
24 /// Return the current time as an Instant.
25 fn now(&self) -> Instant;
26 }
27
28 impl<R: tor_rtcompat::Runtime> RuntimeSupport for R {
29 fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()> {
30 Box::pin(tor_rtcompat::SleepProvider::sleep(self, duration))
31 }
32 fn now(&self) -> Instant {
33 tor_rtcompat::SleepProvider::now(self)
34 }
35 }
36
37 /// A global view of our runtime, used for rate-limited logging.
38 static RUNTIME_SUPPORT: OnceLock<Box<dyn RuntimeSupport>> = OnceLock::new();
39
40 /// Try to install `runtime` as a global runtime to be used for rate-limited logging.
41 ///
42 /// Return an error (and make no changes) if there there was already a runtime installed.
43 pub fn install_runtime<R: tor_rtcompat::Runtime>(
44 runtime: R,
45 ) -> Result<(), InstallRuntimeError> {
46 let rt = Box::new(runtime);
47 RUNTIME_SUPPORT
48 .set(rt)
49 .map_err(|_| InstallRuntimeError::DuplicateCall)
50 }
51
52 /// An error that occurs while installing a runtime.
53 #[derive(Clone, Debug, thiserror::Error)]
54 #[non_exhaustive]
55 pub enum InstallRuntimeError {
56 /// Tried to install a runtime when there was already one installed.
57 #[error("Called tor_log_ratelim::install_runtime() more than once")]
58 DuplicateCall,
59 }
60
61 /// Return the installed runtime, if there is one.
62 pub fn rt_support() -> Option<&'static dyn RuntimeSupport> {
63 RUNTIME_SUPPORT.get().map(Box::as_ref)
64 }
65}
66
67/// A rate-limited wrapper around a [`Loggable`]` that ensures its events are
68/// flushed from time to time.
69pub struct RateLim<T> {
70 /// The Loggable itself.
71 inner: Mutex<Inner<T>>,
72}
73
74/// The mutable state of a [`RateLim`].
75struct Inner<T> {
76 /// The loggable state whose reports are rate-limited
77 loggable: T,
78 /// True if we have a running task that is collating reports for `loggable`.
79 task_running: bool,
80}
81
82impl<T: Loggable> RateLim<T> {
83 /// Create a new `RateLim` to flush events for `loggable`.
84 pub fn new(loggable: T) -> Arc<Self> {
85 Arc::new(RateLim {
86 inner: Mutex::new(Inner {
87 loggable,
88 task_running: false,
89 }),
90 })
91 }
92
93 /// Adjust the status of this reporter's `Loggable` by calling `f` on it,
94 /// but only if it is already scheduled to report itself. Otherwise, do nothing.
95 ///
96 /// This is the appropriate function to use for tracking successes.f
97 pub fn nonevent<F>(&self, f: F)
98 where
99 F: FnOnce(&mut T),
100 {
101 let mut inner = self.inner.lock().expect("lock poisoned");
102 if inner.task_running {
103 f(&mut inner.loggable);
104 }
105 }
106
107 /// Add an event to this rate-limited reporter by calling `f` on it, and
108 /// schedule it to be reported after an appropriate time.
109 ///
110 /// NOTE: This API is a big ugly. If we ever decide to make it non-hidden,
111 /// we may want to make it call rt_support() directly again, as it did in
112 /// earlier visions.
113 pub fn event<F>(self: &Arc<Self>, rt: &'static dyn rt::RuntimeSupport, f: F)
114 where
115 F: FnOnce(&mut T),
116 {
117 let mut inner = self.inner.lock().expect("poisoned lock");
118 f(&mut inner.loggable);
119
120 if !inner.task_running {
121 // Launch a task to make periodic reports on the state of our Loggable.
122 inner.task_running = true;
123 if let Err(e) = rt.spawn(Box::pin(run(rt, Arc::clone(self)))) {
124 // We couldn't spawn a task; we have to flush the state
125 // immediately.
126 //
127 // TODO: This behavior is undesirable if it causes us to spam
128 // the log while we are shutting down. On the other hand, it's
129 // also undesirable if we suppress our logs while we're
130 // shutting down.
131 inner.loggable.flush(Duration::default());
132 tracing::warn!("Also, unable to spawn a logging task: {}", e.report());
133 }
134 }
135 }
136}
137
138/// After approximately this many seconds of not having anything to report, we
139/// should reset our timeout schedule.
140const RESET_AFTER_DORMANT_FOR: Duration = Duration::new(4 * 60 * 60, 0);
141
142/// Return an iterator of reasonable amounts of time to summarize.
143///
144/// We summarize short intervals at first, and back off as the event keeps
145/// happening.
146fn timeout_sequence() -> impl Iterator<Item = Duration> {
147 /// seconds per second.
148 const SEC: u64 = 1;
149 /// seconds per minute.
150 const MIN: u64 = 60;
151 /// seconds per hour
152 const HOUR: u64 = 3600;
153 [
154 5 * SEC,
155 MIN,
156 5 * MIN,
157 30 * MIN,
158 30 * MIN,
159 HOUR,
160 HOUR,
161 4 * HOUR,
162 4 * HOUR,
163 ]
164 .into_iter()
165 .chain(std::iter::repeat(24 * HOUR))
166 .map(|secs| Duration::new(secs, 0))
167}
168
169/// Helper: runs in a background task, and periodically flushes the `Loggable`
170/// in `ratelim`. Exits after [`Loggable::flush`] returns [`Activity::Dormant`]
171/// for "long enough".
172async fn run<T>(rt_support: &dyn rt::RuntimeSupport, ratelim: Arc<RateLim<T>>)
173// TODO : Perhaps instead of taking an Arc<RateLim<T>> we want sometimes to take
174// a `&'static RateLim<T>``, so we don't need to mess about with `Arc`s needlessly.
175where
176 T: Loggable,
177{
178 let mut dormant_since = None;
179 for duration in timeout_sequence() {
180 rt_support.sleep(duration).await;
181 {
182 let mut inner = ratelim.inner.lock().expect("Lock poisoned");
183 debug_assert!(inner.task_running);
184 // NOTE: We say that we are summarizing "duration" on the theory
185 // that we actually slept for "duration". But maybe `sleep` slept
186 // for a little more or less? Nonetheless, we report this as if we
187 // had slept for the exact amount, since the alternative appears to
188 // be saying stuff like "this problem occurred 8/12 times in the
189 // last 10min 0.0014ssec" instead of "10m".
190 if inner.loggable.flush(duration) == Activity::Dormant {
191 // TODO: This can tell the user several times that the problem
192 // did not occur! Perhaps we only want to flush once on dormant,
193 // and then not report the dormant condition again until we are
194 // no longer tracking it. Or perhaps we should lower the
195 // responsibility for deciding when to log and when to uninstall
196 // to the Loggable?
197 match dormant_since {
198 Some(when) => {
199 if let Some(dormant_for) = rt_support.now().checked_duration_since(when) {
200 if dormant_for >= RESET_AFTER_DORMANT_FOR {
201 inner.task_running = false;
202 return;
203 }
204 }
205 }
206 None => {
207 dormant_since = Some(rt_support.now());
208 }
209 }
210 } else {
211 dormant_since = None;
212 }
213 }
214 }
215
216 unreachable!("timeout_sequence returned a finite sequence");
217}
218
219// TODO : Write some tests.