Lines
3.23 %
Functions
1.15 %
Branches
100 %
//! Implement half of log rate-limiting: the ability to cause the state of a
//! Loggable to get flushed at appropriate intervals.
use super::{Activity, Loggable};
use futures::task::SpawnExt as _;
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tor_error::ErrorReport;
/// Declare a dyn-safe trait for the parts of an asynchronous runtime so that we
/// can install it globally.
pub(crate) mod rt {
use futures::{future::BoxFuture, task::Spawn};
use std::sync::OnceLock;
use std::time::{Duration, Instant};
/// A dyn-safe view of the parts of an async runtime that we need for rate-limiting.
pub trait RuntimeSupport: Spawn + 'static + Sync + Send {
/// Return a future that will yield () after `duration` has passed.
fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()>;
/// Return the current time as an Instant.
fn now(&self) -> Instant;
}
impl<R: tor_rtcompat::Runtime> RuntimeSupport for R {
fn sleep(&self, duration: Duration) -> BoxFuture<'_, ()> {
Box::pin(tor_rtcompat::SleepProvider::sleep(self, duration))
fn now(&self) -> Instant {
tor_rtcompat::SleepProvider::now(self)
/// A global view of our runtime, used for rate-limited logging.
static RUNTIME_SUPPORT: OnceLock<Box<dyn RuntimeSupport>> = OnceLock::new();
/// Try to install `runtime` as a global runtime to be used for rate-limited logging.
///
/// Return an error (and make no changes) if there there was already a runtime installed.
pub fn install_runtime<R: tor_rtcompat::Runtime>(
runtime: R,
) -> Result<(), InstallRuntimeError> {
let rt = Box::new(runtime);
RUNTIME_SUPPORT
.set(rt)
.map_err(|_| InstallRuntimeError::DuplicateCall)
/// An error that occurs while installing a runtime.
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum InstallRuntimeError {
/// Tried to install a runtime when there was already one installed.
#[error("Called tor_log_ratelim::install_runtime() more than once")]
DuplicateCall,
/// Return the installed runtime, if there is one.
pub fn rt_support() -> Option<&'static dyn RuntimeSupport> {
RUNTIME_SUPPORT.get().map(Box::as_ref)
/// A rate-limited wrapper around a [`Loggable`]` that ensures its events are
/// flushed from time to time.
pub struct RateLim<T> {
/// The Loggable itself.
inner: Mutex<Inner<T>>,
/// The mutable state of a [`RateLim`].
struct Inner<T> {
/// The loggable state whose reports are rate-limited
loggable: T,
/// True if we have a running task that is collating reports for `loggable`.
task_running: bool,
impl<T: Loggable> RateLim<T> {
/// Create a new `RateLim` to flush events for `loggable`.
pub fn new(loggable: T) -> Arc<Self> {
Arc::new(RateLim {
inner: Mutex::new(Inner {
loggable,
task_running: false,
}),
})
/// Adjust the status of this reporter's `Loggable` by calling `f` on it,
/// but only if it is already scheduled to report itself. Otherwise, do nothing.
/// This is the appropriate function to use for tracking successes.f
pub fn nonevent<F>(&self, f: F)
where
F: FnOnce(&mut T),
{
let mut inner = self.inner.lock().expect("lock poisoned");
if inner.task_running {
f(&mut inner.loggable);
/// Add an event to this rate-limited reporter by calling `f` on it, and
/// schedule it to be reported after an appropriate time.
/// NOTE: This API is a big ugly. If we ever decide to make it non-hidden,
/// we may want to make it call rt_support() directly again, as it did in
/// earlier visions.
pub fn event<F>(self: &Arc<Self>, rt: &'static dyn rt::RuntimeSupport, f: F)
let mut inner = self.inner.lock().expect("poisoned lock");
if !inner.task_running {
// Launch a task to make periodic reports on the state of our Loggable.
inner.task_running = true;
if let Err(e) = rt.spawn(Box::pin(run(rt, Arc::clone(self)))) {
// We couldn't spawn a task; we have to flush the state
// immediately.
//
// TODO: This behavior is undesirable if it causes us to spam
// the log while we are shutting down. On the other hand, it's
// also undesirable if we suppress our logs while we're
// shutting down.
inner.loggable.flush(Duration::default());
tracing::warn!("Also, unable to spawn a logging task: {}", e.report());
/// After approximately this many seconds of not having anything to report, we
/// should reset our timeout schedule.
const RESET_AFTER_DORMANT_FOR: Duration = Duration::new(4 * 60 * 60, 0);
/// Return an iterator of reasonable amounts of time to summarize.
/// We summarize short intervals at first, and back off as the event keeps
/// happening.
fn timeout_sequence() -> impl Iterator<Item = Duration> {
/// seconds per second.
const SEC: u64 = 1;
/// seconds per minute.
const MIN: u64 = 60;
/// seconds per hour
const HOUR: u64 = 3600;
[
5 * SEC,
MIN,
5 * MIN,
30 * MIN,
HOUR,
4 * HOUR,
]
.into_iter()
.chain(std::iter::repeat(24 * HOUR))
.map(|secs| Duration::new(secs, 0))
/// Helper: runs in a background task, and periodically flushes the `Loggable`
/// in `ratelim`. Exits after [`Loggable::flush`] returns [`Activity::Dormant`]
/// for "long enough".
async fn run<T>(rt_support: &dyn rt::RuntimeSupport, ratelim: Arc<RateLim<T>>)
// TODO : Perhaps instead of taking an Arc<RateLim<T>> we want sometimes to take
// a `&'static RateLim<T>``, so we don't need to mess about with `Arc`s needlessly.
T: Loggable,
let mut dormant_since = None;
for duration in timeout_sequence() {
rt_support.sleep(duration).await;
let mut inner = ratelim.inner.lock().expect("Lock poisoned");
debug_assert!(inner.task_running);
// NOTE: We say that we are summarizing "duration" on the theory
// that we actually slept for "duration". But maybe `sleep` slept
// for a little more or less? Nonetheless, we report this as if we
// had slept for the exact amount, since the alternative appears to
// be saying stuff like "this problem occurred 8/12 times in the
// last 10min 0.0014ssec" instead of "10m".
if inner.loggable.flush(duration) == Activity::Dormant {
// TODO: This can tell the user several times that the problem
// did not occur! Perhaps we only want to flush once on dormant,
// and then not report the dormant condition again until we are
// no longer tracking it. Or perhaps we should lower the
// responsibility for deciding when to log and when to uninstall
// to the Loggable?
match dormant_since {
Some(when) => {
if let Some(dormant_for) = rt_support.now().checked_duration_since(when) {
if dormant_for >= RESET_AFTER_DORMANT_FOR {
inner.task_running = false;
return;
None => {
dormant_since = Some(rt_support.now());
} else {
dormant_since = None;
unreachable!("timeout_sequence returned a finite sequence");
// TODO : Write some tests.