Lines
88.61 %
Functions
31.52 %
Branches
100 %
//! Channel padding
//!
//! Tor spec `padding-spec.txt` section 2.
//! # Overview of channel padding control arrangements
//! 1. `tor_chanmgr::mgr::map` collates information about dormancy, netdir,
//! and overall client configuration, to maintain a
//! [`ChannelPaddingInstructions`](crate::channel::ChannelPaddingInstructions)
//! which is to be used for all relevant[^relevant] channels.
//! This is distributed to channel frontends (`Channel`s)
//! by calling `Channel::reparameterize`.
//! 2. Circuit and channel `get_or_launch` methods all take a `ChannelUsage`.
//! This is plumbed through the layers to `AbstractChanMgr::get_or_launch`,
//! which passes it to the channel frontend via `Channel::note_usage`.
//! 3. The `Channel` collates this information, and maintains an idea
//! of whether padding is relevant for this channel (`PaddingControlState`).
//! For channels where it *is* relevant, it sends `CtrlMsg::ConfigUpdate`
//! to the reactor.
//! 4. The reactor handles `CtrlMsg::ConfigUpdate` by reconfiguring is padding timer;
//! and by sending PADDING_NEGOTIATE cell(s).
//! [^relevant]: A "relevant" channel is one which is not excluded by the rules about
//! padding in padding-spec 2.2. Arti does not currently support acting as a relay,
//! so all our channels are client-to-guard or client-to-directory.
use std::pin::Pin;
// TODO, coarsetime maybe? But see arti#496 and also we want to use the mockable SleepProvider
use std::time::{Duration, Instant};
use derive_builder::Builder;
use educe::Educe;
use futures::future::{self, FusedFuture};
use futures::FutureExt;
use pin_project::pin_project;
use rand::distr::Distribution;
use tracing::error;
use tor_cell::chancell::msg::{Padding, PaddingNegotiate};
use tor_config::impl_standard_builder;
use tor_error::into_internal;
use tor_rtcompat::SleepProvider;
use tor_units::IntegerMilliseconds;
/// Timer that organises wakeups when channel padding should be sent
///
/// Use [`next()`](Timer::next) to find when to send padding, and
/// [`note_cell_sent()`](Timer::note_cell_sent) to reset the timeout when data flows.
/// A `Timer` can be in "disabled" state, in which case `next()` never completes.
/// `Timer` must be pinned before use
/// (this allows us to avoid involving the allocator when we reschedule).
#[pin_project(project = PaddingTimerProj)]
pub(crate) struct Timer<R: SleepProvider> {
/// [`SleepProvider`]
sleep_prov: R,
/// Parameters controlling distribution of padding time intervals
/// Can be `None` to mean the timing parameters are set to infinity.
parameters: Option<PreparedParameters>,
/// Gap that we intend to leave between last sent cell, and the padding
/// We only resample this (calculating a new random delay) after the previous
/// timeout actually expired.
/// `None` if the timer is disabled.
/// (This can be done explicitly, but also occurs on time calculation overflow.)
/// Invariants: this field may be `Some` or `None` regardless of the values
/// of other fields. If this field is `None` then the values in `trigger_at`
/// and `waker` are unspecified.
selected_timeout: Option<Duration>,
/// Absolute time at which we should send padding
/// `None` if cells more recently sent than we were polled.
/// That would mean that we are currently moving data out through this channel.
/// The absolute timeout will need to be recalculated when the data flow pauses.
/// `Some` means our `next` has been demanded recently.
/// Then `trigger_at` records the absolute timeout at which we should send padding,
/// which was calculated the first time we were polled (after data).
/// Invariants: the value in this field is meaningful only if `selected_timeout`
/// is `Some`.
/// If `selected_timeout` is `Some`, and `trigger_at` is therefore valid,
/// it is (obviously) no later than `selected_timeout` from now.
/// See also `waker`.
trigger_at: Option<Instant>,
/// Actual waker from the `SleepProvider`
/// This is created and updated lazily, because we suspect that with some runtimes
/// setting timeouts may be slow.
/// Lazy updating means that with intermittent data traffic, we do not keep scheduling,
/// descheduling, and adjusting, a wakeup time.
/// Invariants:
/// If `selected_timeout` is `Some`,
/// the time at which this waker will trigger here is never *later* than `trigger_at`,
/// and never *later* than `selected_timeout` from now.
/// The wakeup time here may well be earlier than `trigger_at`,
/// and sooner than `selected_timeout` from now. It may even be in the past.
/// When we wake up and discover this situation, we reschedule a new waker.
/// If `selected_timeout` is `None`, the value is unspecified.
/// We may retain a `Some` in this case so that if `SleepProvider` is enhanced to
/// support rescheduling, we can do that without making a new `SleepFuture`
/// (and without completely reorganising this the `Timer` state structure.)
#[pin]
waker: Option<R::SleepFuture>,
}
/// Timing parameters, as described in `padding-spec.txt`
#[derive(Debug, Copy, Clone, Eq, PartialEq, Builder)]
#[builder(build_fn(error = "ParametersError", private, name = "build_inner"))]
pub struct Parameters {
/// Low end of the distribution of `X`
#[builder(default = "1500.into()")]
pub(crate) low: IntegerMilliseconds<u32>,
/// High end of the distribution of `X` (inclusive)
#[builder(default = "9500.into()")]
pub(crate) high: IntegerMilliseconds<u32>,
/// An error that occurred whil e constructing padding parameters.
#[derive(Clone, Debug, thiserror::Error)]
#[non_exhaustive]
pub enum ParametersError {
/// Could not construct a range: there were no members between low and high.
#[error("Cannot construct padding parameters: low bound was above the high bound.")]
InvalidRange,
impl ParametersBuilder {
/// Try to construct a [`Parameters`] from this builder.
/// returns an error if the distribution is ill-defined.
pub fn build(&self) -> Result<Parameters, ParametersError> {
let parameters = self.build_inner()?;
if parameters.low > parameters.high {
return Err(ParametersError::InvalidRange);
Ok(parameters)
impl_standard_builder! { Parameters: !Deserialize + !Builder + !Default }
impl Parameters {
/// Return a `PADDING_NEGOTIATE START` cell specifying precisely these parameters
/// This function does not take account of the need to avoid sending particular
/// parameters, and instead sending zeroes, if the requested padding is the consensus
/// default. The caller must take care of that.
pub fn padding_negotiate_cell(&self) -> Result<PaddingNegotiate, tor_error::Bug> {
let get = |input: IntegerMilliseconds<u32>| {
input
.try_map(TryFrom::try_from)
.map_err(into_internal!("padding negotiate out of range"))
};
Ok(PaddingNegotiate::start(get(self.low)?, get(self.high)?))
/// Make a Parameters containing the specification-defined default parameters
pub fn default_padding() -> Self {
Parameters::builder().build().expect("build succeeded")
/// Make a Parameters sentinel value, with both fields set to zero, which means "no padding"
pub fn disabled() -> Self {
Parameters {
low: 0.into(),
high: 0.into(),
/// Timing parameters, "compiled" into a form which can be sampled more efficiently
/// According to the docs for [`rand::Rng::gen_range`],
/// it is better to construct a distribution,
/// than to call `gen_range` repeatedly on the same range.
#[derive(Debug, Clone)]
struct PreparedParameters {
/// The distribution of `X` (not of the ultimate delay, which is `max(X1,X2)`)
x_distribution_ms: rand::distr::Uniform<u32>,
/// Return value from `prepare_to_sleep`: instructions for what caller ought to do
#[derive(Educe)]
#[educe(Debug)]
enum SleepInstructions<'f, R: SleepProvider> {
/// Caller should send padding immediately
Immediate {
/// The current `Instant`, returned so that the caller need not call `now` again
now: Instant,
},
/// Caller should wait forever
Forever,
/// Caller should `await` this
Waker(#[educe(Debug(ignore))] Pin<&'f mut R::SleepFuture>),
impl<R: SleepProvider> Timer<R> {
/// Create a new `Timer`
#[allow(dead_code)]
pub(crate) fn new(sleep_prov: R, parameters: Parameters) -> crate::Result<Self> {
let parameters = parameters.prepare()?;
let selected_timeout = parameters.select_timeout();
// Too different to new_disabled to share its code, sadly.
Ok(Timer {
sleep_prov,
parameters: Some(parameters),
selected_timeout: Some(selected_timeout),
trigger_at: None,
waker: None,
})
/// Create a new `Timer` which starts out disabled
pub(crate) fn new_disabled(
parameters: Option<Parameters>,
) -> crate::Result<Self> {
parameters: parameters.map(|p| p.prepare()).transpose()?,
selected_timeout: None,
/// Disable this `Timer`
/// Idempotent.
pub(crate) fn disable(self: &mut Pin<&mut Self>) {
*self.as_mut().project().selected_timeout = None;
/// Enable this `Timer`
/// (If the timer was disabled, the timeout will only start to run when `next()`
/// is next polled.)
pub(crate) fn enable(self: &mut Pin<&mut Self>) {
if !self.is_enabled() {
self.as_mut().select_fresh_timeout();
/// Set this `Timer`'s parameters
/// Will not enable or disable the timer; that must be done separately if desired.
/// The effect may not be immediate: if we are already in a gap between cells,
/// that existing gap may not be adjusted.
/// (We don't *restart* the timer since that would very likely result in a gap
/// longer than either of the configured values.)
pub(crate) fn reconfigure(
self: &mut Pin<&mut Self>,
parameters: &Parameters,
) -> crate::Result<()> {
*self.as_mut().project().parameters = Some(parameters.prepare()?);
Ok(())
/// Enquire whether this `Timer` is currently enabled
pub(crate) fn is_enabled(&self) -> bool {
self.selected_timeout.is_some()
/// Select a fresh timeout (and enable, if possible)
fn select_fresh_timeout(self: Pin<&mut Self>) {
let mut self_ = self.project();
let timeout = self_.parameters.as_ref().map(|p| p.select_timeout());
*self_.selected_timeout = timeout;
// This is no longer valid; recalculate it on next poll
*self_.trigger_at = None;
// Timeout might be earlier, so we will need a new waker too.
self_.waker.set(None);
/// Note that data has been sent (ie, reset the timeout, delaying the next padding)
pub(crate) fn note_cell_sent(self: &mut Pin<&mut Self>) {
// Fast path, does not need to do anything but clear the absolute expiry time
let self_ = self.as_mut().project();
/// Calculate when to send padding, and return a suitable waker
/// In the usual case returns [`SleepInstructions::Waker`].
fn prepare_to_sleep(mut self: Pin<&mut Self>, now: Option<Instant>) -> SleepInstructions<R> {
let mut self_ = self.as_mut().project();
let timeout = match self_.selected_timeout {
None => return SleepInstructions::Forever,
Some(t) => *t,
if self_.waker.is_some() {
// We need to do this with is_some and expect because we need to consume self
// to get a return value with the right lifetimes.
let waker = self
.project()
.waker
.as_pin_mut()
.expect("None but we just checked");
return SleepInstructions::Waker(waker);
let now = now.unwrap_or_else(|| self_.sleep_prov.now());
let trigger_at = match self_.trigger_at {
Some(t) => t,
None => self_.trigger_at.insert(match now.checked_add(timeout) {
None => {
error!("bug: timeout overflowed computing next channel padding. Disabling.");
self.disable();
return SleepInstructions::Forever;
Some(r) => r,
}),
let remaining = trigger_at.checked_duration_since(now).unwrap_or_default();
if remaining.is_zero() {
return SleepInstructions::Immediate { now };
//dbg!(timeout, remaining, now, trigger_at);
// There is no Option::get_pin_mut_or_set_with
if self_.waker.is_none() {
self_.waker.set(Some(self_.sleep_prov.sleep(remaining)));
.expect("None but we just inserted!");
SleepInstructions::Waker(waker)
/// Wait until we should next send padding, and then return the padding message
/// Should be used as a low-priority branch within `select_biased!`.
/// (`next()` has to be selected on, along with other possible events, in the
/// main loop, so that the padding timer runs concurrently with other processing;
/// and it should be in a low-priority branch of `select_biased!` as an optimisation:
/// that avoids calculating timeouts etc. until necessary,
/// i.e. it calculates them only when the main loop would otherwise block.)
/// The returned future is async-cancel-safe,
/// but once it yields, the padding must actually be sent.
pub(crate) fn next(self: Pin<&mut Self>) -> impl FusedFuture<Output = Padding> + '_ {
self.next_inner().fuse()
/// Wait until we should next send padding (not `FusedFuture`)
/// Callers wants a [`FusedFuture`] because `select!` needs one.
async fn next_inner(mut self: Pin<&mut Self>) -> Padding {
let now = loop {
match self.as_mut().prepare_to_sleep(None) {
SleepInstructions::Forever => future::pending().await,
SleepInstructions::Immediate { now } => break now,
SleepInstructions::Waker(waker) => waker.await,
// This timer has fired and has therefore been used up.
// When we go round again we will make a new one.
//
// TODO: have SleepProviders provide a reschedule function, and use it.
// That is likely to be faster where supported.
self.as_mut().project().waker.set(None);
// It's time to send padding.
// Firstly, calculate the new timeout for the *next* padding,
// so that we leave the `Timer` properly programmed.
// Bet that we will be going to sleep again, and set up the new trigger time
// and waker now. This will save us a future call to Instant::now.
self.as_mut().prepare_to_sleep(Some(now));
Padding::new()
/// "Compile" the parameters into a form which can be quickly sampled
fn prepare(self) -> Result<PreparedParameters, tor_error::Bug> {
Ok(PreparedParameters {
x_distribution_ms: rand::distr::Uniform::new_inclusive(
self.low.as_millis(),
self.high.as_millis(),
)
.map_err(into_internal!("Parameters were not a valid range."))?,
impl PreparedParameters {
/// Randomly select a timeout (as per `padding-spec.txt`)
fn select_timeout(&self) -> Duration {
let mut rng = rand::rng();
let ms = std::cmp::max(
self.x_distribution_ms.sample(&mut rng),
);
Duration::from_millis(ms.into())
#[cfg(test)]
mod test {
// @@ begin test lint list maintained by maint/add_warning @@
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_duration_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
//! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
use super::*;
use futures::future::ready;
use futures::select_biased;
use itertools::{izip, Itertools};
use statrs::distribution::ContinuousCDF;
use tokio::pin;
use tokio_crate::{self as tokio};
use tor_rtcompat::*;
async fn assert_not_ready<R: Runtime>(timer: &mut Pin<&mut Timer<R>>) {
select_biased! {
_ = timer.as_mut().next() => panic!("unexpectedly ready"),
_ = ready(()) => { },
async fn assert_is_ready<R: Runtime>(timer: &mut Pin<&mut Timer<R>>) {
let _: Padding = select_biased! {
p = timer.as_mut().next() => p,
_ = ready(()) => panic!("pad timer failed to yield"),
#[test]
fn timer_impl() {
let runtime = tor_rtcompat::tokio::TokioNativeTlsRuntime::create().unwrap();
#[allow(deprecated)] // TODO #1885
let runtime = tor_rtmock::MockSleepRuntime::new(runtime);
let parameters = Parameters {
low: 1000.into(),
high: 1000.into(),
let () = runtime.block_on(async {
let timer = Timer::new(runtime.clone(), parameters).unwrap();
pin!(timer);
assert_eq! { true, timer.is_enabled() }
// expiry time not yet calculated
assert_eq! { timer.as_mut().trigger_at, None };
// ---------- timeout value ----------
// Just created, not ready yet
assert_not_ready(&mut timer).await;
runtime.advance(Duration::from_millis(999)).await;
// Not quite ready
runtime.advance(Duration::from_millis(1)).await;
// Should go off precisely now
assert_is_ready(&mut timer).await;
runtime.advance(Duration::from_millis(1001)).await;
// Should go off 1ms ago, fine
// ---------- various resets ----------
runtime.advance(Duration::from_millis(500)).await;
timer.note_cell_sent();
// This ought not to cause us to actually calculate the expiry time
let () = select_biased! {
_ = timer.as_mut().next() => panic!(),
// ---------- disable/enable ----------
timer.disable();
runtime.advance(Duration::from_millis(2000)).await;
assert_eq! { timer.as_mut().selected_timeout, None };
assert_eq! { false, timer.is_enabled() }
timer.enable();
runtime.advance(Duration::from_millis(3000)).await;
// Shouldn't be already ready, since we haven't polled yet
runtime.advance(Duration::from_millis(1000)).await;
// *Now*
});
assert! { timer.as_mut().selected_timeout.is_some() };
assert! { timer.as_mut().trigger_at.is_none() };
// Force an overflow by guddling
*timer.as_mut().project().selected_timeout = Some(Duration::MAX);
dbg!(timer.as_mut().project().trigger_at);
let timer = Timer::new_disabled(runtime.clone(), None).unwrap();
assert! { timer.parameters.is_none() };
assert! { timer.as_mut().selected_timeout.is_none() };
let timer = Timer::new_disabled(runtime.clone(), Some(parameters)).unwrap();
assert! { timer.parameters.is_some() };
timer.as_mut().enable();
#[allow(clippy::print_stderr)]
fn timeout_distribution() {
// Test that the distribution of padding intervals is as we expect. This is not so
// straightforward. We need to deal with true randomness (since we can't plumb a
// testing RNG into the padding timer, and perhaps don't even *want* to make that a
// mockable interface). Measuring a distribution of random variables involves some
// statistics.
// The overall approach is:
// Use a fixed (but nontrivial) low to high range
// Sample N times into n equal sized buckets
// Calculate the expected number of samples in each bucket
// Do a chi^2 test. If it doesn't spot a potential difference, declare OK.
// If the chi^2 test does definitely declare a difference, declare failure.
// Otherwise increase N and go round again.
// This allows most runs to be fast without having an appreciable possibility of a
// false test failure and while being able to detect even quite small deviations.
// Notation from
// https://en.wikipedia.org/wiki/Pearson%27s_chi-squared_test#Calculating_the_test-statistic
// I haven't done a formal power calculation but empirically
// this detects the following most of the time:
// deviation of the CDF power from B^2 to B^1.98
// wrong minimum value by 25ms out of 12s, low_ms = min + 25
// wrong maximum value by 10ms out of 12s, high_ms = max -1 - 10
#[allow(non_snake_case)]
let mut N = 100_0000;
#[allow(non_upper_case_globals)]
const n: usize = 100;
const P_GOOD: f64 = 0.05; // Investigate further 5% of times (if all is actually well)
const P_BAD: f64 = 1e-12;
loop {
eprintln!("padding distribution test, n={} N={}", n, N);
let min = 5000;
let max = 17000; // Exclusive
assert_eq!(0, (max - min) % (n as u32)); // buckets must match up to integer boundaries
let cdf = (0..=n)
.map(|bi| {
let b = (bi as f64) / (n as f64);
// expected distribution:
// with B = bi / n
// P(X) < B == B
// P(max(X1,X1)) < B = B^2
b.powi(2)
.collect_vec();
let pdf = cdf
.iter()
.cloned()
.tuple_windows()
.map(|(p, q)| q - p)
let exp = pdf.iter().cloned().map(|p| p * f64::from(N)).collect_vec();
// chi-squared test only valid if every cell expects at least 5
assert!(exp.iter().cloned().all(|ei| ei >= 5.));
let mut obs = [0_u32; n];
let params = Parameters {
low: min.into(),
high: (max - 1).into(), // convert exclusive to inclusive
.prepare()
.unwrap();
for _ in 0..N {
let xx = params.select_timeout();
let ms = xx.as_millis();
let ms = u32::try_from(ms).unwrap();
assert!(ms >= min);
assert!(ms < max);
// Integer arithmetic ensures that we classify exactly
let bi = ((ms - min) * (n as u32)) / (max - min);
obs[bi as usize] += 1;
let chi2 = izip!(&obs, &exp)
.map(|(&oi, &ei)| (f64::from(oi) - ei).powi(2) / ei)
.sum::<f64>();
// n degrees of freedom, one-tailed test
// (since distro parameters are all fixed, not estimated from the sample)
let chi2_distr = statrs::distribution::ChiSquared::new(n as _).unwrap();
// probability of good code generating a result at least this bad
let p = 1. - chi2_distr.cdf(chi2);
eprintln!(
"padding distribution test, n={} N={} chi2={} p={}",
n, N, chi2, p
if p >= P_GOOD {
break;
for (i, (&oi, &ei)) in izip!(&obs, &exp).enumerate() {
eprintln!("bi={:4} OI={:4} EI={}", i, oi, ei);
if p < P_BAD {
panic!("distribution is wrong (p < {:e})", P_BAD);
// This is statistically rather cheaty: we keep trying until we get a definite
// answer! But we radically increase the power of the test each time.
// If the distribution is really wrong, this test ought to find it soon enough,
// especially since we run this repeatedly in CI.
N *= 10;
fn parameters_range() {
let ms100 = IntegerMilliseconds::new(100);
let ms1000 = IntegerMilliseconds::new(1000);
let ms1500 = IntegerMilliseconds::new(1500);
let ms9500 = IntegerMilliseconds::new(9500);
// default
let p = Parameters::builder().build().unwrap();
assert_eq!(
p,
low: ms1500,
high: ms9500
assert!(p.prepare().is_ok());
// low < high
let mut pb = Parameters::builder();
pb.low(ms100);
pb.high(ms1000);
let p = pb.build().unwrap();
low: ms100,
high: ms1000
let p = p.prepare().unwrap();
let range = Duration::try_from(ms100).unwrap()..=Duration::try_from(ms1000).unwrap();
for _ in 1..100 {
assert!(range.contains(&p.select_timeout()));
// low == high
pb.low(ms1000);
// low > high (error case)
pb.high(ms100);
let e = pb.build().unwrap_err();
assert!(matches!(e, ParametersError::InvalidRange));